From 19a61e87178b96e67a9d26e9f702203eeb5493a7 Mon Sep 17 00:00:00 2001 From: Wanjohi <71614375+wanjohiryan@users.noreply.github.com> Date: Wed, 6 Dec 2023 09:33:52 +0300 Subject: [PATCH] For @spidervirus to see am working on Netris --- gst-warp-sink/build.rs | 3 + gst-warp-sink/src/lib.rs | 18 ++ gst-warp-sink/src/moqsink/imp.rs | 364 +++++++++++++++++++++++++++++++ gst-warp-sink/src/moqsink/mod.rs | 19 ++ src/main.rs | 247 +-------------------- 5 files changed, 405 insertions(+), 246 deletions(-) create mode 100644 gst-warp-sink/build.rs create mode 100644 gst-warp-sink/src/lib.rs create mode 100644 gst-warp-sink/src/moqsink/imp.rs create mode 100644 gst-warp-sink/src/moqsink/mod.rs diff --git a/gst-warp-sink/build.rs b/gst-warp-sink/build.rs new file mode 100644 index 0000000..d79c7f2 --- /dev/null +++ b/gst-warp-sink/build.rs @@ -0,0 +1,3 @@ +fn main() { + gst_plugin_version_helper::info() +} \ No newline at end of file diff --git a/gst-warp-sink/src/lib.rs b/gst-warp-sink/src/lib.rs new file mode 100644 index 0000000..66ff807 --- /dev/null +++ b/gst-warp-sink/src/lib.rs @@ -0,0 +1,18 @@ +mod moqsink; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + moqsink::register(plugin)?; + Ok(()) +} + +gst::plugin_define!( + moqsink, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + "unknown", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); \ No newline at end of file diff --git a/gst-warp-sink/src/moqsink/imp.rs b/gst-warp-sink/src/moqsink/imp.rs new file mode 100644 index 0000000..24f7a1d --- /dev/null +++ b/gst-warp-sink/src/moqsink/imp.rs @@ -0,0 +1,364 @@ +use glib::subclass::prelude::*; +use gst::ClockTime; +use gst::prelude::*; +use gst::subclass::prelude::*; +#[allow(unused_imports)] +use gst::{gst_debug, gst_error, gst_warning, gst_info, gst_log, gst_trace}; +use once_cell::sync::Lazy; +use std::convert::TryInto; +use std::sync::Mutex; + +pub const ELEMENT_NAME: &str = "MoqSink"; +const ELEMENT_CLASS_NAME: &str = "MoqSink"; +const ELEMENT_LONG_NAME: &str = "Media Over Quic Sink"; +const ELEMENT_DESCRIPTION: &str = "This element accepts fragmented MP4 input from mp4mux and publishes them to a Moq-Relay."; +const ELEMENT_AUTHOR: &str = "Wanjohi Ryan "; +const DEBUG_CATEGORY: &str = ELEMENT_NAME; + +const ATOM_TYPE_FTYPE: u32 = 1718909296; +const ATOM_TYPE_MOOV: u32 = 1836019574; +const ATOM_TYPE_MOOF: u32 = 1836019558; +const ATOM_TYPE_MDAT: u32 = 1835295092; + +#[derive(Debug)] +struct Mp4Atom { + pub atom_type: u32, + // Includes atom size and type. + pub atom_bytes: Vec, +} + +impl Mp4Atom { + pub fn len(&self) -> usize { + self.atom_bytes.len() + } +} + +#[derive(Debug)] +struct Mp4Parser { + buf: Vec, +} + +impl Mp4Parser { + pub fn new() -> Mp4Parser { + Mp4Parser { + buf: Vec::new(), + } + } + + pub fn add(&mut self, buf: &[u8]) { + self.buf.extend_from_slice(buf); + } + + // Returns true if all or part of an MDAT body has been added. + pub fn have_mdat(&self) -> bool { + if self.buf.len() > 8 { + let atom_type = u32::from_be_bytes(self.buf[4..8].try_into().unwrap()); + atom_type == ATOM_TYPE_MDAT + } else { + false + } + } + + pub fn pop_atom(&mut self) -> Option { + if self.buf.len() >= 8 { + let atom_size = u32::from_be_bytes(self.buf[0..4].try_into().unwrap()) as usize; + let atom_type = u32::from_be_bytes(self.buf[4..8].try_into().unwrap()); + if self.buf.len() >= atom_size { + let mut atom_bytes = Vec::with_capacity(atom_size); + // TODO: Swap vectors? + atom_bytes.extend_from_slice(&self.buf[0..atom_size]); + assert_eq!(self.buf.len(), atom_size); + self.buf.clear(); + Some(Mp4Atom { + atom_type, + atom_bytes, + }) + } else { + None + } + } else { + None + } + } +} + +#[derive(Debug)] +struct StartedState { + mp4_parser: Mp4Parser, + // Atoms in init sequence that must be repeated at each key frame. + ftype_atom: Option, + moov_atom: Option, + // These atoms that must be buffered and pushed as a single buffer. + moof_atom: Option, + // Below members that track current fragment (moof, mdat). + /// Minimum PTS in fragment. + fragment_pts: ClockTime, + /// Minimum DTS in fragment. + fragment_dts: ClockTime, + /// Maximum PTS + duration in fragment. + fragment_max_pts_plus_duration: ClockTime, + /// Minimum offset in fragment. + fragment_offset: Option, + /// Maximum offset_end in fragment. + fragment_offset_end: Option, + fragment_buffer_flags: gst::BufferFlags, +} + +enum State { + Started { + state: StartedState, + } +} + +impl Default for State { + fn default() -> State { + State::Started { + state: StartedState { + mp4_parser: Mp4Parser::new(), + ftype_atom: None, + moov_atom: None, + moof_atom: None, + fragment_pts: ClockTime::none(), + fragment_dts: ClockTime::none(), + fragment_max_pts_plus_duration: ClockTime::none(), + fragment_offset: None, + fragment_offset_end: None, + fragment_buffer_flags: gst::BufferFlags::DELTA_UNIT, + } + } + } +} + +pub struct MoqSink { + state: Mutex, + srcpad: gst::Pad, +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + DEBUG_CATEGORY, + gst::DebugColorFlags::empty(), + Some(ELEMENT_LONG_NAME), + ) +}); + +impl MoqSink { + fn sink_chain( + &self, + pad: &gst::Pad, + element: &super::MoqSink, + buffer: gst::Buffer, + ) -> Result { + gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer); + + let mut state = self.state.lock().unwrap(); + + let state = match *state { + State::Started { + ref mut state, + .. + } => state, + }; + + let map = buffer.map_readable().map_err(|_| { + gst::element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]); + gst::FlowError::Error + })?; + let input_buf = map.as_ref(); + + state.mp4_parser.add(input_buf); + + // Update cummulative fragment variables. + // Buffer PTS, etc. are only valid if this buffer contains MDAT data. + if state.mp4_parser.have_mdat() { + assert!(buffer.pts().is_some()); + if state.fragment_pts.is_none() || state.fragment_pts > buffer.pts() { + state.fragment_pts = buffer.pts(); + } + if state.fragment_dts.is_none() || state.fragment_dts > buffer.dts() { + state.fragment_dts = buffer.dts(); + } + let pts_plus_duration = buffer.pts() + buffer.duration(); + if state.fragment_max_pts_plus_duration.is_none() || state.fragment_max_pts_plus_duration < pts_plus_duration { + state.fragment_max_pts_plus_duration = pts_plus_duration; + } + if buffer.offset() != gst::BUFFER_OFFSET_NONE && (state.fragment_offset.is_none() || state.fragment_offset.unwrap() > buffer.offset()) { + state.fragment_offset = Some(buffer.offset()); + } + if buffer.offset_end() != gst::BUFFER_OFFSET_NONE && (state.fragment_offset_end.is_none() || state.fragment_offset_end.unwrap() < buffer.offset_end()) { + state.fragment_offset_end = Some(buffer.offset_end()); + } + if state.fragment_buffer_flags.contains(gst::BufferFlags::DELTA_UNIT) && !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { + state.fragment_buffer_flags.remove(gst::BufferFlags::DELTA_UNIT); + } + if buffer.flags().contains(gst::BufferFlags::DISCONT) { + state.fragment_buffer_flags.insert(gst::BufferFlags::DISCONT); + } + gst_trace!(CAT, obj: pad, "Updated state={:?}", state); + } + + loop { + match state.mp4_parser.pop_atom() { + Some(atom) => { + gst_log!(CAT, obj: pad, "atom_size={}, atom_type={}", atom.len(), atom.atom_type); + match atom.atom_type { + ATOM_TYPE_FTYPE => { + state.ftype_atom = Some(atom); + gst_log!(CAT, obj: pad, "ftype_atom={:?}", state.ftype_atom); + }, + ATOM_TYPE_MOOV => { + state.moov_atom = Some(atom); + gst_log!(CAT, obj: pad, "moov_atom={:?}", state.moov_atom); + }, + ATOM_TYPE_MOOF => { + state.moof_atom = Some(atom); + gst_log!(CAT, obj: pad, "moof_atom={:?}", state.moof_atom); + }, + ATOM_TYPE_MDAT => { + let mdat_atom = atom; + match (state.ftype_atom.as_ref(), state.moov_atom.as_ref(), state.moof_atom.as_ref()) { + (Some(ftype_atom), Some(moov_atom), Some(moof_atom)) => { + let include_header = !state.fragment_buffer_flags.contains(gst::BufferFlags::DELTA_UNIT); + let header_len = if include_header { + ftype_atom.len() + moov_atom.len() + } else { + 0 + }; + let output_buf_len = header_len + moof_atom.len() + mdat_atom.len(); + gst_log!(CAT, obj: pad, "Pushing buffer; include_header={}, ftype.len={}, moov.len={}, moof.len={}, mdat.len={}", + include_header, ftype_atom.len(), moov_atom.len(), moof_atom.len(), mdat_atom.len()); + let mut gst_buffer = gst::Buffer::with_size(output_buf_len).unwrap(); + { + let buffer_ref = gst_buffer.get_mut().unwrap(); + buffer_ref.set_pts(state.fragment_pts); + buffer_ref.set_dts(state.fragment_dts); + let duration = state.fragment_max_pts_plus_duration - state.fragment_pts; + buffer_ref.set_duration(duration); + buffer_ref.set_offset(state.fragment_offset.unwrap_or(gst::BUFFER_OFFSET_NONE)); + buffer_ref.set_offset_end(state.fragment_offset_end.unwrap_or(gst::BUFFER_OFFSET_NONE)); + buffer_ref.set_flags(state.fragment_buffer_flags); + let mut buffer_map = buffer_ref.map_writable().unwrap(); + let slice = buffer_map.as_mut_slice(); + let mut pos = 0; + if include_header { + slice[pos..pos+ftype_atom.len()].copy_from_slice(&ftype_atom.atom_bytes); + pos += ftype_atom.len(); + slice[pos..pos+moov_atom.len()].copy_from_slice(&moov_atom.atom_bytes); + pos += moov_atom.len(); + } + slice[pos..pos+moof_atom.len()].copy_from_slice(&moof_atom.atom_bytes); + pos += moof_atom.len(); + slice[pos..pos+mdat_atom.len()].copy_from_slice(&mdat_atom.atom_bytes); + pos += mdat_atom.len(); + assert_eq!(pos, output_buf_len); + } + // Clear fragment variables. + state.fragment_pts = ClockTime::none(); + state.fragment_dts = ClockTime::none(); + state.fragment_max_pts_plus_duration = ClockTime::none(); + state.fragment_offset = None; + state.fragment_offset_end = None; + state.fragment_buffer_flags = gst::BufferFlags::DELTA_UNIT; + // Push new buffer. + gst_log!(CAT, obj: pad, "Pushing buffer {:?}", gst_buffer); + // let _ = self.srcpad.push(gst_buffer)?; + self.send_to_network(gst_buffer)?; + }, + _ => { + gst_warning!(CAT, obj: pad, "Received mdat without ftype, moov, or moof"); + }, + } + }, + _ => { + gst_warning!(CAT, obj: pad, "Unknown atom type {:?}", atom); + }, + } + }, + None => break, + } + }; + gst_trace!(CAT, obj: element, "sink_chain: END: state={:?}", state); + Ok(gst::FlowSuccess::Ok) + } + + fn sink_event(&self, _pad: &gst::Pad, _element: &super::MoqSink, event: gst::Event) -> bool { + self.srcpad.push_event(event) + } + + fn sink_query(&self, _pad: &gst::Pad, _element: &super::MoqSink, query: &mut gst::QueryRef) -> bool { + self.srcpad.peer_query(query) + } + + fn send_to_network(&self, data: &[u8]) -> Result<(), Box> { + + } +} + +#[glib::object_subclass] +impl ObjectSubclass for MoqSink { + const NAME: &'static str = ELEMENT_CLASS_NAME; + type Type = super::MoqSink; + type ParentType = gst::Element; + + fn with_class(klass: &Self::Class) -> Self { + + let templ = klass.pad_template("src").unwrap(); + let srcpad = gst::Pad::builder_with_template(&templ, Some("src")) + .event_function(|pad, parent, event| { + MoqSink::catch_panic_pad_function( + parent, + || false, + |identity, element| identity.src_event(pad, element, event), + ) + }) + .query_function(|pad, parent, query| { + MoqSink::catch_panic_pad_function( + parent, + || false, + |identity, element| identity.src_query(pad, element, query), + ) + }) + .build(); + + Self { + state: Mutex::new(Default::default()), + srcpad, + } + } +} + +impl ObjectImpl for MoqSink { + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + obj.add_pad(&self.srcpad).unwrap(); + } +} + +impl ElementImpl for MoqSink { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + ELEMENT_LONG_NAME, + "Generic", + ELEMENT_DESCRIPTION, + ELEMENT_AUTHOR, + ) + }); + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst::Caps::new_any(); + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + vec![src_pad_template] + }); + PAD_TEMPLATES.as_ref() + } +} \ No newline at end of file diff --git a/gst-warp-sink/src/moqsink/mod.rs b/gst-warp-sink/src/moqsink/mod.rs new file mode 100644 index 0000000..7c2195f --- /dev/null +++ b/gst-warp-sink/src/moqsink/mod.rs @@ -0,0 +1,19 @@ +use glib::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct MoqSink(ObjectSubclass) @extends gst::Element, gst::Object; +} + +unsafe impl Send for MoqSink {} +unsafe impl Sync for MoqSink {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + imp::ELEMENT_NAME, + gst::Rank::None, + MoqSink::static_type(), + ) +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index d9d7bba..05cb0d9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,252 +5,7 @@ fn main() -> Result<(), Error> { gst::init()?; gstfmp4::plugin_register_static().expect("Failed to register fmp4 plugin"); - let pipeline = gst::parse_launch("videotestsrc num-buffers=2500 ! timecodestamper ! video/x-raw,format=I420,width=1280,height=720,framerate=30/1 ! videoconvert ! queue ! x264enc tune=zerolatency key-int-max=30 ! mp4mux streamable=true fragment-duration=1 ! ! appsink name=sink ").unwrap().downcast::().unwrap(); - - let sink = pipeline - .by_name("sink") - .unwrap() - .dynamic_cast::() - .unwrap(); - sink.set_buffer_list(true); - - sink.set_callbacks( - gst_app::AppSinkCallbacks::builder() - .new_sample(move |sink| { - let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; - // let mut state = state.lock().unwrap(); - - // // The muxer only outputs non-empty buffer lists - - //TODO: figure out how to iterate over all the buffers and then do this - - let mut buffer_list = sample.buffer_list_owned().expect("no buffer list"); - let map = buffer.map_readable().map_err(|_| { - gst::element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]); - gst::FlowError::Error - })?; - let input_buf = map.as_ref(); - // assert!(!buffer_list.is_empty()); - - // let mut first = buffer_list.get(0).unwrap(); - - // // Each list contains a full segment, i.e. does not start with a DELTA_UNIT - // assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT)); - - // // If the buffer has the DISCONT and HEADER flag set then it contains the media - // // header, i.e. the `ftyp`, `moov` and other media boxes. - // // - // // This might be the initial header or the updated header at the end of the stream. - // if first - // .flags() - // .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) - // { - // let mut path = state.path.clone(); - // std::fs::create_dir_all(&path).expect("failed to create directory"); - // path.push("init.cmfi"); - - // println!("writing header to {}", path.display()); - // let map = first.map_readable().unwrap(); - // std::fs::write(path, &map).expect("failed to write header"); - // drop(map); - - // // Remove the header from the buffer list - // buffer_list.make_mut().remove(0, 1); - - // // If the list is now empty then it only contained the media header and nothing - // // else. - // if buffer_list.is_empty() { - // return Ok(gst::FlowSuccess::Ok); - // } - - // // Otherwise get the next buffer and continue working with that. - // first = buffer_list.get(0).unwrap(); - // } - - // // If the buffer only has the HEADER flag set then this is a segment header that is - // // followed by one or more actual media buffers. - // assert!(first.flags().contains(gst::BufferFlags::HEADER)); - - // let segment = sample - // .segment() - // .expect("no segment") - // .downcast_ref::() - // .expect("no time segment"); - - // // Initialize the start time with the first PTS we observed. This will be used - // // later for calculating the duration of the whole media for the DASH manifest. - // // - // // The PTS of the segment header is equivalent to the earliest PTS of the whole - // // segment. - // let pts = segment - // .to_running_time(first.pts().unwrap()) - // .expect("can't get running time"); - // if state.start_time.is_none() { - // state.start_time = Some(pts); - // } - - // // The metadata of the first media buffer is duplicated to the segment header. - // // Based on this we can know the timecode of the first frame in this segment. - // let meta = first - // .meta::() - // .expect("no timecode meta"); - - // let mut path = state.path.clone(); - // path.push(format!("segment_{}.cmfv", state.segments.len() + 1)); - // println!( - // "writing segment with timecode {} to {}", - // meta.tc(), - // path.display() - // ); - - // // Calculate the end time at this point. The duration of the segment header is set - // // to the whole duration of this segment. - // let duration = first.duration().unwrap(); - // let end_time = first.pts().unwrap() + first.duration().unwrap(); - // state.end_time = Some( - // segment - // .to_running_time(end_time) - // .expect("can't get running time"), - // ); - - // let mut file = std::fs::File::create(path).expect("failed to open fragment"); - // for buffer in &*buffer_list { - // use std::io::prelude::*; - - // let map = buffer.map_readable().unwrap(); - // file.write_all(&map).expect("failed to write fragment"); - // } - - // state.segments.push(Segment { - // start_time: pts, - // duration, - // }); - - Ok(gst::FlowSuccess::Ok) - }) - .eos(move |_sink| { -// let state = state_clone.lock().unwrap(); - -// // Now write the manifest -// let mut path = state.path.clone(); -// path.push("manifest.mpd"); - -// println!("writing manifest to {}", path.display()); - -// let duration = state -// .end_time -// .opt_checked_sub(state.start_time) -// .ok() -// .flatten() -// .unwrap() -// .mseconds(); - -// // Write the whole segment timeline out here, compressing multiple segments with -// // the same duration to a repeated segment. -// let mut segments = vec![]; -// let mut write_segment = -// |start: gst::ClockTime, duration: gst::ClockTime, repeat: usize| { -// let mut s = dash_mpd::S { -// t: Some(start.mseconds() as i64), -// d: duration.mseconds() as i64, -// ..Default::default() -// }; -// if repeat > 0 { -// s.r = Some(repeat as i64); -// } - -// segments.push(s); -// }; - -// let mut start = None; -// let mut num_segments = 0; -// let mut last_duration = None; -// for segment in &state.segments { -// if start.is_none() { -// start = Some(segment.start_time); -// } -// if last_duration.is_none() { -// last_duration = Some(segment.duration); -// } - -// // If the duration of this segment is different from the previous one then we -// // have to write out the segment now. -// if last_duration != Some(segment.duration) { -// write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1); -// start = Some(segment.start_time); -// last_duration = Some(segment.duration); -// num_segments = 1; -// } else { -// num_segments += 1; -// } -// } - -// // Write the last segment if any -// if num_segments > 0 { -// write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1); -// } - -// let segment_template = dash_mpd::SegmentTemplate { -// timescale: Some(1000), -// initialization: Some("init.cmfi".to_string()), -// media: Some("segment_$Number$.cmfv".to_string()), -// SegmentTimeline: Some(dash_mpd::SegmentTimeline { segments }), -// ..Default::default() -// }; - -// let rep = dash_mpd::Representation { -// id: Some("A".to_string()), -// width: Some(1280), -// height: Some(720), -// bandwidth: Some(2048000), -// SegmentTemplate: Some(segment_template), -// ..Default::default() -// }; - -// let adapt = dash_mpd::AdaptationSet { -// contentType: Some("video".to_string()), -// mimeType: Some("video/mp4".to_string()), -// codecs: Some("avc1.4d0228".to_string()), -// frameRate: Some("30/1".to_string()), -// segmentAlignment: Some(true), -// subsegmentStartsWithSAP: Some(1), -// representations: vec![rep], -// ..Default::default() -// }; - -// let period = dash_mpd::Period { -// adaptations: vec![adapt], -// ..Default::default() -// }; - -// let mpd = dash_mpd::MPD { -// mpdtype: Some("static".to_string()), -// xmlns: Some("urn:mpeg:dash:schema:mpd:2011".to_string()), -// schemaLocation: Some("urn:mpeg:dash:schema:mpd:2011 DASH-MPD.xsd".to_string()), -// profiles: Some("urn:mpeg:dash:profile:isoff-on-demand:2011".to_string()), -// periods: vec![period], -// mediaPresentationDuration: Some(std::time::Duration::from_millis(duration)), -// minBufferTime: Some(std::time::Duration::from_secs(1)), -// ..Default::default() -// }; - -// use serde::ser::Serialize; - -// let mut xml = String::new(); -// let mut ser = quick_xml::se::Serializer::new(&mut xml); -// ser.indent(' ', 4); -// mpd.serialize(ser).unwrap(); - -// let manifest = format!( -// r###" -// {xml} -// "### -// ); - -// std::fs::write(path, manifest).expect("failed to write manifest"); - }) - .build(), - ); + let pipeline = gst::parse_launch("videotestsrc num-buffers=2500 ! timecodestamper ! video/x-raw,format=I420,width=1280,height=720,framerate=30/1 ! videoconvert ! queue ! x264enc tune=zerolatency key-int-max=30 ! mp4mux streamable=true fragment-duration=1 ! ! testsink name=sink ").unwrap().downcast::().unwrap(); pipeline.set_state(gst::State::Playing)?;