From 63f730a36d7c9534d130b85ec178a446d6a733f1 Mon Sep 17 00:00:00 2001 From: Wanjohi <71614375+wanjohiryan@users.noreply.github.com> Date: Sat, 2 Dec 2023 21:13:50 +0300 Subject: [PATCH] Changes, MVP --- .gitignore | 2 +- Cargo.lock | 160 +++++++++++++ Cargo.toml | 8 + src/main.rs.test.txt | 542 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 711 insertions(+), 1 deletion(-) create mode 100644 src/main.rs.test.txt diff --git a/.gitignore b/.gitignore index 0725822..bc8e85f 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,7 @@ # already existing elements were commented out #/target - +/hls* # Added by cargo # diff --git a/Cargo.lock b/Cargo.lock index bd10e49..9f445a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -47,6 +47,18 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "bytes" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" + [[package]] name = "cc" version = "1.0.83" @@ -80,7 +92,9 @@ checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", + "wasm-bindgen", "windows-targets 0.48.5", ] @@ -139,6 +153,12 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "futures-sink" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" + [[package]] name = "futures-task" version = "0.3.29" @@ -272,6 +292,32 @@ dependencies = [ "thiserror", ] +[[package]] +name = "gstreamer-app" +version = "0.22.0" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#7f234c88acec5f5233342fa01e0c7ffde51bd049" +dependencies = [ + "futures-core", + "futures-sink", + "glib", + "gstreamer", + "gstreamer-app-sys", + "gstreamer-base", + "libc", +] + +[[package]] +name = "gstreamer-app-sys" +version = "0.22.0" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#7f234c88acec5f5233342fa01e0c7ffde51bd049" +dependencies = [ + "glib-sys", + "gstreamer-base-sys", + "gstreamer-sys", + "libc", + "system-deps", +] + [[package]] name = "gstreamer-audio" version = "0.22.0" @@ -323,6 +369,28 @@ dependencies = [ "system-deps", ] +[[package]] +name = "gstreamer-check" +version = "0.22.0" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#7f234c88acec5f5233342fa01e0c7ffde51bd049" +dependencies = [ + "glib", + "gstreamer", + "gstreamer-check-sys", +] + +[[package]] +name = "gstreamer-check-sys" +version = "0.22.0" +source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#7f234c88acec5f5233342fa01e0c7ffde51bd049" +dependencies = [ + "glib-sys", + "gobject-sys", + "gstreamer-sys", + "libc", + "system-deps", +] + [[package]] name = "gstreamer-gl" version = "0.22.0" @@ -462,6 +530,14 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "isobmff" +version = "0.1.0" +source = "git+https://github.com/LMinJae/isobmff-rs#fbfff435d99bf909766ad738b66fb974f285eee3" +dependencies = [ + "bytes", +] + [[package]] name = "itertools" version = "0.12.0" @@ -471,6 +547,12 @@ dependencies = [ "either", ] +[[package]] +name = "itoa" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" + [[package]] name = "js-sys" version = "0.3.66" @@ -492,16 +574,34 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "m3u8-rs" +version = "5.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d39af8845edca961e3286dcbafeb9e6407d3df6a616ef086847162d46f438d75" +dependencies = [ + "chrono", + "nom", +] + [[package]] name = "me" version = "0.1.0" dependencies = [ "anyhow", + "bytes", + "chrono", "gst-plugin-fmp4", "gstreamer", + "gstreamer-app", "gstreamer-base", + "gstreamer-check", "gstreamer-gl", + "gstreamer-pbutils", "gstreamer-video", + "isobmff", + "m3u8-rs", + "mp4", ] [[package]] @@ -510,12 +610,53 @@ version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + +[[package]] +name = "mp4" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9ef834d5ed55e494a2ae350220314dc4aacd1c43a9498b00e320e0ea352a5c3" +dependencies = [ + "byteorder", + "bytes", + "num-rational", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "muldiv" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "956787520e75e9bd233246045d19f42fb73242759cc57fba9611d940ae96d4b0" +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] +name = "num-bigint" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -533,8 +674,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" dependencies = [ "autocfg", + "num-bigint", "num-integer", "num-traits", + "serde", ] [[package]] @@ -642,6 +785,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "ryu" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" + [[package]] name = "serde" version = "1.0.193" @@ -662,6 +811,17 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "serde_json" +version = "1.0.108" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_spanned" version = "0.6.4" diff --git a/Cargo.toml b/Cargo.toml index 32ccd09..ebbcf3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,15 @@ edition = "2021" anyhow = "1" gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } +gst-pbutils = { package = "gstreamer-pbutils", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-gl = { package = "gstreamer-gl", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-plugin-fmp4 = { git = "https://github.com/sdroege/gst-plugin-rs", version = "0.12.0-alpha.1" } +chrono = "0.4.31" +m3u8-rs = "5.0.4" +isobmff = { git = "https://github.com/LMinJae/isobmff-rs", version = "0.1.0" } +bytes = "1.5.0" +mp4 = "0.14.0" diff --git a/src/main.rs.test.txt b/src/main.rs.test.txt new file mode 100644 index 0000000..5aadaa5 --- /dev/null +++ b/src/main.rs.test.txt @@ -0,0 +1,542 @@ +// Copyright (C) 2022 Mathieu Duponchelle +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +// This creates a live HLS stream with one video playlist and two video playlists. +// Basic trimming is implemented + +use bytes::BytesMut; +use gst::prelude::*; + +use std::collections::VecDeque; +use std::io::Cursor; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +use anyhow::Error; +use chrono::{DateTime, Duration, Utc}; +use isobmff::IO; +use m3u8_rs::{ + AlternativeMedia, AlternativeMediaType, MasterPlaylist, MediaPlaylist, MediaSegment, + VariantStream, +}; + +struct State { + video_streams: Vec, + audio_streams: Vec, + all_mimes: Vec, + path: PathBuf, + wrote_manifest: bool, +} + +impl State { + fn maybe_write_manifest(&mut self) { + if self.wrote_manifest { + return; + } + + if self.all_mimes.len() < self.video_streams.len() + self.audio_streams.len() { + return; + } + + let mut all_mimes = self.all_mimes.clone(); + all_mimes.sort(); + all_mimes.dedup(); + + let playlist = MasterPlaylist { + version: Some(7), + variants: self + .video_streams + .iter() + .map(|stream| { + let mut path = PathBuf::new(); + + path.push(&stream.name); + path.push("manifest.m3u8"); + + VariantStream { + uri: path.as_path().display().to_string(), + bandwidth: stream.bitrate, + codecs: Some(all_mimes.join(",")), + resolution: Some(m3u8_rs::Resolution { + width: stream.width, + height: stream.height, + }), + audio: Some("audio".to_string()), + ..Default::default() + } + }) + .collect(), + alternatives: self + .audio_streams + .iter() + .map(|stream| { + let mut path = PathBuf::new(); + path.push(&stream.name); + path.push("manifest.m3u8"); + + AlternativeMedia { + media_type: AlternativeMediaType::Audio, + uri: Some(path.as_path().display().to_string()), + group_id: "audio".to_string(), + language: Some(stream.lang.clone()), + name: stream.name.clone(), + default: stream.default, + autoselect: stream.default, + channels: Some("2".to_string()), + ..Default::default() + } + }) + .collect(), + independent_segments: true, + ..Default::default() + }; + + println!("Writing master manifest to {}", self.path.display()); + + let mut file = std::fs::File::create(&self.path).unwrap(); + playlist + .write_to(&mut file) + .expect("Failed to write master playlist"); + + self.wrote_manifest = true; + } +} + +struct Segment { + date_time: DateTime, + duration: gst::ClockTime, + path: String, +} + +struct UnreffedSegment { + removal_time: DateTime, + path: String, +} + +struct StreamState { + path: PathBuf, + segments: VecDeque, + trimmed_segments: VecDeque, + start_date_time: Option>, + start_time: Option, + media_sequence: u64, + segment_index: u32, +} + +struct VideoStream { + name: String, + bitrate: u64, + width: u64, + height: u64, +} + +struct AudioStream { + name: String, + lang: String, + default: bool, + wave: String, +} + +fn trim_segments(state: &mut StreamState) { + // Arbitrary 5 segments window + while state.segments.len() > 5 { + let segment = state.segments.pop_front().unwrap(); + + state.media_sequence += 1; + + state.trimmed_segments.push_back(UnreffedSegment { + // HLS spec mandates that segments are removed from the filesystem no sooner + // than the duration of the longest playlist + duration of the segment. + // This is 15 seconds (12.5 + 2.5) in our case, we use 20 seconds to be on the + // safe side + removal_time: segment + .date_time + .checked_add_signed(Duration::seconds(20)) + .unwrap(), + path: segment.path.clone(), + }); + } + + while let Some(segment) = state.trimmed_segments.front() { + if segment.removal_time < state.segments.front().unwrap().date_time { + let segment = state.trimmed_segments.pop_front().unwrap(); + + let mut path = state.path.clone(); + path.push(segment.path); + println!("Removing {}", path.display()); + std::fs::remove_file(path).expect("Failed to remove old segment"); + } else { + break; + } + } +} + +fn update_manifest(state: &mut StreamState) { + // Now write the manifest + let mut path = state.path.clone(); + path.push("manifest.m3u8"); + + println!("writing manifest to {}", path.display()); + + trim_segments(state); + + let playlist = MediaPlaylist { + version: Some(7), + target_duration: 2.5, + media_sequence: state.media_sequence, + segments: state + .segments + .iter() + .enumerate() + .map(|(idx, segment)| MediaSegment { + uri: segment.path.to_string(), + duration: (segment.duration.nseconds() as f64 + / gst::ClockTime::SECOND.nseconds() as f64) as f32, + map: Some(m3u8_rs::Map { + uri: "init.cmfi".into(), + ..Default::default() + }), + program_date_time: if idx == 0 { + Some(segment.date_time.into()) + } else { + None + }, + ..Default::default() + }) + .collect(), + end_list: false, + playlist_type: None, + i_frames_only: false, + start: None, + independent_segments: true, + ..Default::default() + }; + + let mut file = std::fs::File::create(path).unwrap(); + playlist + .write_to(&mut file) + .expect("Failed to write media playlist"); +} + +fn setup_appsink(appsink: &gst_app::AppSink, name: &str, path: &Path, is_video: bool) { + let mut path: PathBuf = path.into(); + path.push(name); + + let state = Arc::new(Mutex::new(StreamState { + segments: VecDeque::new(), + trimmed_segments: VecDeque::new(), + path, + start_date_time: None, + start_time: gst::ClockTime::NONE, + media_sequence: 0, + segment_index: 0, + })); + + appsink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + .new_sample(move |sink| { + let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; + // let mut state = state.lock().unwrap(); + let mut state = state.lock().unwrap(); + + // The muxer only outputs non-empty buffer lists + let mut buffer_list = sample.buffer_list_owned().expect("no buffer list"); + assert!(!buffer_list.is_empty()); + + let mut first = buffer_list.get(0).unwrap(); + + // Each list contains a full segment, i.e. does not start with a DELTA_UNIT + assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT)); + + // If the buffer has the DISCONT and HEADER flag set then it contains the media + // header, i.e. the `ftyp`, `moov` and other media boxes. + // + // This might be the initial header or the updated header at the end of the stream. + if first + .flags() + .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) + { + // let mut path = state.path.clone(); + // std::fs::create_dir_all(&path).expect("failed to create directory"); + // path.push("init.cmfi"); + + // println!("writing header to {}", path.display()); + let map = first.map_readable().unwrap(); + let mut cursor = Cursor::new(&*map); + + let header = mp4::BoxHeader::read(&mut cursor).unwrap(); + println!("header name {}", header.name); + + match header.name { + mp4::BoxType::MoofBox => { + println!("writing manifest to moof"); + } + mp4::BoxType::MdatBox => { + println!("writing manifest to mdat"); + } + + _ => { + // Skip unknown atoms + } + } + + drop(map); + + // Remove the header from the buffer list + buffer_list.make_mut().remove(0, 1); + + // If the list is now empty then it only contained the media header and nothing + // else. + if buffer_list.is_empty() { + return Ok(gst::FlowSuccess::Ok); + } + + // Otherwise get the next buffer and continue working with that. + first = buffer_list.get(0).unwrap(); + } + + // If the buffer only has the HEADER flag set then this is a segment header that is + // followed by one or more actual media buffers. + assert!(first.flags().contains(gst::BufferFlags::HEADER)); + + let map = first.map_readable().unwrap(); + let mut cursor = Cursor::new(&*map); + + let header = mp4::BoxHeader::read(&mut cursor).unwrap(); + println!("header name 2 {}", header.name); + + match header.name { + mp4::BoxType::MoofBox => { + println!("writing manifest to moof"); + } + mp4::BoxType::MdatBox => { + println!("writing manifest to mdat"); + } + + _ => { + // Skip unknown atoms + } + } + + Ok(gst::FlowSuccess::Ok) + }) + .eos(move |_sink| { + unreachable!(); + }) + .build(), + ); +} + +fn probe_encoder(state: Arc>, enc: gst::Element) { + enc.static_pad("src").unwrap().add_probe( + gst::PadProbeType::EVENT_DOWNSTREAM, + move |_pad, info| { + let Some(ev) = info.event() else { + return gst::PadProbeReturn::Ok; + }; + let gst::EventView::Caps(ev) = ev.view() else { + return gst::PadProbeReturn::Ok; + }; + + let mime = gst_pbutils::codec_utils_caps_get_mime_codec(ev.caps()); + + let mut state = state.lock().unwrap(); + state.all_mimes.push(mime.unwrap().into()); + state.maybe_write_manifest(); + + gst::PadProbeReturn::Remove + }, + ); +} + +impl VideoStream { + fn setup( + &self, + state: Arc>, + pipeline: &gst::Pipeline, + path: &Path, + ) -> Result<(), Error> { + let src = gst::ElementFactory::make("videotestsrc") + .property("is-live", true) + .build()?; + + let raw_capsfilter = gst::ElementFactory::make("capsfilter") + .property( + "caps", + gst_video::VideoCapsBuilder::new() + .format(gst_video::VideoFormat::I420) + .width(self.width as i32) + .height(self.height as i32) + .framerate(30.into()) + .build(), + ) + .build()?; + let timeoverlay = gst::ElementFactory::make("timeoverlay").build()?; + let enc = gst::ElementFactory::make("x264enc") + .property("bframes", 0u32) + .property("bitrate", self.bitrate as u32 / 1000u32) + .property_from_str("tune", "zerolatency") + .build()?; + let h264_capsfilter = gst::ElementFactory::make("capsfilter") + .property( + "caps", + gst::Caps::builder("video/x-h264") + .field("profile", "main") + .build(), + ) + .build()?; + let mux = gst::ElementFactory::make("cmafmux") + .property("movie-timescale", 0) + .property("fragment-duration", 1.mseconds()) + .build()?; + let appsink = gst_app::AppSink::builder().buffer_list(true).build(); + + pipeline.add_many([ + &src, + &raw_capsfilter, + &timeoverlay, + &enc, + &h264_capsfilter, + &mux, + appsink.upcast_ref(), + ])?; + + gst::Element::link_many([ + &src, + &raw_capsfilter, + &timeoverlay, + &enc, + &h264_capsfilter, + &mux, + appsink.upcast_ref(), + ])?; + + probe_encoder(state, enc); + + setup_appsink(&appsink, &self.name, path, true); + + Ok(()) + } +} + +impl AudioStream { + fn setup( + &self, + state: Arc>, + pipeline: &gst::Pipeline, + path: &Path, + ) -> Result<(), Error> { + let src = gst::ElementFactory::make("audiotestsrc") + .property("is-live", true) + .property_from_str("wave", &self.wave) + .build()?; + let enc = gst::ElementFactory::make("avenc_aac").build()?; + let mux = gst::ElementFactory::make("cmafmux") + .property("fragment-duration", 1.mseconds()) + .property("movie-timescale", 0) + .build()?; + let appsink = gst_app::AppSink::builder().buffer_list(true).build(); + + pipeline.add_many([&src, &enc, &mux, appsink.upcast_ref()])?; + + gst::Element::link_many([&src, &enc, &mux, appsink.upcast_ref()])?; + + probe_encoder(state, enc); + + setup_appsink(&appsink, &self.name, path, false); + + Ok(()) + } +} + +fn main() -> Result<(), Error> { + gst::init()?; + + gstfmp4::plugin_register_static()?; + + let path = PathBuf::from("hls_live_stream"); + + let pipeline = gst::Pipeline::default(); + + std::fs::create_dir_all(&path).expect("failed to create directory"); + + let mut manifest_path = path.clone(); + manifest_path.push("manifest.m3u8"); + + let state = Arc::new(Mutex::new(State { + video_streams: vec![VideoStream { + name: "video_0".to_string(), + bitrate: 2_048_000, + width: 1280, + height: 720, + }], + audio_streams: vec![ + AudioStream { + name: "audio_0".to_string(), + lang: "eng".to_string(), + default: true, + wave: "sine".to_string(), + }, + AudioStream { + name: "audio_1".to_string(), + lang: "fre".to_string(), + default: false, + wave: "white-noise".to_string(), + }, + ], + all_mimes: vec![], + path: manifest_path.clone(), + wrote_manifest: false, + })); + + { + let state_lock = state.lock().unwrap(); + + for stream in &state_lock.video_streams { + stream.setup(state.clone(), &pipeline, &path)?; + } + + for stream in &state_lock.audio_streams { + stream.setup(state.clone(), &pipeline, &path)?; + } + } + + pipeline.set_state(gst::State::Playing)?; + + let bus = pipeline + .bus() + .expect("Pipeline without bus. Shouldn't happen!"); + + for msg in bus.iter_timed(gst::ClockTime::NONE) { + use gst::MessageView; + + match msg.view() { + MessageView::Eos(..) => { + println!("EOS"); + break; + } + MessageView::Error(err) => { + pipeline.set_state(gst::State::Null)?; + eprintln!( + "Got error from {}: {} ({})", + msg.src() + .map(|s| String::from(s.path_string())) + .unwrap_or_else(|| "None".into()), + err.error(), + err.debug().unwrap_or_else(|| "".into()), + ); + break; + } + _ => (), + } + } + + pipeline.set_state(gst::State::Null)?; + + Ok(()) +}