feat(sink): process moov and ftyp atoms accordingly

This commit is contained in:
Wanjohi
2024-01-29 09:09:14 +03:00
parent bc968037cd
commit dee6e07f24

View File

@@ -1,16 +1,12 @@
#![allow(dead_code)]
#![allow(unused_imports)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_variables)]
// #![allow(dead_code)]
// #![allow(unused_imports)]
// #![allow(unused_mut)]
// #![allow(unused_assignments)]
// #![allow(unused_variables)]
use anyhow::{self, Context};
use gst::prelude::*;
use gst::ClockTime;
use gst_app::glib;
use gst_app::gst_base;
use std::io::SeekFrom;
use std::sync::{Arc, Mutex};
use tokio::io::AsyncReadExt;
use moq_transport::cache::{broadcast, fragment, segment, track};
use moq_transport::VarInt;
@@ -20,95 +16,16 @@ use std::collections::HashMap;
use std::io::{Cursor, Seek};
use std::time;
use mp4::{self, ReadBox};
const ATOM_TYPE_FTYPE: u32 = 1718909296;
const ATOM_TYPE_MOOV: u32 = 1836019574;
const ATOM_TYPE_MOOF: u32 = 1836019558;
const ATOM_TYPE_MDAT: u32 = 1835295092;
#[derive(Debug)]
struct Mp4Atom {
pub atom_type: u32,
// Includes atom size and type.
pub atom_bytes: Vec<u8>,
}
impl Mp4Atom {
pub fn len(&self) -> usize {
self.atom_bytes.len()
}
}
#[derive(Debug)]
struct Mp4Parser {
buf: Vec<u8>,
}
impl Mp4Parser {
pub fn new() -> Mp4Parser {
Mp4Parser { buf: Vec::new() }
}
pub fn add(&mut self, buf: &[u8]) {
self.buf.extend_from_slice(buf);
}
// Returns true if all or part of an MDAT body has been added.
pub fn have_mdat(&self) -> bool {
if self.buf.len() > 8 {
let atom_type = u32::from_be_bytes(self.buf[4..8].try_into().unwrap());
atom_type == ATOM_TYPE_MDAT
} else {
false
}
}
pub fn pop_atom(&mut self) -> Option<Mp4Atom> {
if self.buf.len() >= 8 {
let atom_size = u32::from_be_bytes(self.buf[0..4].try_into().unwrap()) as usize;
let atom_type = u32::from_be_bytes(self.buf[4..8].try_into().unwrap());
if self.buf.len() >= atom_size {
let mut atom_bytes = Vec::with_capacity(atom_size);
// TODO: Swap vectors?
atom_bytes.extend_from_slice(&self.buf[0..atom_size]);
// assert_eq!(self.buf.len(), atom_size);
self.buf.clear();
Some(Mp4Atom {
atom_type,
atom_bytes,
})
} else {
None
}
} else {
None
}
}
}
use mp4::{self, BoxHeader};
struct State {
// Atoms in init sequence that must be repeated at each key frame.
ftype_atom: Option<Mp4Atom>,
moov_atom: Option<Mp4Atom>,
// Below members that track current fragment (moof, mdat).
/// Minimum PTS in fragment.
fragment_pts: Option<ClockTime>,
/// Minimum DTS in fragment.
fragment_dts: Option<ClockTime>,
/// Maximum PTS + duration in fragment.
fragment_max_pts_plus_duration: Option<ClockTime>,
/// Minimum offset in fragment.
fragment_offset: Option<u64>,
/// Maximum offset_end in fragment.
fragment_offset_end: Option<u64>,
fragment_buffer_flags: gst::BufferFlags,
ftyp_atom: Option<Vec<u8>>,
moov_atom: Option<Vec<u8>>,
bitrate: u64,
width: u64,
height: u64,
// wave: String,
mp4_parser: Mp4Parser,
// We hold on to publisher so we don't close then while media is still being published.
broadcast: broadcast::Publisher,
@@ -129,120 +46,23 @@ impl GST {
gstfmp4::plugin_register_static()?;
gstmp4::plugin_register_static().unwrap();
// let pipeline = gst::Pipeline::default();
let state = Arc::new(Mutex::new(State {
ftyp_atom: Some(Vec::new()),
moov_atom: Some(Vec::new()),
bitrate: 2_048_000,
width: 1280,
height: 720,
broadcast: broadcast.to_owned(),
catalog: None,
init: None,
// let state = Arc::new(Mutex::new(State {
// ftype_atom: None,
// moov_atom: None,
// fragment_pts: None,
// fragment_dts: None,
// fragment_max_pts_plus_duration: None,
// fragment_offset: None,
// fragment_offset_end: None,
// fragment_buffer_flags: gst::BufferFlags::DELTA_UNIT,
// bitrate: 2_048_000,
// width: 1280,
// height: 720,
// broadcast: broadcast.to_owned(),
// mp4_parser: Mp4Parser::new(),
// // wave: "sine".to_string(),
// catalog: None,
// init: None,
// // Tracks based on their track ID.
// tracks: None,
// current: None,
// }));
// Tracks based on their track ID.
tracks: None,
current: None,
}));
// let state_lock = state.lock().unwrap();
// let video_src = gst::ElementFactory::make("v4l2src")
// // .property("is-live", true)
// .property("num-buffers", 500i32)
// .build()?;
// let raw_capsfilter = gst::ElementFactory::make("capsfilter")
// .property(
// "caps",
// gst_video::VideoCapsBuilder::new()
// .format(gst_video::VideoFormat::I420)
// .width(state_lock.width as i32)
// .height(state_lock.height as i32)
// .framerate(30.into())
// .build(),
// )
// .build()?;
// // let timeoverlay = gst::ElementFactory::make("timeoverlay").build()?;
// let videoconvert = gst::ElementFactory::make("videoconvert").build()?;
// // let video_enc = gst::ElementFactory::make("x264enc")
// // // .property("bframes", 0u32)
// // .property("key-int-max", 60u32)
// // .property("bitrate", state_lock.bitrate as u32 / 1000u32)
// // .property_from_str("tune", "zerolatency")
// // .build()?;
// // let h264_capsfilter = gst::ElementFactory::make("capsfilter")
// // .property(
// // "caps",
// // gst::Caps::builder("video/x-h264")
// // .field("profile", "main")
// // .build(),
// // )
// // .build()?;
// // let audio_src = gst::ElementFactory::make("audiotestsrc")
// // .property("is-live", true)
// // .property_from_str("wave", &state.wave)
// // .build()?;
// // let audio_enc = gst::ElementFactory::make("avenc_aac").build()?;
// // let mux = gst::ElementFactory::make("cmafmux")
// // .property_from_str("header-update-mode", "update")
// // .property("write-mehd", false)
// // .property("fragment-duration", 1.mseconds())
// // .build()?;
// let mux = gst::ElementFactory::make("qtmux")
// .property_from_str("streamable", "true")
// // .property("fragment-duration", 1u32 )
// .build()?;
// let appsink = gst_app::AppSink::builder().buffer_list(true).build();
// pipeline.add_many([
// &video_src,
// &raw_capsfilter,
// // &timeoverlay,
// &videoconvert,
// // &video_enc,
// // &h264_capsfilter,
// // &audio_src,
// // &audio_enc,
// &mux,
// appsink.upcast_ref(),
// ])?;
// gst::Element::link_many([
// &video_src,
// &raw_capsfilter,
// // &timeoverlay,
// &videoconvert,
// // &video_enc,
// // &h264_capsfilter,
// // &audio_src,
// // &audio_enc,
// &mux,
// appsink.upcast_ref(),
// ])?;
//drop the choke hold here
// drop(state_lock);
// let pipeline = gst::parse::launch("videotestsrc num-buffers=2500 ! timecodestamper ! video/x-raw,format=I420,width=1280,height=720,framerate=30/1 ! timeoverlay ! x264enc bframes=0 bitrate=2048 ! video/x-h264,profile=main ! cmafmux fragment-duration=1 header-update-mode=update write-mehd=true ! appsink name=sink").unwrap().downcast::<gst::Pipeline>().unwrap();
// let pipeline = gst::parse::launch("videotestsrc num-buffers=2500 ! x264enc ! isomp4mux ! appsink name=sink").unwrap().downcast::<gst::Pipeline>().unwrap();
//FIXME: run one pipeline at a time, then let downstream push accordingly
let pipeline = gst::parse::launch(
"
@@ -253,9 +73,9 @@ impl GST {
.unwrap()
.downcast::<gst::Pipeline>()
.unwrap(); //interleave-time=1 movie-timescale=1
// audiotestsrc num-buffers=60 ! audio/x-raw,rate=48000 ! avenc_aac ! queue ! mux. \
// audiotestsrc num-buffers=60 ! audio/x-raw,rate=48000 ! avenc_aac ! queue ! mux. \
//videotestsrc num-buffers=60 ! video/x-raw,framerate=30/1 ! x264enc ! queue ! mux. \
//videotestsrc num-buffers=60 ! video/x-raw,framerate=30/1 ! x264enc ! queue ! mux. \
let appsink = pipeline
.by_name("sink")
@@ -283,7 +103,6 @@ impl GST {
match query.view_mut() {
gst::QueryViewMut::Seeking(q) => {
let format = q.format();
use gst::Format::Bytes;
//https://github.com/Kurento/gstreamer/blob/f2553fb153edeeecc2f4f74fca996c74dc8210df/plugins/elements/gstfilesink.c#L494
match format {
gst::Format::Bytes | gst::Format::Default => {
@@ -317,6 +136,7 @@ impl GST {
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Got error from /GstPipeline:pipeline0/GstAppSink:sink: Panicked: no buffer list ()
*/
let mut state = state.lock().unwrap();
let sample = sink
.pull_sample()
@@ -324,7 +144,7 @@ impl GST {
.map_err(|e| gst::FlowError::Eos)?;
// The muxer only outputs non-empty buffer lists
let mut buffer_list = sample.buffer_list_owned().expect("no buffer list");
let buffer_list = sample.buffer_list_owned().expect("no buffer list");
assert!(!buffer_list.is_empty());
@@ -341,33 +161,33 @@ impl GST {
let mut cursor = Cursor::new(data.to_vec());
while let Ok(header) = mp4::BoxHeader::read(&mut cursor.clone()) {
let box_data = vec![0u8; header.size as usize];
match header.name {
mp4::BoxType::FtypBox => {
println!("Found 'ftyp' box");
state.ftyp_atom = Some(box_data); // Store the entire 'ftyp' atom.
}
mp4::BoxType::MoovBox => {
println!("Found 'moov' box");
let mut init_segment = Vec::new(); // Buffer to store the concatenated 'ftyp' and 'moov' atoms.
match state.ftyp_atom.as_ref() {
Some(ftyp_atom) => {
// Concatenate 'ftyp' and 'moov' atoms.
init_segment.extend_from_slice(&ftyp_atom);
init_segment.extend_from_slice(&box_data);
state.moov_atom = Some(init_segment)
}
None => {}
}
}
mp4::BoxType::MoofBox => {
println!("Found 'moof' box");
// Process 'moof' box
}
mp4::BoxType::MdatBox => {
println!("Found 'mdat' box");
// Process 'mdat' box
}
mp4::BoxType::EmsgBox => {
println!("Found 'emsg' box");
// Process 'mdat' box
}
mp4::BoxType::FreeBox => {
println!("Found 'free' box");
// Process 'mdat' box
}
mp4::BoxType::FtypBox => {
println!("Found 'ftyp' box");
// Process 'mdat' box
}
mp4::BoxType::MoovBox => {
println!("Found 'moov' box");
// Process 'mdat' box
}
// Handle other boxes if needed
_ => {}
}
cursor