diff --git a/src/media.rs b/src/media.rs index 178ae59..4aa108f 100644 --- a/src/media.rs +++ b/src/media.rs @@ -158,7 +158,11 @@ impl GST { data.extend_from_slice(map.as_slice()); } + // The current track name + let mut current = None; + let mut cursor = Cursor::new(data.to_vec()); + //TODO: Research how the mp4::BoxHeader works... while let Ok(header) = mp4::BoxHeader::read(&mut cursor.clone()) { let box_data = vec![0u8; header.size as usize]; @@ -173,8 +177,13 @@ impl GST { let mut init = Vec::new(); // Buffer to store the concatenated 'ftyp' and 'moov' atoms. let mut broadcast = state.broadcast.clone(); // Parse the moov box so we can detect the timescales for each track. - let moov_box_cursor = Cursor::new(box_data.clone()); // Create a cursor for the 'moov' box data. - let moov = mp4::MoovBox::read_box(&mut moov_box_cursor, header.size).expect("could not read moov box"); + // let moov_box_cursor = Cursor::new(box_data.clone()); // Create a cursor for the 'moov' box data. + let moov = mp4::MoovBox::read_box( + &mut cursor.clone(), + // &mut moov_box_cursor.clone(), + header.size, + ) + .expect("could not read moov box"); match state.ftyp_atom.as_ref() { Some(ftyp_atom) => { @@ -199,7 +208,7 @@ impl GST { // Create a single fragment, optionally setting the size let mut init_fragment = init_segment .final_fragment(VarInt::ZERO) - .expect("coild not create a single fragment"); + .expect("could not create a single fragment"); init_fragment .chunk(init.into()) @@ -214,16 +223,54 @@ impl GST { let timescale = track_timescale(&moov, id); // Store the track publisher in a map so we can update it later. - let track = broadcast.create_track(&name)?; + let track = broadcast + .create_track(&name) + .expect("could not broadcast the track"); let track = Track::new(track, timescale); tracks.insert(id, track); } + + let mut catalog = broadcast + .create_track(".catalog") + .expect("could not create a catalog"); + + // Create the catalog track + Self::serve_catalog(&mut catalog, &init_track.name, &moov) + .expect("could not serve catalog"); + + state.catalog = Some(catalog); + state.init = Some(init_track); + state.tracks = Some(tracks); + state.broadcast = broadcast; } None => {} } } mp4::BoxType::MoofBox => { println!("Found 'moof' box"); + + let moof = mp4::MoofBox::read_box(&mut cursor.clone(), header.size) + .expect("failed to read MP4 moof box"); + + // Process the moof. + let fragment = Fragment::new(moof).expect("failed to create a fragment"); + + // Get the track for this moof. + let track = state + .tracks + .unwrap() + .get_mut(&fragment.track) + .context("failed to find track") + .expect("could not get the track from moof atom"); + + // Save the track ID for the next iteration, which must be a mdat. + assert!(current.is_none(), "multiple moof atoms"); + current.replace(fragment.track); + + // Publish the moof header, creating a new segment if it's a keyframe. + track + .header(, fragment) + .context("failed to publish moof")?; // Process 'moof' box } mp4::BoxType::MdatBox => { @@ -236,165 +283,6 @@ impl GST { .expect("Seeking failed"); } - // let header = mp4::BoxHeader::read(&mut cursor.clone()) - // .map_err(|_| gst::FlowError::Error)?; - - // match header.name { - // mp4::BoxType::MoofBox => { - // println!("Found 'moof' box"); - // // Process 'moof' box - // } - // mp4::BoxType::MdatBox => { - // // println!("Found 'mdat' box"); - // // Process 'mdat' box - // } - // mp4::BoxType::EmsgBox => { - // println!("Found 'emsg' box"); - // // Process 'mdat' box - // } - // mp4::BoxType::FreeBox => { - // println!("Found 'free' box"); - // // Process 'mdat' box - // } - // mp4::BoxType::FtypBox => { - // println!("Found 'ftyp' box"); - // // Process 'mdat' box - // } - // mp4::BoxType::MoovBox => { - // println!("Found 'moov' box"); - // // Process 'mdat' box - // } - // // Handle other boxes if needed - // _ => {} - // } - - // cursor - // .seek(SeekFrom::Current(header.size as i64)) - // // .map_err(|e| gst::FlowError::Eos)?; - // .expect("Seeking failed"); - - // let mut data = Vec::new(); - - // for buffer in &*buffer_list { - // let map = buffer - // .map_readable() - // .with_context(|| "Error mapping buffer to readable") - // .map_err(|e| { - // eprintln!("{:?}", e); - - // gst::FlowError::Error - // })?; - - // let mut cursor = Cursor::new(map.as_slice().to_vec()); - - // // while let Ok(header) = mp4::BoxHeader::read(&mut cursor.clone()) { - // loop { - // let header = mp4::BoxHeader::read(&mut cursor.clone()) - // .map_err(|_| gst::FlowError::Error)?; - - // match header.name { - // mp4::BoxType::MoofBox => { - // println!("Found 'moof' box"); - // // Process 'moof' box - // } - // mp4::BoxType::MdatBox => { - // // println!("Found 'mdat' box"); - // // Process 'mdat' box - // } - // mp4::BoxType::EmsgBox => { - // println!("Found 'emsg' box"); - // // Process 'mdat' box - // } - // mp4::BoxType::FreeBox => { - // println!("Found 'free' box"); - // // Process 'mdat' box - // } - // mp4::BoxType::FtypBox => { - // println!("Found 'ftyp' box"); - // // Process 'mdat' box - // } - // mp4::BoxType::MoovBox => { - // println!("Found 'moov' box"); - // // Process 'mdat' box - // } - // // Handle other boxes if needed - // _ => {} - // } - // cursor - // .seek(SeekFrom::Current(header.size as i64)) - // // .map_err(|e| gst::FlowError::Eos)?; - // .expect("Seeking failed"); - // } - - // // data.extend_from_slice(map.as_slice()); - // } - - // let mut cursor = Cursor::new(data.to_vec()); - - // while let Ok(header) = mp4::BoxHeader::read(&mut cursor.clone()) { - // match header.name { - // mp4::BoxType::MoofBox => { - // println!("Found 'moof' box"); - // // Process 'moof' box - // } - // mp4::BoxType::MdatBox => { - // // println!("Found 'mdat' box"); - // // Process 'mdat' box - // } - // mp4::BoxType::EmsgBox => { - // println!("Found 'emsg' box"); - // // Process 'mdat' box - // } - // mp4::BoxType::FreeBox => { - // println!("Found 'free' box"); - // // Process 'mdat' box - // } - // mp4::BoxType::FtypBox => { - // println!("Found 'ftyp' box"); - // // Process 'mdat' box - // } - // mp4::BoxType::MoovBox => { - // println!("Found 'moov' box"); - // // Process 'mdat' box - // } - // // Handle other boxes if needed - // _ => {} - // } - // cursor - // .seek(SeekFrom::Current(header.size as i64)) - // // .map_err(|e| gst::FlowError::Eos)?; - // .expect("Seeking failed"); - // } - // Advance the cursor to skip the current box contents. - // This positions the cursor right at the start of the next box. - - // // Create a a Vec object from the data slice - // let bytes = map.as_slice().to_vec(); - - // // // We're going to parse the moov box. - // // We have to read the moov box header to correctly advance the cursor for the mp4 crate. - // let mut moov_reader = Cursor::new(bytes.clone()); - // let header = mp4::BoxHeader::read(&mut moov_reader) - // .map_err(|_| gst::FlowError::Error)?; - - // match header.name { - // mp4::BoxType::MoofBox => { - // println!("Moof body") - // } - // mp4::BoxType::MdatBox => { - // println!("Mdat body") - // } - // mp4::BoxType::MoovBox => { - // println!("Moov body") - // } - // mp4::BoxType::FtypBox => { - // println!("Ftyp body") - // } - // _ => { - // // Skip unknown atoms - // } - // } - Ok(gst::FlowSuccess::Ok) }) .eos(move |_sink| { @@ -405,450 +293,6 @@ impl GST { .build(), ); - // println!("buffer is empty {:?}", buffer_list.is_empty()); - // assert!(!buffer_list.is_empty()); - - // println!("bufferlist is this long {:?}", buffer_list.len()); - // let mut first = buffer_list.get(0).unwrap(); - - // // Each list contains a full segment, i.e. does not start with a DELTA_UNIT - // println!( - // "first buffer has a delta unit {:?}", - // first.flags().contains(gst::BufferFlags::DELTA_UNIT) - // ); - // assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT)); - - // // let mut state = state.lock().unwrap(); - - // // let mut mp4_parser = Mp4Parser::new(); - - // //FIXME: The mp4_parser fails because we are parsing too many mp4_atoms in parallel, so let us do it sequentially. - - // // If the buffer has the DISCONT and HEADER flag set then it contains the media - // // header, i.e. the `ftyp`, `moov` and other media boxes. - // // - // // This might be the initial header or the updated header at the end of the stream. - // if first - // .flags() - // .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) - // { - // println!("writing header"); - // let map = first - // .map_readable() - // .with_context(|| "Error mapping buffer to readable") - // .map_err(|e| { - // eprintln!("{:?}", e); - - // gst::FlowError::Error - // })?; - // // Create a a Vec object from the data slice - // let bytes = map.as_slice().to_vec(); - - // // We're going to parse the moov box. - // // We have to read the moov box header to correctly advance the cursor for the mp4 crate. - // let mut moov_reader = Cursor::new(bytes.clone()); - // let header = mp4::BoxHeader::read(&mut moov_reader) - // .map_err(|_| gst::FlowError::Error)?; - - // match header.name { - // mp4::BoxType::MoofBox => { - // println!("Moof") - // } - // mp4::BoxType::MdatBox => { - // println!("Mdat") - // } - // mp4::BoxType::MoovBox => { - // println!("Moov") - // } - // mp4::BoxType::FtypBox => { - // println!("Ftyp") - // } - // _ => { - // // Skip unknown atoms - // } - // } - - // drop(map); - - // // Remove the header from the buffer list - // buffer_list.make_mut().remove(0, 1); - - // // If the list is now empty then it only contained the media header and nothing - // // else. - // if buffer_list.is_empty() { - // return Ok(gst::FlowSuccess::Ok); - // } - - // // Otherwise get the next buffer and continue working with that. - // first = buffer_list.get(0).unwrap(); - // } - - // let map = first - // .map_readable() - // .with_context(|| "Error mapping buffer to readable") - // .map_err(|e| { - // eprintln!("{:?}", e); - - // gst::FlowError::Error - // })?; - // // Create a a Vec object from the data slice - // let bytes = map.as_slice().to_vec(); - - // // We're going to parse the moov box. - // // We have to read the moov box header to correctly advance the cursor for the mp4 crate. - // let mut moov_reader = Cursor::new(bytes.clone()); - // let header = mp4::BoxHeader::read(&mut moov_reader) - // .map_err(|_| gst::FlowError::Error)?; - - // match header.name { - // mp4::BoxType::MoofBox => { - // println!("Moof") - // } - // mp4::BoxType::MdatBox => { - // println!("Mdat") - // } - // mp4::BoxType::MoovBox => { - // println!("Moov") - // } - // mp4::BoxType::FtypBox => { - // println!("Ftyp") - // } - // _ => { - // // Skip unknown atoms - // println!("Unknown atom") - // } - // } - - // If the buffer only has the HEADER flag set then this is a segment header that is - // followed by one or more actual media buffers. - // assert!(first.flags().contains(gst::BufferFlags::HEADER)); - - // for buffer in &*buffer_list { - // let map = buffer - // .map_readable() - // .with_context(|| "Error mapping buffer to readable") - // .map_err(|e| { - // eprintln!("{:?}", e); - - // gst::FlowError::Error - // })?; - // let input_buf = map.as_ref(); - - // let mut state = state.lock().unwrap(); - - // //FIXME: The buffer in the mp4parser is "overflowing" - // //TODO: Find another way to "slice" a buffer at a time, probably use this method https://github.com/sdroege/gst-plugin-rs/blob/d9397ef1743ac92e84784d00b93dc0877d44f966/mux/fmp4/examples/hls_live.rs#L256C17-L280C18 or this - - // state.mp4_parser.add(input_buf); - - // // Update cummulative fragment variables. - // // Buffer PTS, etc. are only valid if this buffer contains MDAT data. - // if state.mp4_parser.have_mdat() { - // println!("buffer has pts {:?}", buffer.pts().is_some()); - // assert!(buffer.pts().is_some()); - // if state.fragment_pts.is_none() || state.fragment_pts > buffer.pts() { - // state.fragment_pts = buffer.pts(); - // } - // if state.fragment_dts.is_none() || state.fragment_dts > buffer.dts() { - // state.fragment_dts = buffer.dts(); - // } - // let pts = buffer.pts(); - // let duration = buffer.duration(); - - // let pts_plus_duration = match (pts, duration) { - // (Some(pts), Some(duration)) => Some(pts + duration), - // // Handle the case where one or both values are `None` - // _ => None, - // }; - // if state.fragment_max_pts_plus_duration.is_none() - // || state.fragment_max_pts_plus_duration < pts_plus_duration - // { - // state.fragment_max_pts_plus_duration = pts_plus_duration; - // } - // if buffer.offset() != gst_sys::GST_BUFFER_OFFSET_NONE - // && (state.fragment_offset.is_none() - // || state.fragment_offset.unwrap() > buffer.offset()) - // { - // state.fragment_offset = Some(buffer.offset()); - // } - // if buffer.offset_end() != gst_sys::GST_BUFFER_OFFSET_NONE - // && (state.fragment_offset_end.is_none() - // || state.fragment_offset_end.unwrap() < buffer.offset_end()) - // { - // state.fragment_offset_end = Some(buffer.offset_end()); - // } - // if state - // .fragment_buffer_flags - // .contains(gst::BufferFlags::DELTA_UNIT) - // && !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) - // { - // state - // .fragment_buffer_flags - // .remove(gst::BufferFlags::DELTA_UNIT); - // } - // if buffer.flags().contains(gst::BufferFlags::DISCONT) { - // state - // .fragment_buffer_flags - // .insert(gst::BufferFlags::DISCONT); - // } - // log::info!("finished updating fragment time stamps"); - // } - // } - - // loop { - // let mut state = state.lock().unwrap(); - // match state.mp4_parser.pop_atom() { - // Some(atom) => { - // log::info!( - // "atom_size={}, atom_type={}", - // atom.len(), - // atom.atom_type - // ); - // match atom.atom_type { - // ATOM_TYPE_FTYPE => { - // state.ftype_atom = Some(atom); - // log::info!("ftype_atom={:?}", state.ftype_atom); - // } - // ATOM_TYPE_MOOV => { - // state.moov_atom = Some(atom); - - // log::info!("moov_atom={:?}", state.moov_atom); - - // match (state.ftype_atom.as_ref(), state.moov_atom.as_ref()) - // { - // (Some(ftype_atom), Some(moov_atom)) => { - // let output_buf_len = - // ftype_atom.len() + moov_atom.len(); - // let mut gst_buffer = - // gst::Buffer::with_size(output_buf_len).unwrap(); - // { - // let buffer_ref = gst_buffer.get_mut().unwrap(); - // buffer_ref.set_pts(state.fragment_pts); - // buffer_ref.set_dts(state.fragment_dts); - - // let pts_plus_duration = state - // .fragment_max_pts_plus_duration - // .clone(); - // let fragment_pts = state.fragment_pts.clone(); - - // let duration = - // match (pts_plus_duration, fragment_pts) { - // ( - // Some(pts_plus_duration), - // Some(fragment_pts), - // ) => Some( - // pts_plus_duration - fragment_pts, - // ), - // // Handle the case where one or both values are `None` - // _ => None, - // }; - // buffer_ref.set_duration(duration); - // buffer_ref.set_offset( - // state.fragment_offset.unwrap_or( - // gst_sys::GST_BUFFER_OFFSET_NONE, - // ), - // ); - // buffer_ref.set_offset_end( - // state.fragment_offset_end.unwrap_or( - // gst_sys::GST_BUFFER_OFFSET_NONE, - // ), - // ); - // //this is a header - // buffer_ref.set_flags(gst::BufferFlags::HEADER); - // let mut buffer_map = - // buffer_ref.map_writable().unwrap(); - // let slice = buffer_map.as_mut_slice(); - // let mut pos = 0; - // slice[pos..pos + ftype_atom.len()] - // .copy_from_slice(&ftype_atom.atom_bytes); - // pos += ftype_atom.len(); - // slice[pos..pos + moov_atom.len()] - // .copy_from_slice(&moov_atom.atom_bytes); - // pos += moov_atom.len(); - // println!( - // "real size of the header atom={:?}", - // pos - // ); - // println!( - // "expected size of the header atom={:?}", - // output_buf_len - // ); - // assert_eq!(pos, output_buf_len); - // }; - // log::info!( - // "pushing the header atom={:?}", - // gst_buffer - // ); - - // // Create the catalog track with a single segment. - // let mut init_track = state - // .broadcast - // .clone() - // .create_track("0.mp4") - // .map_err(|_| gst::FlowError::Error)?; - // let init_segment = init_track - // .create_segment(segment::Info { - // sequence: VarInt::ZERO, - // priority: 0, - // expires: None, - // }) - // .map_err(|_| gst::FlowError::Error)?; - - // // Create a single fragment, optionally setting the size - // let mut init_fragment = init_segment - // .final_fragment(VarInt::ZERO) - // .map_err(|_| gst::FlowError::Error)?; - - // let buffer_map = gst_buffer - // .map_readable() - // .map_err(|_| gst::FlowError::Error)?; - - // // Create a a Vec object from the data slice - // let bytes = buffer_map.as_slice().to_vec(); - - // init_fragment - // .chunk(bytes.into()) - // .map_err(|_| gst::FlowError::Error)?; - - // // We're going to parse the moov box. - // // We have to read the moov box header to correctly advance the cursor for the mp4 crate. - // let mut moov_reader = - // Cursor::new(moov_atom.atom_bytes.clone()); - // let moov_header = - // mp4::BoxHeader::read(&mut moov_reader) - // .map_err(|_| gst::FlowError::Error)?; - - // // Parse the moov box so we can detect the timescales for each track. - // let moov = mp4::MoovBox::read_box( - // &mut moov_reader, - // moov_header.size, - // ) - // .map_err(|_| gst::FlowError::Error)?; - - // let mut tracks = HashMap::new(); - - // for trak in &moov.traks { - // let id = trak.tkhd.track_id; - // let name = format!("{}.m4s", id); - - // let timescale = track_timescale(&moov, id); - - // // Store the track publisher in a map so we can update it later. - // let track = broadcast - // .create_track(&name) - // .map_err(|_| gst::FlowError::Error)?; - // let track = Track::new(track, timescale); - // tracks.insert(id, track); - // } - - // let mut catalog = broadcast - // .create_track(".catalog") - // .map_err(|_| gst::FlowError::Error)?; - - // // Create the catalog track - // Self::serve_catalog( - // &mut catalog, - // &init_track.name, - // &moov, - // ) - // .map_err(|_| gst::FlowError::Error)?; - - // state.tracks = Some(tracks); - // state.init = Some(init_track); - // state.catalog = Some(catalog); - // } - // _ => { - // log::warn!("Received moov without ftype"); - // } - // } - // } - // ATOM_TYPE_MOOF => { - // log::info!("pushing the moof_atom={:?}", atom); - - // let mut current = state.current; - - // let tracks = if let Some(tracks) = &mut state.tracks { - // tracks - // } else { - // log::warn!("Tracks are not set up yet"); - // return Err(gst::FlowError::Error); - // }; - - // let mut reader = Cursor::new(atom.atom_bytes.clone()); - // let header = mp4::BoxHeader::read(&mut reader) - // .map_err(|_| gst::FlowError::Error)?; - // let moof = mp4::MoofBox::read_box(&mut reader, header.size) - // .map_err(|_| gst::FlowError::Error)?; - - // // Process the moof. - // let fragment = Fragment::new(moof) - // .map_err(|_| gst::FlowError::Error)?; - - // // Get the track for this moof. - // let track = tracks - // .get_mut(&fragment.track) - // .context("failed to find track") - // .map_err(|_| gst::FlowError::Error)?; - - // // Save the track ID for the next iteration, which must be a mdat. - // if current.clone().is_none() { - // log::error!("multiple moof atoms"); - // return Err(gst::FlowError::Error); - // } - - // current.replace(fragment.track); - - // // Publish the moof header, creating a new segment if it's a keyframe. - // track - // .header(atom.atom_bytes.clone(), fragment) - // .context("failed to publish moof") - // .map_err(|_| gst::FlowError::Error)?; - // } - // ATOM_TYPE_MDAT => { - // log::info!("pushing the mdat_atom={:?}", atom); - - // // Get the track ID from the previous moof. - // let track = state - // .current - // .take() - // .context("missing moof") - // .map_err(|_| gst::FlowError::Error)?; - - // let tracks = if let Some(tracks) = &mut state.tracks { - // tracks - // } else { - // log::warn!("Tracks are not set up yet"); - // return Err(gst::FlowError::Error); - // }; - - // let track = tracks - // .get_mut(&track) - // .context("failed to find track") - // .map_err(|_| gst::FlowError::Error)?; - - // // Publish the mdat atom. - // track - // .data(atom.atom_bytes.clone()) - // .context("failed to publish mdat") - // .map_err(|_| gst::FlowError::Error)?; - // } - // _ => { - // log::warn!("Unknown atom type {:?}", atom); - // } - // } - // } - // None => break, - // } - // } - - // Ok(gst::FlowSuccess::Ok) - // }) - // .eos(move |_sink| { - // unreachable!(); - // }) - // .build(), - // ); - pipeline.set_state(gst::State::Playing)?; let bus = pipeline