From 1f2e3a35653b349eaa0ee2350f84586a3a00b715 Mon Sep 17 00:00:00 2001 From: Wanjohi <71614375+wanjohiryan@users.noreply.github.com> Date: Mon, 1 Jan 2024 14:01:00 +0300 Subject: [PATCH] Serve catalog, add Tracks, add Fragments --- Cargo.lock | 34 ++++++ Cargo.toml | 2 + src/media.rs | 288 +++++++++++++++++++++++++++++++++++++++------------ 3 files changed, 259 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8155f8d..1fb3aca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -464,6 +464,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "four-cc" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3958af68a31b1d1384d3f39b6aa33eb14b6009065b5ca305ddd9712a4237124f" + [[package]] name = "futures" version = "0.3.30" @@ -1106,6 +1112,21 @@ dependencies = [ "thiserror", ] +[[package]] +name = "mp4ra-rust" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be9daf03b43bf3842962947c62ba40f411e46a58774c60838038f04a67d17626" +dependencies = [ + "four-cc", +] + +[[package]] +name = "mpeg4-audio-const" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96a1fe2275b68991faded2c80aa4a33dba398b77d276038b8f50701a22e55918" + [[package]] name = "muldiv" version = "1.0.1" @@ -1484,6 +1505,17 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "rfc6381-codec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4395f46a67f0d57c57f6a5361f3a9a0c0183a19cab3998892ecdc003de6d8037" +dependencies = [ + "four-cc", + "mp4ra-rust", + "mpeg4-audio-const", +] + [[package]] name = "ring" version = "0.16.20" @@ -2071,9 +2103,11 @@ dependencies = [ "moq-transport", "mp4", "quinn", + "rfc6381-codec", "rustls", "rustls-native-certs", "rustls-pemfile", + "serde_json", "tokio", "toml_datetime", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 71548bd..587e077 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,3 +28,5 @@ tracing-subscriber = "0.3" anyhow = { version = "1", features = ["backtrace"] } mp4 = "0.13" moq-transport = { git = "https://github.com/kixelated/moq-rs", version = "0.2.0" } +serde_json = "1" +rfc6381-codec = "0.1" diff --git a/src/media.rs b/src/media.rs index a8e1d41..3b06b1b 100644 --- a/src/media.rs +++ b/src/media.rs @@ -1,3 +1,4 @@ +use anyhow::{self,Context}; use gst::prelude::*; use gst::ClockTime; @@ -8,6 +9,9 @@ use moq_transport::cache::{broadcast, fragment, segment, track}; use moq_transport::VarInt; use std::collections::HashMap; use std::io::Cursor; +use serde_json::json; +use std::cmp::max; +use std::time; use mp4::{self, ReadBox}; @@ -107,32 +111,6 @@ struct State { mp4_parser: Mp4Parser, } -fn probe_encoder(state: Arc>, enc: gst::Element, is_video: bool) { - enc.static_pad("src").unwrap().add_probe( - gst::PadProbeType::EVENT_DOWNSTREAM, - move |_pad, info| { - let Some(ev) = info.event() else { - return gst::PadProbeReturn::Ok; - }; - let gst::EventView::Caps(ev) = ev.view() else { - return gst::PadProbeReturn::Ok; - }; - - let mime = gst_pbutils::codec_utils_caps_get_mime_codec(ev.caps()); - - let mut state = state.lock().unwrap(); - - if is_video { - state.video_enc = Some(mime.unwrap().into()) - } else { - // state.audio_enc - } - - gst::PadProbeReturn::Remove - }, - ); -} - pub struct GST {} impl GST { @@ -354,9 +332,11 @@ impl GST { } // Handle the case where one or both values are `None` _ => None, - }; buffer_ref.set_duration(duration); + }; + buffer_ref.set_duration(duration); buffer_ref.set_offset(state.fragment_offset.unwrap_or(gst_sys::GST_BUFFER_OFFSET_NONE)); buffer_ref.set_offset_end(state.fragment_offset_end.unwrap_or(gst_sys::GST_BUFFER_OFFSET_NONE)); + //this is a header buffer_ref.set_flags(gst::BufferFlags::HEADER); let mut buffer_map = buffer_ref.map_writable().unwrap(); let slice = buffer_map.as_mut_slice(); @@ -408,6 +388,11 @@ impl GST { tracks.insert(id, track); } + let mut catalog = broadcast.create_track(".catalog").map_err(|_| gst::FlowError::Error)?; + + // Create the catalog track + Self::serve_catalog(&mut catalog, &init_track.name, &moov).map_err(|_| gst::FlowError::Error)?; + } _ => { log::warn!("Received moov without ftype"); @@ -500,6 +485,107 @@ impl GST { Ok(()) } + + //Copied from https://github.com/kixelated/moq-rs/blob/f64c2e894e4f09873aeef4620dbf20aaabe5d12e/moq-pub/src/media.rs#L127 + fn serve_catalog( + track: &mut track::Publisher, + init_track_name: &str, + moov: &mp4::MoovBox, + ) -> Result<(), anyhow::Error> { + let segment = track.create_segment(segment::Info { + sequence: VarInt::ZERO, + priority: 0, + expires: None, + })?; + + let mut tracks = Vec::new(); + + for trak in &moov.traks { + let mut track = json!({ + "container": "mp4", + "init_track": init_track_name, + "data_track": format!("{}.m4s", trak.tkhd.track_id), + }); + + let stsd = &trak.mdia.minf.stbl.stsd; + if let Some(avc1) = &stsd.avc1 { + // avc1[.PPCCLL] + // + // let profile = 0x64; + // let constraints = 0x00; + // let level = 0x1f; + let profile = avc1.avcc.avc_profile_indication; + let constraints = avc1.avcc.profile_compatibility; // Not 100% certain here, but it's 0x00 on my current test video + let level = avc1.avcc.avc_level_indication; + + let width = avc1.width; + let height = avc1.height; + + let codec = rfc6381_codec::Codec::avc1(profile, constraints, level); + let codec_str = codec.to_string(); + + track["kind"] = json!("video"); + track["codec"] = json!(codec_str); + track["width"] = json!(width); + track["height"] = json!(height); + } else if let Some(_hev1) = &stsd.hev1 { + // TODO https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L106 + anyhow::bail!("HEVC not yet supported") + } else if let Some(mp4a) = &stsd.mp4a { + let desc = &mp4a + .esds + .as_ref() + .context("missing esds box for MP4a")? + .es_desc + .dec_config; + + let codec_str = format!("mp4a.{:02x}.{}", desc.object_type_indication, desc.dec_specific.profile); + + track["kind"] = json!("audio"); + track["codec"] = json!(codec_str); + track["channel_count"] = json!(mp4a.channelcount); + track["sample_rate"] = json!(mp4a.samplerate.value()); + track["sample_size"] = json!(mp4a.samplesize); + + let bitrate = max(desc.max_bitrate, desc.avg_bitrate); + if bitrate > 0 { + track["bit_rate"] = json!(bitrate); + } + } else if let Some(vp09) = &stsd.vp09 { + // https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L238 + let vpcc = &vp09.vpcc; + let codec_str = format!("vp09.0.{:02x}.{:02x}.{:02x}", vpcc.profile, vpcc.level, vpcc.bit_depth); + + track["kind"] = json!("video"); + track["codec"] = json!(codec_str); + track["width"] = json!(vp09.width); // no idea if this needs to be multiplied + track["height"] = json!(vp09.height); // no idea if this needs to be multiplied + + // TODO Test if this actually works; I'm just guessing based on mp4box.js + anyhow::bail!("VP9 not yet supported") + } else { + // TODO add av01 support: https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L251 + anyhow::bail!("unknown codec for track: {}", trak.tkhd.track_id); + } + + tracks.push(track); + } + + let catalog = json!({ + "tracks": tracks + }); + + let catalog_str = serde_json::to_string_pretty(&catalog)?; + log::info!("catalog: {}", catalog_str); + + // Create a single fragment for the segment. + let mut fragment = segment.final_fragment(VarInt::ZERO)?; + + // Add the segment and add the fragment. + fragment.chunk(catalog_str.into())?; + + Ok(()) + } } struct Track { @@ -526,56 +612,128 @@ impl Track { } } - // pub fn header(&mut self, raw: Vec, fragment: Fragment) -> anyhow::Result<()> { - // if let Some(current) = self.current.as_mut() { - // if !fragment.keyframe { - // // Use the existing segment - // current.chunk(raw.into())?; - // return Ok(()); - // } - // } + pub fn header(&mut self, raw: Vec, fragment: Fragment) -> anyhow::Result<()> { + if let Some(current) = self.current.as_mut() { + if !fragment.keyframe { + // Use the existing segment + current.chunk(raw.into())?; + return Ok(()); + } + } - // // Otherwise make a new segment + // Otherwise make a new segment - // // Compute the timestamp in milliseconds. - // // Overflows after 583 million years, so we're fine. - // let timestamp: u32 = fragment - // .timestamp(self.timescale) - // .as_millis() - // .try_into() - // .context("timestamp too large")?; + // Compute the timestamp in milliseconds. + // Overflows after 583 million years, so we're fine. + let timestamp: u32 = fragment + .timestamp(self.timescale) + .as_millis() + .try_into() + .context("timestamp too large")?; - // // Create a new segment. - // let segment = self.track.create_segment(segment::Info { - // sequence: VarInt::try_from(self.sequence).context("sequence too large")?, + // Create a new segment. + let segment = self.track.create_segment(segment::Info { + sequence: VarInt::try_from(self.sequence).context("sequence too large")?, - // // Newer segments are higher priority - // priority: u32::MAX.checked_sub(timestamp).context("priority too large")?, + // Newer segments are higher priority + priority: u32::MAX.checked_sub(timestamp).context("priority too large")?, - // // Delete segments after 10s. - // expires: Some(time::Duration::from_secs(10)), - // })?; + // Delete segments after 10s. + expires: Some(time::Duration::from_secs(10)), + })?; - // // Create a single fragment for the segment that we will keep appending. - // let mut fragment = segment.final_fragment(VarInt::ZERO)?; + // Create a single fragment for the segment that we will keep appending. + let mut fragment = segment.final_fragment(VarInt::ZERO)?; - // self.sequence += 1; + self.sequence += 1; - // // Insert the raw atom into the segment. - // fragment.chunk(raw.into())?; + // Insert the raw atom into the segment. + fragment.chunk(raw.into())?; - // // Save for the next iteration - // self.current = Some(fragment); + // Save for the next iteration + self.current = Some(fragment); - // Ok(()) - // } + Ok(()) + } - // pub fn data(&mut self, raw: Vec) -> anyhow::Result<()> { - // let fragment = self.current.as_mut().context("missing current fragment")?; - // fragment.chunk(raw.into())?; + pub fn data(&mut self, raw: Vec) -> anyhow::Result<()> { + let fragment = self.current.as_mut().context("missing current fragment")?; + fragment.chunk(raw.into())?; - // Ok(()) - // } + Ok(()) + } +} + +struct Fragment { + // The track for this fragment. + track: u32, + + // The timestamp of the first sample in this fragment, in timescale units. + timestamp: u64, + + // True if this fragment is a keyframe. + keyframe: bool, +} + +impl Fragment { + fn new(moof: mp4::MoofBox) -> anyhow::Result { + // We can't split the mdat atom, so this is impossible to support + anyhow::ensure!(moof.trafs.len() == 1, "multiple tracks per moof atom"); + let track = moof.trafs[0].tfhd.track_id; + + // Parse the moof to get some timing information to sleep. + let timestamp = sample_timestamp(&moof).expect("couldn't find timestamp"); + + // Detect if we should start a new segment. + let keyframe = sample_keyframe(&moof); + + Ok(Self { + track, + timestamp, + keyframe, + }) + } + + // Convert from timescale units to a duration. + fn timestamp(&self, timescale: u64) -> time::Duration { + time::Duration::from_millis(1000 * self.timestamp / timescale) + } +} + +fn sample_timestamp(moof: &mp4::MoofBox) -> Option { + Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time) +} + +fn sample_keyframe(moof: &mp4::MoofBox) -> bool { + for traf in &moof.trafs { + // TODO trak default flags if this is None + let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default(); + let trun = match &traf.trun { + Some(t) => t, + None => return false, + }; + + for i in 0..trun.sample_count { + let mut flags = match trun.sample_flags.get(i as usize) { + Some(f) => *f, + None => default_flags, + }; + + if i == 0 && trun.first_sample_flags.is_some() { + flags = trun.first_sample_flags.unwrap(); + } + + // https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177 + let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther + let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample + + if keyframe && !non_sync { + return true; + } + } + } + + false } // Find the timescale for the given track.