feat: protobuf input messaging (#165)

Replace json protocol by protobuf
generate protobuf files with `bun buf generate` or just `buf generate`

- [x]  Implement all datatypes with proto files

- [x] Map to ts types or use the generated proto types directly with:
   - [x] web frontend
   - [x] relay
   - [x] runner

- [ ] final performance test (to be done when CI builds new images)

---------

Co-authored-by: DatCaptainHorse <DatCaptainHorse@users.noreply.github.com>
This commit is contained in:
Philipp Neumann
2025-01-28 15:04:20 +01:00
committed by GitHub
parent 431733a0e8
commit fbaa8835a3
38 changed files with 2758 additions and 647 deletions

View File

@@ -10,10 +10,10 @@ use std::time::Duration;
use tokio::net::TcpStream;
use tokio::sync::{mpsc, Mutex, Notify};
use tokio::time::sleep;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::{Message, Utf8Bytes};
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
type Callback = Box<dyn Fn(Vec<u8>) + Send + Sync>;
type Callback = Box<dyn Fn(String) + Send + Sync>;
type WSRead = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
type WSWrite = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
@@ -23,7 +23,7 @@ pub struct NestriWebSocket {
reader: Arc<Mutex<Option<WSRead>>>,
writer: Arc<Mutex<Option<WSWrite>>>,
callbacks: Arc<RwLock<HashMap<String, Callback>>>,
message_tx: mpsc::UnboundedSender<Vec<u8>>,
message_tx: mpsc::UnboundedSender<String>,
reconnected_notify: Arc<Notify>,
}
impl NestriWebSocket {
@@ -95,8 +95,8 @@ impl NestriWebSocket {
while let Some(message_result) = ws_read.next().await {
match message_result {
Ok(message) => {
let data = message.into_data();
let base_message = match decode_message(&data) {
let data = message.into_text().expect("failed to turn message into text");
let base_message = match decode_message(data.to_string()) {
Ok(base_message) => base_message,
Err(e) => {
eprintln!("Failed to decode message: {:?}", e);
@@ -107,11 +107,14 @@ impl NestriWebSocket {
let callbacks_lock = callbacks.read().unwrap();
if let Some(callback) = callbacks_lock.get(&base_message.payload_type) {
let data = data.clone();
callback(data);
callback(data.to_string());
}
}
Err(e) => {
eprintln!("Error receiving message: {:?}, reconnecting in 3 seconds...", e);
eprintln!(
"Error receiving message: {:?}, reconnecting in 3 seconds...",
e
);
sleep(Duration::from_secs(3)).await;
self_clone.reconnect().await.unwrap();
break; // Break the inner loop to get a new ws_read
@@ -123,7 +126,7 @@ impl NestriWebSocket {
});
}
fn spawn_write_loop(&self, mut message_rx: mpsc::UnboundedReceiver<Vec<u8>>) {
fn spawn_write_loop(&self, mut message_rx: mpsc::UnboundedReceiver<String>) {
let writer = self.writer.clone();
let self_clone = self.clone();
@@ -136,7 +139,10 @@ impl NestriWebSocket {
let mut writer_lock = writer.lock().await;
if let Some(writer) = writer_lock.as_mut() {
// Try to send the message over the WebSocket
match writer.send(Message::Binary(message.clone())).await {
match writer
.send(Message::Text(Utf8Bytes::from(message.clone())))
.await
{
Ok(_) => {
// Message sent successfully
break;
@@ -196,7 +202,7 @@ impl NestriWebSocket {
}
/// Send a message through the WebSocket
pub fn send_message(&self, message: Vec<u8>) -> Result<(), Box<dyn Error>> {
pub fn send_message(&self, message: String) -> Result<(), Box<dyn Error>> {
self.message_tx
.send(message)
.map_err(|e| format!("Failed to send message: {:?}", e).into())
@@ -205,7 +211,7 @@ impl NestriWebSocket {
/// Register a callback for a specific response type
pub fn register_callback<F>(&self, response_type: &str, callback: F)
where
F: Fn(Vec<u8>) + Send + Sync + 'static,
F: Fn(String) + Send + Sync + 'static,
{
let mut callbacks_lock = self.callbacks.write().unwrap();
callbacks_lock.insert(response_type.to_string(), Box::new(callback));
@@ -234,6 +240,7 @@ impl Log for NestriWebSocket {
let log_message = MessageLog {
base: MessageBase {
payload_type: "log".to_string(),
latency: None,
},
level,
message,