diff --git a/gst-warp-sink/src/moqsink/imp.rs b/gst-warp-sink/src/moqsink/imp.rs index 7737ba3..11a08ef 100644 --- a/gst-warp-sink/src/moqsink/imp.rs +++ b/gst-warp-sink/src/moqsink/imp.rs @@ -29,6 +29,113 @@ const DEFAULT_NAME_LEN: usize = 10; // Length of the random string const DEFAULT_PORT: i32 = 4443; //default port for relay server const DEFAULT_ADDRESS: Option<&str> = Some("localhost"); //default host is localhost +const ATOM_TYPE_FTYPE: u32 = 1718909296; +const ATOM_TYPE_MOOV: u32 = 1836019574; +const ATOM_TYPE_MOOF: u32 = 1836019558; +const ATOM_TYPE_MDAT: u32 = 1835295092; + +#[derive(Debug)] +struct Mp4Atom { + pub atom_type: u32, + // Includes atom size and type. + pub atom_bytes: Vec, +} + +impl Mp4Atom { + pub fn len(&self) -> usize { + self.atom_bytes.len() + } +} + +#[derive(Debug)] +struct Mp4Parser { + buf: Vec, +} + +impl Mp4Parser { + pub fn new() -> Mp4Parser { + Mp4Parser { buf: Vec::new() } + } + + pub fn add(&mut self, buf: &[u8]) { + self.buf.extend_from_slice(buf); + } + + // Returns true if all or part of an MDAT body has been added. + pub fn have_mdat(&self) -> bool { + if self.buf.len() > 8 { + let atom_type = u32::from_be_bytes(self.buf[4..8].try_into().unwrap()); + atom_type == ATOM_TYPE_MDAT + } else { + false + } + } + + pub fn pop_atom(&mut self) -> Option { + if self.buf.len() >= 8 { + let atom_size = u32::from_be_bytes(self.buf[0..4].try_into().unwrap()) as usize; + let atom_type = u32::from_be_bytes(self.buf[4..8].try_into().unwrap()); + if self.buf.len() >= atom_size { + let mut atom_bytes = Vec::with_capacity(atom_size); + // TODO: Swap vectors? + atom_bytes.extend_from_slice(&self.buf[0..atom_size]); + assert_eq!(self.buf.len(), atom_size); + self.buf.clear(); + Some(Mp4Atom { + atom_type, + atom_bytes, + }) + } else { + None + } + } else { + None + } + } +} + +#[derive(Debug)] +struct Started { + broadcast: broadcast::Publisher, + + mp4_parser: Mp4Parser, + // Atoms in init sequence that must be repeated at each key frame. + ftype_atom: Option, + moov_atom: Option, + // These atoms that must be buffered and pushed as a single buffer. + moof_atom: Option, + // Below members that track current fragment (moof, mdat). + /// Minimum PTS in fragment. + fragment_pts: ClockTime, + /// Minimum DTS in fragment. + fragment_dts: ClockTime, + /// Maximum PTS + duration in fragment. + fragment_max_pts_plus_duration: ClockTime, + /// Minimum offset in fragment. + fragment_offset: Option, + /// Maximum offset_end in fragment. + fragment_offset_end: Option, + fragment_buffer_flags: gst::BufferFlags, +} + +impl Started { + pub fn new(broadcast: broadcast::Publisher) -> Started { + Started { + broadcast, + mp4_parser: Mp4Parser::new(), + ftype_atom: None, + moov_atom: None, + moof_atom: None, + fragment_pts: ClockTime::none(), + fragment_dts: ClockTime::none(), + fragment_max_pts_plus_duration: ClockTime::none(), + fragment_offset: None, + fragment_offset_end: None, + fragment_buffer_flags: gst::BufferFlags::DELTA_UNIT, + } + } +} + struct Settings { host: Option, port: Option, @@ -67,9 +174,7 @@ impl Default for Settings { enum State { #[default] Stopped, - Started { - broadcast: Mutex, - }, + Started(Started), } static CAT: Lazy = Lazy::new(|| { @@ -82,7 +187,7 @@ static CAT: Lazy = Lazy::new(|| { #[derive(Default, object_subclass)] pub struct MoqSink { - state: Mutex>, + state: Mutex, url: Mutex>, settings: Mutex, } @@ -199,9 +304,7 @@ impl MoqSink { }); // Update the state to indicate the element has started - *state = Some(State::Started { - broadcast: Mutex::new(publisher), - }); + *state = State::Started(Started::new(broadcast)); Ok(()) } @@ -393,4 +496,201 @@ impl BaseSinkImpl for MoqSink { fn start(&self) -> Result<(), gst::ErrorMessage> { self.start() } + fn render( + &self, + element: &Self::Type, + buffer: &gst::Buffer, + ) -> Result { + if let State::Stopped = *self.state.lock().unwrap() { + gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]); + return Err(gst::FlowError::Error); + } + + gst::trace!(CAT, imp: self, "Rendering {:?}", buffer); + + let mut state = self.state.lock().unwrap(); + + let started_state = match *state { + State::Started(ref mut started_state) => started_state, + State::Stopped => { + unreachable!("Element should be started already"); + } + }; + + let map = buffer.map_readable().map_err(|_| { + gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]); + gst::FlowError::Error + })?; + + let input_buf = map.as_ref(); + + started_state.mp4_parser.add(input_buf); + + // Update cummulative fragment variables. + // Buffer PTS, etc. are only valid if this buffer contains MDAT data. + if started_state.mp4_parser.have_mdat() { + assert!(buffer.pts().is_some()); + if started_state.fragment_pts.is_none() || started_state.fragment_pts > buffer.pts() { + started_state.fragment_pts = buffer.pts(); + } + if started_state.fragment_dts.is_none() || started_state.fragment_dts > buffer.dts() { + started_state.fragment_dts = buffer.dts(); + } + let pts_plus_duration = buffer.pts() + buffer.duration(); + if started_state.fragment_max_pts_plus_duration.is_none() + || started_state.fragment_max_pts_plus_duration < pts_plus_duration + { + started_state.fragment_max_pts_plus_duration = pts_plus_duration; + } + if buffer.offset() != gst::BUFFER_OFFSET_NONE + && (started_state.fragment_offset.is_none() + || started_state.fragment_offset.unwrap() > buffer.offset()) + { + started_state.fragment_offset = Some(buffer.offset()); + } + if buffer.offset_end() != gst::BUFFER_OFFSET_NONE + && (started_state.fragment_offset_end.is_none() + || started_state.fragment_offset_end.unwrap() < buffer.offset_end()) + { + started_state.fragment_offset_end = Some(buffer.offset_end()); + } + if started_state + .fragment_buffer_flags + .contains(gst::BufferFlags::DELTA_UNIT) + && !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) + { + started_state + .fragment_buffer_flags + .remove(gst::BufferFlags::DELTA_UNIT); + } + if buffer.flags().contains(gst::BufferFlags::DISCONT) { + started_state + .fragment_buffer_flags + .insert(gst::BufferFlags::DISCONT); + } + gst::trace!(CAT, imp: self, "Updated state={:?}", started_state); + } + + loop { + match started_state.mp4_parser.pop_atom() { + Some(atom) => { + gst::log!(CAT, imp: self, "atom_size={}, atom_type={}", atom.len(), atom.atom_type); + match atom.atom_type { + ATOM_TYPE_FTYPE => { + started_state.ftype_atom = Some(atom); + gst::log!(CAT, imp: self, "ftype_atom={:?}", started_state.ftype_atom); + } + ATOM_TYPE_MOOV => { + started_state.moov_atom = Some(atom); + gst::log!(CAT, imp: self, "moov_atom={:?}", started_state.moov_atom); + } + ATOM_TYPE_MOOF => { + started_state.moof_atom = Some(atom); + gst::log!(CAT, imp: self, "moof_atom={:?}", started_state.moof_atom); + } + ATOM_TYPE_MDAT => { + let mdat_atom = atom; + match ( + started_state.ftype_atom.as_ref(), + started_state.moov_atom.as_ref(), + started_state.moof_atom.as_ref(), + ) { + (Some(ftype_atom), Some(moov_atom), Some(moof_atom)) => { + let include_header = !started_state + .fragment_buffer_flags + .contains(gst::BufferFlags::DELTA_UNIT); + let header_len = if include_header { + ftype_atom.len() + moov_atom.len() + } else { + 0 + }; + let output_buf_len = + header_len + moof_atom.len() + mdat_atom.len(); + + gst::log!(CAT, imp:self, "Pushing buffer; include_header={}, ftype.len={}, moov.len={}, moof.len={}, mdat.len={}", + include_header, ftype_atom.len(), moov_atom.len(), moof_atom.len(), mdat_atom.len()); + + let mut gst_buffer = + gst::Buffer::with_size(output_buf_len).unwrap(); + { + let buffer_ref = gst_buffer.get_mut().unwrap(); + buffer_ref.set_pts(started_state.fragment_pts); + buffer_ref.set_dts(started_state.fragment_dts); + let duration = started_state.fragment_max_pts_plus_duration + - started_state.fragment_pts; + buffer_ref.set_duration(duration); + buffer_ref.set_offset( + started_state + .fragment_offset + .unwrap_or(gst::BUFFER_OFFSET_NONE), + ); + buffer_ref.set_offset_end( + started_state + .fragment_offset_end + .unwrap_or(gst::BUFFER_OFFSET_NONE), + ); + buffer_ref.set_flags(started_state.fragment_buffer_flags); + let mut buffer_map = buffer_ref.map_writable().unwrap(); + let slice = buffer_map.as_mut_slice(); + let mut pos = 0; + if include_header { + slice[pos..pos + ftype_atom.len()] + .copy_from_slice(&ftype_atom.atom_bytes); + pos += ftype_atom.len(); + slice[pos..pos + moov_atom.len()] + .copy_from_slice(&moov_atom.atom_bytes); + pos += moov_atom.len(); + } + slice[pos..pos + moof_atom.len()] + .copy_from_slice(&moof_atom.atom_bytes); + pos += moof_atom.len(); + slice[pos..pos + mdat_atom.len()] + .copy_from_slice(&mdat_atom.atom_bytes); + pos += mdat_atom.len(); + assert_eq!(pos, output_buf_len); + + //FIXME: Work on the Json here, instead of redoing it in a new method. + } + // Clear fragment variables. + started_state.fragment_pts = ClockTime::none(); + started_state.fragment_dts = ClockTime::none(); + started_state.fragment_max_pts_plus_duration = ClockTime::none(); + started_state.fragment_offset = None; + started_state.fragment_offset_end = None; + started_state.fragment_buffer_flags = gst::BufferFlags::DELTA_UNIT; + // Push new buffer. + gst::log!(CAT, imp: self, "Pushing buffer {:?}", gst_buffer); + } + _ => { + gst_warning!(CAT, obj: pad, "Received mdat without ftype, moov, or moof"); + } + } + } + _ => { + gst_warning!(CAT, obj: pad, "Unknown atom type {:?}", atom); + } + } + } + None => break, + } + } + gst_trace!(CAT, obj: element, "sink_chain: END: state={:?}", state); + + Ok(gst::FlowSuccess::Ok) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.lock().unwrap(); + + if let State::Started(ref mut state) = *state { + gst::warning!(CAT, imp: self, "Stopping all threads"); + + //TODO: Gracefully kill all running servers here + } + + *state = State::Stopped; + gst::info!(CAT, imp: self, "Stopped"); + + Ok(()) + } } diff --git a/gst-warp-sink/src/moqsink/imp.rs.txt b/gst-warp-sink/src/moqsink/imp.rs.txt deleted file mode 100644 index f051bf4..0000000 --- a/gst-warp-sink/src/moqsink/imp.rs.txt +++ /dev/null @@ -1,632 +0,0 @@ -use crate::relayurl::*; -use gst::glib; -use gst::prelude::*; -use gst::subclass::prelude::*; -use gst_base::prelude::*; -use gst_base::subclass::prelude::*; -use gst::glib::once_cell::sync::Lazy; - -use moq_transport::cache::{broadcast, fragment, segment, track}; -use moq_transport::VarInt; -use std::convert::TryInto; -use std::sync::{Arc, Mutex}; -use tokio::runtime::Runtime; -use tokio::sync::mpsc; -use url::Url; - -pub const ELEMENT_NAME: &str = "MoqSink"; -const ELEMENT_CLASS_NAME: &str = "MoqSink"; -const ELEMENT_LONG_NAME: &str = "Media Over Quic Sink"; -const ELEMENT_DESCRIPTION: &str = - "This element accepts fragmented MP4 input from mp4mux and publishes them to a Moq-Relay."; -const ELEMENT_AUTHOR: &str = "Wanjohi Ryan "; -const DEBUG_CATEGORY: &str = ELEMENT_NAME; - -const ATOM_TYPE_FTYPE: u32 = 1718909296; -const ATOM_TYPE_MOOV: u32 = 1836019574; -const ATOM_TYPE_MOOF: u32 = 1836019558; -const ATOM_TYPE_MDAT: u32 = 1835295092; -const DEFAULT_PORT: u16 = 4443; - -#[derive(Debug)] -struct Mp4Atom { - pub atom_type: u32, - // Includes atom size and type. - pub atom_bytes: Vec, -} - -impl Mp4Atom { - pub fn len(&self) -> usize { - self.atom_bytes.len() - } -} - -#[derive(Debug)] -struct Mp4Parser { - buf: Vec, -} - -impl Mp4Parser { - pub fn new() -> Mp4Parser { - Mp4Parser { buf: Vec::new() } - } - - pub fn add(&mut self, buf: &[u8]) { - self.buf.extend_from_slice(buf); - } - - // Returns true if all or part of an MDAT body has been added. - pub fn have_mdat(&self) -> bool { - if self.buf.len() > 8 { - let atom_type = u32::from_be_bytes(self.buf[4..8].try_into().unwrap()); - atom_type == ATOM_TYPE_MDAT - } else { - false - } - } - - pub fn pop_atom(&mut self) -> Option { - if self.buf.len() >= 8 { - let atom_size = u32::from_be_bytes(self.buf[0..4].try_into().unwrap()) as usize; - let atom_type = u32::from_be_bytes(self.buf[4..8].try_into().unwrap()); - if self.buf.len() >= atom_size { - let mut atom_bytes = Vec::with_capacity(atom_size); - // TODO: Swap vectors? - atom_bytes.extend_from_slice(&self.buf[0..atom_size]); - assert_eq!(self.buf.len(), atom_size); - self.buf.clear(); - Some(Mp4Atom { - atom_type, - atom_bytes, - }) - } else { - None - } - } else { - None - } - } -} - -struct Settings { - host: Option, - port: Option, - name: Option, -} - -impl Settings { - fn to_uri(&self) -> String { - RelayUrl { - host: self.host.clone(), - port: self.port.clone().unwrap(), - name: self.name.clone().unwrap(), - } - .to_string() - } -} - -impl Default for Settings { - fn default() -> Self { - Settings { - host: None, - port: None, - name: None, - } - } -} - -#[derive(Debug)] -struct StartedState { - // // We hold on to publisher so we don't close then while media is still being published. - // broadcast: broadcast::Publisher, - // //parses the mp4 boxes - // mp4_parser: Mp4Parser, - // // Atoms in init sequence that must be repeated at each key frame. - // ftype_atom: Option, - // moov_atom: Option, - // // These atoms that must be buffered and pushed as a single buffer. - // moof_atom: Option, - // // Below members that track current fragment (moof, mdat). - // /// Minimum PTS in fragment. - // fragment_pts: ClockTime, - // /// Minimum DTS in fragment. - // fragment_dts: ClockTime, - // /// Maximum PTS + duration in fragment. - // fragment_max_pts_plus_duration: ClockTime, - // /// Minimum offset in fragment. - // fragment_offset: Option, - // /// Maximum offset_end in fragment. - // fragment_offset_end: Option, - // fragment_buffer_flags: gst::BufferFlags, - // We hold on to publisher so we don't close then while media is still being published. - broadcast: broadcast::Publisher, -} - -impl StartedState { - pub fn new(broadcast: broadcast::Publisher) -> StartedState { - StartedState { broadcast } - } -} - -#[derive(Default)] -enum State { - #[default] - Stopped, - Completed, - Started(StartedState), -} - -// impl Default for State { -// fn default() -> State { -// State::Started { -// state: StartedState { -// mp4_parser: Mp4Parser::new(), -// broadcast: None, -// ftype_atom: None, -// moov_atom: None, -// moof_atom: None, -// fragment_pts: ClockTime::none(), -// fragment_dts: ClockTime::none(), -// fragment_max_pts_plus_duration: ClockTime::none(), -// fragment_offset: None, -// fragment_offset_end: None, -// fragment_buffer_flags: gst::BufferFlags::DELTA_UNIT, -// }, -// } -// } -// } - -pub struct MoqSink { - state: Mutex, - url: Mutex>, - settings: Mutex, -} - -static CAT: Lazy = Lazy::new(|| { - gst::DebugCategory::new( - DEBUG_CATEGORY, - gst::DebugColorFlags::empty(), - Some(ELEMENT_LONG_NAME), - ) -}); - -// impl MoqSink { -// fn sink_chain( -// &self, -// pad: &gst::Pad, -// element: &super::MoqSink, -// buffer: gst::Buffer, -// ) -> Result { -// gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer); - -// let mut state = self.state.lock().unwrap(); - -// let state = match *state { -// State::Started { ref mut state, .. } => state, -// }; - -// let map = buffer.map_readable().map_err(|_| { -// gst::element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]); -// gst::FlowError::Error -// })?; -// let input_buf = map.as_ref(); - -// state.mp4_parser.add(input_buf); - -// // Update cummulative fragment variables. -// // Buffer PTS, etc. are only valid if this buffer contains MDAT data. -// if state.mp4_parser.have_mdat() { -// assert!(buffer.pts().is_some()); -// if state.fragment_pts.is_none() || state.fragment_pts > buffer.pts() { -// state.fragment_pts = buffer.pts(); -// } -// if state.fragment_dts.is_none() || state.fragment_dts > buffer.dts() { -// state.fragment_dts = buffer.dts(); -// } -// let pts_plus_duration = buffer.pts() + buffer.duration(); -// if state.fragment_max_pts_plus_duration.is_none() -// || state.fragment_max_pts_plus_duration < pts_plus_duration -// { -// state.fragment_max_pts_plus_duration = pts_plus_duration; -// } -// if buffer.offset() != gst::BUFFER_OFFSET_NONE -// && (state.fragment_offset.is_none() -// || state.fragment_offset.unwrap() > buffer.offset()) -// { -// state.fragment_offset = Some(buffer.offset()); -// } -// if buffer.offset_end() != gst::BUFFER_OFFSET_NONE -// && (state.fragment_offset_end.is_none() -// || state.fragment_offset_end.unwrap() < buffer.offset_end()) -// { -// state.fragment_offset_end = Some(buffer.offset_end()); -// } -// if state -// .fragment_buffer_flags -// .contains(gst::BufferFlags::DELTA_UNIT) -// && !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) -// { -// state -// .fragment_buffer_flags -// .remove(gst::BufferFlags::DELTA_UNIT); -// } -// if buffer.flags().contains(gst::BufferFlags::DISCONT) { -// state -// .fragment_buffer_flags -// .insert(gst::BufferFlags::DISCONT); -// } -// gst_trace!(CAT, obj: pad, "Updated state={:?}", state); -// } - -// loop { -// match state.mp4_parser.pop_atom() { -// Some(atom) => { -// gst_log!(CAT, obj: pad, "atom_size={}, atom_type={}", atom.len(), atom.atom_type); -// match atom.atom_type { -// ATOM_TYPE_FTYPE => { -// state.ftype_atom = Some(atom); -// gst_log!(CAT, obj: pad, "ftype_atom={:?}", state.ftype_atom); -// } -// ATOM_TYPE_MOOV => { -// state.moov_atom = Some(atom); -// gst_log!(CAT, obj: pad, "moov_atom={:?}", state.moov_atom); -// } -// ATOM_TYPE_MOOF => { -// state.moof_atom = Some(atom); -// gst_log!(CAT, obj: pad, "moof_atom={:?}", state.moof_atom); -// } -// ATOM_TYPE_MDAT => { -// let mdat_atom = atom; -// match ( -// state.ftype_atom.as_ref(), -// state.moov_atom.as_ref(), -// state.moof_atom.as_ref(), -// ) { -// (Some(ftype_atom), Some(moov_atom), Some(moof_atom)) => { -// let include_header = !state -// .fragment_buffer_flags -// .contains(gst::BufferFlags::DELTA_UNIT); -// let header_len = if include_header { -// ftype_atom.len() + moov_atom.len() -// } else { -// 0 -// }; -// let output_buf_len = -// header_len + moof_atom.len() + mdat_atom.len(); - -// gst_log!(CAT, obj: pad, "Pushing buffer; include_header={}, ftype.len={}, moov.len={}, moof.len={}, mdat.len={}", -// include_header, ftype_atom.len(), moov_atom.len(), moof_atom.len(), mdat_atom.len()); - -// let mut gst_buffer = -// gst::Buffer::with_size(output_buf_len).unwrap(); -// { -// let buffer_ref = gst_buffer.get_mut().unwrap(); -// buffer_ref.set_pts(state.fragment_pts); -// buffer_ref.set_dts(state.fragment_dts); -// let duration = state.fragment_max_pts_plus_duration -// - state.fragment_pts; -// buffer_ref.set_duration(duration); -// buffer_ref.set_offset( -// state -// .fragment_offset -// .unwrap_or(gst::BUFFER_OFFSET_NONE), -// ); -// buffer_ref.set_offset_end( -// state -// .fragment_offset_end -// .unwrap_or(gst::BUFFER_OFFSET_NONE), -// ); -// buffer_ref.set_flags(state.fragment_buffer_flags); -// let mut buffer_map = buffer_ref.map_writable().unwrap(); -// let slice = buffer_map.as_mut_slice(); -// let mut pos = 0; -// if include_header { -// slice[pos..pos + ftype_atom.len()] -// .copy_from_slice(&ftype_atom.atom_bytes); -// pos += ftype_atom.len(); -// slice[pos..pos + moov_atom.len()] -// .copy_from_slice(&moov_atom.atom_bytes); -// pos += moov_atom.len(); -// } -// slice[pos..pos + moof_atom.len()] -// .copy_from_slice(&moof_atom.atom_bytes); -// pos += moof_atom.len(); -// slice[pos..pos + mdat_atom.len()] -// .copy_from_slice(&mdat_atom.atom_bytes); -// pos += mdat_atom.len(); -// assert_eq!(pos, output_buf_len); - -// //FIXME: Work on the Json here, instead of redoing it in a new method. -// } -// // Clear fragment variables. -// state.fragment_pts = ClockTime::none(); -// state.fragment_dts = ClockTime::none(); -// state.fragment_max_pts_plus_duration = ClockTime::none(); -// state.fragment_offset = None; -// state.fragment_offset_end = None; -// state.fragment_buffer_flags = gst::BufferFlags::DELTA_UNIT; -// // Push new buffer. -// gst_log!(CAT, obj: pad, "Pushing buffer {:?}", gst_buffer); -// // let _ = self.srcpad.push(gst_buffer)?; -// // self.send_to_network(gst_buffer)?; -// } -// _ => { -// gst_warning!(CAT, obj: pad, "Received mdat without ftype, moov, or moof"); -// } -// } -// } -// _ => { -// gst_warning!(CAT, obj: pad, "Unknown atom type {:?}", atom); -// } -// } -// } -// None => break, -// } -// } -// gst_trace!(CAT, obj: element, "sink_chain: END: state={:?}", state); -// Ok(gst::FlowSuccess::Ok) -// } - -// fn sink_event(&self, _pad: &gst::Pad, _element: &super::MoqSink, event: gst::Event) -> bool { -// self.srcpad.push_event(event) -// } - -// fn sink_query( -// &self, -// _pad: &gst::Pad, -// _element: &super::MoqSink, -// query: &mut gst::QueryRef, -// ) -> bool { -// self.srcpad.peer_query(query) -// } - -// fn send_to_network(&self, buffer: gst::Buffer) -> Result<(), Box> { -// //Let this be our publisher -// Ok(()) -// } -// } -impl MoqSink { - fn start(&self) -> Result<(), gst::ErrorMessage> { - let mut state = self.state.lock().unwrap(); - let settings = self.settings.lock().unwrap(); - - if let State::Started { .. } = *state { - unreachable!("Element already started"); - } - - let relay_url = { - let url = self.url.lock().unwrap(); - match *url { - Some(ref url) => url.clone(), - None => { - return Err(gst::error_msg!( - gst::ResourceError::Settings, - ["Cannot start without a URL being set"] - )); - } - } - }; - - // Disable tracing so we don't get a bunch of Quinn spam. - let tracer = tracing_subscriber::FmtSubscriber::builder() - .with_max_level(tracing::Level::WARN) - .finish(); - tracing::subscriber::set_global_default(tracer).unwrap(); - - // Create a list of acceptable root certificates. - let mut roots = rustls::RootCertStore::empty(); - - // Add the platform's native root certificates. - for cert in - rustls_native_certs::load_native_certs().context("could not load platform certs")? - { - roots - .add(&rustls::Certificate(cert.0)) - .context("failed to add root cert")?; - } - - let mut tls_config = rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(roots) - .with_no_client_auth(); - - tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // this one is important - - let arc_tls_config = std::sync::Arc::new(tls_config); - let quinn_client_config = quinn::ClientConfig::new(arc_tls_config); - - let mut endpoint = quinn::Endpoint::client("[::]:0")?; - endpoint.set_default_client_config(quinn_client_config); - - gst::debug!(CAT,imp: self,"connecting to relay: url={:?}",relay_url); - - let session = webtransport_quinn::connect(&endpoint, &relay_url) - .await - .context("failed to create WebTransport session")?; - - let (mut publisher, subscriber) = broadcast::new(""); - - let session = moq_transport::session::Client::publisher(session, subscriber) - .await - .context("failed to create MoQ Transport session")?; - - // let publisher = publisher - // .create_track(&config.track) - // .context("failed to create clock track")?; - *state = State::Started(StartedState::new(broadcast)); - - Ok({ session }) - } - - fn set_uri(self: &MoqSink, url_str: Option<&str>) -> Result<(), glib::Error> { - let state = self.state.lock().unwrap(); - - if let State::Started { .. } = *state { - return Err(glib::Error::new( - gst::URIError::BadState, - "Cannot set URI on a started MoqSink", - )); - } - - let mut url = self.url.lock().unwrap(); - - if url_str.is_none() { - *url = None; - return Ok(()); - } - - gst::debug!(CAT, imp: self, "Setting uri to {:?}", url_str); - - let url_str = url_str.unwrap(); - match parse_relay_url(url_str) { - Ok(relayUrl) => { - *url = Some(relayUrl); - Ok(()) - } - Err(_) => Err(glib::Error::new( - gst::URIError::BadUri, - "Could not parse URI", - )), - } - } -} - -#[glib::object_subclass] -impl ObjectSubclass for MoqSink { - const NAME: &'static str = ELEMENT_CLASS_NAME; - type Type = super::MoqSink; - type ParentType = gst_base::BaseSink; - - type Interfaces = (gst::URIHandler); - -} - -impl ObjectImpl for MoqSink { - fn constructed(&self, obj: &Self::Type) { - self.parent_constructed(); - self.obj().set_sync(false); - } - - fn properties() -> &'static [glib::ParamSpec] { - static PROPERTIES: Lazy> = Lazy::new(|| { - vec![ - glib::ParamSpecString::builder("host") - .nick("Url Host") - .blurb("The host of the relay server to connect to, e.g. example.com") - .mutable_ready() - .build(), - glib::ParamSpecString::builder("port") - .nick("Url Port") - .blurb("The port of the relay server to connect to, most probably this is a 4443") - .default_value(DEFAULT_PORT) - .build(), - glib::ParamSpecString::builder("name") - .nick("Url Name") - .blurb("This is a very long random string to identify your stream on the relay server") - .mutable_ready() - .build(), - ] - }); - PROPERTIES.as_ref() - } - - fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { - let mut settings = self.settings.lock().unwrap(); - - gst::debug!( - CAT, - imp: self, - "Setting property '{}' to '{:?}'", - pspec.name(), - value - ); - - match pspec.name() { - "host" => { - settings.host = value.get::>().expect("type checked upstream"); - if settings.port.is_some() && settings.name.is_some() { - let _ = self.set_uri(Some(&settings.to_uri())); - } - } - "port" => { - settings.port = value.get::>().expect("type checked upstream"); - if settings.host.is_some() && settings.name.is_some() { - let _ = self.set_uri(Some(&settings.to_uri())); - } - } - "name" => { - settings.name = value - .get::>() - .expect("type checked upstream"); - if settings.host.is_some() && settings.port.is_some() { - let _ = self.set_uri(Some(&settings.to_uri())); - } - } - _ => unimplemented!(), - } - } - - fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { - let settings = self.settings.lock().unwrap(); - - match pspec.name() { - "host" => settings.host.to_value(), - "name" => settings.name.to_value(), - "port" => settings.port.to_value(), - _ => unimplemented!(), - } - } -} - -impl GstObjectImpl for MoqSink {} - -impl ElementImpl for MoqSink { - fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { - static ELEMENT_METADATA: Lazy = Lazy::new(|| { - gst::subclass::ElementMetadata::new( - ELEMENT_LONG_NAME, - "Generic", - ELEMENT_DESCRIPTION, - ELEMENT_AUTHOR, - ) - }); - Some(&*ELEMENT_METADATA) - } - - fn pad_templates() -> &'static [gst::PadTemplate] { - static PAD_TEMPLATES: Lazy> = Lazy::new(|| { - let caps = gst::Caps::new_any(); - let sink_pad_template = gst::PadTemplate::new( - "sink", - gst::PadDirection::Sink, - gst::PadPresence::Always, - &caps, - ) - .unwrap(); - vec![sink_pad_template] - }); - PAD_TEMPLATES.as_ref() - } -} - -impl URIHandlerImpl for MoqSink { - const URI_TYPE: gst::URIType = gst::URIType::Sink; - - fn protocols() -> &'static [&'static str] { - &["https"] - } - - fn uri(&self) -> Option { - self.url.lock().unwrap().as_ref().map(|s| s.to_string()) - } - - fn set_uri(&self, uri: &str) -> Result<(), glib::Error> { - self.set_uri(Some(uri)) - } -} - -impl BaseSinkImpl for MoqSink { - fn start(&self) -> Result<(), gst::ErrorMessage> { - self.start() - } -}