diff --git a/src/media.rs b/src/media.rs index b166f3a..59eb036 100644 --- a/src/media.rs +++ b/src/media.rs @@ -6,7 +6,7 @@ use anyhow::{self, Context}; use gst::prelude::*; use gst::ClockTime; - +use gst_app::glib; use std::sync::{Arc, Mutex}; use moq_transport::cache::{broadcast, fragment, segment, track}; @@ -250,19 +250,85 @@ impl GST { let pipeline = gst::parse::launch( "videotestsrc num-buffers=99 ! x264enc ! mux. \ audiotestsrc num-buffers=140 ! avenc_aac ! mux. \ - isomp4mux name=mux ! filesink location=test.mp4 name=sink \ + isomp4mux name=mux ! appsink name=sink \ ", - ).unwrap().downcast::().unwrap(); + ) + .unwrap() + .downcast::() + .unwrap(); //TODO: Create a sink that is "seekable", probably with a really good EOS https://github.com/sdroege/gst-plugin-rs/blob/80b58f3b45d2c3adee5684888937a3aa30e30cd7/mux/mp4/src/mp4mux/imp.rs#L1252 - // let appsink = pipeline - // .by_name("sink") - // .unwrap() - // .dynamic_cast::() - // .unwrap(); + let appsink = pipeline + .by_name("sink") + .unwrap() + .dynamic_cast::() + .unwrap(); - // appsink.set_buffer_list(true); + appsink.set_buffer_list(true); + + // Set the `emit-signals` property to `true` to receive signals + appsink.set_property("emit-signals", &true); + + // Set up a callback for the `new-sample` signal + // appsink.connect_new_sample(move |sink| { + // // Handle the new sample + // let sample = sink.pull_sample().map_err(|_| gst::FlowError::Error)?; + // let buffer = sample.buffer().ok_or_else(|| gst::FlowError::Error)?; + // let _map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; + + // // Perform any necessary operations on the buffer data + + // // Return Ok to indicate successful handling of the new sample + // Ok(gst::FlowSuccess::Ok) + // }); + + // Set up a pad probe on the sink pad to intercept queries + let sink_pad = appsink.static_pad("sink").unwrap(); + sink_pad.add_probe(gst::PadProbeType::QUERY_DOWNSTREAM, |pad, info| { + if let Some(ref query) = info.query_mut() { + //https://github.com/Kurento/gstreamer/blob/f2553fb153edeeecc2f4f74fca996c74dc8210df/plugins/elements/gstfilesink.c#L496C51-L496C69 + use gst::QueryViewMut; + + match query.view_mut() { + QueryViewMut::Seeking(q) => { + // We don't support any seeking at all + println!("Handling query {:?}", q); + + let format = q.format(); + if format == gst::Format::Bytes || format == gst::Format::Default { + q.set( + true, + gst::GenericFormattedValue::none_for_format(format), + gst::GenericFormattedValue::none_for_format(format), + ); + } else { + q.set( + false, + gst::GenericFormattedValue::none_for_format(format), + gst::GenericFormattedValue::none_for_format(format), + ); + + } + + } + _ => (), + } + + // if let gst::QueryView::Seeking(mut seeking) = query.view() { + // println!("Handling query {:?}", query); + + // if seeking.format() == gst::Format::Bytes || seeking.format() == gst::Format::Default { + // seeking.to_owned().set(true, 0.bytes(), 1.bytes()); + // return gst::PadProbeReturn::Handled; + // } else { + // seeking.to_owned().set(false, 0.bytes(), 1.bytes()); + // return gst::PadProbeReturn::Handled; + // } + // } + } + gst::PadProbeReturn::Pass + }); // appsink.set_callbacks( // gst_app::AppSinkCallbacks::builder() @@ -277,6 +343,27 @@ impl GST { // // The muxer only outputs non-empty buffer lists // let mut buffer_list = sample.buffer_list_owned().expect("no buffer list"); + // Ok(gst::FlowSuccess::Ok) + // }) + // .query(move |sink, query| { + // // Handle the seeking query + // if let Some(seeking) = query.downcast_mut::() { + // if seeking.format() == gst::Format::Bytes { + // // Set the seekable flag based on your custom sink's seekability + // seeking.set_seekable(true); // Replace with your own logic to determine if the sink is seekable + // return true; + // } + // } + + // // Handle other queries if needed + // // ... + + // // Call the default query handler for unhandled queries + // sink.parent_query(query) + // }) + // .build(), + // ); + // println!("buffer is empty {:?}", buffer_list.is_empty()); // assert!(!buffer_list.is_empty()); @@ -391,327 +478,327 @@ impl GST { // } // } - // If the buffer only has the HEADER flag set then this is a segment header that is - // followed by one or more actual media buffers. - // assert!(first.flags().contains(gst::BufferFlags::HEADER)); + // If the buffer only has the HEADER flag set then this is a segment header that is + // followed by one or more actual media buffers. + // assert!(first.flags().contains(gst::BufferFlags::HEADER)); - // for buffer in &*buffer_list { - // let map = buffer - // .map_readable() - // .with_context(|| "Error mapping buffer to readable") - // .map_err(|e| { - // eprintln!("{:?}", e); + // for buffer in &*buffer_list { + // let map = buffer + // .map_readable() + // .with_context(|| "Error mapping buffer to readable") + // .map_err(|e| { + // eprintln!("{:?}", e); - // gst::FlowError::Error - // })?; - // let input_buf = map.as_ref(); + // gst::FlowError::Error + // })?; + // let input_buf = map.as_ref(); - // let mut state = state.lock().unwrap(); + // let mut state = state.lock().unwrap(); - // //FIXME: The buffer in the mp4parser is "overflowing" - // //TODO: Find another way to "slice" a buffer at a time, probably use this method https://github.com/sdroege/gst-plugin-rs/blob/d9397ef1743ac92e84784d00b93dc0877d44f966/mux/fmp4/examples/hls_live.rs#L256C17-L280C18 or this + // //FIXME: The buffer in the mp4parser is "overflowing" + // //TODO: Find another way to "slice" a buffer at a time, probably use this method https://github.com/sdroege/gst-plugin-rs/blob/d9397ef1743ac92e84784d00b93dc0877d44f966/mux/fmp4/examples/hls_live.rs#L256C17-L280C18 or this - // state.mp4_parser.add(input_buf); + // state.mp4_parser.add(input_buf); - // // Update cummulative fragment variables. - // // Buffer PTS, etc. are only valid if this buffer contains MDAT data. - // if state.mp4_parser.have_mdat() { - // println!("buffer has pts {:?}", buffer.pts().is_some()); - // assert!(buffer.pts().is_some()); - // if state.fragment_pts.is_none() || state.fragment_pts > buffer.pts() { - // state.fragment_pts = buffer.pts(); - // } - // if state.fragment_dts.is_none() || state.fragment_dts > buffer.dts() { - // state.fragment_dts = buffer.dts(); - // } - // let pts = buffer.pts(); - // let duration = buffer.duration(); + // // Update cummulative fragment variables. + // // Buffer PTS, etc. are only valid if this buffer contains MDAT data. + // if state.mp4_parser.have_mdat() { + // println!("buffer has pts {:?}", buffer.pts().is_some()); + // assert!(buffer.pts().is_some()); + // if state.fragment_pts.is_none() || state.fragment_pts > buffer.pts() { + // state.fragment_pts = buffer.pts(); + // } + // if state.fragment_dts.is_none() || state.fragment_dts > buffer.dts() { + // state.fragment_dts = buffer.dts(); + // } + // let pts = buffer.pts(); + // let duration = buffer.duration(); - // let pts_plus_duration = match (pts, duration) { - // (Some(pts), Some(duration)) => Some(pts + duration), - // // Handle the case where one or both values are `None` - // _ => None, - // }; - // if state.fragment_max_pts_plus_duration.is_none() - // || state.fragment_max_pts_plus_duration < pts_plus_duration - // { - // state.fragment_max_pts_plus_duration = pts_plus_duration; - // } - // if buffer.offset() != gst_sys::GST_BUFFER_OFFSET_NONE - // && (state.fragment_offset.is_none() - // || state.fragment_offset.unwrap() > buffer.offset()) - // { - // state.fragment_offset = Some(buffer.offset()); - // } - // if buffer.offset_end() != gst_sys::GST_BUFFER_OFFSET_NONE - // && (state.fragment_offset_end.is_none() - // || state.fragment_offset_end.unwrap() < buffer.offset_end()) - // { - // state.fragment_offset_end = Some(buffer.offset_end()); - // } - // if state - // .fragment_buffer_flags - // .contains(gst::BufferFlags::DELTA_UNIT) - // && !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) - // { - // state - // .fragment_buffer_flags - // .remove(gst::BufferFlags::DELTA_UNIT); - // } - // if buffer.flags().contains(gst::BufferFlags::DISCONT) { - // state - // .fragment_buffer_flags - // .insert(gst::BufferFlags::DISCONT); - // } - // log::info!("finished updating fragment time stamps"); - // } - // } + // let pts_plus_duration = match (pts, duration) { + // (Some(pts), Some(duration)) => Some(pts + duration), + // // Handle the case where one or both values are `None` + // _ => None, + // }; + // if state.fragment_max_pts_plus_duration.is_none() + // || state.fragment_max_pts_plus_duration < pts_plus_duration + // { + // state.fragment_max_pts_plus_duration = pts_plus_duration; + // } + // if buffer.offset() != gst_sys::GST_BUFFER_OFFSET_NONE + // && (state.fragment_offset.is_none() + // || state.fragment_offset.unwrap() > buffer.offset()) + // { + // state.fragment_offset = Some(buffer.offset()); + // } + // if buffer.offset_end() != gst_sys::GST_BUFFER_OFFSET_NONE + // && (state.fragment_offset_end.is_none() + // || state.fragment_offset_end.unwrap() < buffer.offset_end()) + // { + // state.fragment_offset_end = Some(buffer.offset_end()); + // } + // if state + // .fragment_buffer_flags + // .contains(gst::BufferFlags::DELTA_UNIT) + // && !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) + // { + // state + // .fragment_buffer_flags + // .remove(gst::BufferFlags::DELTA_UNIT); + // } + // if buffer.flags().contains(gst::BufferFlags::DISCONT) { + // state + // .fragment_buffer_flags + // .insert(gst::BufferFlags::DISCONT); + // } + // log::info!("finished updating fragment time stamps"); + // } + // } - // loop { - // let mut state = state.lock().unwrap(); - // match state.mp4_parser.pop_atom() { - // Some(atom) => { - // log::info!( - // "atom_size={}, atom_type={}", - // atom.len(), - // atom.atom_type - // ); - // match atom.atom_type { - // ATOM_TYPE_FTYPE => { - // state.ftype_atom = Some(atom); - // log::info!("ftype_atom={:?}", state.ftype_atom); - // } - // ATOM_TYPE_MOOV => { - // state.moov_atom = Some(atom); + // loop { + // let mut state = state.lock().unwrap(); + // match state.mp4_parser.pop_atom() { + // Some(atom) => { + // log::info!( + // "atom_size={}, atom_type={}", + // atom.len(), + // atom.atom_type + // ); + // match atom.atom_type { + // ATOM_TYPE_FTYPE => { + // state.ftype_atom = Some(atom); + // log::info!("ftype_atom={:?}", state.ftype_atom); + // } + // ATOM_TYPE_MOOV => { + // state.moov_atom = Some(atom); - // log::info!("moov_atom={:?}", state.moov_atom); + // log::info!("moov_atom={:?}", state.moov_atom); - // match (state.ftype_atom.as_ref(), state.moov_atom.as_ref()) - // { - // (Some(ftype_atom), Some(moov_atom)) => { - // let output_buf_len = - // ftype_atom.len() + moov_atom.len(); - // let mut gst_buffer = - // gst::Buffer::with_size(output_buf_len).unwrap(); - // { - // let buffer_ref = gst_buffer.get_mut().unwrap(); - // buffer_ref.set_pts(state.fragment_pts); - // buffer_ref.set_dts(state.fragment_dts); + // match (state.ftype_atom.as_ref(), state.moov_atom.as_ref()) + // { + // (Some(ftype_atom), Some(moov_atom)) => { + // let output_buf_len = + // ftype_atom.len() + moov_atom.len(); + // let mut gst_buffer = + // gst::Buffer::with_size(output_buf_len).unwrap(); + // { + // let buffer_ref = gst_buffer.get_mut().unwrap(); + // buffer_ref.set_pts(state.fragment_pts); + // buffer_ref.set_dts(state.fragment_dts); - // let pts_plus_duration = state - // .fragment_max_pts_plus_duration - // .clone(); - // let fragment_pts = state.fragment_pts.clone(); + // let pts_plus_duration = state + // .fragment_max_pts_plus_duration + // .clone(); + // let fragment_pts = state.fragment_pts.clone(); - // let duration = - // match (pts_plus_duration, fragment_pts) { - // ( - // Some(pts_plus_duration), - // Some(fragment_pts), - // ) => Some( - // pts_plus_duration - fragment_pts, - // ), - // // Handle the case where one or both values are `None` - // _ => None, - // }; - // buffer_ref.set_duration(duration); - // buffer_ref.set_offset( - // state.fragment_offset.unwrap_or( - // gst_sys::GST_BUFFER_OFFSET_NONE, - // ), - // ); - // buffer_ref.set_offset_end( - // state.fragment_offset_end.unwrap_or( - // gst_sys::GST_BUFFER_OFFSET_NONE, - // ), - // ); - // //this is a header - // buffer_ref.set_flags(gst::BufferFlags::HEADER); - // let mut buffer_map = - // buffer_ref.map_writable().unwrap(); - // let slice = buffer_map.as_mut_slice(); - // let mut pos = 0; - // slice[pos..pos + ftype_atom.len()] - // .copy_from_slice(&ftype_atom.atom_bytes); - // pos += ftype_atom.len(); - // slice[pos..pos + moov_atom.len()] - // .copy_from_slice(&moov_atom.atom_bytes); - // pos += moov_atom.len(); - // println!( - // "real size of the header atom={:?}", - // pos - // ); - // println!( - // "expected size of the header atom={:?}", - // output_buf_len - // ); - // assert_eq!(pos, output_buf_len); - // }; - // log::info!( - // "pushing the header atom={:?}", - // gst_buffer - // ); + // let duration = + // match (pts_plus_duration, fragment_pts) { + // ( + // Some(pts_plus_duration), + // Some(fragment_pts), + // ) => Some( + // pts_plus_duration - fragment_pts, + // ), + // // Handle the case where one or both values are `None` + // _ => None, + // }; + // buffer_ref.set_duration(duration); + // buffer_ref.set_offset( + // state.fragment_offset.unwrap_or( + // gst_sys::GST_BUFFER_OFFSET_NONE, + // ), + // ); + // buffer_ref.set_offset_end( + // state.fragment_offset_end.unwrap_or( + // gst_sys::GST_BUFFER_OFFSET_NONE, + // ), + // ); + // //this is a header + // buffer_ref.set_flags(gst::BufferFlags::HEADER); + // let mut buffer_map = + // buffer_ref.map_writable().unwrap(); + // let slice = buffer_map.as_mut_slice(); + // let mut pos = 0; + // slice[pos..pos + ftype_atom.len()] + // .copy_from_slice(&ftype_atom.atom_bytes); + // pos += ftype_atom.len(); + // slice[pos..pos + moov_atom.len()] + // .copy_from_slice(&moov_atom.atom_bytes); + // pos += moov_atom.len(); + // println!( + // "real size of the header atom={:?}", + // pos + // ); + // println!( + // "expected size of the header atom={:?}", + // output_buf_len + // ); + // assert_eq!(pos, output_buf_len); + // }; + // log::info!( + // "pushing the header atom={:?}", + // gst_buffer + // ); - // // Create the catalog track with a single segment. - // let mut init_track = state - // .broadcast - // .clone() - // .create_track("0.mp4") - // .map_err(|_| gst::FlowError::Error)?; - // let init_segment = init_track - // .create_segment(segment::Info { - // sequence: VarInt::ZERO, - // priority: 0, - // expires: None, - // }) - // .map_err(|_| gst::FlowError::Error)?; + // // Create the catalog track with a single segment. + // let mut init_track = state + // .broadcast + // .clone() + // .create_track("0.mp4") + // .map_err(|_| gst::FlowError::Error)?; + // let init_segment = init_track + // .create_segment(segment::Info { + // sequence: VarInt::ZERO, + // priority: 0, + // expires: None, + // }) + // .map_err(|_| gst::FlowError::Error)?; - // // Create a single fragment, optionally setting the size - // let mut init_fragment = init_segment - // .final_fragment(VarInt::ZERO) - // .map_err(|_| gst::FlowError::Error)?; + // // Create a single fragment, optionally setting the size + // let mut init_fragment = init_segment + // .final_fragment(VarInt::ZERO) + // .map_err(|_| gst::FlowError::Error)?; - // let buffer_map = gst_buffer - // .map_readable() - // .map_err(|_| gst::FlowError::Error)?; + // let buffer_map = gst_buffer + // .map_readable() + // .map_err(|_| gst::FlowError::Error)?; - // // Create a a Vec object from the data slice - // let bytes = buffer_map.as_slice().to_vec(); + // // Create a a Vec object from the data slice + // let bytes = buffer_map.as_slice().to_vec(); - // init_fragment - // .chunk(bytes.into()) - // .map_err(|_| gst::FlowError::Error)?; + // init_fragment + // .chunk(bytes.into()) + // .map_err(|_| gst::FlowError::Error)?; - // // We're going to parse the moov box. - // // We have to read the moov box header to correctly advance the cursor for the mp4 crate. - // let mut moov_reader = - // Cursor::new(moov_atom.atom_bytes.clone()); - // let moov_header = - // mp4::BoxHeader::read(&mut moov_reader) - // .map_err(|_| gst::FlowError::Error)?; + // // We're going to parse the moov box. + // // We have to read the moov box header to correctly advance the cursor for the mp4 crate. + // let mut moov_reader = + // Cursor::new(moov_atom.atom_bytes.clone()); + // let moov_header = + // mp4::BoxHeader::read(&mut moov_reader) + // .map_err(|_| gst::FlowError::Error)?; - // // Parse the moov box so we can detect the timescales for each track. - // let moov = mp4::MoovBox::read_box( - // &mut moov_reader, - // moov_header.size, - // ) - // .map_err(|_| gst::FlowError::Error)?; + // // Parse the moov box so we can detect the timescales for each track. + // let moov = mp4::MoovBox::read_box( + // &mut moov_reader, + // moov_header.size, + // ) + // .map_err(|_| gst::FlowError::Error)?; - // let mut tracks = HashMap::new(); + // let mut tracks = HashMap::new(); - // for trak in &moov.traks { - // let id = trak.tkhd.track_id; - // let name = format!("{}.m4s", id); + // for trak in &moov.traks { + // let id = trak.tkhd.track_id; + // let name = format!("{}.m4s", id); - // let timescale = track_timescale(&moov, id); + // let timescale = track_timescale(&moov, id); - // // Store the track publisher in a map so we can update it later. - // let track = broadcast - // .create_track(&name) - // .map_err(|_| gst::FlowError::Error)?; - // let track = Track::new(track, timescale); - // tracks.insert(id, track); - // } + // // Store the track publisher in a map so we can update it later. + // let track = broadcast + // .create_track(&name) + // .map_err(|_| gst::FlowError::Error)?; + // let track = Track::new(track, timescale); + // tracks.insert(id, track); + // } - // let mut catalog = broadcast - // .create_track(".catalog") - // .map_err(|_| gst::FlowError::Error)?; + // let mut catalog = broadcast + // .create_track(".catalog") + // .map_err(|_| gst::FlowError::Error)?; - // // Create the catalog track - // Self::serve_catalog( - // &mut catalog, - // &init_track.name, - // &moov, - // ) - // .map_err(|_| gst::FlowError::Error)?; + // // Create the catalog track + // Self::serve_catalog( + // &mut catalog, + // &init_track.name, + // &moov, + // ) + // .map_err(|_| gst::FlowError::Error)?; - // state.tracks = Some(tracks); - // state.init = Some(init_track); - // state.catalog = Some(catalog); - // } - // _ => { - // log::warn!("Received moov without ftype"); - // } - // } - // } - // ATOM_TYPE_MOOF => { - // log::info!("pushing the moof_atom={:?}", atom); + // state.tracks = Some(tracks); + // state.init = Some(init_track); + // state.catalog = Some(catalog); + // } + // _ => { + // log::warn!("Received moov without ftype"); + // } + // } + // } + // ATOM_TYPE_MOOF => { + // log::info!("pushing the moof_atom={:?}", atom); - // let mut current = state.current; + // let mut current = state.current; - // let tracks = if let Some(tracks) = &mut state.tracks { - // tracks - // } else { - // log::warn!("Tracks are not set up yet"); - // return Err(gst::FlowError::Error); - // }; + // let tracks = if let Some(tracks) = &mut state.tracks { + // tracks + // } else { + // log::warn!("Tracks are not set up yet"); + // return Err(gst::FlowError::Error); + // }; - // let mut reader = Cursor::new(atom.atom_bytes.clone()); - // let header = mp4::BoxHeader::read(&mut reader) - // .map_err(|_| gst::FlowError::Error)?; - // let moof = mp4::MoofBox::read_box(&mut reader, header.size) - // .map_err(|_| gst::FlowError::Error)?; + // let mut reader = Cursor::new(atom.atom_bytes.clone()); + // let header = mp4::BoxHeader::read(&mut reader) + // .map_err(|_| gst::FlowError::Error)?; + // let moof = mp4::MoofBox::read_box(&mut reader, header.size) + // .map_err(|_| gst::FlowError::Error)?; - // // Process the moof. - // let fragment = Fragment::new(moof) - // .map_err(|_| gst::FlowError::Error)?; + // // Process the moof. + // let fragment = Fragment::new(moof) + // .map_err(|_| gst::FlowError::Error)?; - // // Get the track for this moof. - // let track = tracks - // .get_mut(&fragment.track) - // .context("failed to find track") - // .map_err(|_| gst::FlowError::Error)?; + // // Get the track for this moof. + // let track = tracks + // .get_mut(&fragment.track) + // .context("failed to find track") + // .map_err(|_| gst::FlowError::Error)?; - // // Save the track ID for the next iteration, which must be a mdat. - // if current.clone().is_none() { - // log::error!("multiple moof atoms"); - // return Err(gst::FlowError::Error); - // } + // // Save the track ID for the next iteration, which must be a mdat. + // if current.clone().is_none() { + // log::error!("multiple moof atoms"); + // return Err(gst::FlowError::Error); + // } - // current.replace(fragment.track); + // current.replace(fragment.track); - // // Publish the moof header, creating a new segment if it's a keyframe. - // track - // .header(atom.atom_bytes.clone(), fragment) - // .context("failed to publish moof") - // .map_err(|_| gst::FlowError::Error)?; - // } - // ATOM_TYPE_MDAT => { - // log::info!("pushing the mdat_atom={:?}", atom); + // // Publish the moof header, creating a new segment if it's a keyframe. + // track + // .header(atom.atom_bytes.clone(), fragment) + // .context("failed to publish moof") + // .map_err(|_| gst::FlowError::Error)?; + // } + // ATOM_TYPE_MDAT => { + // log::info!("pushing the mdat_atom={:?}", atom); - // // Get the track ID from the previous moof. - // let track = state - // .current - // .take() - // .context("missing moof") - // .map_err(|_| gst::FlowError::Error)?; + // // Get the track ID from the previous moof. + // let track = state + // .current + // .take() + // .context("missing moof") + // .map_err(|_| gst::FlowError::Error)?; - // let tracks = if let Some(tracks) = &mut state.tracks { - // tracks - // } else { - // log::warn!("Tracks are not set up yet"); - // return Err(gst::FlowError::Error); - // }; + // let tracks = if let Some(tracks) = &mut state.tracks { + // tracks + // } else { + // log::warn!("Tracks are not set up yet"); + // return Err(gst::FlowError::Error); + // }; - // let track = tracks - // .get_mut(&track) - // .context("failed to find track") - // .map_err(|_| gst::FlowError::Error)?; + // let track = tracks + // .get_mut(&track) + // .context("failed to find track") + // .map_err(|_| gst::FlowError::Error)?; - // // Publish the mdat atom. - // track - // .data(atom.atom_bytes.clone()) - // .context("failed to publish mdat") - // .map_err(|_| gst::FlowError::Error)?; - // } - // _ => { - // log::warn!("Unknown atom type {:?}", atom); - // } - // } - // } - // None => break, - // } - // } + // // Publish the mdat atom. + // track + // .data(atom.atom_bytes.clone()) + // .context("failed to publish mdat") + // .map_err(|_| gst::FlowError::Error)?; + // } + // _ => { + // log::warn!("Unknown atom type {:?}", atom); + // } + // } + // } + // None => break, + // } + // } // Ok(gst::FlowSuccess::Ok) // })