From 0791fe7e0699f886211d1317362f97be486dbef4 Mon Sep 17 00:00:00 2001 From: Wanjohi <71614375+wanjohiryan@users.noreply.github.com> Date: Fri, 15 Dec 2023 10:03:30 +0300 Subject: [PATCH] I had to refactor --- gst-warp-sink/Cargo.lock | 80 +++- gst-warp-sink/Cargo.toml | 7 +- gst-warp-sink/src/lib.rs | 1 + gst-warp-sink/src/moqsink/imp.rs | 510 ++------------------- gst-warp-sink/src/moqsink/imp.rs.txt | 632 +++++++++++++++++++++++++++ gst-warp-sink/src/moqsink/mod.rs | 10 +- gst-warp-sink/src/relayurl.rs | 2 +- 7 files changed, 763 insertions(+), 479 deletions(-) create mode 100644 gst-warp-sink/src/moqsink/imp.rs.txt diff --git a/gst-warp-sink/Cargo.lock b/gst-warp-sink/Cargo.lock index 6821391..b4b3975 100644 --- a/gst-warp-sink/Cargo.lock +++ b/gst-warp-sink/Cargo.lock @@ -37,6 +37,9 @@ name = "anyhow" version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +dependencies = [ + "backtrace", +] [[package]] name = "async-channel" @@ -1048,6 +1051,16 @@ version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" +[[package]] +name = "lock_api" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.20" @@ -1059,9 +1072,9 @@ dependencies = [ [[package]] name = "m3u8-rs" -version = "5.0.4" +version = "5.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d39af8845edca961e3286dcbafeb9e6407d3df6a616ef086847162d46f438d75" +checksum = "0c1d7ba86f7ea62f17f4310c55e93244619ddc7dadfc7e565de1967e4e41e6e7" dependencies = [ "chrono", "nom", @@ -1087,10 +1100,13 @@ dependencies = [ "m3u8-rs", "moq-transport", "mp4", + "once_cell", "quinn", + "rand", "rustls", "rustls-native-certs", "rustls-pemfile", + "tokio", "tracing", "tracing-subscriber", "url", @@ -1228,6 +1244,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object" version = "0.32.1" @@ -1270,6 +1296,29 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.48.5", +] + [[package]] name = "paste" version = "1.0.14" @@ -1482,6 +1531,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "ring" version = "0.16.20" @@ -1608,6 +1666,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "sct" version = "0.7.1" @@ -1690,6 +1754,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.9" @@ -1832,7 +1905,10 @@ dependencies = [ "bytes", "libc", "mio", + "num_cpus", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2 0.5.5", "tokio-macros", "windows-sys 0.48.0", diff --git a/gst-warp-sink/Cargo.toml b/gst-warp-sink/Cargo.toml index 2fb6b92..c10241e 100644 --- a/gst-warp-sink/Cargo.toml +++ b/gst-warp-sink/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -anyhow = "1" +anyhow = { version = "1", features = ["backtrace"] } gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } @@ -20,6 +20,10 @@ chrono = "0.4.31" m3u8-rs = "5.0.4" isobmff = { git = "https://github.com/LMinJae/isobmff-rs", version = "0.1.0" } bytes = "1.5.0" +once_cell = "1" + +# Async stuff +tokio = { version = "1", features = ["full"] } # QUIC quinn = "0.10" @@ -35,6 +39,7 @@ rustls-pemfile = "1" mp4 = "0.14.0" moq-transport = { git = "https://github.com/kixelated/moq-rs", version = "0.2.0" } +rand = "0.8.5" [lib] name = "gstmoq" diff --git a/gst-warp-sink/src/lib.rs b/gst-warp-sink/src/lib.rs index 28306e9..4eb4a7e 100644 --- a/gst-warp-sink/src/lib.rs +++ b/gst-warp-sink/src/lib.rs @@ -1,5 +1,6 @@ mod moqsink; mod relayurl; +use gst::glib; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { moqsink::register(plugin)?; diff --git a/gst-warp-sink/src/moqsink/imp.rs b/gst-warp-sink/src/moqsink/imp.rs index bc967e4..73bc11b 100644 --- a/gst-warp-sink/src/moqsink/imp.rs +++ b/gst-warp-sink/src/moqsink/imp.rs @@ -1,17 +1,19 @@ -use crate::relayurl::*; -use glib::subclass::prelude::*; +use gst::glib; +use gst::glib::once_cell::sync::Lazy; use gst::prelude::*; use gst::subclass::prelude::*; -use gst::ClockTime; -#[allow(unused_imports)] -use gst::{gst_debug, gst_error, gst_info, gst_log, gst_trace, gst_warning}; -use moq_transport::cache::{broadcast, fragment, segment, track}; -use moq_transport::VarInt; -use once_cell::sync::Lazy; -use std::convert::TryInto; +use gst_base::prelude::*; +use gst_base::subclass::prelude::*; use std::sync::Mutex; + +use crate::relayurl::*; + +use moq_transport::cache::{broadcast, fragment, segment, track}; use url::Url; +extern crate rand; +use rand::{distributions::Alphanumeric, Rng}; + pub const ELEMENT_NAME: &str = "MoqSink"; const ELEMENT_CLASS_NAME: &str = "MoqSink"; const ELEMENT_LONG_NAME: &str = "Media Over Quic Sink"; @@ -20,74 +22,10 @@ const ELEMENT_DESCRIPTION: &str = const ELEMENT_AUTHOR: &str = "Wanjohi Ryan "; const DEBUG_CATEGORY: &str = ELEMENT_NAME; -const ATOM_TYPE_FTYPE: u32 = 1718909296; -const ATOM_TYPE_MOOV: u32 = 1836019574; -const ATOM_TYPE_MOOF: u32 = 1836019558; -const ATOM_TYPE_MDAT: u32 = 1835295092; -const DEFAULT_PORT: u16 = 4443; - -#[derive(Debug)] -struct Mp4Atom { - pub atom_type: u32, - // Includes atom size and type. - pub atom_bytes: Vec, -} - -impl Mp4Atom { - pub fn len(&self) -> usize { - self.atom_bytes.len() - } -} - -#[derive(Debug)] -struct Mp4Parser { - buf: Vec, -} - -impl Mp4Parser { - pub fn new() -> Mp4Parser { - Mp4Parser { buf: Vec::new() } - } - - pub fn add(&mut self, buf: &[u8]) { - self.buf.extend_from_slice(buf); - } - - // Returns true if all or part of an MDAT body has been added. - pub fn have_mdat(&self) -> bool { - if self.buf.len() > 8 { - let atom_type = u32::from_be_bytes(self.buf[4..8].try_into().unwrap()); - atom_type == ATOM_TYPE_MDAT - } else { - false - } - } - - pub fn pop_atom(&mut self) -> Option { - if self.buf.len() >= 8 { - let atom_size = u32::from_be_bytes(self.buf[0..4].try_into().unwrap()) as usize; - let atom_type = u32::from_be_bytes(self.buf[4..8].try_into().unwrap()); - if self.buf.len() >= atom_size { - let mut atom_bytes = Vec::with_capacity(atom_size); - // TODO: Swap vectors? - atom_bytes.extend_from_slice(&self.buf[0..atom_size]); - assert_eq!(self.buf.len(), atom_size); - self.buf.clear(); - Some(Mp4Atom { - atom_type, - atom_bytes, - }) - } else { - None - } - } else { - None - } - } -} +const DEFAULT_NAME_LEN: usize = 10; // Length of the random string struct Settings { - host: Option, + host: String, port: Option, name: Option, } @@ -105,43 +43,30 @@ impl Settings { impl Default for Settings { fn default() -> Self { + //generate a random string on + let random_name: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(DEFAULT_NAME_LEN) + .map(char::from) + .collect(); + Settings { - host: None, - port: None, - name: None, + host: "localhost".to_owned(), + port: Some(4443), + name: Some(random_name), } } } #[derive(Debug)] struct StartedState { - // // We hold on to publisher so we don't close then while media is still being published. - // broadcast: broadcast::Publisher, - // //parses the mp4 boxes - // mp4_parser: Mp4Parser, - // // Atoms in init sequence that must be repeated at each key frame. - // ftype_atom: Option, - // moov_atom: Option, - // // These atoms that must be buffered and pushed as a single buffer. - // moof_atom: Option, - // // Below members that track current fragment (moof, mdat). - // /// Minimum PTS in fragment. - // fragment_pts: ClockTime, - // /// Minimum DTS in fragment. - // fragment_dts: ClockTime, - // /// Maximum PTS + duration in fragment. - // fragment_max_pts_plus_duration: ClockTime, - // /// Minimum offset in fragment. - // fragment_offset: Option, - // /// Maximum offset_end in fragment. - // fragment_offset_end: Option, - // fragment_buffer_flags: gst::BufferFlags, - // We hold on to publisher so we don't close then while media is still being published. broadcast: broadcast::Publisher, } impl StartedState { - + pub fn new(broadcast: broadcast::Publisher) -> StartedState { + StartedState { broadcast } + } } #[derive(Default)] @@ -149,33 +74,7 @@ enum State { #[default] Stopped, Completed, - Started(StartedState) -} - -// impl Default for State { -// fn default() -> State { -// State::Started { -// state: StartedState { -// mp4_parser: Mp4Parser::new(), -// broadcast: None, -// ftype_atom: None, -// moov_atom: None, -// moof_atom: None, -// fragment_pts: ClockTime::none(), -// fragment_dts: ClockTime::none(), -// fragment_max_pts_plus_duration: ClockTime::none(), -// fragment_offset: None, -// fragment_offset_end: None, -// fragment_buffer_flags: gst::BufferFlags::DELTA_UNIT, -// }, -// } -// } -// } - -pub struct MoqSink { - state: Mutex, - url: Mutex>, - settings: Mutex, + Started(StartedState), } static CAT: Lazy = Lazy::new(|| { @@ -186,273 +85,14 @@ static CAT: Lazy = Lazy::new(|| { ) }); -// impl MoqSink { -// fn sink_chain( -// &self, -// pad: &gst::Pad, -// element: &super::MoqSink, -// buffer: gst::Buffer, -// ) -> Result { -// gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer); +#[derive(Default)] +pub struct MoqSink { + state: Mutex, + url: Mutex>, + settings: Mutex, +} -// let mut state = self.state.lock().unwrap(); - -// let state = match *state { -// State::Started { ref mut state, .. } => state, -// }; - -// let map = buffer.map_readable().map_err(|_| { -// gst::element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]); -// gst::FlowError::Error -// })?; -// let input_buf = map.as_ref(); - -// state.mp4_parser.add(input_buf); - -// // Update cummulative fragment variables. -// // Buffer PTS, etc. are only valid if this buffer contains MDAT data. -// if state.mp4_parser.have_mdat() { -// assert!(buffer.pts().is_some()); -// if state.fragment_pts.is_none() || state.fragment_pts > buffer.pts() { -// state.fragment_pts = buffer.pts(); -// } -// if state.fragment_dts.is_none() || state.fragment_dts > buffer.dts() { -// state.fragment_dts = buffer.dts(); -// } -// let pts_plus_duration = buffer.pts() + buffer.duration(); -// if state.fragment_max_pts_plus_duration.is_none() -// || state.fragment_max_pts_plus_duration < pts_plus_duration -// { -// state.fragment_max_pts_plus_duration = pts_plus_duration; -// } -// if buffer.offset() != gst::BUFFER_OFFSET_NONE -// && (state.fragment_offset.is_none() -// || state.fragment_offset.unwrap() > buffer.offset()) -// { -// state.fragment_offset = Some(buffer.offset()); -// } -// if buffer.offset_end() != gst::BUFFER_OFFSET_NONE -// && (state.fragment_offset_end.is_none() -// || state.fragment_offset_end.unwrap() < buffer.offset_end()) -// { -// state.fragment_offset_end = Some(buffer.offset_end()); -// } -// if state -// .fragment_buffer_flags -// .contains(gst::BufferFlags::DELTA_UNIT) -// && !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) -// { -// state -// .fragment_buffer_flags -// .remove(gst::BufferFlags::DELTA_UNIT); -// } -// if buffer.flags().contains(gst::BufferFlags::DISCONT) { -// state -// .fragment_buffer_flags -// .insert(gst::BufferFlags::DISCONT); -// } -// gst_trace!(CAT, obj: pad, "Updated state={:?}", state); -// } - -// loop { -// match state.mp4_parser.pop_atom() { -// Some(atom) => { -// gst_log!(CAT, obj: pad, "atom_size={}, atom_type={}", atom.len(), atom.atom_type); -// match atom.atom_type { -// ATOM_TYPE_FTYPE => { -// state.ftype_atom = Some(atom); -// gst_log!(CAT, obj: pad, "ftype_atom={:?}", state.ftype_atom); -// } -// ATOM_TYPE_MOOV => { -// state.moov_atom = Some(atom); -// gst_log!(CAT, obj: pad, "moov_atom={:?}", state.moov_atom); -// } -// ATOM_TYPE_MOOF => { -// state.moof_atom = Some(atom); -// gst_log!(CAT, obj: pad, "moof_atom={:?}", state.moof_atom); -// } -// ATOM_TYPE_MDAT => { -// let mdat_atom = atom; -// match ( -// state.ftype_atom.as_ref(), -// state.moov_atom.as_ref(), -// state.moof_atom.as_ref(), -// ) { -// (Some(ftype_atom), Some(moov_atom), Some(moof_atom)) => { -// let include_header = !state -// .fragment_buffer_flags -// .contains(gst::BufferFlags::DELTA_UNIT); -// let header_len = if include_header { -// ftype_atom.len() + moov_atom.len() -// } else { -// 0 -// }; -// let output_buf_len = -// header_len + moof_atom.len() + mdat_atom.len(); - -// gst_log!(CAT, obj: pad, "Pushing buffer; include_header={}, ftype.len={}, moov.len={}, moof.len={}, mdat.len={}", -// include_header, ftype_atom.len(), moov_atom.len(), moof_atom.len(), mdat_atom.len()); - -// let mut gst_buffer = -// gst::Buffer::with_size(output_buf_len).unwrap(); -// { -// let buffer_ref = gst_buffer.get_mut().unwrap(); -// buffer_ref.set_pts(state.fragment_pts); -// buffer_ref.set_dts(state.fragment_dts); -// let duration = state.fragment_max_pts_plus_duration -// - state.fragment_pts; -// buffer_ref.set_duration(duration); -// buffer_ref.set_offset( -// state -// .fragment_offset -// .unwrap_or(gst::BUFFER_OFFSET_NONE), -// ); -// buffer_ref.set_offset_end( -// state -// .fragment_offset_end -// .unwrap_or(gst::BUFFER_OFFSET_NONE), -// ); -// buffer_ref.set_flags(state.fragment_buffer_flags); -// let mut buffer_map = buffer_ref.map_writable().unwrap(); -// let slice = buffer_map.as_mut_slice(); -// let mut pos = 0; -// if include_header { -// slice[pos..pos + ftype_atom.len()] -// .copy_from_slice(&ftype_atom.atom_bytes); -// pos += ftype_atom.len(); -// slice[pos..pos + moov_atom.len()] -// .copy_from_slice(&moov_atom.atom_bytes); -// pos += moov_atom.len(); -// } -// slice[pos..pos + moof_atom.len()] -// .copy_from_slice(&moof_atom.atom_bytes); -// pos += moof_atom.len(); -// slice[pos..pos + mdat_atom.len()] -// .copy_from_slice(&mdat_atom.atom_bytes); -// pos += mdat_atom.len(); -// assert_eq!(pos, output_buf_len); - -// //FIXME: Work on the Json here, instead of redoing it in a new method. -// } -// // Clear fragment variables. -// state.fragment_pts = ClockTime::none(); -// state.fragment_dts = ClockTime::none(); -// state.fragment_max_pts_plus_duration = ClockTime::none(); -// state.fragment_offset = None; -// state.fragment_offset_end = None; -// state.fragment_buffer_flags = gst::BufferFlags::DELTA_UNIT; -// // Push new buffer. -// gst_log!(CAT, obj: pad, "Pushing buffer {:?}", gst_buffer); -// // let _ = self.srcpad.push(gst_buffer)?; -// // self.send_to_network(gst_buffer)?; -// } -// _ => { -// gst_warning!(CAT, obj: pad, "Received mdat without ftype, moov, or moof"); -// } -// } -// } -// _ => { -// gst_warning!(CAT, obj: pad, "Unknown atom type {:?}", atom); -// } -// } -// } -// None => break, -// } -// } -// gst_trace!(CAT, obj: element, "sink_chain: END: state={:?}", state); -// Ok(gst::FlowSuccess::Ok) -// } - -// fn sink_event(&self, _pad: &gst::Pad, _element: &super::MoqSink, event: gst::Event) -> bool { -// self.srcpad.push_event(event) -// } - -// fn sink_query( -// &self, -// _pad: &gst::Pad, -// _element: &super::MoqSink, -// query: &mut gst::QueryRef, -// ) -> bool { -// self.srcpad.peer_query(query) -// } - -// fn send_to_network(&self, buffer: gst::Buffer) -> Result<(), Box> { -// //Let this be our publisher -// Ok(()) -// } -// } impl MoqSink { - fn start(&self) -> Result<(), gst::ErrorMessage> { - let mut state = self.state.lock().unwrap(); - let settings = self.settings.lock().unwrap(); - - if let State::Started { .. } = *state { - unreachable!("Element already started"); - } - - let relay_url = { - let url = self.url.lock().unwrap(); - match *url { - Some(ref url) => url.clone(), - None => { - return Err(gst::error_msg!( - gst::ResourceError::Settings, - ["Cannot start without a URL being set"] - )); - } - } - }; - - // Disable tracing so we don't get a bunch of Quinn spam. - let tracer = tracing_subscriber::FmtSubscriber::builder() - .with_max_level(tracing::Level::WARN) - .finish(); - tracing::subscriber::set_global_default(tracer).unwrap(); - - // Create a list of acceptable root certificates. - let mut roots = rustls::RootCertStore::empty(); - - // Add the platform's native root certificates. - for cert in - rustls_native_certs::load_native_certs().context("could not load platform certs")? - { - roots - .add(&rustls::Certificate(cert.0)) - .context("failed to add root cert")?; - } - - let mut tls_config = rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(roots) - .with_no_client_auth(); - - tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // this one is important - - let arc_tls_config = std::sync::Arc::new(tls_config); - let quinn_client_config = quinn::ClientConfig::new(arc_tls_config); - - let mut endpoint = quinn::Endpoint::client("[::]:0")?; - endpoint.set_default_client_config(quinn_client_config); - - gst::debug!(CAT,imp: self,"connecting to relay: url={:?}",relay_url); - - let session = webtransport_quinn::connect(&endpoint, &relay_url) - .await - .context("failed to create WebTransport session")?; - - let (mut publisher, subscriber) = broadcast::new(""); - - let session = moq_transport::session::Client::publisher(session, subscriber) - .await - .context("failed to create MoQ Transport session")?; - - // let publisher = publisher - // .create_track(&config.track) - // .context("failed to create clock track")?; - - Ok(()) - } - fn set_uri(self: &MoqSink, url_str: Option<&str>) -> Result<(), glib::Error> { let state = self.state.lock().unwrap(); @@ -492,88 +132,15 @@ impl ObjectSubclass for MoqSink { type Type = super::MoqSink; type ParentType = gst_base::BaseSink; - type Interfaces = (gst::URIHandler); - - type Instance; - - type Class; + type Interfaces = (gst::URIHandler,); } impl ObjectImpl for MoqSink { - fn constructed(&self, obj: &Self::Type) { + fn constructed(&self) { self.parent_constructed(); + self.obj().set_sync(false); } - - fn properties() -> &'static [glib::ParamSpec] { - static PROPERTIES: Lazy> = Lazy::new(|| { - vec![ - glib::ParamSpecString::builder("host") - .nick("Url Host") - .blurb("The host of the relay server to connect to, e.g. example.com") - .mutable_ready() - .build(), - glib::ParamSpecString::builder("port") - .nick("Url Port") - .blurb("The port of the relay server to connect to, most probably this is a 4443") - .default_value(DEFAULT_PORT) - .build(), - glib::ParamSpecString::builder("name") - .nick("Url Name") - .blurb("This is a very long random string to identify your stream on the relay server") - .mutable_ready() - .build(), - ] - }); - PROPERTIES.as_ref() - } - - fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { - let mut settings = self.settings.lock().unwrap(); - - gst::debug!( - CAT, - imp: self, - "Setting property '{}' to '{:?}'", - pspec.name(), - value - ); - - match pspec.name() { - "host" => { - settings.host = value.get::().expect("type checked upstream"); - if settings.port.is_some() && settings.name.is_some() { - let _ = self.set_uri(Some(&settings.to_uri())); - } - } - "port" => { - settings.port = value.get::>().expect("type checked upstream"); - if settings.host.is_some() && settings.name.is_some() { - let _ = self.set_uri(Some(&settings.to_uri())); - } - } - "name" => { - settings.name = value - .get::>() - .expect("type checked upstream"); - if settings.host.is_some() && settings.port.is_some() { - let _ = self.set_uri(Some(&settings.to_uri())); - } - } - _ => unimplemented!(), - } - } - - fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { - let settings = self.settings.lock().unwrap(); - - match pspec.name() { - "host" => settings.host.to_value(), - "name" => settings.name.to_value(), - "port" => settings.port.to_value(), - _ => unimplemented!(), - } - } } impl GstObjectImpl for MoqSink {} @@ -601,8 +168,10 @@ impl ElementImpl for MoqSink { &caps, ) .unwrap(); + vec![sink_pad_template] }); + PAD_TEMPLATES.as_ref() } } @@ -622,3 +191,6 @@ impl URIHandlerImpl for MoqSink { self.set_uri(Some(uri)) } } + + +impl BaseSinkImpl for MoqSink {} \ No newline at end of file diff --git a/gst-warp-sink/src/moqsink/imp.rs.txt b/gst-warp-sink/src/moqsink/imp.rs.txt new file mode 100644 index 0000000..f051bf4 --- /dev/null +++ b/gst-warp-sink/src/moqsink/imp.rs.txt @@ -0,0 +1,632 @@ +use crate::relayurl::*; +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst_base::prelude::*; +use gst_base::subclass::prelude::*; +use gst::glib::once_cell::sync::Lazy; + +use moq_transport::cache::{broadcast, fragment, segment, track}; +use moq_transport::VarInt; +use std::convert::TryInto; +use std::sync::{Arc, Mutex}; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; +use url::Url; + +pub const ELEMENT_NAME: &str = "MoqSink"; +const ELEMENT_CLASS_NAME: &str = "MoqSink"; +const ELEMENT_LONG_NAME: &str = "Media Over Quic Sink"; +const ELEMENT_DESCRIPTION: &str = + "This element accepts fragmented MP4 input from mp4mux and publishes them to a Moq-Relay."; +const ELEMENT_AUTHOR: &str = "Wanjohi Ryan "; +const DEBUG_CATEGORY: &str = ELEMENT_NAME; + +const ATOM_TYPE_FTYPE: u32 = 1718909296; +const ATOM_TYPE_MOOV: u32 = 1836019574; +const ATOM_TYPE_MOOF: u32 = 1836019558; +const ATOM_TYPE_MDAT: u32 = 1835295092; +const DEFAULT_PORT: u16 = 4443; + +#[derive(Debug)] +struct Mp4Atom { + pub atom_type: u32, + // Includes atom size and type. + pub atom_bytes: Vec, +} + +impl Mp4Atom { + pub fn len(&self) -> usize { + self.atom_bytes.len() + } +} + +#[derive(Debug)] +struct Mp4Parser { + buf: Vec, +} + +impl Mp4Parser { + pub fn new() -> Mp4Parser { + Mp4Parser { buf: Vec::new() } + } + + pub fn add(&mut self, buf: &[u8]) { + self.buf.extend_from_slice(buf); + } + + // Returns true if all or part of an MDAT body has been added. + pub fn have_mdat(&self) -> bool { + if self.buf.len() > 8 { + let atom_type = u32::from_be_bytes(self.buf[4..8].try_into().unwrap()); + atom_type == ATOM_TYPE_MDAT + } else { + false + } + } + + pub fn pop_atom(&mut self) -> Option { + if self.buf.len() >= 8 { + let atom_size = u32::from_be_bytes(self.buf[0..4].try_into().unwrap()) as usize; + let atom_type = u32::from_be_bytes(self.buf[4..8].try_into().unwrap()); + if self.buf.len() >= atom_size { + let mut atom_bytes = Vec::with_capacity(atom_size); + // TODO: Swap vectors? + atom_bytes.extend_from_slice(&self.buf[0..atom_size]); + assert_eq!(self.buf.len(), atom_size); + self.buf.clear(); + Some(Mp4Atom { + atom_type, + atom_bytes, + }) + } else { + None + } + } else { + None + } + } +} + +struct Settings { + host: Option, + port: Option, + name: Option, +} + +impl Settings { + fn to_uri(&self) -> String { + RelayUrl { + host: self.host.clone(), + port: self.port.clone().unwrap(), + name: self.name.clone().unwrap(), + } + .to_string() + } +} + +impl Default for Settings { + fn default() -> Self { + Settings { + host: None, + port: None, + name: None, + } + } +} + +#[derive(Debug)] +struct StartedState { + // // We hold on to publisher so we don't close then while media is still being published. + // broadcast: broadcast::Publisher, + // //parses the mp4 boxes + // mp4_parser: Mp4Parser, + // // Atoms in init sequence that must be repeated at each key frame. + // ftype_atom: Option, + // moov_atom: Option, + // // These atoms that must be buffered and pushed as a single buffer. + // moof_atom: Option, + // // Below members that track current fragment (moof, mdat). + // /// Minimum PTS in fragment. + // fragment_pts: ClockTime, + // /// Minimum DTS in fragment. + // fragment_dts: ClockTime, + // /// Maximum PTS + duration in fragment. + // fragment_max_pts_plus_duration: ClockTime, + // /// Minimum offset in fragment. + // fragment_offset: Option, + // /// Maximum offset_end in fragment. + // fragment_offset_end: Option, + // fragment_buffer_flags: gst::BufferFlags, + // We hold on to publisher so we don't close then while media is still being published. + broadcast: broadcast::Publisher, +} + +impl StartedState { + pub fn new(broadcast: broadcast::Publisher) -> StartedState { + StartedState { broadcast } + } +} + +#[derive(Default)] +enum State { + #[default] + Stopped, + Completed, + Started(StartedState), +} + +// impl Default for State { +// fn default() -> State { +// State::Started { +// state: StartedState { +// mp4_parser: Mp4Parser::new(), +// broadcast: None, +// ftype_atom: None, +// moov_atom: None, +// moof_atom: None, +// fragment_pts: ClockTime::none(), +// fragment_dts: ClockTime::none(), +// fragment_max_pts_plus_duration: ClockTime::none(), +// fragment_offset: None, +// fragment_offset_end: None, +// fragment_buffer_flags: gst::BufferFlags::DELTA_UNIT, +// }, +// } +// } +// } + +pub struct MoqSink { + state: Mutex, + url: Mutex>, + settings: Mutex, +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + DEBUG_CATEGORY, + gst::DebugColorFlags::empty(), + Some(ELEMENT_LONG_NAME), + ) +}); + +// impl MoqSink { +// fn sink_chain( +// &self, +// pad: &gst::Pad, +// element: &super::MoqSink, +// buffer: gst::Buffer, +// ) -> Result { +// gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer); + +// let mut state = self.state.lock().unwrap(); + +// let state = match *state { +// State::Started { ref mut state, .. } => state, +// }; + +// let map = buffer.map_readable().map_err(|_| { +// gst::element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]); +// gst::FlowError::Error +// })?; +// let input_buf = map.as_ref(); + +// state.mp4_parser.add(input_buf); + +// // Update cummulative fragment variables. +// // Buffer PTS, etc. are only valid if this buffer contains MDAT data. +// if state.mp4_parser.have_mdat() { +// assert!(buffer.pts().is_some()); +// if state.fragment_pts.is_none() || state.fragment_pts > buffer.pts() { +// state.fragment_pts = buffer.pts(); +// } +// if state.fragment_dts.is_none() || state.fragment_dts > buffer.dts() { +// state.fragment_dts = buffer.dts(); +// } +// let pts_plus_duration = buffer.pts() + buffer.duration(); +// if state.fragment_max_pts_plus_duration.is_none() +// || state.fragment_max_pts_plus_duration < pts_plus_duration +// { +// state.fragment_max_pts_plus_duration = pts_plus_duration; +// } +// if buffer.offset() != gst::BUFFER_OFFSET_NONE +// && (state.fragment_offset.is_none() +// || state.fragment_offset.unwrap() > buffer.offset()) +// { +// state.fragment_offset = Some(buffer.offset()); +// } +// if buffer.offset_end() != gst::BUFFER_OFFSET_NONE +// && (state.fragment_offset_end.is_none() +// || state.fragment_offset_end.unwrap() < buffer.offset_end()) +// { +// state.fragment_offset_end = Some(buffer.offset_end()); +// } +// if state +// .fragment_buffer_flags +// .contains(gst::BufferFlags::DELTA_UNIT) +// && !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) +// { +// state +// .fragment_buffer_flags +// .remove(gst::BufferFlags::DELTA_UNIT); +// } +// if buffer.flags().contains(gst::BufferFlags::DISCONT) { +// state +// .fragment_buffer_flags +// .insert(gst::BufferFlags::DISCONT); +// } +// gst_trace!(CAT, obj: pad, "Updated state={:?}", state); +// } + +// loop { +// match state.mp4_parser.pop_atom() { +// Some(atom) => { +// gst_log!(CAT, obj: pad, "atom_size={}, atom_type={}", atom.len(), atom.atom_type); +// match atom.atom_type { +// ATOM_TYPE_FTYPE => { +// state.ftype_atom = Some(atom); +// gst_log!(CAT, obj: pad, "ftype_atom={:?}", state.ftype_atom); +// } +// ATOM_TYPE_MOOV => { +// state.moov_atom = Some(atom); +// gst_log!(CAT, obj: pad, "moov_atom={:?}", state.moov_atom); +// } +// ATOM_TYPE_MOOF => { +// state.moof_atom = Some(atom); +// gst_log!(CAT, obj: pad, "moof_atom={:?}", state.moof_atom); +// } +// ATOM_TYPE_MDAT => { +// let mdat_atom = atom; +// match ( +// state.ftype_atom.as_ref(), +// state.moov_atom.as_ref(), +// state.moof_atom.as_ref(), +// ) { +// (Some(ftype_atom), Some(moov_atom), Some(moof_atom)) => { +// let include_header = !state +// .fragment_buffer_flags +// .contains(gst::BufferFlags::DELTA_UNIT); +// let header_len = if include_header { +// ftype_atom.len() + moov_atom.len() +// } else { +// 0 +// }; +// let output_buf_len = +// header_len + moof_atom.len() + mdat_atom.len(); + +// gst_log!(CAT, obj: pad, "Pushing buffer; include_header={}, ftype.len={}, moov.len={}, moof.len={}, mdat.len={}", +// include_header, ftype_atom.len(), moov_atom.len(), moof_atom.len(), mdat_atom.len()); + +// let mut gst_buffer = +// gst::Buffer::with_size(output_buf_len).unwrap(); +// { +// let buffer_ref = gst_buffer.get_mut().unwrap(); +// buffer_ref.set_pts(state.fragment_pts); +// buffer_ref.set_dts(state.fragment_dts); +// let duration = state.fragment_max_pts_plus_duration +// - state.fragment_pts; +// buffer_ref.set_duration(duration); +// buffer_ref.set_offset( +// state +// .fragment_offset +// .unwrap_or(gst::BUFFER_OFFSET_NONE), +// ); +// buffer_ref.set_offset_end( +// state +// .fragment_offset_end +// .unwrap_or(gst::BUFFER_OFFSET_NONE), +// ); +// buffer_ref.set_flags(state.fragment_buffer_flags); +// let mut buffer_map = buffer_ref.map_writable().unwrap(); +// let slice = buffer_map.as_mut_slice(); +// let mut pos = 0; +// if include_header { +// slice[pos..pos + ftype_atom.len()] +// .copy_from_slice(&ftype_atom.atom_bytes); +// pos += ftype_atom.len(); +// slice[pos..pos + moov_atom.len()] +// .copy_from_slice(&moov_atom.atom_bytes); +// pos += moov_atom.len(); +// } +// slice[pos..pos + moof_atom.len()] +// .copy_from_slice(&moof_atom.atom_bytes); +// pos += moof_atom.len(); +// slice[pos..pos + mdat_atom.len()] +// .copy_from_slice(&mdat_atom.atom_bytes); +// pos += mdat_atom.len(); +// assert_eq!(pos, output_buf_len); + +// //FIXME: Work on the Json here, instead of redoing it in a new method. +// } +// // Clear fragment variables. +// state.fragment_pts = ClockTime::none(); +// state.fragment_dts = ClockTime::none(); +// state.fragment_max_pts_plus_duration = ClockTime::none(); +// state.fragment_offset = None; +// state.fragment_offset_end = None; +// state.fragment_buffer_flags = gst::BufferFlags::DELTA_UNIT; +// // Push new buffer. +// gst_log!(CAT, obj: pad, "Pushing buffer {:?}", gst_buffer); +// // let _ = self.srcpad.push(gst_buffer)?; +// // self.send_to_network(gst_buffer)?; +// } +// _ => { +// gst_warning!(CAT, obj: pad, "Received mdat without ftype, moov, or moof"); +// } +// } +// } +// _ => { +// gst_warning!(CAT, obj: pad, "Unknown atom type {:?}", atom); +// } +// } +// } +// None => break, +// } +// } +// gst_trace!(CAT, obj: element, "sink_chain: END: state={:?}", state); +// Ok(gst::FlowSuccess::Ok) +// } + +// fn sink_event(&self, _pad: &gst::Pad, _element: &super::MoqSink, event: gst::Event) -> bool { +// self.srcpad.push_event(event) +// } + +// fn sink_query( +// &self, +// _pad: &gst::Pad, +// _element: &super::MoqSink, +// query: &mut gst::QueryRef, +// ) -> bool { +// self.srcpad.peer_query(query) +// } + +// fn send_to_network(&self, buffer: gst::Buffer) -> Result<(), Box> { +// //Let this be our publisher +// Ok(()) +// } +// } +impl MoqSink { + fn start(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + if let State::Started { .. } = *state { + unreachable!("Element already started"); + } + + let relay_url = { + let url = self.url.lock().unwrap(); + match *url { + Some(ref url) => url.clone(), + None => { + return Err(gst::error_msg!( + gst::ResourceError::Settings, + ["Cannot start without a URL being set"] + )); + } + } + }; + + // Disable tracing so we don't get a bunch of Quinn spam. + let tracer = tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::WARN) + .finish(); + tracing::subscriber::set_global_default(tracer).unwrap(); + + // Create a list of acceptable root certificates. + let mut roots = rustls::RootCertStore::empty(); + + // Add the platform's native root certificates. + for cert in + rustls_native_certs::load_native_certs().context("could not load platform certs")? + { + roots + .add(&rustls::Certificate(cert.0)) + .context("failed to add root cert")?; + } + + let mut tls_config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(roots) + .with_no_client_auth(); + + tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // this one is important + + let arc_tls_config = std::sync::Arc::new(tls_config); + let quinn_client_config = quinn::ClientConfig::new(arc_tls_config); + + let mut endpoint = quinn::Endpoint::client("[::]:0")?; + endpoint.set_default_client_config(quinn_client_config); + + gst::debug!(CAT,imp: self,"connecting to relay: url={:?}",relay_url); + + let session = webtransport_quinn::connect(&endpoint, &relay_url) + .await + .context("failed to create WebTransport session")?; + + let (mut publisher, subscriber) = broadcast::new(""); + + let session = moq_transport::session::Client::publisher(session, subscriber) + .await + .context("failed to create MoQ Transport session")?; + + // let publisher = publisher + // .create_track(&config.track) + // .context("failed to create clock track")?; + *state = State::Started(StartedState::new(broadcast)); + + Ok({ session }) + } + + fn set_uri(self: &MoqSink, url_str: Option<&str>) -> Result<(), glib::Error> { + let state = self.state.lock().unwrap(); + + if let State::Started { .. } = *state { + return Err(glib::Error::new( + gst::URIError::BadState, + "Cannot set URI on a started MoqSink", + )); + } + + let mut url = self.url.lock().unwrap(); + + if url_str.is_none() { + *url = None; + return Ok(()); + } + + gst::debug!(CAT, imp: self, "Setting uri to {:?}", url_str); + + let url_str = url_str.unwrap(); + match parse_relay_url(url_str) { + Ok(relayUrl) => { + *url = Some(relayUrl); + Ok(()) + } + Err(_) => Err(glib::Error::new( + gst::URIError::BadUri, + "Could not parse URI", + )), + } + } +} + +#[glib::object_subclass] +impl ObjectSubclass for MoqSink { + const NAME: &'static str = ELEMENT_CLASS_NAME; + type Type = super::MoqSink; + type ParentType = gst_base::BaseSink; + + type Interfaces = (gst::URIHandler); + +} + +impl ObjectImpl for MoqSink { + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(); + self.obj().set_sync(false); + } + + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecString::builder("host") + .nick("Url Host") + .blurb("The host of the relay server to connect to, e.g. example.com") + .mutable_ready() + .build(), + glib::ParamSpecString::builder("port") + .nick("Url Port") + .blurb("The port of the relay server to connect to, most probably this is a 4443") + .default_value(DEFAULT_PORT) + .build(), + glib::ParamSpecString::builder("name") + .nick("Url Name") + .blurb("This is a very long random string to identify your stream on the relay server") + .mutable_ready() + .build(), + ] + }); + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let mut settings = self.settings.lock().unwrap(); + + gst::debug!( + CAT, + imp: self, + "Setting property '{}' to '{:?}'", + pspec.name(), + value + ); + + match pspec.name() { + "host" => { + settings.host = value.get::>().expect("type checked upstream"); + if settings.port.is_some() && settings.name.is_some() { + let _ = self.set_uri(Some(&settings.to_uri())); + } + } + "port" => { + settings.port = value.get::>().expect("type checked upstream"); + if settings.host.is_some() && settings.name.is_some() { + let _ = self.set_uri(Some(&settings.to_uri())); + } + } + "name" => { + settings.name = value + .get::>() + .expect("type checked upstream"); + if settings.host.is_some() && settings.port.is_some() { + let _ = self.set_uri(Some(&settings.to_uri())); + } + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + + match pspec.name() { + "host" => settings.host.to_value(), + "name" => settings.name.to_value(), + "port" => settings.port.to_value(), + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for MoqSink {} + +impl ElementImpl for MoqSink { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + ELEMENT_LONG_NAME, + "Generic", + ELEMENT_DESCRIPTION, + ELEMENT_AUTHOR, + ) + }); + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst::Caps::new_any(); + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + vec![sink_pad_template] + }); + PAD_TEMPLATES.as_ref() + } +} + +impl URIHandlerImpl for MoqSink { + const URI_TYPE: gst::URIType = gst::URIType::Sink; + + fn protocols() -> &'static [&'static str] { + &["https"] + } + + fn uri(&self) -> Option { + self.url.lock().unwrap().as_ref().map(|s| s.to_string()) + } + + fn set_uri(&self, uri: &str) -> Result<(), glib::Error> { + self.set_uri(Some(uri)) + } +} + +impl BaseSinkImpl for MoqSink { + fn start(&self) -> Result<(), gst::ErrorMessage> { + self.start() + } +} diff --git a/gst-warp-sink/src/moqsink/mod.rs b/gst-warp-sink/src/moqsink/mod.rs index 7c2195f..e181743 100644 --- a/gst-warp-sink/src/moqsink/mod.rs +++ b/gst-warp-sink/src/moqsink/mod.rs @@ -1,19 +1,17 @@ -use glib::prelude::*; +use gst::glib; +use gst::prelude::*; mod imp; glib::wrapper! { - pub struct MoqSink(ObjectSubclass) @extends gst::Element, gst::Object; + pub struct MoqSink(ObjectSubclass) @extends gst_base::BaseSink, gst::Element, gst::Object; } -unsafe impl Send for MoqSink {} -unsafe impl Sync for MoqSink {} - pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { gst::Element::register( Some(plugin), imp::ELEMENT_NAME, - gst::Rank::None, + gst::Rank::PRIMARY, MoqSink::static_type(), ) } \ No newline at end of file diff --git a/gst-warp-sink/src/relayurl.rs b/gst-warp-sink/src/relayurl.rs index c6b6498..e0e63e8 100644 --- a/gst-warp-sink/src/relayurl.rs +++ b/gst-warp-sink/src/relayurl.rs @@ -13,7 +13,7 @@ impl ToString for RelayUrl { } pub fn parse_relay_url(url_str: &str) -> Result { - let url = Url::try_from(s).map_err(|e| e.to_string())?; + let url = Url::try_from(url_str).map_err(|e| e.to_string())?; //TODO: I know this is redundant, but this might come in handy in the future // Make sure the scheme is moq