From f99766b42a969654e187e3bdd3c29c4aa4ff6d2c Mon Sep 17 00:00:00 2001 From: Wanjohi <71614375+wanjohiryan@users.noreply.github.com> Date: Thu, 14 Dec 2023 17:46:23 +0300 Subject: [PATCH] Fix: Added support for ``url`` property. TODO: add buffer --- gst-warp-sink/Cargo.lock | 75 ++++ gst-warp-sink/Cargo.toml | 2 + gst-warp-sink/src/moqsink/imp.rs | 565 +++++++++++++++++++------------ gst-warp-sink/src/relayurl.rs | 13 + 4 files changed, 432 insertions(+), 223 deletions(-) diff --git a/gst-warp-sink/Cargo.lock b/gst-warp-sink/Cargo.lock index 24a7c7c..6821391 100644 --- a/gst-warp-sink/Cargo.lock +++ b/gst-warp-sink/Cargo.lock @@ -1024,6 +1024,12 @@ dependencies = [ "log", ] +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + [[package]] name = "libc" version = "0.2.150" @@ -1085,6 +1091,8 @@ dependencies = [ "rustls", "rustls-native-certs", "rustls-pemfile", + "tracing", + "tracing-subscriber", "url", "webtransport-quinn", ] @@ -1167,6 +1175,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-bigint" version = "0.4.4" @@ -1240,6 +1258,12 @@ dependencies = [ "paste", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking" version = "2.2.0" @@ -1657,6 +1681,15 @@ dependencies = [ "serde", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "slab" version = "0.4.9" @@ -1764,6 +1797,16 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "thread_local" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -1881,6 +1924,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", ] [[package]] @@ -1927,6 +1996,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "value-bag" version = "1.4.2" diff --git a/gst-warp-sink/Cargo.toml b/gst-warp-sink/Cargo.toml index 54a667b..2fb6b92 100644 --- a/gst-warp-sink/Cargo.toml +++ b/gst-warp-sink/Cargo.toml @@ -25,6 +25,8 @@ bytes = "1.5.0" quinn = "0.10" webtransport-quinn = "0.6.1" url = "2" +tracing = "0.1" +tracing-subscriber = "0.3" # Crypto rustls = { version = "0.21", features = ["dangerous_configuration"] } diff --git a/gst-warp-sink/src/moqsink/imp.rs b/gst-warp-sink/src/moqsink/imp.rs index b9f44c4..bc967e4 100644 --- a/gst-warp-sink/src/moqsink/imp.rs +++ b/gst-warp-sink/src/moqsink/imp.rs @@ -10,6 +10,7 @@ use moq_transport::VarInt; use once_cell::sync::Lazy; use std::convert::TryInto; use std::sync::Mutex; +use url::Url; pub const ELEMENT_NAME: &str = "MoqSink"; const ELEMENT_CLASS_NAME: &str = "MoqSink"; @@ -114,52 +115,66 @@ impl Default for Settings { #[derive(Debug)] struct StartedState { - mp4_parser: Mp4Parser, - // Atoms in init sequence that must be repeated at each key frame. - ftype_atom: Option, - moov_atom: Option, - // These atoms that must be buffered and pushed as a single buffer. - moof_atom: Option, - // Below members that track current fragment (moof, mdat). - /// Minimum PTS in fragment. - fragment_pts: ClockTime, - /// Minimum DTS in fragment. - fragment_dts: ClockTime, - /// Maximum PTS + duration in fragment. - fragment_max_pts_plus_duration: ClockTime, - /// Minimum offset in fragment. - fragment_offset: Option, - /// Maximum offset_end in fragment. - fragment_offset_end: Option, - fragment_buffer_flags: gst::BufferFlags, -} -enum State { - Started { state: StartedState }, + // // We hold on to publisher so we don't close then while media is still being published. + // broadcast: broadcast::Publisher, + // //parses the mp4 boxes + // mp4_parser: Mp4Parser, + // // Atoms in init sequence that must be repeated at each key frame. + // ftype_atom: Option, + // moov_atom: Option, + // // These atoms that must be buffered and pushed as a single buffer. + // moof_atom: Option, + // // Below members that track current fragment (moof, mdat). + // /// Minimum PTS in fragment. + // fragment_pts: ClockTime, + // /// Minimum DTS in fragment. + // fragment_dts: ClockTime, + // /// Maximum PTS + duration in fragment. + // fragment_max_pts_plus_duration: ClockTime, + // /// Minimum offset in fragment. + // fragment_offset: Option, + // /// Maximum offset_end in fragment. + // fragment_offset_end: Option, + // fragment_buffer_flags: gst::BufferFlags, + // We hold on to publisher so we don't close then while media is still being published. + broadcast: broadcast::Publisher, } -impl Default for State { - fn default() -> State { - State::Started { - state: StartedState { - mp4_parser: Mp4Parser::new(), - ftype_atom: None, - moov_atom: None, - moof_atom: None, - fragment_pts: ClockTime::none(), - fragment_dts: ClockTime::none(), - fragment_max_pts_plus_duration: ClockTime::none(), - fragment_offset: None, - fragment_offset_end: None, - fragment_buffer_flags: gst::BufferFlags::DELTA_UNIT, - }, - } - } +impl StartedState { + } +#[derive(Default)] +enum State { + #[default] + Stopped, + Completed, + Started(StartedState) +} + +// impl Default for State { +// fn default() -> State { +// State::Started { +// state: StartedState { +// mp4_parser: Mp4Parser::new(), +// broadcast: None, +// ftype_atom: None, +// moov_atom: None, +// moof_atom: None, +// fragment_pts: ClockTime::none(), +// fragment_dts: ClockTime::none(), +// fragment_max_pts_plus_duration: ClockTime::none(), +// fragment_offset: None, +// fragment_offset_end: None, +// fragment_buffer_flags: gst::BufferFlags::DELTA_UNIT, +// }, +// } +// } +// } + pub struct MoqSink { state: Mutex, - url: Mutex>, - srcpad: gst::Pad, + url: Mutex>, settings: Mutex, } @@ -171,207 +186,311 @@ static CAT: Lazy = Lazy::new(|| { ) }); +// impl MoqSink { +// fn sink_chain( +// &self, +// pad: &gst::Pad, +// element: &super::MoqSink, +// buffer: gst::Buffer, +// ) -> Result { +// gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer); + +// let mut state = self.state.lock().unwrap(); + +// let state = match *state { +// State::Started { ref mut state, .. } => state, +// }; + +// let map = buffer.map_readable().map_err(|_| { +// gst::element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]); +// gst::FlowError::Error +// })?; +// let input_buf = map.as_ref(); + +// state.mp4_parser.add(input_buf); + +// // Update cummulative fragment variables. +// // Buffer PTS, etc. are only valid if this buffer contains MDAT data. +// if state.mp4_parser.have_mdat() { +// assert!(buffer.pts().is_some()); +// if state.fragment_pts.is_none() || state.fragment_pts > buffer.pts() { +// state.fragment_pts = buffer.pts(); +// } +// if state.fragment_dts.is_none() || state.fragment_dts > buffer.dts() { +// state.fragment_dts = buffer.dts(); +// } +// let pts_plus_duration = buffer.pts() + buffer.duration(); +// if state.fragment_max_pts_plus_duration.is_none() +// || state.fragment_max_pts_plus_duration < pts_plus_duration +// { +// state.fragment_max_pts_plus_duration = pts_plus_duration; +// } +// if buffer.offset() != gst::BUFFER_OFFSET_NONE +// && (state.fragment_offset.is_none() +// || state.fragment_offset.unwrap() > buffer.offset()) +// { +// state.fragment_offset = Some(buffer.offset()); +// } +// if buffer.offset_end() != gst::BUFFER_OFFSET_NONE +// && (state.fragment_offset_end.is_none() +// || state.fragment_offset_end.unwrap() < buffer.offset_end()) +// { +// state.fragment_offset_end = Some(buffer.offset_end()); +// } +// if state +// .fragment_buffer_flags +// .contains(gst::BufferFlags::DELTA_UNIT) +// && !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) +// { +// state +// .fragment_buffer_flags +// .remove(gst::BufferFlags::DELTA_UNIT); +// } +// if buffer.flags().contains(gst::BufferFlags::DISCONT) { +// state +// .fragment_buffer_flags +// .insert(gst::BufferFlags::DISCONT); +// } +// gst_trace!(CAT, obj: pad, "Updated state={:?}", state); +// } + +// loop { +// match state.mp4_parser.pop_atom() { +// Some(atom) => { +// gst_log!(CAT, obj: pad, "atom_size={}, atom_type={}", atom.len(), atom.atom_type); +// match atom.atom_type { +// ATOM_TYPE_FTYPE => { +// state.ftype_atom = Some(atom); +// gst_log!(CAT, obj: pad, "ftype_atom={:?}", state.ftype_atom); +// } +// ATOM_TYPE_MOOV => { +// state.moov_atom = Some(atom); +// gst_log!(CAT, obj: pad, "moov_atom={:?}", state.moov_atom); +// } +// ATOM_TYPE_MOOF => { +// state.moof_atom = Some(atom); +// gst_log!(CAT, obj: pad, "moof_atom={:?}", state.moof_atom); +// } +// ATOM_TYPE_MDAT => { +// let mdat_atom = atom; +// match ( +// state.ftype_atom.as_ref(), +// state.moov_atom.as_ref(), +// state.moof_atom.as_ref(), +// ) { +// (Some(ftype_atom), Some(moov_atom), Some(moof_atom)) => { +// let include_header = !state +// .fragment_buffer_flags +// .contains(gst::BufferFlags::DELTA_UNIT); +// let header_len = if include_header { +// ftype_atom.len() + moov_atom.len() +// } else { +// 0 +// }; +// let output_buf_len = +// header_len + moof_atom.len() + mdat_atom.len(); + +// gst_log!(CAT, obj: pad, "Pushing buffer; include_header={}, ftype.len={}, moov.len={}, moof.len={}, mdat.len={}", +// include_header, ftype_atom.len(), moov_atom.len(), moof_atom.len(), mdat_atom.len()); + +// let mut gst_buffer = +// gst::Buffer::with_size(output_buf_len).unwrap(); +// { +// let buffer_ref = gst_buffer.get_mut().unwrap(); +// buffer_ref.set_pts(state.fragment_pts); +// buffer_ref.set_dts(state.fragment_dts); +// let duration = state.fragment_max_pts_plus_duration +// - state.fragment_pts; +// buffer_ref.set_duration(duration); +// buffer_ref.set_offset( +// state +// .fragment_offset +// .unwrap_or(gst::BUFFER_OFFSET_NONE), +// ); +// buffer_ref.set_offset_end( +// state +// .fragment_offset_end +// .unwrap_or(gst::BUFFER_OFFSET_NONE), +// ); +// buffer_ref.set_flags(state.fragment_buffer_flags); +// let mut buffer_map = buffer_ref.map_writable().unwrap(); +// let slice = buffer_map.as_mut_slice(); +// let mut pos = 0; +// if include_header { +// slice[pos..pos + ftype_atom.len()] +// .copy_from_slice(&ftype_atom.atom_bytes); +// pos += ftype_atom.len(); +// slice[pos..pos + moov_atom.len()] +// .copy_from_slice(&moov_atom.atom_bytes); +// pos += moov_atom.len(); +// } +// slice[pos..pos + moof_atom.len()] +// .copy_from_slice(&moof_atom.atom_bytes); +// pos += moof_atom.len(); +// slice[pos..pos + mdat_atom.len()] +// .copy_from_slice(&mdat_atom.atom_bytes); +// pos += mdat_atom.len(); +// assert_eq!(pos, output_buf_len); + +// //FIXME: Work on the Json here, instead of redoing it in a new method. +// } +// // Clear fragment variables. +// state.fragment_pts = ClockTime::none(); +// state.fragment_dts = ClockTime::none(); +// state.fragment_max_pts_plus_duration = ClockTime::none(); +// state.fragment_offset = None; +// state.fragment_offset_end = None; +// state.fragment_buffer_flags = gst::BufferFlags::DELTA_UNIT; +// // Push new buffer. +// gst_log!(CAT, obj: pad, "Pushing buffer {:?}", gst_buffer); +// // let _ = self.srcpad.push(gst_buffer)?; +// // self.send_to_network(gst_buffer)?; +// } +// _ => { +// gst_warning!(CAT, obj: pad, "Received mdat without ftype, moov, or moof"); +// } +// } +// } +// _ => { +// gst_warning!(CAT, obj: pad, "Unknown atom type {:?}", atom); +// } +// } +// } +// None => break, +// } +// } +// gst_trace!(CAT, obj: element, "sink_chain: END: state={:?}", state); +// Ok(gst::FlowSuccess::Ok) +// } + +// fn sink_event(&self, _pad: &gst::Pad, _element: &super::MoqSink, event: gst::Event) -> bool { +// self.srcpad.push_event(event) +// } + +// fn sink_query( +// &self, +// _pad: &gst::Pad, +// _element: &super::MoqSink, +// query: &mut gst::QueryRef, +// ) -> bool { +// self.srcpad.peer_query(query) +// } + +// fn send_to_network(&self, buffer: gst::Buffer) -> Result<(), Box> { +// //Let this be our publisher +// Ok(()) +// } +// } impl MoqSink { - // fn sink_chain( - // &self, - // pad: &gst::Pad, - // element: &super::MoqSink, - // buffer: gst::Buffer, - // ) -> Result { - // gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer); + fn start(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); - // let mut state = self.state.lock().unwrap(); + if let State::Started { .. } = *state { + unreachable!("Element already started"); + } - // let state = match *state { - // State::Started { ref mut state, .. } => state, - // }; + let relay_url = { + let url = self.url.lock().unwrap(); + match *url { + Some(ref url) => url.clone(), + None => { + return Err(gst::error_msg!( + gst::ResourceError::Settings, + ["Cannot start without a URL being set"] + )); + } + } + }; - // let map = buffer.map_readable().map_err(|_| { - // gst::element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]); - // gst::FlowError::Error - // })?; - // let input_buf = map.as_ref(); + // Disable tracing so we don't get a bunch of Quinn spam. + let tracer = tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::WARN) + .finish(); + tracing::subscriber::set_global_default(tracer).unwrap(); - // state.mp4_parser.add(input_buf); + // Create a list of acceptable root certificates. + let mut roots = rustls::RootCertStore::empty(); - // // Update cummulative fragment variables. - // // Buffer PTS, etc. are only valid if this buffer contains MDAT data. - // if state.mp4_parser.have_mdat() { - // assert!(buffer.pts().is_some()); - // if state.fragment_pts.is_none() || state.fragment_pts > buffer.pts() { - // state.fragment_pts = buffer.pts(); - // } - // if state.fragment_dts.is_none() || state.fragment_dts > buffer.dts() { - // state.fragment_dts = buffer.dts(); - // } - // let pts_plus_duration = buffer.pts() + buffer.duration(); - // if state.fragment_max_pts_plus_duration.is_none() - // || state.fragment_max_pts_plus_duration < pts_plus_duration - // { - // state.fragment_max_pts_plus_duration = pts_plus_duration; - // } - // if buffer.offset() != gst::BUFFER_OFFSET_NONE - // && (state.fragment_offset.is_none() - // || state.fragment_offset.unwrap() > buffer.offset()) - // { - // state.fragment_offset = Some(buffer.offset()); - // } - // if buffer.offset_end() != gst::BUFFER_OFFSET_NONE - // && (state.fragment_offset_end.is_none() - // || state.fragment_offset_end.unwrap() < buffer.offset_end()) - // { - // state.fragment_offset_end = Some(buffer.offset_end()); - // } - // if state - // .fragment_buffer_flags - // .contains(gst::BufferFlags::DELTA_UNIT) - // && !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) - // { - // state - // .fragment_buffer_flags - // .remove(gst::BufferFlags::DELTA_UNIT); - // } - // if buffer.flags().contains(gst::BufferFlags::DISCONT) { - // state - // .fragment_buffer_flags - // .insert(gst::BufferFlags::DISCONT); - // } - // gst_trace!(CAT, obj: pad, "Updated state={:?}", state); - // } + // Add the platform's native root certificates. + for cert in + rustls_native_certs::load_native_certs().context("could not load platform certs")? + { + roots + .add(&rustls::Certificate(cert.0)) + .context("failed to add root cert")?; + } - // loop { - // match state.mp4_parser.pop_atom() { - // Some(atom) => { - // gst_log!(CAT, obj: pad, "atom_size={}, atom_type={}", atom.len(), atom.atom_type); - // match atom.atom_type { - // ATOM_TYPE_FTYPE => { - // state.ftype_atom = Some(atom); - // gst_log!(CAT, obj: pad, "ftype_atom={:?}", state.ftype_atom); - // } - // ATOM_TYPE_MOOV => { - // state.moov_atom = Some(atom); - // gst_log!(CAT, obj: pad, "moov_atom={:?}", state.moov_atom); - // } - // ATOM_TYPE_MOOF => { - // state.moof_atom = Some(atom); - // gst_log!(CAT, obj: pad, "moof_atom={:?}", state.moof_atom); - // } - // ATOM_TYPE_MDAT => { - // let mdat_atom = atom; - // match ( - // state.ftype_atom.as_ref(), - // state.moov_atom.as_ref(), - // state.moof_atom.as_ref(), - // ) { - // (Some(ftype_atom), Some(moov_atom), Some(moof_atom)) => { - // let include_header = !state - // .fragment_buffer_flags - // .contains(gst::BufferFlags::DELTA_UNIT); - // let header_len = if include_header { - // ftype_atom.len() + moov_atom.len() - // } else { - // 0 - // }; - // let output_buf_len = - // header_len + moof_atom.len() + mdat_atom.len(); + let mut tls_config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(roots) + .with_no_client_auth(); - // gst_log!(CAT, obj: pad, "Pushing buffer; include_header={}, ftype.len={}, moov.len={}, moof.len={}, mdat.len={}", - // include_header, ftype_atom.len(), moov_atom.len(), moof_atom.len(), mdat_atom.len()); + tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // this one is important - // let mut gst_buffer = - // gst::Buffer::with_size(output_buf_len).unwrap(); - // { - // let buffer_ref = gst_buffer.get_mut().unwrap(); - // buffer_ref.set_pts(state.fragment_pts); - // buffer_ref.set_dts(state.fragment_dts); - // let duration = state.fragment_max_pts_plus_duration - // - state.fragment_pts; - // buffer_ref.set_duration(duration); - // buffer_ref.set_offset( - // state - // .fragment_offset - // .unwrap_or(gst::BUFFER_OFFSET_NONE), - // ); - // buffer_ref.set_offset_end( - // state - // .fragment_offset_end - // .unwrap_or(gst::BUFFER_OFFSET_NONE), - // ); - // buffer_ref.set_flags(state.fragment_buffer_flags); - // let mut buffer_map = buffer_ref.map_writable().unwrap(); - // let slice = buffer_map.as_mut_slice(); - // let mut pos = 0; - // if include_header { - // slice[pos..pos + ftype_atom.len()] - // .copy_from_slice(&ftype_atom.atom_bytes); - // pos += ftype_atom.len(); - // slice[pos..pos + moov_atom.len()] - // .copy_from_slice(&moov_atom.atom_bytes); - // pos += moov_atom.len(); - // } - // slice[pos..pos + moof_atom.len()] - // .copy_from_slice(&moof_atom.atom_bytes); - // pos += moof_atom.len(); - // slice[pos..pos + mdat_atom.len()] - // .copy_from_slice(&mdat_atom.atom_bytes); - // pos += mdat_atom.len(); - // assert_eq!(pos, output_buf_len); + let arc_tls_config = std::sync::Arc::new(tls_config); + let quinn_client_config = quinn::ClientConfig::new(arc_tls_config); - // //FIXME: Work on the Json here, instead of redoing it in a new method. - // } - // // Clear fragment variables. - // state.fragment_pts = ClockTime::none(); - // state.fragment_dts = ClockTime::none(); - // state.fragment_max_pts_plus_duration = ClockTime::none(); - // state.fragment_offset = None; - // state.fragment_offset_end = None; - // state.fragment_buffer_flags = gst::BufferFlags::DELTA_UNIT; - // // Push new buffer. - // gst_log!(CAT, obj: pad, "Pushing buffer {:?}", gst_buffer); - // // let _ = self.srcpad.push(gst_buffer)?; - // // self.send_to_network(gst_buffer)?; - // } - // _ => { - // gst_warning!(CAT, obj: pad, "Received mdat without ftype, moov, or moof"); - // } - // } - // } - // _ => { - // gst_warning!(CAT, obj: pad, "Unknown atom type {:?}", atom); - // } - // } - // } - // None => break, - // } - // } - // gst_trace!(CAT, obj: element, "sink_chain: END: state={:?}", state); - // Ok(gst::FlowSuccess::Ok) - // } + let mut endpoint = quinn::Endpoint::client("[::]:0")?; + endpoint.set_default_client_config(quinn_client_config); - // fn sink_event(&self, _pad: &gst::Pad, _element: &super::MoqSink, event: gst::Event) -> bool { - // self.srcpad.push_event(event) - // } + gst::debug!(CAT,imp: self,"connecting to relay: url={:?}",relay_url); - // fn sink_query( - // &self, - // _pad: &gst::Pad, - // _element: &super::MoqSink, - // query: &mut gst::QueryRef, - // ) -> bool { - // self.srcpad.peer_query(query) - // } + let session = webtransport_quinn::connect(&endpoint, &relay_url) + .await + .context("failed to create WebTransport session")?; - // fn send_to_network(&self, buffer: gst::Buffer) -> Result<(), Box> { - // //Let this be our publisher - // Ok(()) - // } + let (mut publisher, subscriber) = broadcast::new(""); + + let session = moq_transport::session::Client::publisher(session, subscriber) + .await + .context("failed to create MoQ Transport session")?; + + // let publisher = publisher + // .create_track(&config.track) + // .context("failed to create clock track")?; + + Ok(()) + } + + fn set_uri(self: &MoqSink, url_str: Option<&str>) -> Result<(), glib::Error> { + let state = self.state.lock().unwrap(); + + if let State::Started { .. } = *state { + return Err(glib::Error::new( + gst::URIError::BadState, + "Cannot set URI on a started MoqSink", + )); + } + + let mut url = self.url.lock().unwrap(); + + if url_str.is_none() { + *url = None; + return Ok(()); + } + + gst::debug!(CAT, imp: self, "Setting uri to {:?}", url_str); + + let url_str = url_str.unwrap(); + match parse_relay_url(url_str) { + Ok(relayUrl) => { + *url = Some(relayUrl); + Ok(()) + } + Err(_) => Err(glib::Error::new( + gst::URIError::BadUri, + "Could not parse URI", + )), + } + } } #[glib::object_subclass] impl ObjectSubclass for MoqSink { const NAME: &'static str = ELEMENT_CLASS_NAME; type Type = super::MoqSink; - type ParentType = gst::Element; + type ParentType = gst_base::BaseSink; type Interfaces = (gst::URIHandler); @@ -502,4 +621,4 @@ impl URIHandlerImpl for MoqSink { fn set_uri(&self, uri: &str) -> Result<(), glib::Error> { self.set_uri(Some(uri)) } -} \ No newline at end of file +} diff --git a/gst-warp-sink/src/relayurl.rs b/gst-warp-sink/src/relayurl.rs index da5e52b..c6b6498 100644 --- a/gst-warp-sink/src/relayurl.rs +++ b/gst-warp-sink/src/relayurl.rs @@ -1,3 +1,4 @@ +use url::Url; #[derive(Clone)] pub struct RelayUrl { pub host: String, @@ -10,3 +11,15 @@ impl ToString for RelayUrl { format!("https://{}:{}/{}", self.host, self.port, self.name) } } + +pub fn parse_relay_url(url_str: &str) -> Result { + let url = Url::try_from(s).map_err(|e| e.to_string())?; + + //TODO: I know this is redundant, but this might come in handy in the future + // Make sure the scheme is moq + if url.scheme() != "https" { + return Err("url scheme must be https:// for WebTransport".to_string()); + } + + Ok(url) +}