diff --git a/gst-warp-sink/Cargo.lock b/gst-warp-sink/Cargo.lock index b4b3975..0941271 100644 --- a/gst-warp-sink/Cargo.lock +++ b/gst-warp-sink/Cargo.lock @@ -1070,16 +1070,6 @@ dependencies = [ "value-bag", ] -[[package]] -name = "m3u8-rs" -version = "5.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c1d7ba86f7ea62f17f4310c55e93244619ddc7dadfc7e565de1967e4e41e6e7" -dependencies = [ - "chrono", - "nom", -] - [[package]] name = "me" version = "0.1.0" @@ -1091,13 +1081,11 @@ dependencies = [ "gst-plugin-version-helper 0.8.0 (git+https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs)", "gstreamer", "gstreamer-app", - "gstreamer-base", "gstreamer-check", "gstreamer-gl", "gstreamer-pbutils", "gstreamer-video", "isobmff", - "m3u8-rs", "moq-transport", "mp4", "once_cell", @@ -1119,12 +1107,6 @@ version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" -[[package]] -name = "minimal-lexical" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" - [[package]] name = "miniz_oxide" version = "0.7.1" @@ -1181,16 +1163,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "956787520e75e9bd233246045d19f42fb73242759cc57fba9611d940ae96d4b0" -[[package]] -name = "nom" -version = "7.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" -dependencies = [ - "memchr", - "minimal-lexical", -] - [[package]] name = "nu-ansi-term" version = "0.46.0" diff --git a/gst-warp-sink/Cargo.toml b/gst-warp-sink/Cargo.toml index c10241e..8588377 100644 --- a/gst-warp-sink/Cargo.toml +++ b/gst-warp-sink/Cargo.toml @@ -8,16 +8,14 @@ edition = "2021" [dependencies] anyhow = { version = "1", features = ["backtrace"] } -gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } -gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } -gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs"} +gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs"} gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } gst-pbutils = { package = "gstreamer-pbutils", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-gl = { package = "gstreamer-gl", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-plugin-fmp4 = { git = "https://github.com/sdroege/gst-plugin-rs", version = "0.12.0-alpha.1" } chrono = "0.4.31" -m3u8-rs = "5.0.4" isobmff = { git = "https://github.com/LMinJae/isobmff-rs", version = "0.1.0" } bytes = "1.5.0" once_cell = "1" diff --git a/gst-warp-sink/src/moqsink/imp.rs b/gst-warp-sink/src/moqsink/imp.rs index 3fa8f27..0e60c31 100644 --- a/gst-warp-sink/src/moqsink/imp.rs +++ b/gst-warp-sink/src/moqsink/imp.rs @@ -2,13 +2,13 @@ use gst::glib; use gst::glib::once_cell::sync::Lazy; use gst::prelude::*; use gst::subclass::prelude::*; -use gst_base::prelude::*; -use gst_base::subclass::prelude::*; + use std::sync::Mutex; use crate::relayurl::*; +use crate::RUNTIME; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use tokio::runtime::Runtime; use tokio::sync::mpsc; use moq_transport::cache::{broadcast, fragment, segment, track}; @@ -90,65 +90,128 @@ static CAT: Lazy = Lazy::new(|| { ) }); -#[derive(Default)] +#[derive(Default, object_subclass)] pub struct MoqSink { - state: Mutex, + state: Mutex>, url: Mutex>, settings: Mutex, } +impl Default for MoqSink { + fn default() -> Self { + MoqSink { + state: Mutex::new(None), + url: Mutex::new(None), + settings: Mutex::new(Settings::default()), + } + } +} + impl MoqSink { fn start(&self) -> Result<(), gst::ErrorMessage> { - let mut state = self.state.lock().unwrap(); - let settings = self.settings.lock().unwrap(); + let mut state = self.state.lock().map_err(|e| { + gst::error_msg!( + gst::ResourceError::Settings, + ["Failed to acquire state lock: {}", e] + ) + })?; if let State::Started { .. } = *state { unreachable!("Element already started"); } - let relay_url = { - let url = self.url.lock().unwrap(); - match *url { - Some(ref url) => url.clone(), - None => { - return Err(gst::error_msg!( - gst::ResourceError::Settings, - ["Cannot start without a URL being set"] - )); - } - } - }; - - // // Initialize shared state and channels - // let (sender, receiver) = mpsc::channel(32); - // // self.sender = Some(sender); - - // // Spawn a new thread to run the Tokio runtime - // std::thread::spawn(move || { - // // Create a new Tokio runtime - // let rt = Runtime::new().unwrap(); - - // // Block on the async code within the Tokio runtime - // rt.block_on(async move { - // // Set up your async tasks here - // let session_task = session.run(); - // let media_task = media.run(); - - // tokio::select! { - // res = session_task => { - // // Handle session result - // // You might want to send a message to the GStreamer element using the channel - // }, - // res = media_task => { - // // Handle media result - // // You might want to send a message to the GStreamer element using the channel - // }, - // msg = receiver.recv() => { - // // Handle messages sent from the GStreamer element to the Tokio runtime - // // These might be control commands or data to process - // } + // let relay_url = { + // let url = self.url.lock().unwrap(); + // match *url { + // Some(ref url) => url.clone(), + // None => { + // return Err(gst::error_msg!( + // gst::ResourceError::Settings, + // ["Cannot start without a URL being set"] + // )); // } - // }); + // } + // }; + let relay_url = self + .url + .lock() + .map_err(|e| { + gst::error_msg!( + gst::ResourceError::Settings, + ["Failed to acquire URL lock: {}", e] + ) + })? + .clone() + .ok_or_else(|| { + gst::error_msg!( + gst::ResourceError::Settings, + ["Cannot start without a URL being set"] + ) + })?; + + gst::trace!( + CAT, + imp: self, + "connecting to relay: url={}", + relay_url + ); + + // Initialize shared state and channels + let (sender, receiver) = mpsc::channel(32); + // self.sender = Some(sender); + + // Spawn a new thread to run the Moq server + RUNTIME.spawn(async move { + let tracer = tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::WARN) + .finish(); + tracing::subscriber::set_global_default(tracer).unwrap(); + + let (publisher, subscriber) = broadcast::new(""); + + // Create a list of acceptable root certificates. + let mut roots = rustls::RootCertStore::empty(); + + // Add the platform's native root certificates. + for cert in + rustls_native_certs::load_native_certs().context("could not load platform certs")? + { + roots + .add(&rustls::Certificate(cert.0)) + .context("failed to add root cert")?; + } + + let mut tls_config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(roots) + .with_no_client_auth(); + + // this one is important + tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; + + let arc_tls_config = std::sync::Arc::new(tls_config); + let quinn_client_config = quinn::ClientConfig::new(arc_tls_config); + + let mut endpoint = + quinn::Endpoint::client(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0))?; + endpoint.set_default_client_config(quinn_client_config); + + let session = webtransport_quinn::connect(&endpoint, &relay_url) + .await + .context("failed to create WebTransport session")?; + + let session = moq_transport::session::Client::publisher(session, subscriber) + .await + .context("failed to create MoQ Transport session")?; + + session.run().await.context("session error")?; + + Ok::<(), anyhow::Error>(()) + }); + + // Update the state to indicate the element has started + // *state = State::Started(StartedState { + // broadcast: publisher, // }); Ok(()) @@ -179,27 +242,28 @@ impl MoqSink { *url = Some(relay_url); Ok(()) } - Err(_) => Err(glib::Error::new( - gst::URIError::BadUri, - "Could not parse URI", - )), + Err(err) => Err(glib::Error::new(gst::URIError::BadUri, &err)), } } } -#[glib::object_subclass] impl ObjectSubclass for MoqSink { const NAME: &'static str = ELEMENT_CLASS_NAME; type Type = super::MoqSink; type ParentType = gst_base::BaseSink; type Interfaces = (gst::URIHandler,); + + type Instance; + + type Class; } +impl GstObjectImpl for WaylandDisplaySrc {} + impl ObjectImpl for MoqSink { fn constructed(&self) { self.parent_constructed(); - self.obj().set_sync(false); } @@ -208,7 +272,7 @@ impl ObjectImpl for MoqSink { vec![ glib::ParamSpecString::builder("host") .nick("Host") - .blurb("The host of the relay server to connect tom, this can be a web url") + .blurb("The host of the relay server to connect to, this can be a web url") .default_value(DEFAULT_ADDRESS) .build(), glib::ParamSpecInt::builder("port") @@ -339,4 +403,4 @@ impl BaseSinkImpl for MoqSink { fn start(&self) -> Result<(), gst::ErrorMessage> { self.start() } -} \ No newline at end of file +}