diff --git a/gst-warp-sink/Cargo.lock b/gst-warp-sink/Cargo.lock index 0941271..9cf8ec2 100644 --- a/gst-warp-sink/Cargo.lock +++ b/gst-warp-sink/Cargo.lock @@ -645,29 +645,6 @@ dependencies = [ "system-deps", ] -[[package]] -name = "gst-plugin-fmp4" -version = "0.12.0-alpha.1" -source = "git+https://github.com/sdroege/gst-plugin-rs#c7f961cc2216f24e282ed81dbed9be7d29d5ecd0" -dependencies = [ - "anyhow", - "gst-plugin-version-helper 0.8.0 (git+https://github.com/sdroege/gst-plugin-rs)", - "gstreamer", - "gstreamer-audio", - "gstreamer-base", - "gstreamer-pbutils", - "gstreamer-video", -] - -[[package]] -name = "gst-plugin-version-helper" -version = "0.8.0" -source = "git+https://github.com/sdroege/gst-plugin-rs#c7f961cc2216f24e282ed81dbed9be7d29d5ecd0" -dependencies = [ - "chrono", - "toml_edit 0.21.0", -] - [[package]] name = "gst-plugin-version-helper" version = "0.8.0" @@ -800,33 +777,6 @@ dependencies = [ "system-deps", ] -[[package]] -name = "gstreamer-gl" -version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#7f234c88acec5f5233342fa01e0c7ffde51bd049" -dependencies = [ - "glib", - "gstreamer", - "gstreamer-base", - "gstreamer-gl-sys", - "gstreamer-video", - "libc", -] - -[[package]] -name = "gstreamer-gl-sys" -version = "0.22.0" -source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#7f234c88acec5f5233342fa01e0c7ffde51bd049" -dependencies = [ - "glib-sys", - "gobject-sys", - "gstreamer-base-sys", - "gstreamer-sys", - "gstreamer-video-sys", - "libc", - "system-deps", -] - [[package]] name = "gstreamer-pbutils" version = "0.22.0" @@ -1077,14 +1027,11 @@ dependencies = [ "anyhow", "bytes", "chrono", - "gst-plugin-fmp4", - "gst-plugin-version-helper 0.8.0 (git+https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs)", + "gst-plugin-version-helper", "gstreamer", "gstreamer-app", "gstreamer-check", - "gstreamer-gl", "gstreamer-pbutils", - "gstreamer-video", "isobmff", "moq-transport", "mp4", diff --git a/gst-warp-sink/Cargo.toml b/gst-warp-sink/Cargo.toml index 8588377..fdaa54b 100644 --- a/gst-warp-sink/Cargo.toml +++ b/gst-warp-sink/Cargo.toml @@ -12,9 +12,6 @@ gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/g gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs"} gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } gst-pbutils = { package = "gstreamer-pbutils", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } -gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } -gst-gl = { package = "gstreamer-gl", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } -gst-plugin-fmp4 = { git = "https://github.com/sdroege/gst-plugin-rs", version = "0.12.0-alpha.1" } chrono = "0.4.31" isobmff = { git = "https://github.com/LMinJae/isobmff-rs", version = "0.1.0" } bytes = "1.5.0" diff --git a/gst-warp-sink/src/moqsink/imp.rs b/gst-warp-sink/src/moqsink/imp.rs index 0e60c31..7737ba3 100644 --- a/gst-warp-sink/src/moqsink/imp.rs +++ b/gst-warp-sink/src/moqsink/imp.rs @@ -3,7 +3,7 @@ use gst::glib::once_cell::sync::Lazy; use gst::prelude::*; use gst::subclass::prelude::*; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use crate::relayurl::*; use crate::RUNTIME; @@ -63,23 +63,13 @@ impl Default for Settings { } } -#[derive(Debug)] -struct StartedState { - broadcast: broadcast::Publisher, -} - -impl StartedState { - pub fn new(broadcast: broadcast::Publisher) -> StartedState { - StartedState { broadcast } - } -} - #[derive(Default)] enum State { #[default] Stopped, - Completed, - Started(StartedState), + Started { + broadcast: Mutex, + }, } static CAT: Lazy = Lazy::new(|| { @@ -120,34 +110,37 @@ impl MoqSink { unreachable!("Element already started"); } - // let relay_url = { - // let url = self.url.lock().unwrap(); - // match *url { - // Some(ref url) => url.clone(), - // None => { - // return Err(gst::error_msg!( - // gst::ResourceError::Settings, - // ["Cannot start without a URL being set"] - // )); - // } - // } - // }; - let relay_url = self - .url - .lock() - .map_err(|e| { - gst::error_msg!( - gst::ResourceError::Settings, - ["Failed to acquire URL lock: {}", e] - ) - })? - .clone() - .ok_or_else(|| { - gst::error_msg!( - gst::ResourceError::Settings, - ["Cannot start without a URL being set"] - ) - })?; + let relay_url = { + let url = self.url.lock().unwrap(); + match *url { + Some(ref url) => url.clone(), + None => { + return Err(gst::error_msg!( + gst::ResourceError::Settings, + ["Cannot start without a URL being set"] + )); + } + } + }; + + //More complex but with error handling + + // let relay_url = self + // .url + // .lock() + // .map_err(|e| { + // gst::error_msg!( + // gst::ResourceError::Settings, + // ["Failed to acquire URL lock: {}", e] + // ) + // })? + // .clone() + // .ok_or_else(|| { + // gst::error_msg!( + // gst::ResourceError::Settings, + // ["Cannot start without a URL being set"] + // ) + // })?; gst::trace!( CAT, @@ -156,9 +149,7 @@ impl MoqSink { relay_url ); - // Initialize shared state and channels - let (sender, receiver) = mpsc::channel(32); - // self.sender = Some(sender); + let (publisher, subscriber) = broadcast::new(""); // Spawn a new thread to run the Moq server RUNTIME.spawn(async move { @@ -167,8 +158,6 @@ impl MoqSink { .finish(); tracing::subscriber::set_global_default(tracer).unwrap(); - let (publisher, subscriber) = broadcast::new(""); - // Create a list of acceptable root certificates. let mut roots = rustls::RootCertStore::empty(); @@ -210,9 +199,9 @@ impl MoqSink { }); // Update the state to indicate the element has started - // *state = State::Started(StartedState { - // broadcast: publisher, - // }); + *state = Some(State::Started { + broadcast: Mutex::new(publisher), + }); Ok(()) } @@ -247,6 +236,7 @@ impl MoqSink { } } +#[glib::object_subclass] impl ObjectSubclass for MoqSink { const NAME: &'static str = ELEMENT_CLASS_NAME; type Type = super::MoqSink;