Files
netris-nestri/packages/relay/internal/ingest.go
Wanjohi de80f3e6ab feat(maitred): Update maitred - hookup to the API (#198)
## Description
We are attempting to hookup maitred to the API
Maitred duties will be:
- [ ] Hookup to the API
- [ ]  Wait for signal (from the API) to start Steam
- [ ] Stop signal to stop the gaming session, clean up Steam... and
maybe do the backup

## Summary by CodeRabbit

- **New Features**
- Introduced Docker-based deployment configurations for both the main
and relay applications.
- Added new API endpoints enabling real-time machine messaging and
enhanced IoT operations.
- Expanded database schema and actor types to support improved machine
tracking.

- **Improvements**
- Enhanced real-time communication and relay management with streamlined
room handling.
- Upgraded dependencies, logging, and error handling for greater
stability and performance.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: DatCaptainHorse <DatCaptainHorse@users.noreply.github.com>
Co-authored-by: Kristian Ollikainen <14197772+DatCaptainHorse@users.noreply.github.com>
2025-04-07 23:23:53 +03:00

218 lines
7.0 KiB
Go

package internal
import (
"encoding/json"
"errors"
"fmt"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
"io"
"log/slog"
"relay/internal/common"
"relay/internal/connections"
"strings"
)
func IngestHandler(room *Room) {
relay := GetRelay()
// Callback for closing PeerConnection
onPCClose := func() {
slog.Debug("ingest PeerConnection closed", "room", room.Name)
room.Online = false
room.signalParticipantsOffline()
relay.DeleteRoomIfEmpty(room)
}
var err error
room.PeerConnection, err = common.CreatePeerConnection(onPCClose)
if err != nil {
slog.Error("Failed to create ingest PeerConnection", "room", room.Name, "err", err)
return
}
room.PeerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
localTrack, err := webrtc.NewTrackLocalStaticRTP(remoteTrack.Codec().RTPCodecCapability, remoteTrack.Kind().String(), fmt.Sprintf("nestri-%s-%s", room.Name, remoteTrack.Kind().String()))
if err != nil {
slog.Error("Failed to create local track for room", "room", room.Name, "kind", remoteTrack.Kind(), "err", err)
return
}
slog.Debug("Received track for room", "room", room.Name, "kind", remoteTrack.Kind())
// Set track and let Room handle state
room.SetTrack(remoteTrack.Kind(), localTrack)
// Prepare PlayoutDelayExtension so we don't need to recreate it for each packet
playoutExt := &rtp.PlayoutDelayExtension{
MinDelay: 0,
MaxDelay: 0,
}
playoutPayload, err := playoutExt.Marshal()
if err != nil {
slog.Error("Failed to marshal PlayoutDelayExtension for room", "room", room.Name, "err", err)
return
}
for {
rtpPacket, _, err := remoteTrack.ReadRTP()
if err != nil {
if !errors.Is(err, io.EOF) {
slog.Error("Failed to read RTP from remote track for room", "room", room.Name, "err", err)
}
break
}
// Use PlayoutDelayExtension for low latency, only for video tracks
if err := rtpPacket.SetExtension(common.ExtensionMap[common.ExtensionPlayoutDelay], playoutPayload); err != nil {
slog.Error("Failed to set PlayoutDelayExtension for room", "room", room.Name, "err", err)
continue
}
err = localTrack.WriteRTP(rtpPacket)
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
slog.Error("Failed to write RTP to local track for room", "room", room.Name, "err", err)
break
}
}
slog.Debug("Track closed for room", "room", room.Name, "kind", remoteTrack.Kind())
// Clear track when done
room.SetTrack(remoteTrack.Kind(), nil)
})
room.PeerConnection.OnDataChannel(func(dc *webrtc.DataChannel) {
room.DataChannel = connections.NewNestriDataChannel(dc)
slog.Debug("Ingest received DataChannel for room", "room", room.Name)
room.DataChannel.RegisterOnOpen(func() {
slog.Debug("ingest DataChannel opened for room", "room", room.Name)
})
room.DataChannel.OnClose(func() {
slog.Debug("ingest DataChannel closed for room", "room", room.Name)
})
// We do not handle any messages from ingest via DataChannel yet
})
room.PeerConnection.OnICECandidate(func(candidate *webrtc.ICECandidate) {
if candidate == nil {
return
}
slog.Debug("ingest received ICECandidate for room", "room", room.Name)
err = room.WebSocket.SendICECandidateMessageWS(candidate.ToJSON())
if err != nil {
slog.Error("Failed to send ICE candidate message to ingest for room", "room", room.Name, "err", err)
}
})
iceHolder := make([]webrtc.ICECandidateInit, 0)
// ICE callback
room.WebSocket.RegisterMessageCallback("ice", func(data []byte) {
var iceMsg connections.MessageICECandidate
if err = json.Unmarshal(data, &iceMsg); err != nil {
slog.Error("Failed to decode ICE candidate message from ingest for room", "room", room.Name, "err", err)
return
}
if room.PeerConnection != nil {
if room.PeerConnection.RemoteDescription() != nil {
if err = room.PeerConnection.AddICECandidate(iceMsg.Candidate); err != nil {
slog.Error("Failed to add ICE candidate for room", "room", room.Name, "err", err)
}
for _, heldCandidate := range iceHolder {
if err = room.PeerConnection.AddICECandidate(heldCandidate); err != nil {
slog.Error("Failed to add held ICE candidate for room", "room", room.Name, "err", err)
}
}
iceHolder = make([]webrtc.ICECandidateInit, 0)
} else {
iceHolder = append(iceHolder, iceMsg.Candidate)
}
} else {
slog.Error("ICE candidate received but PeerConnection is nil for room", "room", room.Name)
}
})
// SDP offer callback
room.WebSocket.RegisterMessageCallback("sdp", func(data []byte) {
var sdpMsg connections.MessageSDP
if err = json.Unmarshal(data, &sdpMsg); err != nil {
slog.Error("Failed to decode SDP message from ingest for room", "room", room.Name, "err", err)
return
}
answer := handleIngestSDP(room, sdpMsg)
if answer != nil {
if err = room.WebSocket.SendSDPMessageWS(*answer); err != nil {
slog.Error("Failed to send SDP answer message to ingest for room", "room", room.Name, "err", err)
}
} else {
slog.Error("Failed to handle ingest SDP message for room", "room", room.Name)
}
})
// Log callback
room.WebSocket.RegisterMessageCallback("log", func(data []byte) {
var logMsg connections.MessageLog
if err = json.Unmarshal(data, &logMsg); err != nil {
slog.Error("Failed to decode log message from ingest for room", "room", room.Name, "err", err)
return
}
// TODO: Handle log message sending to metrics server
})
// Metrics callback
room.WebSocket.RegisterMessageCallback("metrics", func(data []byte) {
var metricsMsg connections.MessageMetrics
if err = json.Unmarshal(data, &metricsMsg); err != nil {
slog.Error("Failed to decode metrics message from ingest for room", "room", room.Name, "err", err)
return
}
// TODO: Handle metrics message sending to metrics server
})
room.WebSocket.RegisterOnClose(func() {
slog.Debug("ingest WebSocket closed for room", "room", room.Name)
room.Online = false
room.signalParticipantsOffline()
relay.DeleteRoomIfEmpty(room)
})
slog.Info("Room is ready, sending OK answer to ingest", "room", room.Name)
if err = room.WebSocket.SendAnswerMessageWS(connections.AnswerOK); err != nil {
slog.Error("Failed to send OK answer message to ingest for room", "room", room.Name, "err", err)
}
}
// SDP offer handler, returns SDP answer
func handleIngestSDP(room *Room, offerMsg connections.MessageSDP) *webrtc.SessionDescription {
var err error
sdpOffer := offerMsg.SDP.SDP
sdpOffer = strings.Replace(sdpOffer, ";sprop-maxcapturerate=24000", "", -1)
err = room.PeerConnection.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: sdpOffer,
})
if err != nil {
slog.Error("Failed to set remote description for room", "room", room.Name, "err", err)
return nil
}
answer, err := room.PeerConnection.CreateAnswer(nil)
if err != nil {
slog.Error("Failed to create SDP answer for room", "room", room.Name, "err", err)
return nil
}
err = room.PeerConnection.SetLocalDescription(answer)
if err != nil {
slog.Error("Failed to set local description for room", "room", room.Name, "err", err)
return nil
}
return &answer
}