diff --git a/src/media.rs b/src/media.rs index a2dba9a..fe40ee9 100644 --- a/src/media.rs +++ b/src/media.rs @@ -84,8 +84,6 @@ struct State { // Atoms in init sequence that must be repeated at each key frame. ftype_atom: Option, moov_atom: Option, - // These atoms that must be buffered and pushed as a single buffer. - moof_atom: Option, // Below members that track current fragment (moof, mdat). /// Minimum PTS in fragment. fragment_pts: Option, @@ -114,6 +112,7 @@ struct State { // Tracks based on their track ID. tracks: Option>, + current: Option, } pub struct GST {} @@ -131,7 +130,6 @@ impl GST { let state = Arc::new(Mutex::new(State { ftype_atom: None, moov_atom: None, - moof_atom: None, fragment_pts: None, fragment_dts: None, fragment_max_pts_plus_duration: None, @@ -151,6 +149,7 @@ impl GST { // Tracks based on their track ID. tracks: None, + current: None, })); let mut state_lock = state.lock().unwrap(); @@ -310,8 +309,6 @@ impl GST { loop { let mut state = state.lock().unwrap(); - let mut current = None; - match state.mp4_parser.pop_atom() { Some(atom) => { log::info!("atom_size={}, atom_type={}", atom.len(), atom.atom_type); @@ -356,8 +353,11 @@ impl GST { pos += ftype_atom.len(); slice[pos..pos+moov_atom.len()].copy_from_slice(&moov_atom.atom_bytes); pos += moov_atom.len(); + log::info!("real size of the header atom={:?}", pos); + log::info!("expected size of the header atom={:?}", output_buf_len); assert_eq!(pos, output_buf_len); }; + log::info!("pushing the header atom={:?}", gst_buffer); // Create the catalog track with a single segment. let mut init_track = state.broadcast.clone().create_track("0.mp4").map_err(|_| gst::FlowError::Error)?; @@ -414,7 +414,9 @@ impl GST { } }, ATOM_TYPE_MOOF => { - log::info!("moof_atom={:?}", state.moof_atom); + log::info!("pushing the moof_atom={:?}", atom); + + let mut current = state.current; let tracks = if let Some(tracks) = &mut state.tracks { tracks @@ -434,78 +436,33 @@ impl GST { let track = tracks.get_mut(&fragment.track).context("failed to find track").map_err(|_| gst::FlowError::Error)?; // Save the track ID for the next iteration, which must be a mdat. - if current.is_none(){ + if current.clone().is_none(){ log::info!("multiple moof atoms") } + current.replace(fragment.track); + // Publish the moof header, creating a new segment if it's a keyframe. track.header(atom.atom_bytes.clone(), fragment).context("failed to publish moof").map_err(|_| gst::FlowError::Error)?; - - state.moof_atom = Some(atom); - + }, ATOM_TYPE_MDAT => { - let mdat_atom = atom; - match (state.ftype_atom.as_ref(), state.moov_atom.as_ref(), state.moof_atom.as_ref()) { - (Some(ftype_atom), Some(moov_atom), Some(moof_atom)) => { - let include_header = !state.fragment_buffer_flags.contains(gst::BufferFlags::DELTA_UNIT); - let header_len = if include_header { - ftype_atom.len() + moov_atom.len() - } else { - 0 - }; - let output_buf_len = header_len + moof_atom.len() + mdat_atom.len(); - log::info!("Pushing buffer; include_header={}, ftype.len={}, moov.len={}, moof.len={}, mdat.len={}", - include_header, ftype_atom.len(), moov_atom.len(), moof_atom.len(), mdat_atom.len()); - let mut gst_buffer = gst::Buffer::with_size(output_buf_len).unwrap(); - { - let buffer_ref = gst_buffer.get_mut().unwrap(); - buffer_ref.set_pts(state.fragment_pts); - buffer_ref.set_dts(state.fragment_dts); + log::info!("pushing the mdat_atom={:?}", atom); - let pts_plus_duration =state.fragment_max_pts_plus_duration.clone(); - let fragment_pts = state.fragment_pts.clone(); + // Get the track ID from the previous moof. + let track = state.current.take().context("missing moof").map_err(|_| gst::FlowError::Error)?; + + let tracks = if let Some(tracks) = &mut state.tracks { + tracks + } else { + log::warn!("Tracks are not set up yet"); + return Err(gst::FlowError::Error); + }; - let duration = match (pts_plus_duration, fragment_pts) { - (Some(pts_plus_duration), Some(fragment_pts)) => { - Some(pts_plus_duration - fragment_pts) - } - // Handle the case where one or both values are `None` - _ => None, - }; buffer_ref.set_duration(duration); - buffer_ref.set_offset(state.fragment_offset.unwrap_or(gst_sys::GST_BUFFER_OFFSET_NONE)); - buffer_ref.set_offset_end(state.fragment_offset_end.unwrap_or(gst_sys::GST_BUFFER_OFFSET_NONE)); - buffer_ref.set_flags(state.fragment_buffer_flags); - let mut buffer_map = buffer_ref.map_writable().unwrap(); - let slice = buffer_map.as_mut_slice(); - let mut pos = 0; - if include_header { - slice[pos..pos+ftype_atom.len()].copy_from_slice(&ftype_atom.atom_bytes); - pos += ftype_atom.len(); - slice[pos..pos+moov_atom.len()].copy_from_slice(&moov_atom.atom_bytes); - pos += moov_atom.len(); - } - slice[pos..pos+moof_atom.len()].copy_from_slice(&moof_atom.atom_bytes); - pos += moof_atom.len(); - slice[pos..pos+mdat_atom.len()].copy_from_slice(&mdat_atom.atom_bytes); - pos += mdat_atom.len(); - assert_eq!(pos, output_buf_len); - } - // Clear fragment variables. - state.fragment_pts = None; - state.fragment_dts = None; - state.fragment_max_pts_plus_duration = None; - state.fragment_offset = None; - state.fragment_offset_end = None; - state.fragment_buffer_flags = gst::BufferFlags::DELTA_UNIT; - // Push new buffer. - log::info!("Pushing buffer {:?}", gst_buffer); - // let _ = self.srcpad.push(gst_buffer)?; - }, - _ => { - log::warn!("Received mdat without ftype, moov, or moof"); - }, - } + let track = tracks.get_mut(&track).context("failed to find track").map_err(|_| gst::FlowError::Error)?; + + // Publish the mdat atom. + track.data(atom.atom_bytes.clone()).context("failed to publish mdat").map_err(|_| gst::FlowError::Error)?; }, _ => { log::warn!("Unknown atom type {:?}", atom);