feat(runner): Container detection and handling, video bit-depth flags and script updates (#303)

## Description

Works in apptainer now.. podman is still the goat since apptainer needs
docker treatment and even more..

- Added container detection so podman can be used to it's fullest, the
non-sane ones are handled separately..
- Added video bit-depth option, cuz AV1 and 10-bit encoding go well
together.
- Some other package updates to nestri-server.
- General tidying up of scripts to make multi-container-engine handling
less of a pain.
- Updated old wireplumber lua script to new json format.

Further changes:

- Removed unused debug arg from nestri-server.
- Moved configs to config file folder rather than keeping them in
containerfile.
- Improved audio configs, moved some into wireplumber to keep things
tidy.
- Bit better arg handling in nestri-server.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Optional 10‑bit video support and auto‑launch of an app after display
setup.

* **Changes**
* Standardized runtime/user env to NESTRI_* with updated home/cache
paths and explicit LANG; password generation now logged.
* Improved container/GPU detection and startup logging; reduced blanket
root usage during startup; SSH setup surfaced.
* WirePlumber/PipeWire moved to JSON configs; low‑latency clock and
loopback audio policies added; audio capture defaults to PipeWire.
  
* **Chores**
* GStreamer/libp2p dependency upgrades and Rust toolchain pinned; NVIDIA
driver capability exposed.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: DatCaptainHorse <DatCaptainHorse@users.noreply.github.com>
This commit is contained in:
Kristian Ollikainen
2025-09-24 20:08:04 +03:00
committed by GitHub
parent aba0bc3be1
commit 590fe5e196
26 changed files with 1508 additions and 1804 deletions

View File

@@ -1,5 +1,6 @@
use crate::args::encoding_args::AudioCaptureMethod;
use crate::enc_helper::{AudioCodec, EncoderType, VideoCodec};
use clap::builder::TypedValueParser;
use clap::builder::{BoolishValueParser, NonEmptyStringValueParser};
use clap::{Arg, Command, value_parser};
@@ -25,15 +26,6 @@ impl Args {
.value_parser(BoolishValueParser::new())
.default_value("false"),
)
.arg(
Arg::new("debug")
.short('d')
.long("debug")
.env("DEBUG")
.help("Enable additional debugging features")
.value_parser(BoolishValueParser::new())
.default_value("false"),
)
.arg(
Arg::new("relay-url")
.short('u')
@@ -88,8 +80,8 @@ impl Args {
.long("gpu-index")
.env("GPU_INDEX")
.help("GPU to use by index")
.value_parser(value_parser!(i32).range(-1..))
.default_value("-1"),
.value_parser(value_parser!(u32).range(0..))
.required(false),
)
.arg(
Arg::new("gpu-card-path")
@@ -154,13 +146,24 @@ impl Args {
.value_parser(value_parser!(EncoderType))
.default_value("hardware"),
)
.arg(
Arg::new("video-bit-depth")
.long("video-bit-depth")
.env("VIDEO_BIT_DEPTH")
.help("Video bit depth (8 or 10), only with DMA-BUF and non-H264 codec")
.value_parser(
clap::builder::PossibleValuesParser::new(["8", "10"])
.map(|s| s.parse::<u32>().unwrap()),
)
.default_value("8"),
)
.arg(
Arg::new("audio-capture-method")
.long("audio-capture-method")
.env("AUDIO_CAPTURE_METHOD")
.help("Audio capture method")
.value_parser(value_parser!(AudioCaptureMethod))
.default_value("pulseaudio"),
.default_value("pipewire"),
)
.arg(
Arg::new("audio-codec")

View File

@@ -1,8 +1,6 @@
pub struct AppArgs {
/// Verbose output mode
pub verbose: bool,
/// Enable additional debug information and features, may affect performance
pub debug: bool,
/// Virtual display resolution
pub resolution: (u32, u32),
@@ -15,13 +13,13 @@ pub struct AppArgs {
pub room: String,
/// Experimental DMA-BUF support
/// TODO: Move to video encoding flags
pub dma_buf: bool,
}
impl AppArgs {
pub fn from_matches(matches: &clap::ArgMatches) -> Self {
Self {
verbose: matches.get_one::<bool>("verbose").unwrap_or(&false).clone(),
debug: matches.get_one::<bool>("debug").unwrap_or(&false).clone(),
resolution: {
let res = matches
.get_one::<String>("resolution")
@@ -54,7 +52,6 @@ impl AppArgs {
pub fn debug_print(&self) {
tracing::info!("AppArgs:");
tracing::info!("> verbose: {}", self.verbose);
tracing::info!("> debug: {}", self.debug);
tracing::info!(
"> resolution: '{}x{}'",
self.resolution.0,

View File

@@ -1,37 +1,48 @@
pub struct DeviceArgs {
/// GPU vendor (e.g. "intel")
pub gpu_vendor: String,
pub gpu_vendor: Option<String>,
/// GPU name (e.g. "a770")
pub gpu_name: String,
/// GPU index, if multiple same GPUs are present, -1 for auto-selection
pub gpu_index: i32,
pub gpu_name: Option<String>,
/// GPU index, if multiple same GPUs are present, None for auto-selection
pub gpu_index: Option<u32>,
/// GPU card/render path, sets card explicitly from such path
pub gpu_card_path: String,
pub gpu_card_path: Option<String>,
}
impl DeviceArgs {
pub fn from_matches(matches: &clap::ArgMatches) -> Self {
Self {
gpu_vendor: matches
.get_one::<String>("gpu-vendor")
.unwrap_or(&"".to_string())
.clone(),
.cloned(),
gpu_name: matches
.get_one::<String>("gpu-name")
.unwrap_or(&"".to_string())
.clone(),
gpu_index: matches.get_one::<i32>("gpu-index").unwrap_or(&-1).clone(),
.cloned(),
gpu_index: matches
.get_one::<u32>("gpu-index")
.cloned(),
gpu_card_path: matches
.get_one::<String>("gpu-card-path")
.unwrap_or(&"".to_string())
.clone(),
.cloned(),
}
}
pub fn debug_print(&self) {
tracing::info!("DeviceArgs:");
tracing::info!("> gpu_vendor: '{}'", self.gpu_vendor);
tracing::info!("> gpu_name: '{}'", self.gpu_name);
tracing::info!("> gpu_index: {}", self.gpu_index);
tracing::info!("> gpu_card_path: '{}'", self.gpu_card_path);
tracing::info!(
"> gpu_vendor: '{}'",
self.gpu_vendor.as_deref().unwrap_or("auto")
);
tracing::info!(
"> gpu_name: '{}'",
self.gpu_name.as_deref().unwrap_or("auto")
);
tracing::info!(
"> gpu_index: {}",
self.gpu_index.map_or("auto".to_string(), |i| i.to_string())
);
tracing::info!(
"> gpu_card_path: '{}'",
self.gpu_card_path.as_deref().unwrap_or("auto")
);
}
}

View File

@@ -64,14 +64,14 @@ pub struct EncodingOptionsBase {
/// Codec (e.g. "h264", "opus" etc.)
pub codec: Codec,
/// Overridable encoder (e.g. "vah264lpenc", "opusenc" etc.)
pub encoder: String,
pub encoder: Option<String>,
/// Rate control method (e.g. "cqp", "vbr", "cbr")
pub rate_control: RateControl,
}
impl EncodingOptionsBase {
pub fn debug_print(&self) {
tracing::info!("> Codec: '{}'", self.codec.as_str());
tracing::info!("> Encoder: '{}'", self.encoder);
tracing::info!("> Encoder: '{}'", self.encoder.as_deref().unwrap_or("auto"));
match &self.rate_control {
RateControl::CQP(cqp) => {
tracing::info!("> Rate Control: CQP");
@@ -93,6 +93,7 @@ impl EncodingOptionsBase {
pub struct VideoEncodingOptions {
pub base: EncodingOptionsBase,
pub encoder_type: EncoderType,
pub bit_depth: u32,
}
impl VideoEncodingOptions {
pub fn from_matches(matches: &clap::ArgMatches) -> Self {
@@ -104,10 +105,7 @@ impl VideoEncodingOptions {
.unwrap_or(&VideoCodec::H264)
.clone(),
),
encoder: matches
.get_one::<String>("video-encoder")
.unwrap_or(&"".to_string())
.clone(),
encoder: matches.get_one::<String>("video-encoder").cloned(),
rate_control: match matches
.get_one::<RateControlMethod>("video-rate-control")
.unwrap_or(&RateControlMethod::CBR)
@@ -132,6 +130,10 @@ impl VideoEncodingOptions {
.get_one::<EncoderType>("video-encoder-type")
.unwrap_or(&EncoderType::HARDWARE)
.clone(),
bit_depth: matches
.get_one::<u32>("video-bit-depth")
.copied()
.unwrap_or(8),
}
}
@@ -139,6 +141,7 @@ impl VideoEncodingOptions {
tracing::info!("Video Encoding Options:");
self.base.debug_print();
tracing::info!("> Encoder Type: {}", self.encoder_type.as_str());
tracing::info!("> Bit Depth: {}", self.bit_depth);
}
}
impl Deref for VideoEncodingOptions {
@@ -191,10 +194,7 @@ impl AudioEncodingOptions {
.unwrap_or(&AudioCodec::OPUS)
.clone(),
),
encoder: matches
.get_one::<String>("audio-encoder")
.unwrap_or(&"".to_string())
.clone(),
encoder: matches.get_one::<String>("audio-encoder").cloned(),
rate_control: match matches
.get_one::<RateControlMethod>("audio-rate-control")
.unwrap_or(&RateControlMethod::CBR)

View File

@@ -1,5 +1,5 @@
use crate::args::encoding_args::RateControl;
use crate::gpu::{GPUInfo, get_gpu_by_card_path, get_gpus_by_vendor, get_nvidia_gpu_by_cuda_id};
use crate::gpu::{GPUInfo, GPUVendor, get_gpu_by_card_path, get_gpus_by_vendor};
use clap::ValueEnum;
use gstreamer::prelude::*;
use std::error::Error;
@@ -148,7 +148,7 @@ impl VideoEncoderInfo {
pub fn apply_parameters(&self, element: &gstreamer::Element, verbose: bool) {
for (key, value) in &self.parameters {
if element.has_property(key, None) {
if element.has_property(key) {
if verbose {
tracing::debug!("Setting property {} to {}", key, value);
}
@@ -273,7 +273,7 @@ pub fn encoder_gop_params(encoder: &VideoEncoderInfo, gop_size: u32) -> VideoEnc
pub fn encoder_low_latency_params(
encoder: &VideoEncoderInfo,
rate_control: &RateControl,
_rate_control: &RateControl,
framerate: u32,
) -> VideoEncoderInfo {
// 2 second GOP size, maybe lower to 1 second for fast recovery, if needed?
@@ -375,9 +375,9 @@ pub fn get_compatible_encoders(gpus: &Vec<GPUInfo>) -> Vec<VideoEncoderInfo> {
match api {
EncoderAPI::QSV | EncoderAPI::VAAPI => {
// Safe property access with panic protection, gstreamer-rs is fun
let path = if element.has_property("device-path", None) {
let path = if element.has_property("device-path") {
Some(element.property::<String>("device-path"))
} else if element.has_property("device", None) {
} else if element.has_property("device") {
Some(element.property::<String>("device"))
} else {
None
@@ -385,15 +385,46 @@ pub fn get_compatible_encoders(gpus: &Vec<GPUInfo>) -> Vec<VideoEncoderInfo> {
path.and_then(|p| get_gpu_by_card_path(&gpus, &p))
}
EncoderAPI::NVENC if element.has_property("cuda-device-id", None) => {
let cuda_id = element.property::<u32>("cuda-device-id");
get_nvidia_gpu_by_cuda_id(&gpus, cuda_id as usize)
EncoderAPI::NVENC => {
if encoder_name.contains("device") {
// Parse by element name's index (i.e. "nvh264device{N}enc")
let re = regex::Regex::new(r"device(\d+)").unwrap();
if let Some(caps) = re.captures(encoder_name.as_str()) {
if let Some(m) = caps.get(1) {
if let Ok(id) = m.as_str().parse::<usize>() {
return get_gpus_by_vendor(&gpus, GPUVendor::NVIDIA)
.get(id)
.cloned();
}
}
}
None
} else if element.has_property("cuda-device-id") {
let device_id =
match element.property_value("cuda-device-id").get::<i32>() {
Ok(v) if v >= 0 => Some(v as usize),
_ => None,
};
// We'll just treat cuda-device-id as an index
device_id.and_then(|id| {
get_gpus_by_vendor(&gpus, GPUVendor::NVIDIA)
.get(id)
.cloned()
})
} else {
None
}
}
EncoderAPI::AMF if element.has_property("device", None) => {
let device_id = element.property::<u32>("device");
get_gpus_by_vendor(&gpus, "amd")
.get(device_id as usize)
.cloned()
EncoderAPI::AMF if element.has_property("device") => {
let device_id = match element.property_value("device").get::<u32>() {
Ok(v) => Some(v as usize),
Err(_) => None,
};
device_id.and_then(|id| {
get_gpus_by_vendor(&gpus, GPUVendor::AMD).get(id).cloned()
})
}
_ => None,
}

View File

@@ -1,14 +1,53 @@
use regex::Regex;
use std::error::Error;
use std::fs;
use std::process::Command;
use std::str;
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub enum GPUVendor {
UNKNOWN,
INTEL,
NVIDIA,
AMD,
UNKNOWN = 0x0000,
INTEL = 0x8086,
NVIDIA = 0x10de,
AMD = 0x1002,
}
impl From<u16> for GPUVendor {
fn from(value: u16) -> Self {
match value {
0x8086 => GPUVendor::INTEL,
0x10de => GPUVendor::NVIDIA,
0x1002 => GPUVendor::AMD,
_ => GPUVendor::UNKNOWN,
}
}
}
impl From<&str> for GPUVendor {
fn from(value: &str) -> Self {
match value.to_lowercase().as_str() {
"intel" => GPUVendor::INTEL,
"nvidia" => GPUVendor::NVIDIA,
"amd" => GPUVendor::AMD,
_ => GPUVendor::UNKNOWN,
}
}
}
impl From<String> for GPUVendor {
fn from(value: String) -> Self {
GPUVendor::from(value.as_str())
}
}
impl GPUVendor {
pub fn as_str(&self) -> &str {
match self {
GPUVendor::INTEL => "Intel",
GPUVendor::NVIDIA => "NVIDIA",
GPUVendor::AMD => "AMD",
GPUVendor::UNKNOWN => "Unknown",
}
}
}
impl std::fmt::Display for GPUVendor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
@@ -19,21 +58,11 @@ pub struct GPUInfo {
device_name: String,
pci_bus_id: String,
}
impl GPUInfo {
pub fn vendor(&self) -> &GPUVendor {
&self.vendor
}
pub fn vendor_string(&self) -> &str {
match self.vendor {
GPUVendor::INTEL => "Intel",
GPUVendor::NVIDIA => "NVIDIA",
GPUVendor::AMD => "AMD",
GPUVendor::UNKNOWN => "Unknown",
}
}
pub fn card_path(&self) -> &str {
&self.card_path
}
@@ -49,73 +78,122 @@ impl GPUInfo {
pub fn pci_bus_id(&self) -> &str {
&self.pci_bus_id
}
}
fn get_gpu_vendor(vendor_id: &str) -> GPUVendor {
match vendor_id {
"8086" => GPUVendor::INTEL,
"10de" => GPUVendor::NVIDIA,
"1002" => GPUVendor::AMD,
_ => GPUVendor::UNKNOWN,
pub fn as_str(&self) -> String {
format!(
"{} (Vendor: {}, Card Path: {}, Render Path: {}, PCI Bus ID: {})",
self.device_name, self.vendor, self.card_path, self.render_path, self.pci_bus_id
)
}
}
impl std::fmt::Display for GPUInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
/// Retrieves a list of GPUs available on the system.
/// # Returns
/// * `Vec<GPUInfo>` - A vector containing information about each GPU.
pub fn get_gpus() -> Vec<GPUInfo> {
let output = Command::new("lspci")
.args(["-mm", "-nn"])
.output()
.expect("Failed to execute lspci");
pub fn get_gpus() -> Result<Vec<GPUInfo>, Box<dyn Error>> {
// Use "/sys/class/drm/card{}" to find all GPU devices
let mut gpus = Vec::new();
let re = regex::Regex::new(r"^card(\d+)$")?;
for entry in fs::read_dir("/sys/class/drm")? {
let entry = entry?;
let file_name = entry.file_name();
let file_name_str = file_name.to_string_lossy();
str::from_utf8(&output.stdout)
.unwrap()
.lines()
.filter_map(|line| parse_pci_device(line))
.filter(|(class_id, _, _, _)| matches!(class_id.as_str(), "0300" | "0302" | "0380"))
.filter_map(|(_, vendor_id, device_name, pci_addr)| {
get_dri_device_path(&pci_addr)
.map(|(card, render)| (vendor_id, card, render, device_name, pci_addr))
})
.map(
|(vid, card_path, render_path, device_name, pci_bus_id)| GPUInfo {
vendor: get_gpu_vendor(&vid),
// We are only interested in entries that match "cardN", and getting the minor number
let caps = match re.captures(&file_name_str) {
Some(caps) => caps,
None => continue,
};
let minor = &caps[1];
// Read vendor and device ID
let vendor_str = fs::read_to_string(format!("/sys/class/drm/card{}/device/vendor", minor))?;
let vendor_str = vendor_str.trim_start_matches("0x").trim_end_matches('\n');
let vendor = u16::from_str_radix(vendor_str, 16)?;
let device_str = fs::read_to_string(format!("/sys/class/drm/card{}/device/device", minor))?;
let device_str = device_str.trim_start_matches("0x").trim_end_matches('\n');
// Look up in hwdata PCI database
let device_name = match fs::read_to_string("/usr/share/hwdata/pci.ids") {
Ok(pci_ids) => parse_pci_ids(&pci_ids, vendor_str, device_str).unwrap_or("".to_owned()),
Err(e) => {
tracing::warn!("Failed to read /usr/share/hwdata/pci.ids: {}", e);
"".to_owned()
}
};
// Read PCI bus ID
let pci_bus_id = fs::read_to_string(format!("/sys/class/drm/card{}/device/uevent", minor))?;
let pci_bus_id = pci_bus_id
.lines()
.find_map(|line| {
if line.starts_with("PCI_SLOT_NAME=") {
Some(line.trim_start_matches("PCI_SLOT_NAME=").to_owned())
} else {
None
}
})
.ok_or("PCI_SLOT_NAME not found")?;
// Get DRI device paths
if let Some((card_path, render_path)) = get_dri_device_path(pci_bus_id.as_str()) {
gpus.push(GPUInfo {
vendor: vendor.into(),
card_path,
render_path,
device_name,
pci_bus_id,
},
)
.collect()
});
}
}
Ok(gpus)
}
fn parse_pci_device(line: &str) -> Option<(String, String, String, String)> {
let re = Regex::new(
r#"^(?P<pci_addr>\S+)\s+"[^\[]*\[(?P<class_id>[0-9a-f]{4})\].*?"\s+"[^"]*?\[(?P<vendor_id>[0-9a-f]{4})\][^"]*?"\s+"(?P<device_name>[^"]+?)""#,
).unwrap();
fn parse_pci_ids(pci_data: &str, vendor_id: &str, device_id: &str) -> Option<String> {
let mut current_vendor = String::new();
let vendor_id = vendor_id.to_lowercase();
let device_id = device_id.to_lowercase();
let caps = re.captures(line)?;
for line in pci_data.lines() {
// Skip comments and empty lines
if line.starts_with('#') || line.is_empty() {
continue;
}
// Clean device name by removing only the trailing device ID
let device_name = caps.name("device_name")?.as_str().trim();
let clean_re = Regex::new(r"\s+\[[0-9a-f]{4}\]$").unwrap();
let cleaned_name = clean_re.replace(device_name, "").trim().to_string();
// Check for vendor lines (no leading whitespace)
if !line.starts_with(['\t', ' ']) {
let mut parts = line.splitn(2, ' ');
if let (Some(vendor), Some(_)) = (parts.next(), parts.next()) {
current_vendor = vendor.to_lowercase();
}
continue;
}
Some((
caps.name("class_id")?.as_str().to_lowercase(),
caps.name("vendor_id")?.as_str().to_lowercase(),
cleaned_name,
caps.name("pci_addr")?.as_str().to_string(),
))
// Check for device lines (leading whitespace)
let line = line.trim_start();
let mut parts = line.splitn(2, ' ');
if let (Some(dev_id), Some(desc)) = (parts.next(), parts.next()) {
if dev_id.to_lowercase() == device_id && current_vendor == vendor_id {
return Some(desc.trim().to_owned());
}
}
}
None
}
fn get_dri_device_path(pci_addr: &str) -> Option<(String, String)> {
let target_dir = format!("0000:{}", pci_addr);
let entries = fs::read_dir("/sys/bus/pci/devices").ok()?;
for entry in entries.flatten() {
if !entry.path().to_string_lossy().contains(&target_dir) {
if !entry.path().to_string_lossy().contains(&pci_addr) {
continue;
}
@@ -145,10 +223,9 @@ fn get_dri_device_path(pci_addr: &str) -> Option<(String, String)> {
None
}
pub fn get_gpus_by_vendor(gpus: &[GPUInfo], vendor: &str) -> Vec<GPUInfo> {
let target = vendor.to_lowercase();
pub fn get_gpus_by_vendor(gpus: &[GPUInfo], vendor: GPUVendor) -> Vec<GPUInfo> {
gpus.iter()
.filter(|gpu| gpu.vendor_string().to_lowercase() == target)
.filter(|gpu| *gpu.vendor() == vendor)
.cloned()
.collect()
}
@@ -169,42 +246,22 @@ pub fn get_gpu_by_card_path(gpus: &[GPUInfo], path: &str) -> Option<GPUInfo> {
.cloned()
}
pub fn get_nvidia_gpu_by_cuda_id(gpus: &[GPUInfo], cuda_device_id: usize) -> Option<GPUInfo> {
// Check if nvidia-smi is available
if Command::new("nvidia-smi").arg("--help").output().is_err() {
tracing::warn!("nvidia-smi is not available");
return None;
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[ignore = "requires access to /sys/class/drm and a GPU; not suitable for default CI"]
fn test_get_gpus() {
let gpus = get_gpus().unwrap();
// Environment-dependent; just print for manual runs.
if gpus.is_empty() {
eprintln!("No GPUs found; skipping assertions");
return;
}
// Print the GPUs found for manual verification
for gpu in &gpus {
println!("{}", gpu);
}
}
// Run nvidia-smi to get information about the CUDA device
let output = Command::new("nvidia-smi")
.args([
"--query-gpu=pci.bus_id",
"--format=csv,noheader",
"-i",
&cuda_device_id.to_string(),
])
.output()
.ok()?;
if !output.status.success() {
return None;
}
// Parse the output to get the PCI bus ID
let pci_bus_id = str::from_utf8(&output.stdout).ok()?.trim().to_uppercase(); // nvidia-smi returns uppercase PCI IDs
// Convert from 00000000:05:00.0 to 05:00.0 if needed
let pci_bus_id = if pci_bus_id.starts_with("00000000:") {
pci_bus_id[9..].to_string() // Skip the domain part
} else if pci_bus_id.starts_with("0000:") {
pci_bus_id[5..].to_string() // Alternate check for older nvidia-smi versions
} else {
pci_bus_id
};
// Find the GPU with the matching PCI bus ID
gpus.iter()
.find(|gpu| gpu.vendor == GPUVendor::NVIDIA && gpu.pci_bus_id.to_uppercase() == pci_bus_id)
.cloned()
}

View File

@@ -25,43 +25,42 @@ use tracing_subscriber::filter::LevelFilter;
// Handles gathering GPU information and selecting the most suitable GPU
fn handle_gpus(args: &args::Args) -> Result<Vec<gpu::GPUInfo>, Box<dyn Error>> {
tracing::info!("Gathering GPU information..");
let mut gpus = gpu::get_gpus();
let mut gpus = gpu::get_gpus()?;
if gpus.is_empty() {
return Err("No GPUs found".into());
}
for (i, gpu) in gpus.iter().enumerate() {
tracing::info!(
"> [GPU:{}] Vendor: '{}', Card Path: '{}', Render Path: '{}', Device Name: '{}'",
i,
gpu.vendor_string(),
gpu.card_path(),
gpu.render_path(),
gpu.device_name()
);
tracing::info!("> [GPU:{}] {}", i, gpu);
}
// Additional GPU filtering
if !args.device.gpu_card_path.is_empty() {
if let Some(gpu) = gpu::get_gpu_by_card_path(&gpus, &args.device.gpu_card_path) {
return Ok(Vec::from([gpu]));
}
if let Some(gpu_card_path) = &args.device.gpu_card_path {
return match gpu::get_gpu_by_card_path(&gpus, gpu_card_path.as_str()) {
Some(gpu) => Ok(Vec::from([gpu])),
None => Err(format!(
"No GPU found with the specified card path: '{}'",
gpu_card_path
)
.into()),
};
} else {
// Run all filters that are not empty
let mut filtered_gpus = gpus.clone();
if !args.device.gpu_vendor.is_empty() {
filtered_gpus = gpu::get_gpus_by_vendor(&filtered_gpus, &args.device.gpu_vendor);
if let Some(gpu_vendor) = &args.device.gpu_vendor {
filtered_gpus =
gpu::get_gpus_by_vendor(&filtered_gpus, GPUVendor::from(gpu_vendor.clone()));
}
if !args.device.gpu_name.is_empty() {
filtered_gpus = gpu::get_gpus_by_device_name(&filtered_gpus, &args.device.gpu_name);
if let Some(gpu_name) = &args.device.gpu_name {
filtered_gpus = gpu::get_gpus_by_device_name(&filtered_gpus, gpu_name.as_str());
}
if args.device.gpu_index > -1 {
if let Some(gpu_index) = &args.device.gpu_index {
// get single GPU by index
let gpu_index = args.device.gpu_index as usize;
let gpu_index = *gpu_index as usize;
if gpu_index >= filtered_gpus.len() {
return Err(format!(
"GPU index {} is out of bounds for available GPUs (0-{})",
gpu_index,
filtered_gpus.len() - 1
filtered_gpus.len().saturating_sub(1)
)
.into());
}
@@ -77,10 +76,10 @@ fn handle_gpus(args: &args::Args) -> Result<Vec<gpu::GPUInfo>, Box<dyn Error>> {
if gpus.is_empty() {
return Err(format!(
"No GPU(s) found with the specified parameters: vendor='{}', name='{}', index='{}', card_path='{}'",
args.device.gpu_vendor,
args.device.gpu_name,
args.device.gpu_index,
args.device.gpu_card_path
args.device.gpu_vendor.as_deref().unwrap_or("auto"),
args.device.gpu_name.as_deref().unwrap_or("auto"),
args.device.gpu_index.map_or("auto".to_string(), |i| i.to_string()),
args.device.gpu_card_path.as_deref().unwrap_or("auto")
).into());
}
Ok(gpus)
@@ -112,9 +111,8 @@ fn handle_encoder_video(
}
// Pick most suitable video encoder based on given arguments
let video_encoder;
if !args.encoding.video.encoder.is_empty() {
video_encoder =
enc_helper::get_encoder_by_name(&video_encoders, &args.encoding.video.encoder)?;
if let Some(wanted_encoder) = &args.encoding.video.encoder {
video_encoder = enc_helper::get_encoder_by_name(&video_encoders, wanted_encoder.as_str())?;
} else {
video_encoder = enc_helper::get_best_working_encoder(
&video_encoders,
@@ -164,11 +162,12 @@ fn handle_encoder_video_settings(
// Handles picking audio encoder
// TODO: Expand enc_helper with audio types, for now just opus
fn handle_encoder_audio(args: &args::Args) -> String {
let audio_encoder = if args.encoding.audio.encoder.is_empty() {
"opusenc".to_string()
} else {
args.encoding.audio.encoder.clone()
};
let audio_encoder = args
.encoding
.audio
.encoder
.clone()
.unwrap_or_else(|| "opusenc".to_string());
tracing::info!("Selected audio encoder: '{}'", audio_encoder);
audio_encoder
}
@@ -197,10 +196,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Get relay URL from arguments
let relay_url = args.app.relay_url.trim();
// Initialize libp2p (logically the sink should handle the connection to be independent)
let nestri_p2p = Arc::new(NestriP2P::new().await?);
let p2p_conn = nestri_p2p.connect(relay_url).await?;
gstreamer::init()?;
let _ = gstrswebrtc::plugin_register_static(); // Might be already registered, so we'll pass..
@@ -239,6 +234,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Handle audio encoder selection
let audio_encoder = handle_encoder_audio(&args);
// Initialize libp2p (logically the sink should handle the connection to be independent)
let nestri_p2p = Arc::new(NestriP2P::new().await?);
let p2p_conn = nestri_p2p.connect(relay_url).await?;
/*** PIPELINE CREATION ***/
// Create the pipeline
let pipeline = Arc::new(gstreamer::Pipeline::new());
@@ -250,7 +249,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
gstreamer::ElementFactory::make("pulsesrc").build()?
}
encoding_args::AudioCaptureMethod::PIPEWIRE => {
gstreamer::ElementFactory::make("pipewiresrc").build()?
let pw_element = gstreamer::ElementFactory::make("pipewiresrc").build()?;
pw_element.set_property("use-bufferpool", &false); // false for audio
pw_element
}
encoding_args::AudioCaptureMethod::ALSA => {
gstreamer::ElementFactory::make("alsasrc").build()?
@@ -279,7 +280,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
},
);
// If has "frame-size" (opus), set to 10 for lower latency (below 10 seems to be too low?)
if audio_encoder.has_property("frame-size", None) {
if audio_encoder.has_property("frame-size") {
audio_encoder.set_property_from_str("frame-size", "10");
}
@@ -313,6 +314,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
))?;
caps_filter.set_property("caps", &caps);
// Get bit-depth and choose appropriate format (NV12 or P010_10LE)
// H.264 does not support above 8-bit. Also we require DMA-BUF.
let video_format = if args.encoding.video.bit_depth == 10
&& args.app.dma_buf
&& video_encoder_info.codec != enc_helper::VideoCodec::H264
{
"P010_10LE"
} else {
"NV12"
};
// GL and CUDA elements (NVIDIA only..)
let mut glupload = None;
let mut glconvert = None;
@@ -325,7 +337,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
glconvert = Some(gstreamer::ElementFactory::make("glcolorconvert").build()?);
// GL color convert caps
let caps_filter = gstreamer::ElementFactory::make("capsfilter").build()?;
let gl_caps = gstreamer::Caps::from_str("video/x-raw(memory:GLMemory),format=NV12")?;
let gl_caps = gstreamer::Caps::from_str(
format!("video/x-raw(memory:GLMemory),format={video_format}").as_str(),
)?;
caps_filter.set_property("caps", &gl_caps);
gl_caps_filter = Some(caps_filter);
// CUDA upload element
@@ -341,7 +355,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
vapostproc = Some(gstreamer::ElementFactory::make("vapostproc").build()?);
// VA caps filter
let caps_filter = gstreamer::ElementFactory::make("capsfilter").build()?;
let va_caps = gstreamer::Caps::from_str("video/x-raw(memory:VAMemory),format=NV12")?;
let va_caps = gstreamer::Caps::from_str(
format!("video/x-raw(memory:VAMemory),format={video_format}").as_str(),
)?;
caps_filter.set_property("caps", &va_caps);
va_caps_filter = Some(caps_filter);
}

View File

@@ -9,7 +9,7 @@ use atomic_refcell::AtomicRefCell;
use glib::subclass::prelude::*;
use gstreamer::glib;
use gstreamer::prelude::*;
use gstreamer_webrtc::{gst_sdp, WebRTCSDPType, WebRTCSessionDescription};
use gstreamer_webrtc::{WebRTCSDPType, WebRTCSessionDescription, gst_sdp};
use gstrswebrtc::signaller::{Signallable, SignallableImpl};
use parking_lot::RwLock as PLRwLock;
use prost::Message;

View File

@@ -1,3 +1,3 @@
pub mod p2p;
pub mod p2p_protocol_stream;
pub mod p2p_safestream;
pub mod p2p_protocol_stream;

View File

@@ -1,10 +1,16 @@
use libp2p::futures::StreamExt;
use libp2p::multiaddr::Protocol;
use libp2p::{
Multiaddr, PeerId, Swarm, identify, noise, ping,
Multiaddr, PeerId, Swarm, identity,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux,
};
use libp2p_autonat as autonat;
use libp2p_identify as identify;
use libp2p_noise as noise;
use libp2p_ping as ping;
use libp2p_stream as stream;
use libp2p_tcp as tcp;
use libp2p_yamux as yamux;
use std::error::Error;
use std::sync::Arc;
use tokio::sync::Mutex;
@@ -12,15 +18,28 @@ use tokio::sync::Mutex;
#[derive(Clone)]
pub struct NestriConnection {
pub peer_id: PeerId,
pub control: libp2p_stream::Control,
pub control: stream::Control,
}
#[derive(NetworkBehaviour)]
struct NestriBehaviour {
identify: identify::Behaviour,
ping: ping::Behaviour,
stream: libp2p_stream::Behaviour,
autonatv2: libp2p::autonat::v2::client::Behaviour,
stream: stream::Behaviour,
autonatv2: autonat::v2::client::Behaviour,
}
impl NestriBehaviour {
fn new(key: identity::PublicKey) -> Self {
Self {
identify: identify::Behaviour::new(identify::Config::new(
"/ipfs/id/1.0.0".to_string(),
key,
)),
ping: ping::Behaviour::default(),
stream: stream::Behaviour::default(),
autonatv2: autonat::v2::client::Behaviour::default(),
}
}
}
pub struct NestriP2P {
@@ -39,22 +58,7 @@ impl NestriP2P {
.with_dns()?
.with_websocket(noise::Config::new, yamux::Config::default)
.await?
.with_behaviour(|key| {
let identify_behaviour = identify::Behaviour::new(identify::Config::new(
"/ipfs/id/1.0.0".to_string(),
key.public(),
));
let ping_behaviour = ping::Behaviour::default();
let stream_behaviour = libp2p_stream::Behaviour::default();
let autonatv2_behaviour = libp2p::autonat::v2::client::Behaviour::default();
Ok(NestriBehaviour {
identify: identify_behaviour,
ping: ping_behaviour,
stream: stream_behaviour,
autonatv2: autonatv2_behaviour,
})
})?
.with_behaviour(|key| NestriBehaviour::new(key.public()))?
.build(),
));
@@ -62,12 +66,6 @@ impl NestriP2P {
let swarm_clone = swarm.clone();
tokio::spawn(swarm_loop(swarm_clone));
{
let mut swarm_lock = swarm.lock().await;
swarm_lock.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; // IPv4 - TCP Raw
swarm_lock.listen_on("/ip6/::/tcp/0".parse()?)?; // IPv6 - TCP Raw
}
Ok(NestriP2P { swarm })
}
@@ -95,6 +93,55 @@ async fn swarm_loop(swarm: Arc<Mutex<Swarm<NestriBehaviour>>>) {
swarm_lock.select_next_some().await
};
match event {
/* Ping Events */
SwarmEvent::Behaviour(NestriBehaviourEvent::Ping(ping::Event {
peer,
connection,
result,
})) => {
if let Ok(latency) = result {
tracing::debug!(
"Ping event - peer: {}, connection: {:?}, latency: {} us",
peer,
connection,
latency.as_micros()
);
} else if let Err(err) = result {
tracing::warn!(
"Ping event - peer: {}, connection: {:?}, error: {:?}",
peer,
connection,
err
);
}
}
/* Autonat (v2) Events */
SwarmEvent::Behaviour(NestriBehaviourEvent::Autonatv2(
autonat::v2::client::Event {
server,
tested_addr,
bytes_sent,
result,
},
)) => {
if let Ok(()) = result {
tracing::debug!(
"AutonatV2 event - test server '{}' verified address '{}' with {} bytes sent",
server,
tested_addr,
bytes_sent
);
} else if let Err(err) = result {
tracing::warn!(
"AutonatV2 event - test server '{}' failed to verify address '{}' with {} bytes sent: {:?}",
server,
tested_addr,
bytes_sent,
err
);
}
}
/* Swarm Events */
SwarmEvent::NewListenAddr { address, .. } => {
tracing::info!("Listening on: '{}'", address);
}
@@ -130,6 +177,13 @@ async fn swarm_loop(swarm: Arc<Mutex<Swarm<NestriBehaviour>>>) {
tracing::error!("Failed to connect: {}", error);
}
}
SwarmEvent::ExternalAddrConfirmed { address } => {
tracing::info!("Confirmed external address: {}", address);
}
/* Unhandled Events */
SwarmEvent::Behaviour(event) => {
tracing::warn!("Unhandled Behaviour event: {:?}", event);
}
_ => {}
}
}

View File

@@ -1,9 +1,6 @@
use byteorder::{BigEndian, ByteOrder};
use libp2p::futures::io::{ReadHalf, WriteHalf};
use libp2p::futures::{AsyncReadExt, AsyncWriteExt};
use prost::Message;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::sync::Arc;
use tokio::sync::Mutex;
@@ -22,37 +19,6 @@ impl SafeStream {
}
}
pub async fn send_json<T: Serialize>(
&self,
data: &T,
) -> Result<(), Box<dyn std::error::Error>> {
let json_data = serde_json::to_vec(data)?;
tracing::info!("Sending JSON");
let e = self.send_with_length_prefix(&json_data).await;
tracing::info!("Sent JSON");
e
}
pub async fn receive_json<T: DeserializeOwned>(&self) -> Result<T, Box<dyn std::error::Error>> {
let data = self.receive_with_length_prefix().await?;
let msg = serde_json::from_slice(&data)?;
Ok(msg)
}
pub async fn send_proto<M: Message>(&self, msg: &M) -> Result<(), Box<dyn std::error::Error>> {
let mut proto_data = Vec::new();
msg.encode(&mut proto_data)?;
self.send_with_length_prefix(&proto_data).await
}
pub async fn receive_proto<M: Message + Default>(
&self,
) -> Result<M, Box<dyn std::error::Error>> {
let data = self.receive_with_length_prefix().await?;
let msg = M::decode(&*data)?;
Ok(msg)
}
pub async fn send_raw(&self, data: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
self.send_with_length_prefix(data).await
}

View File

@@ -3,17 +3,17 @@
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtoTimestampEntry {
#[prost(string, tag="1")]
#[prost(string, tag = "1")]
pub stage: ::prost::alloc::string::String,
#[prost(message, optional, tag="2")]
#[prost(message, optional, tag = "2")]
pub time: ::core::option::Option<::prost_types::Timestamp>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtoLatencyTracker {
#[prost(string, tag="1")]
#[prost(string, tag = "1")]
pub sequence_id: ::prost::alloc::string::String,
#[prost(message, repeated, tag="2")]
#[prost(message, repeated, tag = "2")]
pub timestamps: ::prost::alloc::vec::Vec<ProtoTimestampEntry>,
}
/// MouseMove message
@@ -21,11 +21,11 @@ pub struct ProtoLatencyTracker {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtoMouseMove {
/// Fixed value "MouseMove"
#[prost(string, tag="1")]
#[prost(string, tag = "1")]
pub r#type: ::prost::alloc::string::String,
#[prost(int32, tag="2")]
#[prost(int32, tag = "2")]
pub x: i32,
#[prost(int32, tag="3")]
#[prost(int32, tag = "3")]
pub y: i32,
}
/// MouseMoveAbs message
@@ -33,11 +33,11 @@ pub struct ProtoMouseMove {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtoMouseMoveAbs {
/// Fixed value "MouseMoveAbs"
#[prost(string, tag="1")]
#[prost(string, tag = "1")]
pub r#type: ::prost::alloc::string::String,
#[prost(int32, tag="2")]
#[prost(int32, tag = "2")]
pub x: i32,
#[prost(int32, tag="3")]
#[prost(int32, tag = "3")]
pub y: i32,
}
/// MouseWheel message
@@ -45,11 +45,11 @@ pub struct ProtoMouseMoveAbs {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtoMouseWheel {
/// Fixed value "MouseWheel"
#[prost(string, tag="1")]
#[prost(string, tag = "1")]
pub r#type: ::prost::alloc::string::String,
#[prost(int32, tag="2")]
#[prost(int32, tag = "2")]
pub x: i32,
#[prost(int32, tag="3")]
#[prost(int32, tag = "3")]
pub y: i32,
}
/// MouseKeyDown message
@@ -57,9 +57,9 @@ pub struct ProtoMouseWheel {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtoMouseKeyDown {
/// Fixed value "MouseKeyDown"
#[prost(string, tag="1")]
#[prost(string, tag = "1")]
pub r#type: ::prost::alloc::string::String,
#[prost(int32, tag="2")]
#[prost(int32, tag = "2")]
pub key: i32,
}
/// MouseKeyUp message
@@ -67,9 +67,9 @@ pub struct ProtoMouseKeyDown {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtoMouseKeyUp {
/// Fixed value "MouseKeyUp"
#[prost(string, tag="1")]
#[prost(string, tag = "1")]
pub r#type: ::prost::alloc::string::String,
#[prost(int32, tag="2")]
#[prost(int32, tag = "2")]
pub key: i32,
}
/// KeyDown message
@@ -77,9 +77,9 @@ pub struct ProtoMouseKeyUp {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtoKeyDown {
/// Fixed value "KeyDown"
#[prost(string, tag="1")]
#[prost(string, tag = "1")]
pub r#type: ::prost::alloc::string::String,
#[prost(int32, tag="2")]
#[prost(int32, tag = "2")]
pub key: i32,
}
/// KeyUp message
@@ -87,53 +87,53 @@ pub struct ProtoKeyDown {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtoKeyUp {
/// Fixed value "KeyUp"
#[prost(string, tag="1")]
#[prost(string, tag = "1")]
pub r#type: ::prost::alloc::string::String,
#[prost(int32, tag="2")]
#[prost(int32, tag = "2")]
pub key: i32,
}
/// Union of all Input types
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtoInput {
#[prost(oneof="proto_input::InputType", tags="1, 2, 3, 4, 5, 6, 7")]
#[prost(oneof = "proto_input::InputType", tags = "1, 2, 3, 4, 5, 6, 7")]
pub input_type: ::core::option::Option<proto_input::InputType>,
}
/// Nested message and enum types in `ProtoInput`.
pub mod proto_input {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum InputType {
#[prost(message, tag="1")]
#[prost(message, tag = "1")]
MouseMove(super::ProtoMouseMove),
#[prost(message, tag="2")]
#[prost(message, tag = "2")]
MouseMoveAbs(super::ProtoMouseMoveAbs),
#[prost(message, tag="3")]
#[prost(message, tag = "3")]
MouseWheel(super::ProtoMouseWheel),
#[prost(message, tag="4")]
#[prost(message, tag = "4")]
MouseKeyDown(super::ProtoMouseKeyDown),
#[prost(message, tag="5")]
#[prost(message, tag = "5")]
MouseKeyUp(super::ProtoMouseKeyUp),
#[prost(message, tag="6")]
#[prost(message, tag = "6")]
KeyDown(super::ProtoKeyDown),
#[prost(message, tag="7")]
#[prost(message, tag = "7")]
KeyUp(super::ProtoKeyUp),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtoMessageBase {
#[prost(string, tag="1")]
#[prost(string, tag = "1")]
pub payload_type: ::prost::alloc::string::String,
#[prost(message, optional, tag="2")]
#[prost(message, optional, tag = "2")]
pub latency: ::core::option::Option<ProtoLatencyTracker>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtoMessageInput {
#[prost(message, optional, tag="1")]
#[prost(message, optional, tag = "1")]
pub message_base: ::core::option::Option<ProtoMessageBase>,
#[prost(message, optional, tag="2")]
#[prost(message, optional, tag = "2")]
pub data: ::core::option::Option<ProtoInput>,
}
// @@protoc_insertion_point(module)