feat(runner): Fixes and improvements (#259)

## Description

- Improves latency for runner
- Fixes bugs in entrypoint bash scripts
- Package updates, gstreamer 1.26 and workaround for it

Modified runner workflow to hopefully pull latest cachyos base image on
nightlies. This will cause a full build but for nightlies should be
fine?

Also removed the duplicate key-down workaround as we've enabled ordered
datachannels now. Increased retransmit to 2 from 0 to see if it'll help
with some network issues.

Marked as draft as I need to do bug testing still, I'll do it after
fever calms down 😅



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Enhanced deployment workflows with optimized container image
management.
- Improved audio and video processing for lower latency and better
synchronization.
  - Consolidated debugging options to ease command-line monitoring.

- **Refactor**
- Streamlined internal script flow and process handling for smoother
performance.
- Updated dependency management and communication protocols to boost
overall stability.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

Co-authored-by: DatCaptainHorse <DatCaptainHorse@users.noreply.github.com>
This commit is contained in:
Kristian Ollikainen
2025-04-13 23:13:09 +03:00
committed by GitHub
parent f408ec56cb
commit 9a6826b069
15 changed files with 1257 additions and 672 deletions

View File

@@ -7,47 +7,53 @@ use crate::proto::proto::proto_input::InputType::{
};
use crate::proto::proto::{ProtoInput, ProtoMessageInput};
use crate::websocket::NestriWebSocket;
use atomic_refcell::AtomicRefCell;
use glib::subclass::prelude::*;
use gst::glib;
use gst::prelude::*;
use gst_webrtc::{WebRTCSDPType, WebRTCSessionDescription, gst_sdp};
use gstrswebrtc::signaller::{Signallable, SignallableImpl};
use parking_lot::RwLock as PLRwLock;
use prost::Message;
use std::collections::HashSet;
use std::sync::{Arc, LazyLock};
use std::sync::{Mutex, RwLock};
use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
pub struct Signaller {
nestri_ws: RwLock<Option<Arc<NestriWebSocket>>>,
pipeline: RwLock<Option<Arc<gst::Pipeline>>>,
data_channel: RwLock<Option<gst_webrtc::WebRTCDataChannel>>,
nestri_ws: PLRwLock<Option<Arc<NestriWebSocket>>>,
pipeline: PLRwLock<Option<Arc<gst::Pipeline>>>,
data_channel: AtomicRefCell<Option<gst_webrtc::WebRTCDataChannel>>,
}
impl Default for Signaller {
fn default() -> Self {
Self {
nestri_ws: RwLock::new(None),
pipeline: RwLock::new(None),
data_channel: RwLock::new(None),
nestri_ws: PLRwLock::new(None),
pipeline: PLRwLock::new(None),
data_channel: AtomicRefCell::new(None),
}
}
}
impl Signaller {
pub fn set_nestri_ws(&self, nestri_ws: Arc<NestriWebSocket>) {
*self.nestri_ws.write().unwrap() = Some(nestri_ws);
*self.nestri_ws.write() = Some(nestri_ws);
}
pub fn set_pipeline(&self, pipeline: Arc<gst::Pipeline>) {
*self.pipeline.write().unwrap() = Some(pipeline);
*self.pipeline.write() = Some(pipeline);
}
pub fn get_pipeline(&self) -> Option<Arc<gst::Pipeline>> {
self.pipeline.read().unwrap().clone()
self.pipeline.read().clone()
}
pub fn set_data_channel(&self, data_channel: gst_webrtc::WebRTCDataChannel) {
*self.data_channel.write().unwrap() = Some(data_channel);
match self.data_channel.try_borrow_mut() {
Ok(mut dc) => *dc = Some(data_channel),
Err(_) => gst::warning!(
gst::CAT_DEFAULT,
"Failed to set data channel - already borrowed"
),
}
}
/// Helper method to clean things up
@@ -55,7 +61,6 @@ impl Signaller {
let nestri_ws = {
self.nestri_ws
.read()
.unwrap()
.clone()
.expect("NestriWebSocket not set")
};
@@ -145,8 +150,9 @@ impl Signaller {
&"nestri-data-channel",
&gst::Structure::builder("config")
.field("ordered", &true)
.field("max-retransmits", &0u32)
.field("max-retransmits", &2u32)
.field("priority", "high")
.field("protocol", "raw")
.build(),
],
),
@@ -175,7 +181,6 @@ impl SignallableImpl for Signaller {
let nestri_ws = {
self.nestri_ws
.read()
.unwrap()
.clone()
.expect("NestriWebSocket not set")
};
@@ -266,7 +271,6 @@ impl SignallableImpl for Signaller {
let nestri_ws = {
self.nestri_ws
.read()
.unwrap()
.clone()
.expect("NestriWebSocket not set")
};
@@ -297,7 +301,6 @@ impl SignallableImpl for Signaller {
let nestri_ws = {
self.nestri_ws
.read()
.unwrap()
.clone()
.expect("NestriWebSocket not set")
};
@@ -360,9 +363,6 @@ impl ObjectImpl for Signaller {
fn setup_data_channel(data_channel: &gst_webrtc::WebRTCDataChannel, pipeline: &gst::Pipeline) {
let pipeline = pipeline.clone();
// A shared state to track currently pressed keys
let pressed_keys = Arc::new(Mutex::new(HashSet::new()));
let pressed_buttons = Arc::new(Mutex::new(HashSet::new()));
data_channel.connect_on_message_data(move |_data_channel, data| {
if let Some(data) = data {
@@ -370,9 +370,7 @@ fn setup_data_channel(data_channel: &gst_webrtc::WebRTCDataChannel, pipeline: &g
Ok(message_input) => {
if let Some(input_msg) = message_input.data {
// Process the input message and create an event
if let Some(event) =
handle_input_message(input_msg, &pressed_keys, &pressed_buttons)
{
if let Some(event) = handle_input_message(input_msg) {
// Send the event to pipeline, result bool is ignored
let _ = pipeline.send_event(event);
}
@@ -388,11 +386,7 @@ fn setup_data_channel(data_channel: &gst_webrtc::WebRTCDataChannel, pipeline: &g
});
}
fn handle_input_message(
input_msg: ProtoInput,
pressed_keys: &Arc<Mutex<HashSet<i32>>>,
pressed_buttons: &Arc<Mutex<HashSet<i32>>>,
) -> Option<gst::Event> {
fn handle_input_message(input_msg: ProtoInput) -> Option<gst::Event> {
if let Some(input_type) = input_msg.input_type {
match input_type {
MouseMove(data) => {
@@ -412,13 +406,6 @@ fn handle_input_message(
Some(gst::event::CustomUpstream::new(structure))
}
KeyDown(data) => {
let mut keys = pressed_keys.lock().unwrap();
// If the key is already pressed, return to prevent key lockup
if keys.contains(&data.key) {
return None;
}
keys.insert(data.key);
let structure = gst::Structure::builder("KeyboardKey")
.field("key", data.key as u32)
.field("pressed", true)
@@ -427,10 +414,6 @@ fn handle_input_message(
Some(gst::event::CustomUpstream::new(structure))
}
KeyUp(data) => {
let mut keys = pressed_keys.lock().unwrap();
// Remove the key from the pressed state when released
keys.remove(&data.key);
let structure = gst::Structure::builder("KeyboardKey")
.field("key", data.key as u32)
.field("pressed", false)
@@ -447,13 +430,6 @@ fn handle_input_message(
Some(gst::event::CustomUpstream::new(structure))
}
MouseKeyDown(data) => {
let mut buttons = pressed_buttons.lock().unwrap();
// If the button is already pressed, return to prevent button lockup
if buttons.contains(&data.key) {
return None;
}
buttons.insert(data.key);
let structure = gst::Structure::builder("MouseButton")
.field("button", data.key as u32)
.field("pressed", true)
@@ -462,10 +438,6 @@ fn handle_input_message(
Some(gst::event::CustomUpstream::new(structure))
}
MouseKeyUp(data) => {
let mut buttons = pressed_buttons.lock().unwrap();
// Remove the button from the pressed state when released
buttons.remove(&data.key);
let structure = gst::Structure::builder("MouseButton")
.field("button", data.key as u32)
.field("pressed", false)