diff --git a/src/media.rs b/src/media.rs index 8c8b84b..801e405 100644 --- a/src/media.rs +++ b/src/media.rs @@ -1,16 +1,12 @@ -#![allow(dead_code)] -#![allow(unused_imports)] -#![allow(unused_mut)] -#![allow(unused_assignments)] -#![allow(unused_variables)] +// #![allow(dead_code)] +// #![allow(unused_imports)] +// #![allow(unused_mut)] +// #![allow(unused_assignments)] +// #![allow(unused_variables)] use anyhow::{self, Context}; use gst::prelude::*; -use gst::ClockTime; -use gst_app::glib; -use gst_app::gst_base; use std::io::SeekFrom; use std::sync::{Arc, Mutex}; -use tokio::io::AsyncReadExt; use moq_transport::cache::{broadcast, fragment, segment, track}; use moq_transport::VarInt; @@ -20,95 +16,16 @@ use std::collections::HashMap; use std::io::{Cursor, Seek}; use std::time; -use mp4::{self, ReadBox}; - -const ATOM_TYPE_FTYPE: u32 = 1718909296; -const ATOM_TYPE_MOOV: u32 = 1836019574; -const ATOM_TYPE_MOOF: u32 = 1836019558; -const ATOM_TYPE_MDAT: u32 = 1835295092; - -#[derive(Debug)] -struct Mp4Atom { - pub atom_type: u32, - // Includes atom size and type. - pub atom_bytes: Vec, -} - -impl Mp4Atom { - pub fn len(&self) -> usize { - self.atom_bytes.len() - } -} - -#[derive(Debug)] -struct Mp4Parser { - buf: Vec, -} - -impl Mp4Parser { - pub fn new() -> Mp4Parser { - Mp4Parser { buf: Vec::new() } - } - - pub fn add(&mut self, buf: &[u8]) { - self.buf.extend_from_slice(buf); - } - - // Returns true if all or part of an MDAT body has been added. - pub fn have_mdat(&self) -> bool { - if self.buf.len() > 8 { - let atom_type = u32::from_be_bytes(self.buf[4..8].try_into().unwrap()); - atom_type == ATOM_TYPE_MDAT - } else { - false - } - } - - pub fn pop_atom(&mut self) -> Option { - if self.buf.len() >= 8 { - let atom_size = u32::from_be_bytes(self.buf[0..4].try_into().unwrap()) as usize; - let atom_type = u32::from_be_bytes(self.buf[4..8].try_into().unwrap()); - if self.buf.len() >= atom_size { - let mut atom_bytes = Vec::with_capacity(atom_size); - // TODO: Swap vectors? - atom_bytes.extend_from_slice(&self.buf[0..atom_size]); - // assert_eq!(self.buf.len(), atom_size); - self.buf.clear(); - Some(Mp4Atom { - atom_type, - atom_bytes, - }) - } else { - None - } - } else { - None - } - } -} +use mp4::{self, BoxHeader}; struct State { // Atoms in init sequence that must be repeated at each key frame. - ftype_atom: Option, - moov_atom: Option, - // Below members that track current fragment (moof, mdat). - /// Minimum PTS in fragment. - fragment_pts: Option, - /// Minimum DTS in fragment. - fragment_dts: Option, - /// Maximum PTS + duration in fragment. - fragment_max_pts_plus_duration: Option, - /// Minimum offset in fragment. - fragment_offset: Option, - /// Maximum offset_end in fragment. - fragment_offset_end: Option, - fragment_buffer_flags: gst::BufferFlags, + ftyp_atom: Option>, + moov_atom: Option>, bitrate: u64, width: u64, height: u64, - // wave: String, - mp4_parser: Mp4Parser, // We hold on to publisher so we don't close then while media is still being published. broadcast: broadcast::Publisher, @@ -129,120 +46,23 @@ impl GST { gstfmp4::plugin_register_static()?; gstmp4::plugin_register_static().unwrap(); - // let pipeline = gst::Pipeline::default(); + let state = Arc::new(Mutex::new(State { + ftyp_atom: Some(Vec::new()), + moov_atom: Some(Vec::new()), + bitrate: 2_048_000, + width: 1280, + height: 720, + broadcast: broadcast.to_owned(), + catalog: None, + init: None, - // let state = Arc::new(Mutex::new(State { - // ftype_atom: None, - // moov_atom: None, - // fragment_pts: None, - // fragment_dts: None, - // fragment_max_pts_plus_duration: None, - // fragment_offset: None, - // fragment_offset_end: None, - // fragment_buffer_flags: gst::BufferFlags::DELTA_UNIT, - // bitrate: 2_048_000, - // width: 1280, - // height: 720, - // broadcast: broadcast.to_owned(), - // mp4_parser: Mp4Parser::new(), - // // wave: "sine".to_string(), - // catalog: None, - // init: None, - - // // Tracks based on their track ID. - // tracks: None, - // current: None, - // })); + // Tracks based on their track ID. + tracks: None, + current: None, + })); // let state_lock = state.lock().unwrap(); - // let video_src = gst::ElementFactory::make("v4l2src") - // // .property("is-live", true) - // .property("num-buffers", 500i32) - // .build()?; - - // let raw_capsfilter = gst::ElementFactory::make("capsfilter") - // .property( - // "caps", - // gst_video::VideoCapsBuilder::new() - // .format(gst_video::VideoFormat::I420) - // .width(state_lock.width as i32) - // .height(state_lock.height as i32) - // .framerate(30.into()) - // .build(), - // ) - // .build()?; - - // // let timeoverlay = gst::ElementFactory::make("timeoverlay").build()?; - - // let videoconvert = gst::ElementFactory::make("videoconvert").build()?; - - // // let video_enc = gst::ElementFactory::make("x264enc") - // // // .property("bframes", 0u32) - // // .property("key-int-max", 60u32) - // // .property("bitrate", state_lock.bitrate as u32 / 1000u32) - // // .property_from_str("tune", "zerolatency") - // // .build()?; - - // // let h264_capsfilter = gst::ElementFactory::make("capsfilter") - // // .property( - // // "caps", - // // gst::Caps::builder("video/x-h264") - // // .field("profile", "main") - // // .build(), - // // ) - // // .build()?; - - // // let audio_src = gst::ElementFactory::make("audiotestsrc") - // // .property("is-live", true) - // // .property_from_str("wave", &state.wave) - // // .build()?; - - // // let audio_enc = gst::ElementFactory::make("avenc_aac").build()?; - - // // let mux = gst::ElementFactory::make("cmafmux") - // // .property_from_str("header-update-mode", "update") - // // .property("write-mehd", false) - // // .property("fragment-duration", 1.mseconds()) - // // .build()?; - - // let mux = gst::ElementFactory::make("qtmux") - // .property_from_str("streamable", "true") - // // .property("fragment-duration", 1u32 ) - // .build()?; - - // let appsink = gst_app::AppSink::builder().buffer_list(true).build(); - - // pipeline.add_many([ - // &video_src, - // &raw_capsfilter, - // // &timeoverlay, - // &videoconvert, - // // &video_enc, - // // &h264_capsfilter, - // // &audio_src, - // // &audio_enc, - // &mux, - // appsink.upcast_ref(), - // ])?; - - // gst::Element::link_many([ - // &video_src, - // &raw_capsfilter, - // // &timeoverlay, - // &videoconvert, - // // &video_enc, - // // &h264_capsfilter, - // // &audio_src, - // // &audio_enc, - // &mux, - // appsink.upcast_ref(), - // ])?; - - //drop the choke hold here - // drop(state_lock); - // let pipeline = gst::parse::launch("videotestsrc num-buffers=2500 ! timecodestamper ! video/x-raw,format=I420,width=1280,height=720,framerate=30/1 ! timeoverlay ! x264enc bframes=0 bitrate=2048 ! video/x-h264,profile=main ! cmafmux fragment-duration=1 header-update-mode=update write-mehd=true ! appsink name=sink").unwrap().downcast::().unwrap(); - // let pipeline = gst::parse::launch("videotestsrc num-buffers=2500 ! x264enc ! isomp4mux ! appsink name=sink").unwrap().downcast::().unwrap(); //FIXME: run one pipeline at a time, then let downstream push accordingly let pipeline = gst::parse::launch( " @@ -253,9 +73,9 @@ impl GST { .unwrap() .downcast::() .unwrap(); //interleave-time=1 movie-timescale=1 - // audiotestsrc num-buffers=60 ! audio/x-raw,rate=48000 ! avenc_aac ! queue ! mux. \ + // audiotestsrc num-buffers=60 ! audio/x-raw,rate=48000 ! avenc_aac ! queue ! mux. \ - //videotestsrc num-buffers=60 ! video/x-raw,framerate=30/1 ! x264enc ! queue ! mux. \ + //videotestsrc num-buffers=60 ! video/x-raw,framerate=30/1 ! x264enc ! queue ! mux. \ let appsink = pipeline .by_name("sink") @@ -283,7 +103,6 @@ impl GST { match query.view_mut() { gst::QueryViewMut::Seeking(q) => { let format = q.format(); - use gst::Format::Bytes; //https://github.com/Kurento/gstreamer/blob/f2553fb153edeeecc2f4f74fca996c74dc8210df/plugins/elements/gstfilesink.c#L494 match format { gst::Format::Bytes | gst::Format::Default => { @@ -317,6 +136,7 @@ impl GST { note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace Got error from /GstPipeline:pipeline0/GstAppSink:sink: Panicked: no buffer list () */ + let mut state = state.lock().unwrap(); let sample = sink .pull_sample() @@ -324,7 +144,7 @@ impl GST { .map_err(|e| gst::FlowError::Eos)?; // The muxer only outputs non-empty buffer lists - let mut buffer_list = sample.buffer_list_owned().expect("no buffer list"); + let buffer_list = sample.buffer_list_owned().expect("no buffer list"); assert!(!buffer_list.is_empty()); @@ -341,33 +161,33 @@ impl GST { let mut cursor = Cursor::new(data.to_vec()); while let Ok(header) = mp4::BoxHeader::read(&mut cursor.clone()) { + let box_data = vec![0u8; header.size as usize]; + match header.name { + mp4::BoxType::FtypBox => { + println!("Found 'ftyp' box"); + state.ftyp_atom = Some(box_data); // Store the entire 'ftyp' atom. + } + mp4::BoxType::MoovBox => { + println!("Found 'moov' box"); + let mut init_segment = Vec::new(); // Buffer to store the concatenated 'ftyp' and 'moov' atoms. + match state.ftyp_atom.as_ref() { + Some(ftyp_atom) => { + // Concatenate 'ftyp' and 'moov' atoms. + init_segment.extend_from_slice(&ftyp_atom); + init_segment.extend_from_slice(&box_data); + state.moov_atom = Some(init_segment) + } + None => {} + } + } mp4::BoxType::MoofBox => { println!("Found 'moof' box"); // Process 'moof' box } - mp4::BoxType::MdatBox => { println!("Found 'mdat' box"); - // Process 'mdat' box } - mp4::BoxType::EmsgBox => { - println!("Found 'emsg' box"); - // Process 'mdat' box - } - mp4::BoxType::FreeBox => { - println!("Found 'free' box"); - // Process 'mdat' box - } - mp4::BoxType::FtypBox => { - println!("Found 'ftyp' box"); - // Process 'mdat' box - } - mp4::BoxType::MoovBox => { - println!("Found 'moov' box"); - // Process 'mdat' box - } - // Handle other boxes if needed _ => {} } cursor