mirror of
https://github.com/nestriness/warp.git
synced 2025-12-11 09:25:39 +02:00
Refactor the codebase
This commit is contained in:
7
gst-warp-sink/Cargo.lock
generated
7
gst-warp-sink/Cargo.lock
generated
@@ -1081,6 +1081,12 @@ dependencies = [
|
||||
"m3u8-rs",
|
||||
"moq-transport",
|
||||
"mp4",
|
||||
"quinn",
|
||||
"rustls",
|
||||
"rustls-native-certs",
|
||||
"rustls-pemfile",
|
||||
"url",
|
||||
"webtransport-quinn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1526,6 +1532,7 @@ version = "0.21.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba"
|
||||
dependencies = [
|
||||
"log",
|
||||
"ring 0.17.7",
|
||||
"rustls-webpki",
|
||||
"sct",
|
||||
|
||||
@@ -20,6 +20,17 @@ chrono = "0.4.31"
|
||||
m3u8-rs = "5.0.4"
|
||||
isobmff = { git = "https://github.com/LMinJae/isobmff-rs", version = "0.1.0" }
|
||||
bytes = "1.5.0"
|
||||
|
||||
# QUIC
|
||||
quinn = "0.10"
|
||||
webtransport-quinn = "0.6.1"
|
||||
url = "2"
|
||||
|
||||
# Crypto
|
||||
rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
||||
rustls-native-certs = "0.6"
|
||||
rustls-pemfile = "1"
|
||||
|
||||
mp4 = "0.14.0"
|
||||
moq-transport = { git = "https://github.com/kixelated/moq-rs", version = "0.2.0" }
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
mod moqsink;
|
||||
mod relayurl;
|
||||
|
||||
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
||||
moqsink::register(plugin)?;
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
use crate::relayurl::*;
|
||||
use glib::subclass::prelude::*;
|
||||
use gst::prelude::*;
|
||||
use gst::subclass::prelude::*;
|
||||
use gst::ClockTime;
|
||||
#[allow(unused_imports)]
|
||||
use gst::{gst_debug, gst_error, gst_info, gst_log, gst_trace, gst_warning};
|
||||
use moq_transport::cache::{broadcast, fragment, segment, track};
|
||||
use moq_transport::VarInt;
|
||||
use once_cell::sync::Lazy;
|
||||
use std::convert::TryInto;
|
||||
use std::sync::Mutex;
|
||||
@@ -20,6 +23,7 @@ const ATOM_TYPE_FTYPE: u32 = 1718909296;
|
||||
const ATOM_TYPE_MOOV: u32 = 1836019574;
|
||||
const ATOM_TYPE_MOOF: u32 = 1836019558;
|
||||
const ATOM_TYPE_MDAT: u32 = 1835295092;
|
||||
const DEFAULT_PORT: u16 = 4443;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Mp4Atom {
|
||||
@@ -81,6 +85,33 @@ impl Mp4Parser {
|
||||
}
|
||||
}
|
||||
|
||||
struct Settings {
|
||||
host: Option<String>,
|
||||
port: Option<u16>,
|
||||
name: Option<String>,
|
||||
}
|
||||
|
||||
impl Settings {
|
||||
fn to_uri(&self) -> String {
|
||||
RelayUrl {
|
||||
host: self.host.clone(),
|
||||
port: self.port.clone().unwrap(),
|
||||
name: self.name.clone().unwrap(),
|
||||
}
|
||||
.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Settings {
|
||||
fn default() -> Self {
|
||||
Settings {
|
||||
host: None,
|
||||
port: None,
|
||||
name: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct StartedState {
|
||||
mp4_parser: Mp4Parser,
|
||||
@@ -102,7 +133,6 @@ struct StartedState {
|
||||
fragment_offset_end: Option<u64>,
|
||||
fragment_buffer_flags: gst::BufferFlags,
|
||||
}
|
||||
|
||||
enum State {
|
||||
Started { state: StartedState },
|
||||
}
|
||||
@@ -128,7 +158,9 @@ impl Default for State {
|
||||
|
||||
pub struct MoqSink {
|
||||
state: Mutex<State>,
|
||||
url: Mutex<Option<RelayUrl>>,
|
||||
srcpad: gst::Pad,
|
||||
settings: Mutex<Settings>,
|
||||
}
|
||||
|
||||
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||
@@ -140,197 +172,199 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||
});
|
||||
|
||||
impl MoqSink {
|
||||
fn sink_chain(
|
||||
&self,
|
||||
pad: &gst::Pad,
|
||||
element: &super::MoqSink,
|
||||
buffer: gst::Buffer,
|
||||
) -> Result<gst::FlowSuccess, gst::FlowError> {
|
||||
gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
|
||||
// fn sink_chain(
|
||||
// &self,
|
||||
// pad: &gst::Pad,
|
||||
// element: &super::MoqSink,
|
||||
// buffer: gst::Buffer,
|
||||
// ) -> Result<gst::FlowSuccess, gst::FlowError> {
|
||||
// gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
|
||||
|
||||
let mut state = self.state.lock().unwrap();
|
||||
// let mut state = self.state.lock().unwrap();
|
||||
|
||||
let state = match *state {
|
||||
State::Started { ref mut state, .. } => state,
|
||||
};
|
||||
// let state = match *state {
|
||||
// State::Started { ref mut state, .. } => state,
|
||||
// };
|
||||
|
||||
let map = buffer.map_readable().map_err(|_| {
|
||||
gst::element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]);
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
let input_buf = map.as_ref();
|
||||
// let map = buffer.map_readable().map_err(|_| {
|
||||
// gst::element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]);
|
||||
// gst::FlowError::Error
|
||||
// })?;
|
||||
// let input_buf = map.as_ref();
|
||||
|
||||
state.mp4_parser.add(input_buf);
|
||||
// state.mp4_parser.add(input_buf);
|
||||
|
||||
// Update cummulative fragment variables.
|
||||
// Buffer PTS, etc. are only valid if this buffer contains MDAT data.
|
||||
if state.mp4_parser.have_mdat() {
|
||||
assert!(buffer.pts().is_some());
|
||||
if state.fragment_pts.is_none() || state.fragment_pts > buffer.pts() {
|
||||
state.fragment_pts = buffer.pts();
|
||||
}
|
||||
if state.fragment_dts.is_none() || state.fragment_dts > buffer.dts() {
|
||||
state.fragment_dts = buffer.dts();
|
||||
}
|
||||
let pts_plus_duration = buffer.pts() + buffer.duration();
|
||||
if state.fragment_max_pts_plus_duration.is_none()
|
||||
|| state.fragment_max_pts_plus_duration < pts_plus_duration
|
||||
{
|
||||
state.fragment_max_pts_plus_duration = pts_plus_duration;
|
||||
}
|
||||
if buffer.offset() != gst::BUFFER_OFFSET_NONE
|
||||
&& (state.fragment_offset.is_none()
|
||||
|| state.fragment_offset.unwrap() > buffer.offset())
|
||||
{
|
||||
state.fragment_offset = Some(buffer.offset());
|
||||
}
|
||||
if buffer.offset_end() != gst::BUFFER_OFFSET_NONE
|
||||
&& (state.fragment_offset_end.is_none()
|
||||
|| state.fragment_offset_end.unwrap() < buffer.offset_end())
|
||||
{
|
||||
state.fragment_offset_end = Some(buffer.offset_end());
|
||||
}
|
||||
if state
|
||||
.fragment_buffer_flags
|
||||
.contains(gst::BufferFlags::DELTA_UNIT)
|
||||
&& !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT)
|
||||
{
|
||||
state
|
||||
.fragment_buffer_flags
|
||||
.remove(gst::BufferFlags::DELTA_UNIT);
|
||||
}
|
||||
if buffer.flags().contains(gst::BufferFlags::DISCONT) {
|
||||
state
|
||||
.fragment_buffer_flags
|
||||
.insert(gst::BufferFlags::DISCONT);
|
||||
}
|
||||
gst_trace!(CAT, obj: pad, "Updated state={:?}", state);
|
||||
}
|
||||
// // Update cummulative fragment variables.
|
||||
// // Buffer PTS, etc. are only valid if this buffer contains MDAT data.
|
||||
// if state.mp4_parser.have_mdat() {
|
||||
// assert!(buffer.pts().is_some());
|
||||
// if state.fragment_pts.is_none() || state.fragment_pts > buffer.pts() {
|
||||
// state.fragment_pts = buffer.pts();
|
||||
// }
|
||||
// if state.fragment_dts.is_none() || state.fragment_dts > buffer.dts() {
|
||||
// state.fragment_dts = buffer.dts();
|
||||
// }
|
||||
// let pts_plus_duration = buffer.pts() + buffer.duration();
|
||||
// if state.fragment_max_pts_plus_duration.is_none()
|
||||
// || state.fragment_max_pts_plus_duration < pts_plus_duration
|
||||
// {
|
||||
// state.fragment_max_pts_plus_duration = pts_plus_duration;
|
||||
// }
|
||||
// if buffer.offset() != gst::BUFFER_OFFSET_NONE
|
||||
// && (state.fragment_offset.is_none()
|
||||
// || state.fragment_offset.unwrap() > buffer.offset())
|
||||
// {
|
||||
// state.fragment_offset = Some(buffer.offset());
|
||||
// }
|
||||
// if buffer.offset_end() != gst::BUFFER_OFFSET_NONE
|
||||
// && (state.fragment_offset_end.is_none()
|
||||
// || state.fragment_offset_end.unwrap() < buffer.offset_end())
|
||||
// {
|
||||
// state.fragment_offset_end = Some(buffer.offset_end());
|
||||
// }
|
||||
// if state
|
||||
// .fragment_buffer_flags
|
||||
// .contains(gst::BufferFlags::DELTA_UNIT)
|
||||
// && !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT)
|
||||
// {
|
||||
// state
|
||||
// .fragment_buffer_flags
|
||||
// .remove(gst::BufferFlags::DELTA_UNIT);
|
||||
// }
|
||||
// if buffer.flags().contains(gst::BufferFlags::DISCONT) {
|
||||
// state
|
||||
// .fragment_buffer_flags
|
||||
// .insert(gst::BufferFlags::DISCONT);
|
||||
// }
|
||||
// gst_trace!(CAT, obj: pad, "Updated state={:?}", state);
|
||||
// }
|
||||
|
||||
loop {
|
||||
match state.mp4_parser.pop_atom() {
|
||||
Some(atom) => {
|
||||
gst_log!(CAT, obj: pad, "atom_size={}, atom_type={}", atom.len(), atom.atom_type);
|
||||
match atom.atom_type {
|
||||
ATOM_TYPE_FTYPE => {
|
||||
state.ftype_atom = Some(atom);
|
||||
gst_log!(CAT, obj: pad, "ftype_atom={:?}", state.ftype_atom);
|
||||
}
|
||||
ATOM_TYPE_MOOV => {
|
||||
state.moov_atom = Some(atom);
|
||||
gst_log!(CAT, obj: pad, "moov_atom={:?}", state.moov_atom);
|
||||
}
|
||||
ATOM_TYPE_MOOF => {
|
||||
state.moof_atom = Some(atom);
|
||||
gst_log!(CAT, obj: pad, "moof_atom={:?}", state.moof_atom);
|
||||
}
|
||||
ATOM_TYPE_MDAT => {
|
||||
let mdat_atom = atom;
|
||||
match (
|
||||
state.ftype_atom.as_ref(),
|
||||
state.moov_atom.as_ref(),
|
||||
state.moof_atom.as_ref(),
|
||||
) {
|
||||
(Some(ftype_atom), Some(moov_atom), Some(moof_atom)) => {
|
||||
let include_header = !state
|
||||
.fragment_buffer_flags
|
||||
.contains(gst::BufferFlags::DELTA_UNIT);
|
||||
let header_len = if include_header {
|
||||
ftype_atom.len() + moov_atom.len()
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let output_buf_len =
|
||||
header_len + moof_atom.len() + mdat_atom.len();
|
||||
gst_log!(CAT, obj: pad, "Pushing buffer; include_header={}, ftype.len={}, moov.len={}, moof.len={}, mdat.len={}",
|
||||
include_header, ftype_atom.len(), moov_atom.len(), moof_atom.len(), mdat_atom.len());
|
||||
let mut gst_buffer =
|
||||
gst::Buffer::with_size(output_buf_len).unwrap();
|
||||
{
|
||||
let buffer_ref = gst_buffer.get_mut().unwrap();
|
||||
buffer_ref.set_pts(state.fragment_pts);
|
||||
buffer_ref.set_dts(state.fragment_dts);
|
||||
let duration = state.fragment_max_pts_plus_duration
|
||||
- state.fragment_pts;
|
||||
buffer_ref.set_duration(duration);
|
||||
buffer_ref.set_offset(
|
||||
state
|
||||
.fragment_offset
|
||||
.unwrap_or(gst::BUFFER_OFFSET_NONE),
|
||||
);
|
||||
buffer_ref.set_offset_end(
|
||||
state
|
||||
.fragment_offset_end
|
||||
.unwrap_or(gst::BUFFER_OFFSET_NONE),
|
||||
);
|
||||
buffer_ref.set_flags(state.fragment_buffer_flags);
|
||||
let mut buffer_map = buffer_ref.map_writable().unwrap();
|
||||
let slice = buffer_map.as_mut_slice();
|
||||
let mut pos = 0;
|
||||
if include_header {
|
||||
slice[pos..pos + ftype_atom.len()]
|
||||
.copy_from_slice(&ftype_atom.atom_bytes);
|
||||
pos += ftype_atom.len();
|
||||
slice[pos..pos + moov_atom.len()]
|
||||
.copy_from_slice(&moov_atom.atom_bytes);
|
||||
pos += moov_atom.len();
|
||||
}
|
||||
slice[pos..pos + moof_atom.len()]
|
||||
.copy_from_slice(&moof_atom.atom_bytes);
|
||||
pos += moof_atom.len();
|
||||
slice[pos..pos + mdat_atom.len()]
|
||||
.copy_from_slice(&mdat_atom.atom_bytes);
|
||||
pos += mdat_atom.len();
|
||||
assert_eq!(pos, output_buf_len);
|
||||
// loop {
|
||||
// match state.mp4_parser.pop_atom() {
|
||||
// Some(atom) => {
|
||||
// gst_log!(CAT, obj: pad, "atom_size={}, atom_type={}", atom.len(), atom.atom_type);
|
||||
// match atom.atom_type {
|
||||
// ATOM_TYPE_FTYPE => {
|
||||
// state.ftype_atom = Some(atom);
|
||||
// gst_log!(CAT, obj: pad, "ftype_atom={:?}", state.ftype_atom);
|
||||
// }
|
||||
// ATOM_TYPE_MOOV => {
|
||||
// state.moov_atom = Some(atom);
|
||||
// gst_log!(CAT, obj: pad, "moov_atom={:?}", state.moov_atom);
|
||||
// }
|
||||
// ATOM_TYPE_MOOF => {
|
||||
// state.moof_atom = Some(atom);
|
||||
// gst_log!(CAT, obj: pad, "moof_atom={:?}", state.moof_atom);
|
||||
// }
|
||||
// ATOM_TYPE_MDAT => {
|
||||
// let mdat_atom = atom;
|
||||
// match (
|
||||
// state.ftype_atom.as_ref(),
|
||||
// state.moov_atom.as_ref(),
|
||||
// state.moof_atom.as_ref(),
|
||||
// ) {
|
||||
// (Some(ftype_atom), Some(moov_atom), Some(moof_atom)) => {
|
||||
// let include_header = !state
|
||||
// .fragment_buffer_flags
|
||||
// .contains(gst::BufferFlags::DELTA_UNIT);
|
||||
// let header_len = if include_header {
|
||||
// ftype_atom.len() + moov_atom.len()
|
||||
// } else {
|
||||
// 0
|
||||
// };
|
||||
// let output_buf_len =
|
||||
// header_len + moof_atom.len() + mdat_atom.len();
|
||||
|
||||
//FIXME: Work on the Json here, instead of redoing it in a new method.
|
||||
}
|
||||
// Clear fragment variables.
|
||||
state.fragment_pts = ClockTime::none();
|
||||
state.fragment_dts = ClockTime::none();
|
||||
state.fragment_max_pts_plus_duration = ClockTime::none();
|
||||
state.fragment_offset = None;
|
||||
state.fragment_offset_end = None;
|
||||
state.fragment_buffer_flags = gst::BufferFlags::DELTA_UNIT;
|
||||
// Push new buffer.
|
||||
gst_log!(CAT, obj: pad, "Pushing buffer {:?}", gst_buffer);
|
||||
// let _ = self.srcpad.push(gst_buffer)?;
|
||||
// self.send_to_network(gst_buffer)?;
|
||||
}
|
||||
_ => {
|
||||
gst_warning!(CAT, obj: pad, "Received mdat without ftype, moov, or moof");
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
gst_warning!(CAT, obj: pad, "Unknown atom type {:?}", atom);
|
||||
}
|
||||
}
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
gst_trace!(CAT, obj: element, "sink_chain: END: state={:?}", state);
|
||||
Ok(gst::FlowSuccess::Ok)
|
||||
}
|
||||
// gst_log!(CAT, obj: pad, "Pushing buffer; include_header={}, ftype.len={}, moov.len={}, moof.len={}, mdat.len={}",
|
||||
// include_header, ftype_atom.len(), moov_atom.len(), moof_atom.len(), mdat_atom.len());
|
||||
|
||||
fn sink_event(&self, _pad: &gst::Pad, _element: &super::MoqSink, event: gst::Event) -> bool {
|
||||
self.srcpad.push_event(event)
|
||||
}
|
||||
// let mut gst_buffer =
|
||||
// gst::Buffer::with_size(output_buf_len).unwrap();
|
||||
// {
|
||||
// let buffer_ref = gst_buffer.get_mut().unwrap();
|
||||
// buffer_ref.set_pts(state.fragment_pts);
|
||||
// buffer_ref.set_dts(state.fragment_dts);
|
||||
// let duration = state.fragment_max_pts_plus_duration
|
||||
// - state.fragment_pts;
|
||||
// buffer_ref.set_duration(duration);
|
||||
// buffer_ref.set_offset(
|
||||
// state
|
||||
// .fragment_offset
|
||||
// .unwrap_or(gst::BUFFER_OFFSET_NONE),
|
||||
// );
|
||||
// buffer_ref.set_offset_end(
|
||||
// state
|
||||
// .fragment_offset_end
|
||||
// .unwrap_or(gst::BUFFER_OFFSET_NONE),
|
||||
// );
|
||||
// buffer_ref.set_flags(state.fragment_buffer_flags);
|
||||
// let mut buffer_map = buffer_ref.map_writable().unwrap();
|
||||
// let slice = buffer_map.as_mut_slice();
|
||||
// let mut pos = 0;
|
||||
// if include_header {
|
||||
// slice[pos..pos + ftype_atom.len()]
|
||||
// .copy_from_slice(&ftype_atom.atom_bytes);
|
||||
// pos += ftype_atom.len();
|
||||
// slice[pos..pos + moov_atom.len()]
|
||||
// .copy_from_slice(&moov_atom.atom_bytes);
|
||||
// pos += moov_atom.len();
|
||||
// }
|
||||
// slice[pos..pos + moof_atom.len()]
|
||||
// .copy_from_slice(&moof_atom.atom_bytes);
|
||||
// pos += moof_atom.len();
|
||||
// slice[pos..pos + mdat_atom.len()]
|
||||
// .copy_from_slice(&mdat_atom.atom_bytes);
|
||||
// pos += mdat_atom.len();
|
||||
// assert_eq!(pos, output_buf_len);
|
||||
|
||||
fn sink_query(
|
||||
&self,
|
||||
_pad: &gst::Pad,
|
||||
_element: &super::MoqSink,
|
||||
query: &mut gst::QueryRef,
|
||||
) -> bool {
|
||||
self.srcpad.peer_query(query)
|
||||
}
|
||||
// //FIXME: Work on the Json here, instead of redoing it in a new method.
|
||||
// }
|
||||
// // Clear fragment variables.
|
||||
// state.fragment_pts = ClockTime::none();
|
||||
// state.fragment_dts = ClockTime::none();
|
||||
// state.fragment_max_pts_plus_duration = ClockTime::none();
|
||||
// state.fragment_offset = None;
|
||||
// state.fragment_offset_end = None;
|
||||
// state.fragment_buffer_flags = gst::BufferFlags::DELTA_UNIT;
|
||||
// // Push new buffer.
|
||||
// gst_log!(CAT, obj: pad, "Pushing buffer {:?}", gst_buffer);
|
||||
// // let _ = self.srcpad.push(gst_buffer)?;
|
||||
// // self.send_to_network(gst_buffer)?;
|
||||
// }
|
||||
// _ => {
|
||||
// gst_warning!(CAT, obj: pad, "Received mdat without ftype, moov, or moof");
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// _ => {
|
||||
// gst_warning!(CAT, obj: pad, "Unknown atom type {:?}", atom);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// None => break,
|
||||
// }
|
||||
// }
|
||||
// gst_trace!(CAT, obj: element, "sink_chain: END: state={:?}", state);
|
||||
// Ok(gst::FlowSuccess::Ok)
|
||||
// }
|
||||
|
||||
fn send_to_network(&self, buffer: gst::Buffer) -> Result<(), Box<dyn std::error::Error>> {
|
||||
//Let this be our publisher
|
||||
Ok(())
|
||||
}
|
||||
// fn sink_event(&self, _pad: &gst::Pad, _element: &super::MoqSink, event: gst::Event) -> bool {
|
||||
// self.srcpad.push_event(event)
|
||||
// }
|
||||
|
||||
// fn sink_query(
|
||||
// &self,
|
||||
// _pad: &gst::Pad,
|
||||
// _element: &super::MoqSink,
|
||||
// query: &mut gst::QueryRef,
|
||||
// ) -> bool {
|
||||
// self.srcpad.peer_query(query)
|
||||
// }
|
||||
|
||||
// fn send_to_network(&self, buffer: gst::Buffer) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// //Let this be our publisher
|
||||
// Ok(())
|
||||
// }
|
||||
}
|
||||
|
||||
#[glib::object_subclass]
|
||||
@@ -339,32 +373,7 @@ impl ObjectSubclass for MoqSink {
|
||||
type Type = super::MoqSink;
|
||||
type ParentType = gst::Element;
|
||||
|
||||
fn with_class(klass: &Self::Class) -> Self {
|
||||
let templ = klass.pad_template("src").unwrap();
|
||||
let srcpad = gst::Pad::builder_with_template(&templ, Some("src"))
|
||||
.event_function(|pad, parent, event| {
|
||||
MoqSink::catch_panic_pad_function(
|
||||
parent,
|
||||
|| false,
|
||||
|identity, element| identity.src_event(pad, element, event),
|
||||
)
|
||||
})
|
||||
.query_function(|pad, parent, query| {
|
||||
MoqSink::catch_panic_pad_function(
|
||||
parent,
|
||||
|| false,
|
||||
|identity, element| identity.src_query(pad, element, query),
|
||||
)
|
||||
})
|
||||
.build();
|
||||
|
||||
Self {
|
||||
state: Mutex::new(Default::default()),
|
||||
srcpad,
|
||||
}
|
||||
}
|
||||
|
||||
type Interfaces;
|
||||
type Interfaces = (gst::URIHandler);
|
||||
|
||||
type Instance;
|
||||
|
||||
@@ -374,10 +383,82 @@ impl ObjectSubclass for MoqSink {
|
||||
impl ObjectImpl for MoqSink {
|
||||
fn constructed(&self, obj: &Self::Type) {
|
||||
self.parent_constructed();
|
||||
obj.add_pad(&self.srcpad).unwrap();
|
||||
self.obj().set_sync(false);
|
||||
}
|
||||
|
||||
fn properties() -> &'static [glib::ParamSpec] {
|
||||
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
|
||||
vec![
|
||||
glib::ParamSpecString::builder("host")
|
||||
.nick("Url Host")
|
||||
.blurb("The host of the relay server to connect to, e.g. example.com")
|
||||
.mutable_ready()
|
||||
.build(),
|
||||
glib::ParamSpecString::builder("port")
|
||||
.nick("Url Port")
|
||||
.blurb("The port of the relay server to connect to, most probably this is a 4443")
|
||||
.default_value(DEFAULT_PORT)
|
||||
.build(),
|
||||
glib::ParamSpecString::builder("name")
|
||||
.nick("Url Name")
|
||||
.blurb("This is a very long random string to identify your stream on the relay server")
|
||||
.mutable_ready()
|
||||
.build(),
|
||||
]
|
||||
});
|
||||
PROPERTIES.as_ref()
|
||||
}
|
||||
|
||||
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Setting property '{}' to '{:?}'",
|
||||
pspec.name(),
|
||||
value
|
||||
);
|
||||
|
||||
match pspec.name() {
|
||||
"host" => {
|
||||
settings.host = value.get::<String>().expect("type checked upstream");
|
||||
if settings.port.is_some() && settings.name.is_some() {
|
||||
let _ = self.set_uri(Some(&settings.to_uri()));
|
||||
}
|
||||
}
|
||||
"port" => {
|
||||
settings.port = value.get::<Option<u16>>().expect("type checked upstream");
|
||||
if settings.host.is_some() && settings.name.is_some() {
|
||||
let _ = self.set_uri(Some(&settings.to_uri()));
|
||||
}
|
||||
}
|
||||
"name" => {
|
||||
settings.name = value
|
||||
.get::<Option<String>>()
|
||||
.expect("type checked upstream");
|
||||
if settings.host.is_some() && settings.port.is_some() {
|
||||
let _ = self.set_uri(Some(&settings.to_uri()));
|
||||
}
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
match pspec.name() {
|
||||
"host" => settings.host.to_value(),
|
||||
"name" => settings.name.to_value(),
|
||||
"port" => settings.port.to_value(),
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GstObjectImpl for MoqSink {}
|
||||
|
||||
impl ElementImpl for MoqSink {
|
||||
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
|
||||
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
|
||||
@@ -394,216 +475,31 @@ impl ElementImpl for MoqSink {
|
||||
fn pad_templates() -> &'static [gst::PadTemplate] {
|
||||
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
|
||||
let caps = gst::Caps::new_any();
|
||||
let src_pad_template = gst::PadTemplate::new(
|
||||
"src",
|
||||
gst::PadDirection::Src,
|
||||
let sink_pad_template = gst::PadTemplate::new(
|
||||
"sink",
|
||||
gst::PadDirection::Sink,
|
||||
gst::PadPresence::Always,
|
||||
&caps,
|
||||
)
|
||||
.unwrap();
|
||||
vec![src_pad_template]
|
||||
vec![sink_pad_template]
|
||||
});
|
||||
PAD_TEMPLATES.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
//Moq
|
||||
pub struct Publisher {
|
||||
track: track::Publisher,
|
||||
}
|
||||
impl URIHandlerImpl for MoqSink {
|
||||
const URI_TYPE: gst::URIType = gst::URIType::Sink;
|
||||
|
||||
impl Publisher {
|
||||
pub fn new(track: track::Publisher) -> Self {
|
||||
Self { track }
|
||||
fn protocols() -> &'static [&'static str] {
|
||||
&["https"]
|
||||
}
|
||||
|
||||
pub async fn run(mut self) -> anyhow::Result<()> {
|
||||
let start = Utc::now();
|
||||
let mut now = start;
|
||||
|
||||
// Just for fun, don't start at zero.
|
||||
let mut sequence = start.minute();
|
||||
|
||||
loop {
|
||||
let segment = self
|
||||
.track
|
||||
.create_segment(segment::Info {
|
||||
sequence: VarInt::from_u32(sequence),
|
||||
priority: 0,
|
||||
expires: Some(time::Duration::from_secs(60)),
|
||||
})
|
||||
.context("failed to create minute segment")?;
|
||||
|
||||
sequence += 1;
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = Self::send_segment(segment, now).await {
|
||||
log::warn!("failed to send minute: {:?}", err);
|
||||
}
|
||||
});
|
||||
|
||||
let next = now + chrono::Duration::minutes(1);
|
||||
let next = next.with_second(0).unwrap().with_nanosecond(0).unwrap();
|
||||
|
||||
let delay = (next - now).to_std().unwrap();
|
||||
tokio::time::sleep(delay).await;
|
||||
|
||||
now = next; // just assume we didn't undersleep
|
||||
}
|
||||
fn uri(&self) -> Option<String> {
|
||||
self.url.lock().unwrap().as_ref().map(|s| s.to_string())
|
||||
}
|
||||
|
||||
async fn send_segment(
|
||||
mut segment: segment::Publisher,
|
||||
mut now: DateTime<Utc>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Everything but the second.
|
||||
let base = now.format("%Y-%m-%d %H:%M:").to_string();
|
||||
|
||||
segment
|
||||
.fragment(VarInt::ZERO, base.len())?
|
||||
.chunk(base.clone().into())
|
||||
.context("failed to write base")?;
|
||||
|
||||
loop {
|
||||
let delta = now.format("%S").to_string();
|
||||
let sequence = VarInt::from_u32(now.second() + 1);
|
||||
|
||||
segment
|
||||
.fragment(sequence, delta.len())?
|
||||
.chunk(delta.clone().into())
|
||||
.context("failed to write delta")?;
|
||||
|
||||
println!("{}{}", base, delta);
|
||||
|
||||
let next = now + chrono::Duration::seconds(1);
|
||||
let next = next.with_nanosecond(0).unwrap();
|
||||
|
||||
let delay = (next - now).to_std().unwrap();
|
||||
tokio::time::sleep(delay).await;
|
||||
|
||||
// Get the current time again to check if we overslept
|
||||
let next = Utc::now();
|
||||
if next.minute() != now.minute() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
now = next;
|
||||
}
|
||||
fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
|
||||
self.set_uri(Some(uri))
|
||||
}
|
||||
}
|
||||
|
||||
//Sends to remote
|
||||
pub struct Sender {
|
||||
track: track::Publisher,
|
||||
}
|
||||
|
||||
impl Sender {
|
||||
pub fn new(track: track::Publisher) -> Self {
|
||||
Self { track }
|
||||
}
|
||||
|
||||
pub async fn start(mut self) -> anyhow::Result<()> {
|
||||
// Disable tracing so we don't get a bunch of Quinn spam.
|
||||
let tracer = tracing_subscriber::FmtSubscriber::builder()
|
||||
.with_max_level(tracing::Level::WARN)
|
||||
.finish();
|
||||
tracing::subscriber::set_global_default(tracer).unwrap();
|
||||
|
||||
let config = cli::Config::parse();
|
||||
|
||||
// Create a list of acceptable root certificates.
|
||||
let mut roots = rustls::RootCertStore::empty();
|
||||
|
||||
if config.tls_root.is_empty() {
|
||||
// Add the platform's native root certificates.
|
||||
for cert in
|
||||
rustls_native_certs::load_native_certs().context("could not load platform certs")?
|
||||
{
|
||||
roots
|
||||
.add(&rustls::Certificate(cert.0))
|
||||
.context("failed to add root cert")?;
|
||||
}
|
||||
} else {
|
||||
// Add the specified root certificates.
|
||||
for root in &config.tls_root {
|
||||
let root = fs::File::open(root).context("failed to open root cert file")?;
|
||||
let mut root = io::BufReader::new(root);
|
||||
|
||||
let root = rustls_pemfile::certs(&mut root).context("failed to read root cert")?;
|
||||
anyhow::ensure!(root.len() == 1, "expected a single root cert");
|
||||
let root = rustls::Certificate(root[0].to_owned());
|
||||
|
||||
roots.add(&root).context("failed to add root cert")?;
|
||||
}
|
||||
}
|
||||
|
||||
let mut tls_config = rustls::ClientConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_root_certificates(roots)
|
||||
.with_no_client_auth();
|
||||
|
||||
// Allow disabling TLS verification altogether.
|
||||
// if config.tls_disable_verify {
|
||||
// let noop = NoCertificateVerification {};
|
||||
// tls_config
|
||||
// .dangerous()
|
||||
// .set_certificate_verifier(Arc::new(noop));
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_segment(
|
||||
mut segment: segment::Publisher,
|
||||
mut now: DateTime<Utc>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Everything but the second.
|
||||
let base = now.format("%Y-%m-%d %H:%M:").to_string();
|
||||
|
||||
segment
|
||||
.fragment(VarInt::ZERO, base.len())?
|
||||
.chunk(base.clone().into())
|
||||
.context("failed to write base")?;
|
||||
|
||||
loop {
|
||||
let delta = now.format("%S").to_string();
|
||||
let sequence = VarInt::from_u32(now.second() + 1);
|
||||
|
||||
segment
|
||||
.fragment(sequence, delta.len())?
|
||||
.chunk(delta.clone().into())
|
||||
.context("failed to write delta")?;
|
||||
|
||||
println!("{}{}", base, delta);
|
||||
|
||||
let next = now + chrono::Duration::seconds(1);
|
||||
let next = next.with_nanosecond(0).unwrap();
|
||||
|
||||
let delay = (next - now).to_std().unwrap();
|
||||
tokio::time::sleep(delay).await;
|
||||
|
||||
// Get the current time again to check if we overslept
|
||||
let next = Utc::now();
|
||||
if next.minute() != now.minute() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
now = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NoCertificateVerification {}
|
||||
|
||||
impl rustls::client::ServerCertVerifier for NoCertificateVerification {
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
_end_entity: &rustls::Certificate,
|
||||
_intermediates: &[rustls::Certificate],
|
||||
_server_name: &rustls::ServerName,
|
||||
_scts: &mut dyn Iterator<Item = &[u8]>,
|
||||
_ocsp_response: &[u8],
|
||||
_now: time::SystemTime,
|
||||
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
|
||||
Ok(rustls::client::ServerCertVerified::assertion())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,85 +0,0 @@
|
||||
In the code example you provided, the Mp4Parser struct is responsible for parsing and extracting MP4 atoms from the incoming buffer. When the pop_atom method is called, it checks if there is enough data to form an atom and then extracts it.
|
||||
|
||||
To extract precise atoms like mdat, ftyp, moof, and moov, you can follow the pattern already established in the loop in the sink_chain function. This loop calls pop_atom repeatedly to extract atoms from the incoming buffer. When an atom is extracted, it checks the atom's type and processes it accordingly.
|
||||
|
||||
Here's a simplified version of the loop that extracts and processes the atoms:
|
||||
|
||||
loop {
|
||||
match state.mp4_parser.pop_atom() {
|
||||
Some(atom) => {
|
||||
match atom.atom_type {
|
||||
ATOM_TYPE_FTYPE => {
|
||||
state.ftype_atom = Some(atom);
|
||||
// ... process ftype atom
|
||||
},
|
||||
ATOM_TYPE_MOOV => {
|
||||
state.moov_atom = Some(atom);
|
||||
// ... process moov atom
|
||||
},
|
||||
ATOM_TYPE_MOOF => {
|
||||
state.moof_atom = Some(atom);
|
||||
// ... process moof atom
|
||||
},
|
||||
ATOM_TYPE_MDAT => {
|
||||
// ... process mdat atom
|
||||
},
|
||||
_ => {
|
||||
// Unknown or unhandled atom type
|
||||
},
|
||||
}
|
||||
},
|
||||
None => break, // No more atoms to pop, exit the loop
|
||||
}
|
||||
}
|
||||
In your send_to_network method, you will need to send these atoms over the network. This means you should collect the necessary atoms and then send them in the correct order, depending on whether it's a keyframe or not.
|
||||
|
||||
To do this, you can create a FragmentData struct that includes the atoms you need:
|
||||
|
||||
struct FragmentData {
|
||||
ftype_atom: Option<Mp4Atom>,
|
||||
moov_atom: Option<Mp4Atom>,
|
||||
moof_atom: Mp4Atom,
|
||||
mdat_atom: Mp4Atom,
|
||||
// ... include other metadata like timestamps
|
||||
}
|
||||
Then, in your loop, when you have all the necessary atoms (e.g., after you've processed the mdat atom), you can create an instance of FragmentData and pass it to send_to_network:
|
||||
|
||||
// Inside the loop, after processing the mdat atom
|
||||
let fragment_data = FragmentData {
|
||||
ftype_atom: state.ftype_atom.clone(),
|
||||
moov_atom: state.moov_atom.clone(),
|
||||
moof_atom: state.moof_atom.take().unwrap(),
|
||||
mdat_atom: mdat_atom,
|
||||
// ... set other metadata
|
||||
};
|
||||
|
||||
// Send the fragment data over the network
|
||||
self.send_to_network(fragment_data)?;
|
||||
Finally, in your send_to_network method, you can construct the buffer to send based on the FragmentData:
|
||||
|
||||
impl MoqSink {
|
||||
// ...
|
||||
fn send_to_network(&self, fragment_data: FragmentData) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Construct the buffer to send
|
||||
let mut buffer_to_send = Vec::new();
|
||||
|
||||
// If it's a keyframe, include the ftype and moov atoms
|
||||
if let (Some(ftype_atom), Some(moov_atom)) = (fragment_data.ftype_atom, fragment_data.moov_atom) {
|
||||
buffer_to_send.extend_from_slice(&ftype_atom.atom_bytes);
|
||||
buffer_to_send.extend_from_slice(&moov_atom.atom_bytes);
|
||||
}
|
||||
|
||||
// Always include the moof and mdat atoms
|
||||
buffer_to_send.extend_from_slice(&fragment_data.moof_atom.atom_bytes);
|
||||
buffer_to_send.extend_from_slice(&fragment_data.mdat_atom.atom_bytes);
|
||||
|
||||
// Send the buffer over the network using your QUIC implementation
|
||||
// ...
|
||||
|
||||
Ok(())
|
||||
}
|
||||
// ...
|
||||
}
|
||||
Remember to handle cases where the ftype and moov atoms might not be present, as they're only included for keyframes. The send_to_network method will need to take this into account and only include these atoms when necessary.
|
||||
|
||||
The actual network transmission code will depend on the QUIC library you're using, and you'll need to implement the details of establishing connections, creating streams, and sending data accordingly.
|
||||
12
gst-warp-sink/src/relayurl.rs
Normal file
12
gst-warp-sink/src/relayurl.rs
Normal file
@@ -0,0 +1,12 @@
|
||||
#[derive(Clone)]
|
||||
pub struct RelayUrl {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
pub name: String,
|
||||
}
|
||||
|
||||
impl ToString for RelayUrl {
|
||||
fn to_string(&self) -> String {
|
||||
format!("https://{}:{}/{}", self.host, self.port, self.name)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user