diff --git a/.gitignore b/.gitignore index f6a98cc..c2427c7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ /target **/target + +*.mp4 diff --git a/output.mp4 b/output.mp4 deleted file mode 100644 index 6cc6016..0000000 Binary files a/output.mp4 and /dev/null differ diff --git a/src/media.rs b/src/media.rs index 8a9fcf8..560fb29 100644 --- a/src/media.rs +++ b/src/media.rs @@ -1,12 +1,11 @@ -#![allow(dead_code)] -#![allow(unused_imports)] -#![allow(unused_mut)] -#![allow(unused_assignments)] -#![allow(unused_variables)] +// #![allow(dead_code)] +// #![allow(unused_imports)] +// #![allow(unused_mut)] +// #![allow(unused_assignments)] +// #![allow(unused_variables)] use anyhow::{self, Context}; use gst::prelude::*; -use std::io::SeekFrom; use std::sync::{Arc, Mutex}; use moq_transport::cache::{broadcast, fragment, segment, track}; @@ -14,7 +13,7 @@ use moq_transport::VarInt; use serde_json::json; use std::cmp::max; use std::collections::HashMap; -use std::io::{Cursor, Seek}; +use std::io::Cursor; use std::time; use mp4::{self, ReadBox}; @@ -72,12 +71,7 @@ impl Mp4Parser { struct State { // Atoms in init sequence that must be repeated at each key frame. - ftyp_atom: Option>, - moov_atom: Option>, - - // bitrate: u64, - // width: u64, - // height: u64, + ftyp_atom: Option, // We hold on to publisher so we don't close then while media is still being published. broadcast: broadcast::Publisher, @@ -100,8 +94,7 @@ impl GST { gstmp4::plugin_register_static().unwrap(); let state = Arc::new(Mutex::new(State { - ftyp_atom: Some(Vec::new()), - moov_atom: Some(Vec::new()), + ftyp_atom: None, // bitrate: 2_048_000, // width: 1280, // height: 720, @@ -120,17 +113,18 @@ impl GST { //FIXME: run one pipeline at a time, then let downstream push accordingly let pipeline = gst::parse::launch( " - videotestsrc num-buffers=60 ! video/x-raw,framerate=30/1 ! x264enc ! queue ! mux. \ - audiotestsrc num-buffers=60 ! audio/x-raw,rate=48000 ! avenc_aac ! queue ! mux. \ - mp4mux fragment-duration=1 name=mux ! appsink name=sink \ + filesrc location=meh.mp4 \ + ! qtdemux \ + ! h264parse \ + ! mp4mux fragment-duration=1 streamable=true name=mux ! appsink name=sink \ ", ) .unwrap() .downcast::() - .unwrap(); //interleave-time=1 movie-timescale=1 - // audiotestsrc num-buffers=60 ! audio/x-raw,rate=48000 ! avenc_aac ! queue ! mux. \ - - //videotestsrc num-buffers=60 ! video/x-raw,framerate=30/1 ! x264enc ! queue ! mux. \ + .unwrap(); + //mp4mux fragment-duration=1 name=mux ! filesink location=test.mp4 \ + //audiotestsrc num-buffers=6000000 ! audio/x-raw,rate=48000 ! avenc_aac ! queue ! mux. \ + //videotestsrc num-buffers=6000000 ! video/x-raw,framerate=30/1 ! x264enc ! queue ! mux. \ let appsink = pipeline .by_name("sink") @@ -217,25 +211,152 @@ impl GST { // We're going to parse the moov box. // We have to read the moov box header to correctly advance the cursor for the mp4 crate. - // let mut reader = Cursor::new(&atom.atom_bytes); - // let header = mp4::BoxHeader::read(&mut reader) - // .expect("could not read box header"); + let mut reader = Cursor::new(&atom.atom_bytes); + let header = mp4::BoxHeader::read(&mut reader) + .expect("could not read box header"); match atom.atom_type { ATOM_TYPE_FTYP => { // save for later - println!("ftyp_atom"); - // state.ftyp_atom = Some(atom); + // println!("ftyp_atom"); + state.ftyp_atom = Some(atom); } ATOM_TYPE_MOOV => { - println!("moov_atom"); + // println!("moov_atom"); + let ftyp = state.ftyp_atom.as_ref().unwrap(); + let mut init = ftyp.atom_bytes.clone(); + init.extend(&atom.atom_bytes); + + // Parse the moov box so we can detect the timescales for each track. + let moov = mp4::MoovBox::read_box(&mut reader, header.size) + .expect("could not read the moov box"); + + // Create the catalog track with a single segment. + if !state.init.is_some() { + let mut init_track = broadcast + .create_track("0.mp4") + .expect("could not create the init broadcast track"); + + let init_segment = init_track + .create_segment(segment::Info { + sequence: VarInt::ZERO, + priority: 0, + expires: None, + }) + .expect("could not create init segment"); + + // Create a single fragment, optionally setting the size + let mut init_fragment = init_segment + .final_fragment(VarInt::ZERO) + .expect("could not create the init fragment"); + + init_fragment.chunk(init.into()).expect("could not insert the moov+ftyp box into the init fragment"); + + let mut tracks = HashMap::new(); + + for trak in &moov.traks { + let id = trak.tkhd.track_id; + + let name = format!("{}.m4s", id); + // let name = format!("{}.m4s", id); + + let timescale = track_timescale(&moov, id); + + // Store the track publisher in a map so we can update it later. + let track = broadcast + .create_track(&name) + .expect("could not create a broadcast track"); + + let track = Track::new(track, timescale); + + tracks.insert(id, track); + } + + + let mut catalog = broadcast + .create_track(".catalog") + .expect("could not create a catalog"); + + // Create the catalog track + Self::serve_catalog(&mut catalog, &init_track.name, &moov) + .expect("could not serve the catalog"); + + state.broadcast = broadcast.clone(); + state.catalog = Some(catalog); + state.init = Some(init_track); + state.tracks = Some(tracks); + } + } ATOM_TYPE_MOOF => { - // state.moof_atom = Some(atom); - println!("moof_atom") + // The current track name + let mut current = None; + + // println!("moof_atom"); + let moof = mp4::MoofBox::read_box(&mut reader, header.size).expect("failed to read MP4"); + + // Process the moof. + let fragment = Fragment::new(moof) + .expect("failed to create a new fragment for moof atom"); + + // // Get the track for this moof. + // let track = state + // .tracks + // .as_mut() + // .unwrap() + // .get_mut(&fragment.track) + // .expect("failed to find track"); + + // // Save the track ID for the next iteration, which must be a mdat. + // assert!(current.is_none()); + // current.replace(fragment.track); + let id = { + + // Get the track for this moof. + let track = state + .tracks + .as_mut() + .unwrap() + .get_mut(&fragment.track) + .expect("failed to find track"); + + // Process the track here if necessary. + // ... + + // Save the track ID for the next iteration, which must be a mdat. + assert!(current.is_none()); + current.replace(fragment.track); + + + // Publish the moof header, creating a new segment if it's a keyframe. + track + .header(atom.atom_bytes, fragment) + .expect("failed to publish moof"); + + // The borrow of `state` ends here, so `track` goes out of scope and the mutable borrow ends. + current.clone() + }; + + // println!("current moof {:?}", id); + + state.current = id; } ATOM_TYPE_MDAT => { - println!("mdat_atom"); + // println!("mdat_atom"); + let mut current = state.current.clone(); + // println!("current mdat {:?}", current); + + // Get the track ID from the previous moof. + let track = current.take().expect("missing moof"); + let track = state + .tracks + .as_mut() + .unwrap() + .get_mut(&track) + .expect("failed to find track"); + + // Publish the mdat atom. + track.data(atom.atom_bytes).expect("failed to publish mdat"); } _ => { //Skip unkown atoms @@ -297,7 +418,7 @@ impl GST { init_track_name: &str, moov: &mp4::MoovBox, ) -> Result<(), anyhow::Error> { - println!("serving the catalog"); + // println!("serving the catalog"); let segment = track.create_segment(segment::Info { sequence: VarInt::ZERO,