fix(sink): add `isomp4mux` as it produces the mp4 boxes we need (plus can work with video and audio in parallel)

This commit is contained in:
Wanjohi
2024-01-18 06:49:07 +03:00
parent c6b8eb795b
commit a86067ab35
4 changed files with 263 additions and 192 deletions

59
Cargo.lock generated
View File

@@ -711,7 +711,7 @@ checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253"
[[package]]
name = "gio-sys"
version = "0.19.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core#df9d809aae072d6aead9400a03cdf92bd68e7c07"
source = "git+https://github.com/gtk-rs/gtk-rs-core#5613c9d139aa8ad7f142be782c6c75f21f1880f2"
dependencies = [
"glib-sys",
"gobject-sys",
@@ -723,7 +723,7 @@ dependencies = [
[[package]]
name = "glib"
version = "0.19.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core#df9d809aae072d6aead9400a03cdf92bd68e7c07"
source = "git+https://github.com/gtk-rs/gtk-rs-core#5613c9d139aa8ad7f142be782c6c75f21f1880f2"
dependencies = [
"bitflags 2.4.1",
"futures-channel",
@@ -745,7 +745,7 @@ dependencies = [
[[package]]
name = "glib-macros"
version = "0.19.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core#df9d809aae072d6aead9400a03cdf92bd68e7c07"
source = "git+https://github.com/gtk-rs/gtk-rs-core#5613c9d139aa8ad7f142be782c6c75f21f1880f2"
dependencies = [
"heck",
"proc-macro-crate",
@@ -758,7 +758,7 @@ dependencies = [
[[package]]
name = "glib-sys"
version = "0.19.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core#df9d809aae072d6aead9400a03cdf92bd68e7c07"
source = "git+https://github.com/gtk-rs/gtk-rs-core#5613c9d139aa8ad7f142be782c6c75f21f1880f2"
dependencies = [
"libc",
"system-deps",
@@ -779,7 +779,7 @@ dependencies = [
[[package]]
name = "gobject-sys"
version = "0.19.0"
source = "git+https://github.com/gtk-rs/gtk-rs-core#df9d809aae072d6aead9400a03cdf92bd68e7c07"
source = "git+https://github.com/gtk-rs/gtk-rs-core#5613c9d139aa8ad7f142be782c6c75f21f1880f2"
dependencies = [
"glib-sys",
"libc",
@@ -789,7 +789,21 @@ dependencies = [
[[package]]
name = "gst-plugin-fmp4"
version = "0.12.0-alpha.1"
source = "git+https://github.com/sdroege/gst-plugin-rs#d9397ef1743ac92e84784d00b93dc0877d44f966"
source = "git+https://github.com/sdroege/gst-plugin-rs#80b58f3b45d2c3adee5684888937a3aa30e30cd7"
dependencies = [
"anyhow",
"gst-plugin-version-helper",
"gstreamer",
"gstreamer-audio",
"gstreamer-base",
"gstreamer-pbutils",
"gstreamer-video",
]
[[package]]
name = "gst-plugin-mp4"
version = "0.12.0-alpha.1"
source = "git+https://github.com/sdroege/gst-plugin-rs#80b58f3b45d2c3adee5684888937a3aa30e30cd7"
dependencies = [
"anyhow",
"gst-plugin-version-helper",
@@ -803,7 +817,7 @@ dependencies = [
[[package]]
name = "gst-plugin-version-helper"
version = "0.8.0"
source = "git+https://github.com/sdroege/gst-plugin-rs#d9397ef1743ac92e84784d00b93dc0877d44f966"
source = "git+https://github.com/sdroege/gst-plugin-rs#80b58f3b45d2c3adee5684888937a3aa30e30cd7"
dependencies = [
"chrono",
"toml_edit 0.21.0",
@@ -812,7 +826,7 @@ dependencies = [
[[package]]
name = "gstreamer"
version = "0.22.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#0b4c602c6fc96d530dee4b53c21980f40609975e"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#c376bfac4d388562063fd7ea269a1857def3033b"
dependencies = [
"cfg-if",
"futures-channel",
@@ -861,7 +875,7 @@ dependencies = [
[[package]]
name = "gstreamer-audio"
version = "0.22.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#0b4c602c6fc96d530dee4b53c21980f40609975e"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#c376bfac4d388562063fd7ea269a1857def3033b"
dependencies = [
"cfg-if",
"glib",
@@ -875,7 +889,7 @@ dependencies = [
[[package]]
name = "gstreamer-audio-sys"
version = "0.22.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#0b4c602c6fc96d530dee4b53c21980f40609975e"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#c376bfac4d388562063fd7ea269a1857def3033b"
dependencies = [
"glib-sys",
"gobject-sys",
@@ -888,7 +902,7 @@ dependencies = [
[[package]]
name = "gstreamer-base"
version = "0.22.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#0b4c602c6fc96d530dee4b53c21980f40609975e"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#c376bfac4d388562063fd7ea269a1857def3033b"
dependencies = [
"atomic_refcell",
"cfg-if",
@@ -901,7 +915,7 @@ dependencies = [
[[package]]
name = "gstreamer-base-sys"
version = "0.22.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#0b4c602c6fc96d530dee4b53c21980f40609975e"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#c376bfac4d388562063fd7ea269a1857def3033b"
dependencies = [
"glib-sys",
"gobject-sys",
@@ -913,7 +927,7 @@ dependencies = [
[[package]]
name = "gstreamer-pbutils"
version = "0.22.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#0b4c602c6fc96d530dee4b53c21980f40609975e"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#c376bfac4d388562063fd7ea269a1857def3033b"
dependencies = [
"glib",
"gstreamer",
@@ -927,7 +941,7 @@ dependencies = [
[[package]]
name = "gstreamer-pbutils-sys"
version = "0.22.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#0b4c602c6fc96d530dee4b53c21980f40609975e"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#c376bfac4d388562063fd7ea269a1857def3033b"
dependencies = [
"glib-sys",
"gobject-sys",
@@ -941,7 +955,7 @@ dependencies = [
[[package]]
name = "gstreamer-sys"
version = "0.22.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#0b4c602c6fc96d530dee4b53c21980f40609975e"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#c376bfac4d388562063fd7ea269a1857def3033b"
dependencies = [
"glib-sys",
"gobject-sys",
@@ -952,7 +966,7 @@ dependencies = [
[[package]]
name = "gstreamer-video"
version = "0.22.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#0b4c602c6fc96d530dee4b53c21980f40609975e"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#c376bfac4d388562063fd7ea269a1857def3033b"
dependencies = [
"cfg-if",
"futures-channel",
@@ -967,7 +981,7 @@ dependencies = [
[[package]]
name = "gstreamer-video-sys"
version = "0.22.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#0b4c602c6fc96d530dee4b53c21980f40609975e"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#c376bfac4d388562063fd7ea269a1857def3033b"
dependencies = [
"glib-sys",
"gobject-sys",
@@ -1444,11 +1458,11 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro-crate"
version = "2.0.0"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e8366a6159044a37876a2b9817124296703c586a5c92e2c53751fa06d8d43e8"
checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284"
dependencies = [
"toml_edit 0.20.2",
"toml_edit 0.21.0",
]
[[package]]
@@ -1860,9 +1874,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.11.2"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970"
checksum = "2593d31f82ead8df961d8bd23a64c2ccf2eb5dd34b0a34bfb4dd54011c72009e"
[[package]]
name = "socket2"
@@ -2218,6 +2232,7 @@ dependencies = [
"clap_mangen",
"env_logger",
"gst-plugin-fmp4",
"gst-plugin-mp4",
"gstreamer",
"gstreamer-app",
"gstreamer-pbutils",

View File

@@ -31,6 +31,7 @@ moq-transport = { git = "https://github.com/kixelated/moq-rs", version = "0.2.0"
serde_json = "1"
rfc6381-codec = "0.1"
clap = { version = "4", features = ["derive"] }
gst-plugin-mp4 = { git = "https://github.com/sdroege/gst-plugin-rs", version = "0.12.0-alpha.1" }
[build-dependencies]
clap = { version = "4", features = ["derive"] }

View File

@@ -2,6 +2,7 @@
#![allow(unused_imports)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_variables)]
use anyhow::{self, Context};
use gst::prelude::*;
use gst::ClockTime;
@@ -132,38 +133,39 @@ impl GST {
//FIXME: add audio pipeline
gstfmp4::plugin_register_static()?;
gstmp4::plugin_register_static().unwrap();
let pipeline = gst::Pipeline::default();
// let pipeline = gst::Pipeline::default();
let state = Arc::new(Mutex::new(State {
ftype_atom: None,
moov_atom: None,
fragment_pts: None,
fragment_dts: None,
fragment_max_pts_plus_duration: None,
fragment_offset: None,
fragment_offset_end: None,
fragment_buffer_flags: gst::BufferFlags::DELTA_UNIT,
bitrate: 2_048_000,
width: 1280,
height: 720,
broadcast: broadcast.to_owned(),
mp4_parser: Mp4Parser::new(),
// wave: "sine".to_string(),
catalog: None,
init: None,
// let state = Arc::new(Mutex::new(State {
// ftype_atom: None,
// moov_atom: None,
// fragment_pts: None,
// fragment_dts: None,
// fragment_max_pts_plus_duration: None,
// fragment_offset: None,
// fragment_offset_end: None,
// fragment_buffer_flags: gst::BufferFlags::DELTA_UNIT,
// bitrate: 2_048_000,
// width: 1280,
// height: 720,
// broadcast: broadcast.to_owned(),
// mp4_parser: Mp4Parser::new(),
// // wave: "sine".to_string(),
// catalog: None,
// init: None,
// Tracks based on their track ID.
tracks: None,
current: None,
}));
// // Tracks based on their track ID.
// tracks: None,
// current: None,
// }));
let state_lock = state.lock().unwrap();
// let state_lock = state.lock().unwrap();
let video_src = gst::ElementFactory::make("videotestsrc")
.property("is-live", true)
.property("num-buffers", 360i32)
.build()?;
// let video_src = gst::ElementFactory::make("v4l2src")
// // .property("is-live", true)
// .property("num-buffers", 500i32)
// .build()?;
// let raw_capsfilter = gst::ElementFactory::make("capsfilter")
// .property(
@@ -177,168 +179,221 @@ impl GST {
// )
// .build()?;
// let timeoverlay = gst::ElementFactory::make("timeoverlay").build()?;
// // let timeoverlay = gst::ElementFactory::make("timeoverlay").build()?;
// let videoconvert = gst::ElementFactory::make("videoconvert").build()?;
let video_enc = gst::ElementFactory::make("x264enc")
// .property("bframes", 0u32)
.property("key-int-max", 60u32)
.property("bitrate", state_lock.bitrate as u32 / 1000u32)
.property_from_str("tune", "zerolatency")
.build()?;
// // let video_enc = gst::ElementFactory::make("x264enc")
// // // .property("bframes", 0u32)
// // .property("key-int-max", 60u32)
// // .property("bitrate", state_lock.bitrate as u32 / 1000u32)
// // .property_from_str("tune", "zerolatency")
// // .build()?;
// let h264_capsfilter = gst::ElementFactory::make("capsfilter")
// .property(
// "caps",
// gst::Caps::builder("video/x-h264")
// .field("profile", "main")
// .build(),
// )
// .build()?;
// // let h264_capsfilter = gst::ElementFactory::make("capsfilter")
// // .property(
// // "caps",
// // gst::Caps::builder("video/x-h264")
// // .field("profile", "main")
// // .build(),
// // )
// // .build()?;
// let audio_src = gst::ElementFactory::make("audiotestsrc")
// .property("is-live", true)
// .property_from_str("wave", &state.wave)
// .build()?;
// // let audio_src = gst::ElementFactory::make("audiotestsrc")
// // .property("is-live", true)
// // .property_from_str("wave", &state.wave)
// // .build()?;
// let audio_enc = gst::ElementFactory::make("avenc_aac").build()?;
// // let audio_enc = gst::ElementFactory::make("avenc_aac").build()?;
let mux = gst::ElementFactory::make("cmafmux")
.property_from_str("header-update-mode", "update")
.property("write-mehd", true)
.property("fragment-duration", 1.mseconds())
.build()?;
// let mux = gst::ElementFactory::make("mp4mux")
// // let mux = gst::ElementFactory::make("cmafmux")
// // .property_from_str("header-update-mode", "update")
// // .property("write-mehd", false)
// // .property("fragment-duration", 1.mseconds())
// // .build()?;
// let mux = gst::ElementFactory::make("qtmux")
// .property_from_str("streamable", "true")
// .property("fragment-duration", 1u32 )
// // .property("fragment-duration", 1u32 )
// .build()?;
let appsink = gst_app::AppSink::builder().buffer_list(true).build();
// let appsink = gst_app::AppSink::builder().buffer_list(true).build();
pipeline.add_many([
&video_src,
// &raw_capsfilter,
// &timeoverlay,
// &videoconvert,
&video_enc,
// &h264_capsfilter,
// &audio_src,
// &audio_enc,
&mux,
appsink.upcast_ref(),
])?;
// pipeline.add_many([
// &video_src,
// &raw_capsfilter,
// // &timeoverlay,
// &videoconvert,
// // &video_enc,
// // &h264_capsfilter,
// // &audio_src,
// // &audio_enc,
// &mux,
// appsink.upcast_ref(),
// ])?;
gst::Element::link_many([
&video_src,
// &raw_capsfilter,
// &timeoverlay,
// &videoconvert,
&video_enc,
// &h264_capsfilter,
// &audio_src,
// &audio_enc,
&mux,
appsink.upcast_ref(),
])?;
// gst::Element::link_many([
// &video_src,
// &raw_capsfilter,
// // &timeoverlay,
// &videoconvert,
// // &video_enc,
// // &h264_capsfilter,
// // &audio_src,
// // &audio_enc,
// &mux,
// appsink.upcast_ref(),
// ])?;
//drop the choke hold here
drop(state_lock);
// drop(state_lock);
// let pipeline = gst::parse::launch("videotestsrc num-buffers=2500 ! timecodestamper ! video/x-raw,format=I420,width=1280,height=720,framerate=30/1 ! timeoverlay ! x264enc bframes=0 bitrate=2048 ! video/x-h264,profile=main ! cmafmux fragment-duration=1 header-update-mode=update write-mehd=true ! appsink name=sink").unwrap().downcast::<gst::Pipeline>().unwrap();
// let pipeline = gst::parse::launch("videotestsrc num-buffers=2500 ! x264enc ! isomp4mux ! appsink name=sink").unwrap().downcast::<gst::Pipeline>().unwrap();
let pipeline = gst::parse::launch(
"videotestsrc num-buffers=99 ! x264enc ! mux. \
audiotestsrc num-buffers=140 ! avenc_aac ! mux. \
isomp4mux name=mux ! filesink location=test.mp4 name=sink \
",
).unwrap().downcast::<gst::Pipeline>().unwrap();
appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |sink| {
let sample = sink
.pull_sample()
.with_context(|| "Error pulling sample")
.map_err(|e| {
eprintln!("{:?}", e);
gst::FlowError::Eos
})?;
// The muxer only outputs non-empty buffer lists
let mut buffer_list = sample.buffer_list_owned().expect("no buffer list");
// let appsink = pipeline
// .by_name("sink")
// .unwrap()
// .dynamic_cast::<gst_app::AppSink>()
// .unwrap();
println!("buffer is empty {:?}", buffer_list.is_empty());
assert!(!buffer_list.is_empty());
// appsink.set_buffer_list(true);
println!("bufferlist is this long {:?}", buffer_list.len());
let mut first = buffer_list.get(0).unwrap();
// appsink.set_callbacks(
// gst_app::AppSinkCallbacks::builder()
// .new_sample(move |sink| {
// let sample = sink
// .pull_sample()
// .with_context(|| "Error pulling sample")
// .map_err(|e| {
// eprintln!("{:?}", e);
// gst::FlowError::Eos
// })?;
// // The muxer only outputs non-empty buffer lists
// let mut buffer_list = sample.buffer_list_owned().expect("no buffer list");
// Each list contains a full segment, i.e. does not start with a DELTA_UNIT
println!(
"first buffer has a delta unit {:?}",
first.flags().contains(gst::BufferFlags::DELTA_UNIT)
);
assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT));
// println!("buffer is empty {:?}", buffer_list.is_empty());
// assert!(!buffer_list.is_empty());
let mut state = state.lock().unwrap();
// println!("bufferlist is this long {:?}", buffer_list.len());
// let mut first = buffer_list.get(0).unwrap();
//FIXME: The mp4_parser fails because we are parsing too many mp4_atoms in parallel, so let us do it sequentially.
// // Each list contains a full segment, i.e. does not start with a DELTA_UNIT
// println!(
// "first buffer has a delta unit {:?}",
// first.flags().contains(gst::BufferFlags::DELTA_UNIT)
// );
// assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT));
// If the buffer has the DISCONT and HEADER flag set then it contains the media
// header, i.e. the `ftyp`, `moov` and other media boxes.
//
// This might be the initial header or the updated header at the end of the stream.
if first
.flags()
.contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER)
{
println!("writing header");
let map = first
.map_readable()
.with_context(|| "Error mapping buffer to readable")
.map_err(|e| {
eprintln!("{:?}", e);
// // let mut state = state.lock().unwrap();
gst::FlowError::Error
})?;
// // let mut mp4_parser = Mp4Parser::new();
state.mp4_parser.add(map.as_ref());
loop {
match state.mp4_parser.pop_atom() {
Some(atom) => {
println!(
"atom_size={}, atom_type={}",
atom.len(),
atom.atom_type
);
match atom.atom_type {
ATOM_TYPE_FTYPE => {
println!("FTYP");
},
ATOM_TYPE_MOOV => {
println!("MOOV");
},
ATOM_TYPE_MOOF => {
println!("MOOF");
},
ATOM_TYPE_MDAT => {
println!("MDAT");
},
_ => {
log::warn!("Unknown atom type {:?}", atom);
println!("Unknown atom type {:?}", atom);
}
}
},
None => break,
}
}
drop(map);
// //FIXME: The mp4_parser fails because we are parsing too many mp4_atoms in parallel, so let us do it sequentially.
// Remove the header from the buffer list
buffer_list.make_mut().remove(0, 1);
// // If the buffer has the DISCONT and HEADER flag set then it contains the media
// // header, i.e. the `ftyp`, `moov` and other media boxes.
// //
// // This might be the initial header or the updated header at the end of the stream.
// if first
// .flags()
// .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER)
// {
// println!("writing header");
// let map = first
// .map_readable()
// .with_context(|| "Error mapping buffer to readable")
// .map_err(|e| {
// eprintln!("{:?}", e);
// If the list is now empty then it only contained the media header and nothing
// else.
if buffer_list.is_empty() {
return Ok(gst::FlowSuccess::Ok);
}
// gst::FlowError::Error
// })?;
// // Create a a Vec<u8> object from the data slice
// let bytes = map.as_slice().to_vec();
// Otherwise get the next buffer and continue working with that.
first = buffer_list.get(0).unwrap();
}
// // We're going to parse the moov box.
// // We have to read the moov box header to correctly advance the cursor for the mp4 crate.
// let mut moov_reader = Cursor::new(bytes.clone());
// let header = mp4::BoxHeader::read(&mut moov_reader)
// .map_err(|_| gst::FlowError::Error)?;
// match header.name {
// mp4::BoxType::MoofBox => {
// println!("Moof")
// }
// mp4::BoxType::MdatBox => {
// println!("Mdat")
// }
// mp4::BoxType::MoovBox => {
// println!("Moov")
// }
// mp4::BoxType::FtypBox => {
// println!("Ftyp")
// }
// _ => {
// // Skip unknown atoms
// }
// }
// drop(map);
// // Remove the header from the buffer list
// buffer_list.make_mut().remove(0, 1);
// // If the list is now empty then it only contained the media header and nothing
// // else.
// if buffer_list.is_empty() {
// return Ok(gst::FlowSuccess::Ok);
// }
// // Otherwise get the next buffer and continue working with that.
// first = buffer_list.get(0).unwrap();
// }
// let map = first
// .map_readable()
// .with_context(|| "Error mapping buffer to readable")
// .map_err(|e| {
// eprintln!("{:?}", e);
// gst::FlowError::Error
// })?;
// // Create a a Vec<u8> object from the data slice
// let bytes = map.as_slice().to_vec();
// // We're going to parse the moov box.
// // We have to read the moov box header to correctly advance the cursor for the mp4 crate.
// let mut moov_reader = Cursor::new(bytes.clone());
// let header = mp4::BoxHeader::read(&mut moov_reader)
// .map_err(|_| gst::FlowError::Error)?;
// match header.name {
// mp4::BoxType::MoofBox => {
// println!("Moof")
// }
// mp4::BoxType::MdatBox => {
// println!("Mdat")
// }
// mp4::BoxType::MoovBox => {
// println!("Moov")
// }
// mp4::BoxType::FtypBox => {
// println!("Ftyp")
// }
// _ => {
// // Skip unknown atoms
// println!("Unknown atom")
// }
// }
// If the buffer only has the HEADER flag set then this is a segment header that is
// followed by one or more actual media buffers.
// assert!(first.flags().contains(gst::BufferFlags::HEADER));
// for buffer in &*buffer_list {
// let map = buffer
@@ -658,13 +713,13 @@ impl GST {
// }
// }
Ok(gst::FlowSuccess::Ok)
})
.eos(move |_sink| {
unreachable!();
})
.build(),
);
// Ok(gst::FlowSuccess::Ok)
// })
// .eos(move |_sink| {
// unreachable!();
// })
// .build(),
// );
pipeline.set_state(gst::State::Playing)?;

BIN
test.mp4 Normal file

Binary file not shown.