Create a new non-blocking thread to run a QUINN session from.

This commit is contained in:
Wanjohi
2023-12-23 04:39:35 -08:00
parent d890aa0011
commit 8d5ca938f2
3 changed files with 123 additions and 89 deletions

View File

@@ -1070,16 +1070,6 @@ dependencies = [
"value-bag",
]
[[package]]
name = "m3u8-rs"
version = "5.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c1d7ba86f7ea62f17f4310c55e93244619ddc7dadfc7e565de1967e4e41e6e7"
dependencies = [
"chrono",
"nom",
]
[[package]]
name = "me"
version = "0.1.0"
@@ -1091,13 +1081,11 @@ dependencies = [
"gst-plugin-version-helper 0.8.0 (git+https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs)",
"gstreamer",
"gstreamer-app",
"gstreamer-base",
"gstreamer-check",
"gstreamer-gl",
"gstreamer-pbutils",
"gstreamer-video",
"isobmff",
"m3u8-rs",
"moq-transport",
"mp4",
"once_cell",
@@ -1119,12 +1107,6 @@ version = "2.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.7.1"
@@ -1181,16 +1163,6 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "956787520e75e9bd233246045d19f42fb73242759cc57fba9611d940ae96d4b0"
[[package]]
name = "nom"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"

View File

@@ -8,16 +8,14 @@ edition = "2021"
[dependencies]
anyhow = { version = "1", features = ["backtrace"] }
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs"}
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs"}
gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
gst-pbutils = { package = "gstreamer-pbutils", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-gl = { package = "gstreamer-gl", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-plugin-fmp4 = { git = "https://github.com/sdroege/gst-plugin-rs", version = "0.12.0-alpha.1" }
chrono = "0.4.31"
m3u8-rs = "5.0.4"
isobmff = { git = "https://github.com/LMinJae/isobmff-rs", version = "0.1.0" }
bytes = "1.5.0"
once_cell = "1"

View File

@@ -2,13 +2,13 @@ use gst::glib;
use gst::glib::once_cell::sync::Lazy;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
use std::sync::Mutex;
use crate::relayurl::*;
use crate::RUNTIME;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use moq_transport::cache::{broadcast, fragment, segment, track};
@@ -90,65 +90,128 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
#[derive(Default)]
#[derive(Default, object_subclass)]
pub struct MoqSink {
state: Mutex<State>,
state: Mutex<Option<State>>,
url: Mutex<Option<Url>>,
settings: Mutex<Settings>,
}
impl Default for MoqSink {
fn default() -> Self {
MoqSink {
state: Mutex::new(None),
url: Mutex::new(None),
settings: Mutex::new(Settings::default()),
}
}
}
impl MoqSink {
fn start(&self) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().map_err(|e| {
gst::error_msg!(
gst::ResourceError::Settings,
["Failed to acquire state lock: {}", e]
)
})?;
if let State::Started { .. } = *state {
unreachable!("Element already started");
}
let relay_url = {
let url = self.url.lock().unwrap();
match *url {
Some(ref url) => url.clone(),
None => {
return Err(gst::error_msg!(
gst::ResourceError::Settings,
["Cannot start without a URL being set"]
));
}
}
};
// // Initialize shared state and channels
// let (sender, receiver) = mpsc::channel(32);
// // self.sender = Some(sender);
// // Spawn a new thread to run the Tokio runtime
// std::thread::spawn(move || {
// // Create a new Tokio runtime
// let rt = Runtime::new().unwrap();
// // Block on the async code within the Tokio runtime
// rt.block_on(async move {
// // Set up your async tasks here
// let session_task = session.run();
// let media_task = media.run();
// tokio::select! {
// res = session_task => {
// // Handle session result
// // You might want to send a message to the GStreamer element using the channel
// },
// res = media_task => {
// // Handle media result
// // You might want to send a message to the GStreamer element using the channel
// },
// msg = receiver.recv() => {
// // Handle messages sent from the GStreamer element to the Tokio runtime
// // These might be control commands or data to process
// }
// let relay_url = {
// let url = self.url.lock().unwrap();
// match *url {
// Some(ref url) => url.clone(),
// None => {
// return Err(gst::error_msg!(
// gst::ResourceError::Settings,
// ["Cannot start without a URL being set"]
// ));
// }
// });
// }
// };
let relay_url = self
.url
.lock()
.map_err(|e| {
gst::error_msg!(
gst::ResourceError::Settings,
["Failed to acquire URL lock: {}", e]
)
})?
.clone()
.ok_or_else(|| {
gst::error_msg!(
gst::ResourceError::Settings,
["Cannot start without a URL being set"]
)
})?;
gst::trace!(
CAT,
imp: self,
"connecting to relay: url={}",
relay_url
);
// Initialize shared state and channels
let (sender, receiver) = mpsc::channel(32);
// self.sender = Some(sender);
// Spawn a new thread to run the Moq server
RUNTIME.spawn(async move {
let tracer = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::WARN)
.finish();
tracing::subscriber::set_global_default(tracer).unwrap();
let (publisher, subscriber) = broadcast::new("");
// Create a list of acceptable root certificates.
let mut roots = rustls::RootCertStore::empty();
// Add the platform's native root certificates.
for cert in
rustls_native_certs::load_native_certs().context("could not load platform certs")?
{
roots
.add(&rustls::Certificate(cert.0))
.context("failed to add root cert")?;
}
let mut tls_config = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(roots)
.with_no_client_auth();
// this one is important
tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()];
let arc_tls_config = std::sync::Arc::new(tls_config);
let quinn_client_config = quinn::ClientConfig::new(arc_tls_config);
let mut endpoint =
quinn::Endpoint::client(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0))?;
endpoint.set_default_client_config(quinn_client_config);
let session = webtransport_quinn::connect(&endpoint, &relay_url)
.await
.context("failed to create WebTransport session")?;
let session = moq_transport::session::Client::publisher(session, subscriber)
.await
.context("failed to create MoQ Transport session")?;
session.run().await.context("session error")?;
Ok::<(), anyhow::Error>(())
});
// Update the state to indicate the element has started
// *state = State::Started(StartedState {
// broadcast: publisher,
// });
Ok(())
@@ -179,27 +242,28 @@ impl MoqSink {
*url = Some(relay_url);
Ok(())
}
Err(_) => Err(glib::Error::new(
gst::URIError::BadUri,
"Could not parse URI",
)),
Err(err) => Err(glib::Error::new(gst::URIError::BadUri, &err)),
}
}
}
#[glib::object_subclass]
impl ObjectSubclass for MoqSink {
const NAME: &'static str = ELEMENT_CLASS_NAME;
type Type = super::MoqSink;
type ParentType = gst_base::BaseSink;
type Interfaces = (gst::URIHandler,);
type Instance;
type Class;
}
impl GstObjectImpl for WaylandDisplaySrc {}
impl ObjectImpl for MoqSink {
fn constructed(&self) {
self.parent_constructed();
self.obj().set_sync(false);
}
@@ -208,7 +272,7 @@ impl ObjectImpl for MoqSink {
vec![
glib::ParamSpecString::builder("host")
.nick("Host")
.blurb("The host of the relay server to connect tom, this can be a web url")
.blurb("The host of the relay server to connect to, this can be a web url")
.default_value(DEFAULT_ADDRESS)
.build(),
glib::ParamSpecInt::builder("port")
@@ -339,4 +403,4 @@ impl BaseSinkImpl for MoqSink {
fn start(&self) -> Result<(), gst::ErrorMessage> {
self.start()
}
}
}