Add `moq-transport`

This commit is contained in:
Wanjohi
2023-12-13 09:16:21 +03:00
parent 19a61e8717
commit 5f42cb26c2
4 changed files with 1438 additions and 49 deletions

View File

@@ -1,9 +1,9 @@
use glib::subclass::prelude::*;
use gst::ClockTime;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::ClockTime;
#[allow(unused_imports)]
use gst::{gst_debug, gst_error, gst_warning, gst_info, gst_log, gst_trace};
use gst::{gst_debug, gst_error, gst_info, gst_log, gst_trace, gst_warning};
use once_cell::sync::Lazy;
use std::convert::TryInto;
use std::sync::Mutex;
@@ -11,7 +11,8 @@ use std::sync::Mutex;
pub const ELEMENT_NAME: &str = "MoqSink";
const ELEMENT_CLASS_NAME: &str = "MoqSink";
const ELEMENT_LONG_NAME: &str = "Media Over Quic Sink";
const ELEMENT_DESCRIPTION: &str = "This element accepts fragmented MP4 input from mp4mux and publishes them to a Moq-Relay.";
const ELEMENT_DESCRIPTION: &str =
"This element accepts fragmented MP4 input from mp4mux and publishes them to a Moq-Relay.";
const ELEMENT_AUTHOR: &str = "Wanjohi Ryan <wanjohiryan33@gmail.com>";
const DEBUG_CATEGORY: &str = ELEMENT_NAME;
@@ -40,9 +41,7 @@ struct Mp4Parser {
impl Mp4Parser {
pub fn new() -> Mp4Parser {
Mp4Parser {
buf: Vec::new(),
}
Mp4Parser { buf: Vec::new() }
}
pub fn add(&mut self, buf: &[u8]) {
@@ -105,9 +104,7 @@ struct StartedState {
}
enum State {
Started {
state: StartedState,
}
Started { state: StartedState },
}
impl Default for State {
@@ -124,7 +121,7 @@ impl Default for State {
fragment_offset: None,
fragment_offset_end: None,
fragment_buffer_flags: gst::BufferFlags::DELTA_UNIT,
}
},
}
}
}
@@ -154,10 +151,7 @@ impl MoqSink {
let mut state = self.state.lock().unwrap();
let state = match *state {
State::Started {
ref mut state,
..
} => state,
State::Started { ref mut state, .. } => state,
};
let map = buffer.map_readable().map_err(|_| {
@@ -179,20 +173,36 @@ impl MoqSink {
state.fragment_dts = buffer.dts();
}
let pts_plus_duration = buffer.pts() + buffer.duration();
if state.fragment_max_pts_plus_duration.is_none() || state.fragment_max_pts_plus_duration < pts_plus_duration {
if state.fragment_max_pts_plus_duration.is_none()
|| state.fragment_max_pts_plus_duration < pts_plus_duration
{
state.fragment_max_pts_plus_duration = pts_plus_duration;
}
if buffer.offset() != gst::BUFFER_OFFSET_NONE && (state.fragment_offset.is_none() || state.fragment_offset.unwrap() > buffer.offset()) {
if buffer.offset() != gst::BUFFER_OFFSET_NONE
&& (state.fragment_offset.is_none()
|| state.fragment_offset.unwrap() > buffer.offset())
{
state.fragment_offset = Some(buffer.offset());
}
if buffer.offset_end() != gst::BUFFER_OFFSET_NONE && (state.fragment_offset_end.is_none() || state.fragment_offset_end.unwrap() < buffer.offset_end()) {
if buffer.offset_end() != gst::BUFFER_OFFSET_NONE
&& (state.fragment_offset_end.is_none()
|| state.fragment_offset_end.unwrap() < buffer.offset_end())
{
state.fragment_offset_end = Some(buffer.offset_end());
}
if state.fragment_buffer_flags.contains(gst::BufferFlags::DELTA_UNIT) && !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
state.fragment_buffer_flags.remove(gst::BufferFlags::DELTA_UNIT);
if state
.fragment_buffer_flags
.contains(gst::BufferFlags::DELTA_UNIT)
&& !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT)
{
state
.fragment_buffer_flags
.remove(gst::BufferFlags::DELTA_UNIT);
}
if buffer.flags().contains(gst::BufferFlags::DISCONT) {
state.fragment_buffer_flags.insert(gst::BufferFlags::DISCONT);
state
.fragment_buffer_flags
.insert(gst::BufferFlags::DISCONT);
}
gst_trace!(CAT, obj: pad, "Updated state={:?}", state);
}
@@ -205,52 +215,75 @@ impl MoqSink {
ATOM_TYPE_FTYPE => {
state.ftype_atom = Some(atom);
gst_log!(CAT, obj: pad, "ftype_atom={:?}", state.ftype_atom);
},
}
ATOM_TYPE_MOOV => {
state.moov_atom = Some(atom);
gst_log!(CAT, obj: pad, "moov_atom={:?}", state.moov_atom);
},
}
ATOM_TYPE_MOOF => {
state.moof_atom = Some(atom);
gst_log!(CAT, obj: pad, "moof_atom={:?}", state.moof_atom);
},
}
ATOM_TYPE_MDAT => {
let mdat_atom = atom;
match (state.ftype_atom.as_ref(), state.moov_atom.as_ref(), state.moof_atom.as_ref()) {
match (
state.ftype_atom.as_ref(),
state.moov_atom.as_ref(),
state.moof_atom.as_ref(),
) {
(Some(ftype_atom), Some(moov_atom), Some(moof_atom)) => {
let include_header = !state.fragment_buffer_flags.contains(gst::BufferFlags::DELTA_UNIT);
let include_header = !state
.fragment_buffer_flags
.contains(gst::BufferFlags::DELTA_UNIT);
let header_len = if include_header {
ftype_atom.len() + moov_atom.len()
} else {
0
};
let output_buf_len = header_len + moof_atom.len() + mdat_atom.len();
let output_buf_len =
header_len + moof_atom.len() + mdat_atom.len();
gst_log!(CAT, obj: pad, "Pushing buffer; include_header={}, ftype.len={}, moov.len={}, moof.len={}, mdat.len={}",
include_header, ftype_atom.len(), moov_atom.len(), moof_atom.len(), mdat_atom.len());
let mut gst_buffer = gst::Buffer::with_size(output_buf_len).unwrap();
let mut gst_buffer =
gst::Buffer::with_size(output_buf_len).unwrap();
{
let buffer_ref = gst_buffer.get_mut().unwrap();
buffer_ref.set_pts(state.fragment_pts);
buffer_ref.set_dts(state.fragment_dts);
let duration = state.fragment_max_pts_plus_duration - state.fragment_pts;
let duration = state.fragment_max_pts_plus_duration
- state.fragment_pts;
buffer_ref.set_duration(duration);
buffer_ref.set_offset(state.fragment_offset.unwrap_or(gst::BUFFER_OFFSET_NONE));
buffer_ref.set_offset_end(state.fragment_offset_end.unwrap_or(gst::BUFFER_OFFSET_NONE));
buffer_ref.set_offset(
state
.fragment_offset
.unwrap_or(gst::BUFFER_OFFSET_NONE),
);
buffer_ref.set_offset_end(
state
.fragment_offset_end
.unwrap_or(gst::BUFFER_OFFSET_NONE),
);
buffer_ref.set_flags(state.fragment_buffer_flags);
let mut buffer_map = buffer_ref.map_writable().unwrap();
let slice = buffer_map.as_mut_slice();
let mut pos = 0;
if include_header {
slice[pos..pos+ftype_atom.len()].copy_from_slice(&ftype_atom.atom_bytes);
slice[pos..pos + ftype_atom.len()]
.copy_from_slice(&ftype_atom.atom_bytes);
pos += ftype_atom.len();
slice[pos..pos+moov_atom.len()].copy_from_slice(&moov_atom.atom_bytes);
slice[pos..pos + moov_atom.len()]
.copy_from_slice(&moov_atom.atom_bytes);
pos += moov_atom.len();
}
slice[pos..pos+moof_atom.len()].copy_from_slice(&moof_atom.atom_bytes);
slice[pos..pos + moof_atom.len()]
.copy_from_slice(&moof_atom.atom_bytes);
pos += moof_atom.len();
slice[pos..pos+mdat_atom.len()].copy_from_slice(&mdat_atom.atom_bytes);
slice[pos..pos + mdat_atom.len()]
.copy_from_slice(&mdat_atom.atom_bytes);
pos += mdat_atom.len();
assert_eq!(pos, output_buf_len);
//FIXME: Work on the Json here, instead of redoing it in a new method.
}
// Clear fragment variables.
state.fragment_pts = ClockTime::none();
@@ -262,21 +295,21 @@ impl MoqSink {
// Push new buffer.
gst_log!(CAT, obj: pad, "Pushing buffer {:?}", gst_buffer);
// let _ = self.srcpad.push(gst_buffer)?;
self.send_to_network(gst_buffer)?;
},
// self.send_to_network(gst_buffer)?;
}
_ => {
gst_warning!(CAT, obj: pad, "Received mdat without ftype, moov, or moof");
},
}
}
},
}
_ => {
gst_warning!(CAT, obj: pad, "Unknown atom type {:?}", atom);
},
}
}
},
}
None => break,
}
};
}
gst_trace!(CAT, obj: element, "sink_chain: END: state={:?}", state);
Ok(gst::FlowSuccess::Ok)
}
@@ -285,12 +318,17 @@ impl MoqSink {
self.srcpad.push_event(event)
}
fn sink_query(&self, _pad: &gst::Pad, _element: &super::MoqSink, query: &mut gst::QueryRef) -> bool {
fn sink_query(
&self,
_pad: &gst::Pad,
_element: &super::MoqSink,
query: &mut gst::QueryRef,
) -> bool {
self.srcpad.peer_query(query)
}
fn send_to_network(&self, data: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
fn send_to_network(&self, buffer: gst::Buffer) -> Result<(), Box<dyn std::error::Error>> {
//Let this be our publisher
}
}
@@ -301,7 +339,6 @@ impl ObjectSubclass for MoqSink {
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.pad_template("src").unwrap();
let srcpad = gst::Pad::builder_with_template(&templ, Some("src"))
.event_function(|pad, parent, event| {
@@ -342,7 +379,7 @@ impl ElementImpl for MoqSink {
"Generic",
ELEMENT_DESCRIPTION,
ELEMENT_AUTHOR,
)
)
});
Some(&*ELEMENT_METADATA)
}
@@ -361,4 +398,204 @@ impl ElementImpl for MoqSink {
});
PAD_TEMPLATES.as_ref()
}
}
}
//Moq
pub struct Publisher {
track: track::Publisher,
}
impl Publisher {
pub fn new(track: track::Publisher) -> Self {
Self { track }
}
pub async fn run(mut self) -> anyhow::Result<()> {
let start = Utc::now();
let mut now = start;
// Just for fun, don't start at zero.
let mut sequence = start.minute();
loop {
let segment = self
.track
.create_segment(segment::Info {
sequence: VarInt::from_u32(sequence),
priority: 0,
expires: Some(time::Duration::from_secs(60)),
})
.context("failed to create minute segment")?;
sequence += 1;
tokio::spawn(async move {
if let Err(err) = Self::send_segment(segment, now).await {
log::warn!("failed to send minute: {:?}", err);
}
});
let next = now + chrono::Duration::minutes(1);
let next = next.with_second(0).unwrap().with_nanosecond(0).unwrap();
let delay = (next - now).to_std().unwrap();
tokio::time::sleep(delay).await;
now = next; // just assume we didn't undersleep
}
}
async fn send_segment(
mut segment: segment::Publisher,
mut now: DateTime<Utc>,
) -> anyhow::Result<()> {
// Everything but the second.
let base = now.format("%Y-%m-%d %H:%M:").to_string();
segment
.fragment(VarInt::ZERO, base.len())?
.chunk(base.clone().into())
.context("failed to write base")?;
loop {
let delta = now.format("%S").to_string();
let sequence = VarInt::from_u32(now.second() + 1);
segment
.fragment(sequence, delta.len())?
.chunk(delta.clone().into())
.context("failed to write delta")?;
println!("{}{}", base, delta);
let next = now + chrono::Duration::seconds(1);
let next = next.with_nanosecond(0).unwrap();
let delay = (next - now).to_std().unwrap();
tokio::time::sleep(delay).await;
// Get the current time again to check if we overslept
let next = Utc::now();
if next.minute() != now.minute() {
return Ok(());
}
now = next;
}
}
}
//Sends to remote
pub struct Sender {
track: track::Publisher,
}
impl Sender {
pub fn new(track: track::Publisher) -> Self {
Self { track }
}
pub async fn start(mut self) -> anyhow::Result<()> {
// Disable tracing so we don't get a bunch of Quinn spam.
let tracer = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::WARN)
.finish();
tracing::subscriber::set_global_default(tracer).unwrap();
let config = cli::Config::parse();
// Create a list of acceptable root certificates.
let mut roots = rustls::RootCertStore::empty();
if config.tls_root.is_empty() {
// Add the platform's native root certificates.
for cert in
rustls_native_certs::load_native_certs().context("could not load platform certs")?
{
roots
.add(&rustls::Certificate(cert.0))
.context("failed to add root cert")?;
}
} else {
// Add the specified root certificates.
for root in &config.tls_root {
let root = fs::File::open(root).context("failed to open root cert file")?;
let mut root = io::BufReader::new(root);
let root = rustls_pemfile::certs(&mut root).context("failed to read root cert")?;
anyhow::ensure!(root.len() == 1, "expected a single root cert");
let root = rustls::Certificate(root[0].to_owned());
roots.add(&root).context("failed to add root cert")?;
}
}
let mut tls_config = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(roots)
.with_no_client_auth();
// Allow disabling TLS verification altogether.
if config.tls_disable_verify {
let noop = NoCertificateVerification {};
tls_config
.dangerous()
.set_certificate_verifier(Arc::new(noop));
}
}
async fn send_segment(
mut segment: segment::Publisher,
mut now: DateTime<Utc>,
) -> anyhow::Result<()> {
// Everything but the second.
let base = now.format("%Y-%m-%d %H:%M:").to_string();
segment
.fragment(VarInt::ZERO, base.len())?
.chunk(base.clone().into())
.context("failed to write base")?;
loop {
let delta = now.format("%S").to_string();
let sequence = VarInt::from_u32(now.second() + 1);
segment
.fragment(sequence, delta.len())?
.chunk(delta.clone().into())
.context("failed to write delta")?;
println!("{}{}", base, delta);
let next = now + chrono::Duration::seconds(1);
let next = next.with_nanosecond(0).unwrap();
let delay = (next - now).to_std().unwrap();
tokio::time::sleep(delay).await;
// Get the current time again to check if we overslept
let next = Utc::now();
if next.minute() != now.minute() {
return Ok(());
}
now = next;
}
}
}
pub struct NoCertificateVerification {}
impl rustls::client::ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}