mirror of
https://github.com/nestriness/warp.git
synced 2025-12-11 09:25:39 +02:00
Serve catalog, add Tracks, add Fragments
This commit is contained in:
34
Cargo.lock
generated
34
Cargo.lock
generated
@@ -464,6 +464,12 @@ dependencies = [
|
||||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "four-cc"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3958af68a31b1d1384d3f39b6aa33eb14b6009065b5ca305ddd9712a4237124f"
|
||||
|
||||
[[package]]
|
||||
name = "futures"
|
||||
version = "0.3.30"
|
||||
@@ -1106,6 +1112,21 @@ dependencies = [
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mp4ra-rust"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "be9daf03b43bf3842962947c62ba40f411e46a58774c60838038f04a67d17626"
|
||||
dependencies = [
|
||||
"four-cc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mpeg4-audio-const"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "96a1fe2275b68991faded2c80aa4a33dba398b77d276038b8f50701a22e55918"
|
||||
|
||||
[[package]]
|
||||
name = "muldiv"
|
||||
version = "1.0.1"
|
||||
@@ -1484,6 +1505,17 @@ version = "0.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
|
||||
|
||||
[[package]]
|
||||
name = "rfc6381-codec"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4395f46a67f0d57c57f6a5361f3a9a0c0183a19cab3998892ecdc003de6d8037"
|
||||
dependencies = [
|
||||
"four-cc",
|
||||
"mp4ra-rust",
|
||||
"mpeg4-audio-const",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.16.20"
|
||||
@@ -2071,9 +2103,11 @@ dependencies = [
|
||||
"moq-transport",
|
||||
"mp4",
|
||||
"quinn",
|
||||
"rfc6381-codec",
|
||||
"rustls",
|
||||
"rustls-native-certs",
|
||||
"rustls-pemfile",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"toml_datetime",
|
||||
"tracing",
|
||||
|
||||
@@ -28,3 +28,5 @@ tracing-subscriber = "0.3"
|
||||
anyhow = { version = "1", features = ["backtrace"] }
|
||||
mp4 = "0.13"
|
||||
moq-transport = { git = "https://github.com/kixelated/moq-rs", version = "0.2.0" }
|
||||
serde_json = "1"
|
||||
rfc6381-codec = "0.1"
|
||||
|
||||
288
src/media.rs
288
src/media.rs
@@ -1,3 +1,4 @@
|
||||
use anyhow::{self,Context};
|
||||
use gst::prelude::*;
|
||||
use gst::ClockTime;
|
||||
|
||||
@@ -8,6 +9,9 @@ use moq_transport::cache::{broadcast, fragment, segment, track};
|
||||
use moq_transport::VarInt;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Cursor;
|
||||
use serde_json::json;
|
||||
use std::cmp::max;
|
||||
use std::time;
|
||||
|
||||
use mp4::{self, ReadBox};
|
||||
|
||||
@@ -107,32 +111,6 @@ struct State {
|
||||
mp4_parser: Mp4Parser,
|
||||
}
|
||||
|
||||
fn probe_encoder(state: Arc<Mutex<State>>, enc: gst::Element, is_video: bool) {
|
||||
enc.static_pad("src").unwrap().add_probe(
|
||||
gst::PadProbeType::EVENT_DOWNSTREAM,
|
||||
move |_pad, info| {
|
||||
let Some(ev) = info.event() else {
|
||||
return gst::PadProbeReturn::Ok;
|
||||
};
|
||||
let gst::EventView::Caps(ev) = ev.view() else {
|
||||
return gst::PadProbeReturn::Ok;
|
||||
};
|
||||
|
||||
let mime = gst_pbutils::codec_utils_caps_get_mime_codec(ev.caps());
|
||||
|
||||
let mut state = state.lock().unwrap();
|
||||
|
||||
if is_video {
|
||||
state.video_enc = Some(mime.unwrap().into())
|
||||
} else {
|
||||
// state.audio_enc
|
||||
}
|
||||
|
||||
gst::PadProbeReturn::Remove
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
pub struct GST {}
|
||||
|
||||
impl GST {
|
||||
@@ -354,9 +332,11 @@ impl GST {
|
||||
}
|
||||
// Handle the case where one or both values are `None`
|
||||
_ => None,
|
||||
}; buffer_ref.set_duration(duration);
|
||||
};
|
||||
buffer_ref.set_duration(duration);
|
||||
buffer_ref.set_offset(state.fragment_offset.unwrap_or(gst_sys::GST_BUFFER_OFFSET_NONE));
|
||||
buffer_ref.set_offset_end(state.fragment_offset_end.unwrap_or(gst_sys::GST_BUFFER_OFFSET_NONE));
|
||||
//this is a header
|
||||
buffer_ref.set_flags(gst::BufferFlags::HEADER);
|
||||
let mut buffer_map = buffer_ref.map_writable().unwrap();
|
||||
let slice = buffer_map.as_mut_slice();
|
||||
@@ -408,6 +388,11 @@ impl GST {
|
||||
tracks.insert(id, track);
|
||||
}
|
||||
|
||||
let mut catalog = broadcast.create_track(".catalog").map_err(|_| gst::FlowError::Error)?;
|
||||
|
||||
// Create the catalog track
|
||||
Self::serve_catalog(&mut catalog, &init_track.name, &moov).map_err(|_| gst::FlowError::Error)?;
|
||||
|
||||
}
|
||||
_ => {
|
||||
log::warn!("Received moov without ftype");
|
||||
@@ -500,6 +485,107 @@ impl GST {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//Copied from https://github.com/kixelated/moq-rs/blob/f64c2e894e4f09873aeef4620dbf20aaabe5d12e/moq-pub/src/media.rs#L127
|
||||
fn serve_catalog(
|
||||
track: &mut track::Publisher,
|
||||
init_track_name: &str,
|
||||
moov: &mp4::MoovBox,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let segment = track.create_segment(segment::Info {
|
||||
sequence: VarInt::ZERO,
|
||||
priority: 0,
|
||||
expires: None,
|
||||
})?;
|
||||
|
||||
let mut tracks = Vec::new();
|
||||
|
||||
for trak in &moov.traks {
|
||||
let mut track = json!({
|
||||
"container": "mp4",
|
||||
"init_track": init_track_name,
|
||||
"data_track": format!("{}.m4s", trak.tkhd.track_id),
|
||||
});
|
||||
|
||||
let stsd = &trak.mdia.minf.stbl.stsd;
|
||||
if let Some(avc1) = &stsd.avc1 {
|
||||
// avc1[.PPCCLL]
|
||||
//
|
||||
// let profile = 0x64;
|
||||
// let constraints = 0x00;
|
||||
// let level = 0x1f;
|
||||
let profile = avc1.avcc.avc_profile_indication;
|
||||
let constraints = avc1.avcc.profile_compatibility; // Not 100% certain here, but it's 0x00 on my current test video
|
||||
let level = avc1.avcc.avc_level_indication;
|
||||
|
||||
let width = avc1.width;
|
||||
let height = avc1.height;
|
||||
|
||||
let codec = rfc6381_codec::Codec::avc1(profile, constraints, level);
|
||||
let codec_str = codec.to_string();
|
||||
|
||||
track["kind"] = json!("video");
|
||||
track["codec"] = json!(codec_str);
|
||||
track["width"] = json!(width);
|
||||
track["height"] = json!(height);
|
||||
} else if let Some(_hev1) = &stsd.hev1 {
|
||||
// TODO https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L106
|
||||
anyhow::bail!("HEVC not yet supported")
|
||||
} else if let Some(mp4a) = &stsd.mp4a {
|
||||
let desc = &mp4a
|
||||
.esds
|
||||
.as_ref()
|
||||
.context("missing esds box for MP4a")?
|
||||
.es_desc
|
||||
.dec_config;
|
||||
|
||||
let codec_str = format!("mp4a.{:02x}.{}", desc.object_type_indication, desc.dec_specific.profile);
|
||||
|
||||
track["kind"] = json!("audio");
|
||||
track["codec"] = json!(codec_str);
|
||||
track["channel_count"] = json!(mp4a.channelcount);
|
||||
track["sample_rate"] = json!(mp4a.samplerate.value());
|
||||
track["sample_size"] = json!(mp4a.samplesize);
|
||||
|
||||
let bitrate = max(desc.max_bitrate, desc.avg_bitrate);
|
||||
if bitrate > 0 {
|
||||
track["bit_rate"] = json!(bitrate);
|
||||
}
|
||||
} else if let Some(vp09) = &stsd.vp09 {
|
||||
// https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L238
|
||||
let vpcc = &vp09.vpcc;
|
||||
let codec_str = format!("vp09.0.{:02x}.{:02x}.{:02x}", vpcc.profile, vpcc.level, vpcc.bit_depth);
|
||||
|
||||
track["kind"] = json!("video");
|
||||
track["codec"] = json!(codec_str);
|
||||
track["width"] = json!(vp09.width); // no idea if this needs to be multiplied
|
||||
track["height"] = json!(vp09.height); // no idea if this needs to be multiplied
|
||||
|
||||
// TODO Test if this actually works; I'm just guessing based on mp4box.js
|
||||
anyhow::bail!("VP9 not yet supported")
|
||||
} else {
|
||||
// TODO add av01 support: https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L251
|
||||
anyhow::bail!("unknown codec for track: {}", trak.tkhd.track_id);
|
||||
}
|
||||
|
||||
tracks.push(track);
|
||||
}
|
||||
|
||||
let catalog = json!({
|
||||
"tracks": tracks
|
||||
});
|
||||
|
||||
let catalog_str = serde_json::to_string_pretty(&catalog)?;
|
||||
log::info!("catalog: {}", catalog_str);
|
||||
|
||||
// Create a single fragment for the segment.
|
||||
let mut fragment = segment.final_fragment(VarInt::ZERO)?;
|
||||
|
||||
// Add the segment and add the fragment.
|
||||
fragment.chunk(catalog_str.into())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct Track {
|
||||
@@ -526,56 +612,128 @@ impl Track {
|
||||
}
|
||||
}
|
||||
|
||||
// pub fn header(&mut self, raw: Vec<u8>, fragment: Fragment) -> anyhow::Result<()> {
|
||||
// if let Some(current) = self.current.as_mut() {
|
||||
// if !fragment.keyframe {
|
||||
// // Use the existing segment
|
||||
// current.chunk(raw.into())?;
|
||||
// return Ok(());
|
||||
// }
|
||||
// }
|
||||
pub fn header(&mut self, raw: Vec<u8>, fragment: Fragment) -> anyhow::Result<()> {
|
||||
if let Some(current) = self.current.as_mut() {
|
||||
if !fragment.keyframe {
|
||||
// Use the existing segment
|
||||
current.chunk(raw.into())?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// // Otherwise make a new segment
|
||||
// Otherwise make a new segment
|
||||
|
||||
// // Compute the timestamp in milliseconds.
|
||||
// // Overflows after 583 million years, so we're fine.
|
||||
// let timestamp: u32 = fragment
|
||||
// .timestamp(self.timescale)
|
||||
// .as_millis()
|
||||
// .try_into()
|
||||
// .context("timestamp too large")?;
|
||||
// Compute the timestamp in milliseconds.
|
||||
// Overflows after 583 million years, so we're fine.
|
||||
let timestamp: u32 = fragment
|
||||
.timestamp(self.timescale)
|
||||
.as_millis()
|
||||
.try_into()
|
||||
.context("timestamp too large")?;
|
||||
|
||||
// // Create a new segment.
|
||||
// let segment = self.track.create_segment(segment::Info {
|
||||
// sequence: VarInt::try_from(self.sequence).context("sequence too large")?,
|
||||
// Create a new segment.
|
||||
let segment = self.track.create_segment(segment::Info {
|
||||
sequence: VarInt::try_from(self.sequence).context("sequence too large")?,
|
||||
|
||||
// // Newer segments are higher priority
|
||||
// priority: u32::MAX.checked_sub(timestamp).context("priority too large")?,
|
||||
// Newer segments are higher priority
|
||||
priority: u32::MAX.checked_sub(timestamp).context("priority too large")?,
|
||||
|
||||
// // Delete segments after 10s.
|
||||
// expires: Some(time::Duration::from_secs(10)),
|
||||
// })?;
|
||||
// Delete segments after 10s.
|
||||
expires: Some(time::Duration::from_secs(10)),
|
||||
})?;
|
||||
|
||||
// // Create a single fragment for the segment that we will keep appending.
|
||||
// let mut fragment = segment.final_fragment(VarInt::ZERO)?;
|
||||
// Create a single fragment for the segment that we will keep appending.
|
||||
let mut fragment = segment.final_fragment(VarInt::ZERO)?;
|
||||
|
||||
// self.sequence += 1;
|
||||
self.sequence += 1;
|
||||
|
||||
// // Insert the raw atom into the segment.
|
||||
// fragment.chunk(raw.into())?;
|
||||
// Insert the raw atom into the segment.
|
||||
fragment.chunk(raw.into())?;
|
||||
|
||||
// // Save for the next iteration
|
||||
// self.current = Some(fragment);
|
||||
// Save for the next iteration
|
||||
self.current = Some(fragment);
|
||||
|
||||
// Ok(())
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// pub fn data(&mut self, raw: Vec<u8>) -> anyhow::Result<()> {
|
||||
// let fragment = self.current.as_mut().context("missing current fragment")?;
|
||||
// fragment.chunk(raw.into())?;
|
||||
pub fn data(&mut self, raw: Vec<u8>) -> anyhow::Result<()> {
|
||||
let fragment = self.current.as_mut().context("missing current fragment")?;
|
||||
fragment.chunk(raw.into())?;
|
||||
|
||||
// Ok(())
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct Fragment {
|
||||
// The track for this fragment.
|
||||
track: u32,
|
||||
|
||||
// The timestamp of the first sample in this fragment, in timescale units.
|
||||
timestamp: u64,
|
||||
|
||||
// True if this fragment is a keyframe.
|
||||
keyframe: bool,
|
||||
}
|
||||
|
||||
impl Fragment {
|
||||
fn new(moof: mp4::MoofBox) -> anyhow::Result<Self> {
|
||||
// We can't split the mdat atom, so this is impossible to support
|
||||
anyhow::ensure!(moof.trafs.len() == 1, "multiple tracks per moof atom");
|
||||
let track = moof.trafs[0].tfhd.track_id;
|
||||
|
||||
// Parse the moof to get some timing information to sleep.
|
||||
let timestamp = sample_timestamp(&moof).expect("couldn't find timestamp");
|
||||
|
||||
// Detect if we should start a new segment.
|
||||
let keyframe = sample_keyframe(&moof);
|
||||
|
||||
Ok(Self {
|
||||
track,
|
||||
timestamp,
|
||||
keyframe,
|
||||
})
|
||||
}
|
||||
|
||||
// Convert from timescale units to a duration.
|
||||
fn timestamp(&self, timescale: u64) -> time::Duration {
|
||||
time::Duration::from_millis(1000 * self.timestamp / timescale)
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_timestamp(moof: &mp4::MoofBox) -> Option<u64> {
|
||||
Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time)
|
||||
}
|
||||
|
||||
fn sample_keyframe(moof: &mp4::MoofBox) -> bool {
|
||||
for traf in &moof.trafs {
|
||||
// TODO trak default flags if this is None
|
||||
let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default();
|
||||
let trun = match &traf.trun {
|
||||
Some(t) => t,
|
||||
None => return false,
|
||||
};
|
||||
|
||||
for i in 0..trun.sample_count {
|
||||
let mut flags = match trun.sample_flags.get(i as usize) {
|
||||
Some(f) => *f,
|
||||
None => default_flags,
|
||||
};
|
||||
|
||||
if i == 0 && trun.first_sample_flags.is_some() {
|
||||
flags = trun.first_sample_flags.unwrap();
|
||||
}
|
||||
|
||||
// https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177
|
||||
let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther
|
||||
let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample
|
||||
|
||||
if keyframe && !non_sync {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
// Find the timescale for the given track.
|
||||
|
||||
Reference in New Issue
Block a user