Last but not least, parse and push the Mdat atom

This commit is contained in:
Wanjohi
2024-01-01 15:07:03 +03:00
parent a6d44d585c
commit 22d2641d8b

View File

@@ -84,8 +84,6 @@ struct State {
// Atoms in init sequence that must be repeated at each key frame.
ftype_atom: Option<Mp4Atom>,
moov_atom: Option<Mp4Atom>,
// These atoms that must be buffered and pushed as a single buffer.
moof_atom: Option<Mp4Atom>,
// Below members that track current fragment (moof, mdat).
/// Minimum PTS in fragment.
fragment_pts: Option<ClockTime>,
@@ -114,6 +112,7 @@ struct State {
// Tracks based on their track ID.
tracks: Option<HashMap<u32, Track>>,
current: Option<u32>,
}
pub struct GST {}
@@ -131,7 +130,6 @@ impl GST {
let state = Arc::new(Mutex::new(State {
ftype_atom: None,
moov_atom: None,
moof_atom: None,
fragment_pts: None,
fragment_dts: None,
fragment_max_pts_plus_duration: None,
@@ -151,6 +149,7 @@ impl GST {
// Tracks based on their track ID.
tracks: None,
current: None,
}));
let mut state_lock = state.lock().unwrap();
@@ -310,8 +309,6 @@ impl GST {
loop {
let mut state = state.lock().unwrap();
let mut current = None;
match state.mp4_parser.pop_atom() {
Some(atom) => {
log::info!("atom_size={}, atom_type={}", atom.len(), atom.atom_type);
@@ -356,8 +353,11 @@ impl GST {
pos += ftype_atom.len();
slice[pos..pos+moov_atom.len()].copy_from_slice(&moov_atom.atom_bytes);
pos += moov_atom.len();
log::info!("real size of the header atom={:?}", pos);
log::info!("expected size of the header atom={:?}", output_buf_len);
assert_eq!(pos, output_buf_len);
};
log::info!("pushing the header atom={:?}", gst_buffer);
// Create the catalog track with a single segment.
let mut init_track = state.broadcast.clone().create_track("0.mp4").map_err(|_| gst::FlowError::Error)?;
@@ -414,7 +414,9 @@ impl GST {
}
},
ATOM_TYPE_MOOF => {
log::info!("moof_atom={:?}", state.moof_atom);
log::info!("pushing the moof_atom={:?}", atom);
let mut current = state.current;
let tracks = if let Some(tracks) = &mut state.tracks {
tracks
@@ -434,78 +436,33 @@ impl GST {
let track = tracks.get_mut(&fragment.track).context("failed to find track").map_err(|_| gst::FlowError::Error)?;
// Save the track ID for the next iteration, which must be a mdat.
if current.is_none(){
if current.clone().is_none(){
log::info!("multiple moof atoms")
}
current.replace(fragment.track);
// Publish the moof header, creating a new segment if it's a keyframe.
track.header(atom.atom_bytes.clone(), fragment).context("failed to publish moof").map_err(|_| gst::FlowError::Error)?;
state.moof_atom = Some(atom);
},
ATOM_TYPE_MDAT => {
let mdat_atom = atom;
match (state.ftype_atom.as_ref(), state.moov_atom.as_ref(), state.moof_atom.as_ref()) {
(Some(ftype_atom), Some(moov_atom), Some(moof_atom)) => {
let include_header = !state.fragment_buffer_flags.contains(gst::BufferFlags::DELTA_UNIT);
let header_len = if include_header {
ftype_atom.len() + moov_atom.len()
} else {
0
};
let output_buf_len = header_len + moof_atom.len() + mdat_atom.len();
log::info!("Pushing buffer; include_header={}, ftype.len={}, moov.len={}, moof.len={}, mdat.len={}",
include_header, ftype_atom.len(), moov_atom.len(), moof_atom.len(), mdat_atom.len());
let mut gst_buffer = gst::Buffer::with_size(output_buf_len).unwrap();
{
let buffer_ref = gst_buffer.get_mut().unwrap();
buffer_ref.set_pts(state.fragment_pts);
buffer_ref.set_dts(state.fragment_dts);
log::info!("pushing the mdat_atom={:?}", atom);
let pts_plus_duration =state.fragment_max_pts_plus_duration.clone();
let fragment_pts = state.fragment_pts.clone();
// Get the track ID from the previous moof.
let track = state.current.take().context("missing moof").map_err(|_| gst::FlowError::Error)?;
let tracks = if let Some(tracks) = &mut state.tracks {
tracks
} else {
log::warn!("Tracks are not set up yet");
return Err(gst::FlowError::Error);
};
let duration = match (pts_plus_duration, fragment_pts) {
(Some(pts_plus_duration), Some(fragment_pts)) => {
Some(pts_plus_duration - fragment_pts)
}
// Handle the case where one or both values are `None`
_ => None,
}; buffer_ref.set_duration(duration);
buffer_ref.set_offset(state.fragment_offset.unwrap_or(gst_sys::GST_BUFFER_OFFSET_NONE));
buffer_ref.set_offset_end(state.fragment_offset_end.unwrap_or(gst_sys::GST_BUFFER_OFFSET_NONE));
buffer_ref.set_flags(state.fragment_buffer_flags);
let mut buffer_map = buffer_ref.map_writable().unwrap();
let slice = buffer_map.as_mut_slice();
let mut pos = 0;
if include_header {
slice[pos..pos+ftype_atom.len()].copy_from_slice(&ftype_atom.atom_bytes);
pos += ftype_atom.len();
slice[pos..pos+moov_atom.len()].copy_from_slice(&moov_atom.atom_bytes);
pos += moov_atom.len();
}
slice[pos..pos+moof_atom.len()].copy_from_slice(&moof_atom.atom_bytes);
pos += moof_atom.len();
slice[pos..pos+mdat_atom.len()].copy_from_slice(&mdat_atom.atom_bytes);
pos += mdat_atom.len();
assert_eq!(pos, output_buf_len);
}
// Clear fragment variables.
state.fragment_pts = None;
state.fragment_dts = None;
state.fragment_max_pts_plus_duration = None;
state.fragment_offset = None;
state.fragment_offset_end = None;
state.fragment_buffer_flags = gst::BufferFlags::DELTA_UNIT;
// Push new buffer.
log::info!("Pushing buffer {:?}", gst_buffer);
// let _ = self.srcpad.push(gst_buffer)?;
},
_ => {
log::warn!("Received mdat without ftype, moov, or moof");
},
}
let track = tracks.get_mut(&track).context("failed to find track").map_err(|_| gst::FlowError::Error)?;
// Publish the mdat atom.
track.data(atom.atom_bytes.clone()).context("failed to publish mdat").map_err(|_| gst::FlowError::Error)?;
},
_ => {
log::warn!("Unknown atom type {:?}", atom);