feat: add broadcaster

This commit is contained in:
Wanjohi
2024-02-02 09:29:33 +03:00
parent f7a380399e
commit 77aae6b403
3 changed files with 155 additions and 32 deletions

2
.gitignore vendored
View File

@@ -1,3 +1,5 @@
/target
**/target
*.mp4

Binary file not shown.

View File

@@ -1,12 +1,11 @@
#![allow(dead_code)]
#![allow(unused_imports)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
#![allow(unused_variables)]
// #![allow(dead_code)]
// #![allow(unused_imports)]
// #![allow(unused_mut)]
// #![allow(unused_assignments)]
// #![allow(unused_variables)]
use anyhow::{self, Context};
use gst::prelude::*;
use std::io::SeekFrom;
use std::sync::{Arc, Mutex};
use moq_transport::cache::{broadcast, fragment, segment, track};
@@ -14,7 +13,7 @@ use moq_transport::VarInt;
use serde_json::json;
use std::cmp::max;
use std::collections::HashMap;
use std::io::{Cursor, Seek};
use std::io::Cursor;
use std::time;
use mp4::{self, ReadBox};
@@ -72,12 +71,7 @@ impl Mp4Parser {
struct State {
// Atoms in init sequence that must be repeated at each key frame.
ftyp_atom: Option<Vec<u8>>,
moov_atom: Option<Vec<u8>>,
// bitrate: u64,
// width: u64,
// height: u64,
ftyp_atom: Option<Mp4Atom>,
// We hold on to publisher so we don't close then while media is still being published.
broadcast: broadcast::Publisher,
@@ -100,8 +94,7 @@ impl GST {
gstmp4::plugin_register_static().unwrap();
let state = Arc::new(Mutex::new(State {
ftyp_atom: Some(Vec::new()),
moov_atom: Some(Vec::new()),
ftyp_atom: None,
// bitrate: 2_048_000,
// width: 1280,
// height: 720,
@@ -120,17 +113,18 @@ impl GST {
//FIXME: run one pipeline at a time, then let downstream push accordingly
let pipeline = gst::parse::launch(
"
videotestsrc num-buffers=60 ! video/x-raw,framerate=30/1 ! x264enc ! queue ! mux. \
audiotestsrc num-buffers=60 ! audio/x-raw,rate=48000 ! avenc_aac ! queue ! mux. \
mp4mux fragment-duration=1 name=mux ! appsink name=sink \
filesrc location=meh.mp4 \
! qtdemux \
! h264parse \
! mp4mux fragment-duration=1 streamable=true name=mux ! appsink name=sink \
",
)
.unwrap()
.downcast::<gst::Pipeline>()
.unwrap(); //interleave-time=1 movie-timescale=1
// audiotestsrc num-buffers=60 ! audio/x-raw,rate=48000 ! avenc_aac ! queue ! mux. \
//videotestsrc num-buffers=60 ! video/x-raw,framerate=30/1 ! x264enc ! queue ! mux. \
.unwrap();
//mp4mux fragment-duration=1 name=mux ! filesink location=test.mp4 \
//audiotestsrc num-buffers=6000000 ! audio/x-raw,rate=48000 ! avenc_aac ! queue ! mux. \
//videotestsrc num-buffers=6000000 ! video/x-raw,framerate=30/1 ! x264enc ! queue ! mux. \
let appsink = pipeline
.by_name("sink")
@@ -217,25 +211,152 @@ impl GST {
// We're going to parse the moov box.
// We have to read the moov box header to correctly advance the cursor for the mp4 crate.
// let mut reader = Cursor::new(&atom.atom_bytes);
// let header = mp4::BoxHeader::read(&mut reader)
// .expect("could not read box header");
let mut reader = Cursor::new(&atom.atom_bytes);
let header = mp4::BoxHeader::read(&mut reader)
.expect("could not read box header");
match atom.atom_type {
ATOM_TYPE_FTYP => {
// save for later
println!("ftyp_atom");
// state.ftyp_atom = Some(atom);
// println!("ftyp_atom");
state.ftyp_atom = Some(atom);
}
ATOM_TYPE_MOOV => {
println!("moov_atom");
// println!("moov_atom");
let ftyp = state.ftyp_atom.as_ref().unwrap();
let mut init = ftyp.atom_bytes.clone();
init.extend(&atom.atom_bytes);
// Parse the moov box so we can detect the timescales for each track.
let moov = mp4::MoovBox::read_box(&mut reader, header.size)
.expect("could not read the moov box");
// Create the catalog track with a single segment.
if !state.init.is_some() {
let mut init_track = broadcast
.create_track("0.mp4")
.expect("could not create the init broadcast track");
let init_segment = init_track
.create_segment(segment::Info {
sequence: VarInt::ZERO,
priority: 0,
expires: None,
})
.expect("could not create init segment");
// Create a single fragment, optionally setting the size
let mut init_fragment = init_segment
.final_fragment(VarInt::ZERO)
.expect("could not create the init fragment");
init_fragment.chunk(init.into()).expect("could not insert the moov+ftyp box into the init fragment");
let mut tracks = HashMap::new();
for trak in &moov.traks {
let id = trak.tkhd.track_id;
let name = format!("{}.m4s", id);
// let name = format!("{}.m4s", id);
let timescale = track_timescale(&moov, id);
// Store the track publisher in a map so we can update it later.
let track = broadcast
.create_track(&name)
.expect("could not create a broadcast track");
let track = Track::new(track, timescale);
tracks.insert(id, track);
}
let mut catalog = broadcast
.create_track(".catalog")
.expect("could not create a catalog");
// Create the catalog track
Self::serve_catalog(&mut catalog, &init_track.name, &moov)
.expect("could not serve the catalog");
state.broadcast = broadcast.clone();
state.catalog = Some(catalog);
state.init = Some(init_track);
state.tracks = Some(tracks);
}
}
ATOM_TYPE_MOOF => {
// state.moof_atom = Some(atom);
println!("moof_atom")
// The current track name
let mut current = None;
// println!("moof_atom");
let moof = mp4::MoofBox::read_box(&mut reader, header.size).expect("failed to read MP4");
// Process the moof.
let fragment = Fragment::new(moof)
.expect("failed to create a new fragment for moof atom");
// // Get the track for this moof.
// let track = state
// .tracks
// .as_mut()
// .unwrap()
// .get_mut(&fragment.track)
// .expect("failed to find track");
// // Save the track ID for the next iteration, which must be a mdat.
// assert!(current.is_none());
// current.replace(fragment.track);
let id = {
// Get the track for this moof.
let track = state
.tracks
.as_mut()
.unwrap()
.get_mut(&fragment.track)
.expect("failed to find track");
// Process the track here if necessary.
// ...
// Save the track ID for the next iteration, which must be a mdat.
assert!(current.is_none());
current.replace(fragment.track);
// Publish the moof header, creating a new segment if it's a keyframe.
track
.header(atom.atom_bytes, fragment)
.expect("failed to publish moof");
// The borrow of `state` ends here, so `track` goes out of scope and the mutable borrow ends.
current.clone()
};
// println!("current moof {:?}", id);
state.current = id;
}
ATOM_TYPE_MDAT => {
println!("mdat_atom");
// println!("mdat_atom");
let mut current = state.current.clone();
// println!("current mdat {:?}", current);
// Get the track ID from the previous moof.
let track = current.take().expect("missing moof");
let track = state
.tracks
.as_mut()
.unwrap()
.get_mut(&track)
.expect("failed to find track");
// Publish the mdat atom.
track.data(atom.atom_bytes).expect("failed to publish mdat");
}
_ => {
//Skip unkown atoms
@@ -297,7 +418,7 @@ impl GST {
init_track_name: &str,
moov: &mp4::MoovBox,
) -> Result<(), anyhow::Error> {
println!("serving the catalog");
// println!("serving the catalog");
let segment = track.create_segment(segment::Info {
sequence: VarInt::ZERO,