From d00c3383e0b8b50514bf9ac715c63a6c949fab24 Mon Sep 17 00:00:00 2001 From: Wanjohi <71614375+wanjohiryan@users.noreply.github.com> Date: Wed, 31 Jan 2024 09:09:33 +0300 Subject: [PATCH] feat: add mp4_parser --- Cargo.lock | 23 +++++ Cargo.toml | 9 ++ src/media.rs | 246 ++++++++++++++++++++++++++++++++------------------- src/test.rs | 0 4 files changed, 185 insertions(+), 93 deletions(-) create mode 100644 src/test.rs diff --git a/Cargo.lock b/Cargo.lock index 72166cc..a20fd85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2158,6 +2158,28 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" +dependencies = [ + "getrandom", + "rand", + "uuid-macro-internal", +] + +[[package]] +name = "uuid-macro-internal" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7abb14ae1a50dad63eaa768a458ef43d298cd1bd44951677bd10b732a9ba2a2d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "valuable" version = "0.1.0" @@ -2211,6 +2233,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "uuid", "webtransport-quinn", ] diff --git a/Cargo.toml b/Cargo.toml index 75c7ac8..8df5f93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,8 +32,17 @@ rfc6381-codec = "0.1" clap = { version = "4", features = ["derive"] } gst-plugin-mp4 = { git = "https://github.com/sdroege/gst-plugin-rs", version = "0.12.0-alpha.1" } gst-plugin-fmp4 = { git = "https://github.com/sdroege/gst-plugin-rs", version = "0.12.0-alpha.1" } +# uuid = "1.7.0" [build-dependencies] clap = { version = "4", features = ["derive"] } clap_mangen = "0.2" url = "2" + +[dependencies.uuid] +version = "1.7.0" +features = [ + "v4", # Lets you generate random UUIDs + "fast-rng", # Use a faster (but still sufficiently random) RNG + "macro-diagnostics", # Enable better diagnostics for compile-time UUIDs +] diff --git a/src/media.rs b/src/media.rs index 4aa108f..86cf1e2 100644 --- a/src/media.rs +++ b/src/media.rs @@ -5,7 +5,6 @@ // #![allow(unused_variables)] use anyhow::{self, Context}; use gst::prelude::*; -use std::io::SeekFrom; use std::sync::{Arc, Mutex}; use moq_transport::cache::{broadcast, fragment, segment, track}; @@ -13,15 +12,69 @@ use moq_transport::VarInt; use serde_json::json; use std::cmp::max; use std::collections::HashMap; -use std::io::{Cursor, Seek}; +use std::io::Cursor; use std::time; +use uuid::Uuid; + use mp4::{self, ReadBox}; +const ATOM_TYPE_FTYP: u32 = 1718909296; +const ATOM_TYPE_MOOV: u32 = 1836019574; +const ATOM_TYPE_MOOF: u32 = 1836019558; +const ATOM_TYPE_MDAT: u32 = 1835295092; + +#[derive(Debug)] +struct Mp4Atom { + pub atom_type: u32, + // Includes atom size and type. + pub atom_bytes: Vec, +} + +impl Mp4Atom { + #[allow(dead_code)] + pub fn len(&self) -> usize { + self.atom_bytes.len() + } +} + +#[derive(Debug)] +struct Mp4Parser { + buf: Vec, +} + +impl Mp4Parser { + pub fn new() -> Mp4Parser { + Mp4Parser { buf: Vec::new() } + } + + pub fn add(&mut self, buf: &[u8]) { + self.buf.extend_from_slice(buf); + } + + pub fn pop_atom(&mut self) -> Option { + if self.buf.len() >= 8 { + let atom_size = u32::from_be_bytes(self.buf[0..4].try_into().unwrap()) as usize; + let atom_type = u32::from_be_bytes(self.buf[4..8].try_into().unwrap()); + if self.buf.len() >= atom_size { + let atom_bytes = self.buf.drain(0..atom_size).collect(); + Some(Mp4Atom { + atom_type, + atom_bytes, + }) + } else { + None + } + } else { + None + } + } +} + struct State { // Atoms in init sequence that must be repeated at each key frame. - ftyp_atom: Option>, - moov_atom: Option>, + ftyp_atom: Option, + mp4_parser: Mp4Parser, // bitrate: u64, // width: u64, @@ -47,14 +100,14 @@ impl GST { gstmp4::plugin_register_static().unwrap(); let state = Arc::new(Mutex::new(State { - ftyp_atom: Some(Vec::new()), - moov_atom: Some(Vec::new()), + ftyp_atom: None, // bitrate: 2_048_000, // width: 1280, // height: 720, broadcast: broadcast.to_owned(), catalog: None, init: None, + mp4_parser: Mp4Parser::new(), // Tracks based on their track ID. tracks: None, @@ -66,7 +119,7 @@ impl GST { //FIXME: run one pipeline at a time, then let downstream push accordingly let pipeline = gst::parse::launch( " - audiotestsrc num-buffers=60 ! audio/x-raw,rate=48000 ! avenc_aac ! queue ! mux. \ + videotestsrc num-buffers=60000000 ! video/x-raw,framerate=30/1 ! x264enc ! queue ! mux. \ cmafmux write-mehd=true header-update-mode=update fragment-duration=1 name=mux ! appsink name=sink \ ", ) @@ -158,44 +211,49 @@ impl GST { data.extend_from_slice(map.as_slice()); } + state.mp4_parser.add(&data); + // The current track name - let mut current = None; + let mut current = state.current.clone(); - let mut cursor = Cursor::new(data.to_vec()); - //TODO: Research how the mp4::BoxHeader works... - while let Ok(header) = mp4::BoxHeader::read(&mut cursor.clone()) { - let box_data = vec![0u8; header.size as usize]; + loop { + match state.mp4_parser.pop_atom() { + Some(atom) => { + // println!("atom_size={}, atom_type={}", atom.len(), atom.atom_type); - match header.name { - mp4::BoxType::FtypBox => { - println!("Found 'ftyp' box"); - state.ftyp_atom = Some(box_data); // Store the entire 'ftyp' atom. - } - mp4::BoxType::MoovBox => { - println!("Found 'moov' box"); - let mut init = Vec::new(); // Buffer to store the concatenated 'ftyp' and 'moov' atoms. - let mut broadcast = state.broadcast.clone(); - // Parse the moov box so we can detect the timescales for each track. - // let moov_box_cursor = Cursor::new(box_data.clone()); // Create a cursor for the 'moov' box data. - let moov = mp4::MoovBox::read_box( - &mut cursor.clone(), - // &mut moov_box_cursor.clone(), - header.size, - ) - .expect("could not read moov box"); + // We're going to parse the moov box. + // We have to read the moov box header to correctly advance the cursor for the mp4 crate. + let mut reader = Cursor::new(&atom.atom_bytes); + let header = mp4::BoxHeader::read(&mut reader) + .expect("could not read box header"); - match state.ftyp_atom.as_ref() { - Some(ftyp_atom) => { - // Concatenate 'ftyp' and 'moov' atoms. - init.extend_from_slice(&ftyp_atom); - init.extend_from_slice(&box_data); - // state.moov_atom = Some(init); //FIXME: just realised, this is wrong + match atom.atom_type { + ATOM_TYPE_FTYP => { + //save for later + state.ftyp_atom = Some(atom); + } + ATOM_TYPE_MOOV => { + println!("moov_atom"); + let ftyp = state.ftyp_atom.as_ref().unwrap(); + let mut init = ftyp.atom_bytes.clone(); + init.extend(&atom.atom_bytes); + + // Parse the moov box so we can detect the timescales for each track. + let moov = mp4::MoovBox::read_box( + &mut reader, + header.size, + ) + .expect("could not read the moov box"); + + let uuid = Uuid::new_v4(); + + let init_name = format!("{}.mp4", uuid); // Create the catalog track with a single segment. let mut init_track = broadcast - .create_track("0.mp4") - .expect("could not create track"); + .create_track(&init_name) + .expect("could not create the init broadcast track"); let init_segment = init_track .create_segment(segment::Info { @@ -203,84 +261,86 @@ impl GST { priority: 0, expires: None, }) - .expect("could not create init_segment"); + .expect("could not create init segment"); // Create a single fragment, optionally setting the size - let mut init_fragment = init_segment - .final_fragment(VarInt::ZERO) - .expect("could not create a single fragment"); + let mut init_fragment = + init_segment.final_fragment(VarInt::ZERO).expect("could not create the init fragment"); - init_fragment - .chunk(init.into()) - .expect("could not create a broadcast chunk"); + init_fragment.chunk(init.into()).expect("could not insert the moov+ftyp box into the init fragment"); let mut tracks = HashMap::new(); for trak in &moov.traks { let id = trak.tkhd.track_id; - let name = format!("{}.m4s", id); + let uuid = Uuid::new_v4(); + + let name = format!("{}.m4s", uuid); + // let name = format!("{}.m4s", id); + let timescale = track_timescale(&moov, id); - + // Store the track publisher in a map so we can update it later. let track = broadcast - .create_track(&name) - .expect("could not broadcast the track"); + .create_track(&name) + .expect("could not create a broadcast track"); + let track = Track::new(track, timescale); + tracks.insert(id, track); } - let mut catalog = broadcast - .create_track(".catalog") - .expect("could not create a catalog"); + let uuid = Uuid::new_v4(); - // Create the catalog track - Self::serve_catalog(&mut catalog, &init_track.name, &moov) - .expect("could not serve catalog"); + let catalog_name = format!(".catalog.{}", uuid); - state.catalog = Some(catalog); - state.init = Some(init_track); - state.tracks = Some(tracks); - state.broadcast = broadcast; + let mut catalog = broadcast.create_track(&catalog_name).expect("could not create a catalog"); + + // Create the catalog track + Self::serve_catalog(&mut catalog, &init_track.name, &moov).expect("could not serve the catalog"); + + state.broadcast = broadcast.clone(); + state.catalog = Some(catalog); + state.init= Some(init_track); + state.tracks = Some(tracks); + + } + ATOM_TYPE_MOOF => { + // state.moof_atom = Some(atom); + // println!("moof_atom"); + + let moof = mp4::MoofBox::read_box(&mut reader, header.size).expect("failed to read MP4"); + + // Process the moof. + let fragment = Fragment::new(moof).expect("failed to create a new fragment for moof atom"); + + // Get the track for this moof. + let track = state.tracks.as_mut().unwrap().get_mut(&fragment.track).expect("failed to find track"); + + // Save the track ID for the next iteration, which must be a mdat. + assert!(current.is_none()); + current.replace(fragment.track); + + // Publish the moof header, creating a new segment if it's a keyframe. + track.header(atom.atom_bytes, fragment).expect("failed to publish moof"); + } + ATOM_TYPE_MDAT => { + // println!("mdat_atom"); + // Get the track ID from the previous moof. + let track = current.take().expect("missing moof"); + let track = state.tracks.as_mut().unwrap().get_mut(&track).expect("failed to find track"); + + // Publish the mdat atom. + track.data(atom.atom_bytes).expect("failed to publish mdat"); + } + _ => { + //Skip unkown atoms } - None => {} } } - mp4::BoxType::MoofBox => { - println!("Found 'moof' box"); - - let moof = mp4::MoofBox::read_box(&mut cursor.clone(), header.size) - .expect("failed to read MP4 moof box"); - - // Process the moof. - let fragment = Fragment::new(moof).expect("failed to create a fragment"); - - // Get the track for this moof. - let track = state - .tracks - .unwrap() - .get_mut(&fragment.track) - .context("failed to find track") - .expect("could not get the track from moof atom"); - - // Save the track ID for the next iteration, which must be a mdat. - assert!(current.is_none(), "multiple moof atoms"); - current.replace(fragment.track); - - // Publish the moof header, creating a new segment if it's a keyframe. - track - .header(, fragment) - .context("failed to publish moof")?; - // Process 'moof' box - } - mp4::BoxType::MdatBox => { - println!("Found 'mdat' box"); - } - _ => {} + None => break, } - cursor - .seek(SeekFrom::Current(header.size as i64)) - .expect("Seeking failed"); } Ok(gst::FlowSuccess::Ok) diff --git a/src/test.rs b/src/test.rs new file mode 100644 index 0000000..e69de29