mirror of
https://github.com/nestriness/nestri.git
synced 2025-12-12 08:45:38 +02:00
Some rabbit nitpick fixes
This commit is contained in:
53
packages/relay/internal/common/ice_helper.go
Normal file
53
packages/relay/internal/common/ice_helper.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
// ICEHelper holds webrtc.ICECandidateInit(s) until remote candidate is set for given webrtc.PeerConnection
|
||||
// Held candidates should be flushed at the end of negotiation to ensure all are available for connection
|
||||
type ICEHelper struct {
|
||||
candidates []webrtc.ICECandidateInit
|
||||
pc *webrtc.PeerConnection
|
||||
}
|
||||
|
||||
func NewICEHelper(pc *webrtc.PeerConnection) *ICEHelper {
|
||||
return &ICEHelper{
|
||||
pc: pc,
|
||||
candidates: make([]webrtc.ICECandidateInit, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func (ice *ICEHelper) SetPeerConnection(pc *webrtc.PeerConnection) {
|
||||
ice.pc = pc
|
||||
}
|
||||
|
||||
func (ice *ICEHelper) AddCandidate(c webrtc.ICECandidateInit) {
|
||||
if ice.pc != nil {
|
||||
if ice.pc.RemoteDescription() != nil {
|
||||
// Add immediately if remote is set
|
||||
if err := ice.pc.AddICECandidate(c); err != nil {
|
||||
slog.Error("Failed to add ICE candidate", "err", err)
|
||||
}
|
||||
// Also flush held candidates automatically
|
||||
ice.FlushHeldCandidates()
|
||||
} else {
|
||||
// Hold in slice until remote is set
|
||||
ice.candidates = append(ice.candidates, c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ice *ICEHelper) FlushHeldCandidates() {
|
||||
if ice.pc != nil && len(ice.candidates) > 0 {
|
||||
for _, heldCandidate := range ice.candidates {
|
||||
if err := ice.pc.AddICECandidate(heldCandidate); err != nil {
|
||||
slog.Error("Failed to add held ICE candidate", "err", err)
|
||||
}
|
||||
}
|
||||
// Clear the held candidates
|
||||
ice.candidates = make([]webrtc.ICECandidateInit, 0)
|
||||
}
|
||||
}
|
||||
@@ -71,7 +71,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) {
|
||||
safeBRW := common.NewSafeBufioRW(brw)
|
||||
|
||||
var currentRoomName string // Track the current room for this stream
|
||||
iceHolder := make([]webrtc.ICECandidateInit, 0)
|
||||
iceHelper := common.NewICEHelper(nil)
|
||||
for {
|
||||
var msgWrapper gen.ProtoMessage
|
||||
err := safeBRW.ReceiveProto(&msgWrapper)
|
||||
@@ -177,6 +177,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) {
|
||||
|
||||
// Assign peer connection
|
||||
participant.PeerConnection = pc
|
||||
iceHelper.SetPeerConnection(pc)
|
||||
|
||||
// Add audio/video tracks
|
||||
{
|
||||
@@ -344,29 +345,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) {
|
||||
SDPMLineIndex: &smollified,
|
||||
UsernameFragment: iceMsg.Candidate.UsernameFragment,
|
||||
}
|
||||
// Use currentRoomName to get the connection from nested map
|
||||
if len(currentRoomName) > 0 {
|
||||
if roomMap, ok := sp.servedConns.Get(currentRoomName); ok {
|
||||
if conn, ok := roomMap.Get(stream.Conn().RemotePeer()); ok && conn.pc.RemoteDescription() != nil {
|
||||
if err = conn.pc.AddICECandidate(cand); err != nil {
|
||||
slog.Error("Failed to add ICE candidate", "err", err)
|
||||
}
|
||||
for _, heldIce := range iceHolder {
|
||||
if err := conn.pc.AddICECandidate(heldIce); err != nil {
|
||||
slog.Error("Failed to add held ICE candidate", "err", err)
|
||||
}
|
||||
}
|
||||
// Clear the held candidates
|
||||
iceHolder = make([]webrtc.ICECandidateInit, 0)
|
||||
} else {
|
||||
// Hold the candidate until remote description is set
|
||||
iceHolder = append(iceHolder, cand)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Hold the candidate until remote description is set
|
||||
iceHolder = append(iceHolder, cand)
|
||||
}
|
||||
iceHelper.AddCandidate(cand)
|
||||
} else {
|
||||
slog.Error("Could not GetIce from ice-candidate")
|
||||
}
|
||||
@@ -386,6 +365,8 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) {
|
||||
continue
|
||||
}
|
||||
slog.Debug("Set remote description for answer")
|
||||
// Flush held candidates now if missed before (race-condition)
|
||||
iceHelper.FlushHeldCandidates()
|
||||
} else {
|
||||
slog.Warn("Received answer without active PeerConnection")
|
||||
}
|
||||
@@ -406,7 +387,7 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) {
|
||||
safeBRW := common.NewSafeBufioRW(brw)
|
||||
|
||||
var room *shared.Room
|
||||
iceHolder := make([]webrtc.ICECandidateInit, 0)
|
||||
iceHelper := common.NewICEHelper(nil)
|
||||
for {
|
||||
var msgWrapper gen.ProtoMessage
|
||||
err := safeBRW.ReceiveProto(&msgWrapper)
|
||||
@@ -483,21 +464,7 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) {
|
||||
SDPMLineIndex: &smollified,
|
||||
UsernameFragment: iceMsg.Candidate.UsernameFragment,
|
||||
}
|
||||
if conn, ok := sp.incomingConns.Get(room.Name); ok && conn.pc.RemoteDescription() != nil {
|
||||
if err = conn.pc.AddICECandidate(cand); err != nil {
|
||||
slog.Error("Failed to add ICE candidate for pushed stream", "err", err)
|
||||
}
|
||||
for _, heldIce := range iceHolder {
|
||||
if err = conn.pc.AddICECandidate(heldIce); err != nil {
|
||||
slog.Error("Failed to add held ICE candidate for pushed stream", "err", err)
|
||||
}
|
||||
}
|
||||
// Clear the held candidates
|
||||
iceHolder = make([]webrtc.ICECandidateInit, 0)
|
||||
} else {
|
||||
// Hold the candidate until remote description is set
|
||||
iceHolder = append(iceHolder, cand)
|
||||
}
|
||||
iceHelper.AddCandidate(cand)
|
||||
} else {
|
||||
slog.Error("Failed to GetIce in pushed stream ice-candidate")
|
||||
}
|
||||
@@ -529,6 +496,7 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) {
|
||||
|
||||
// Assign room peer connection
|
||||
room.PeerConnection = pc
|
||||
iceHelper.SetPeerConnection(pc)
|
||||
|
||||
pc.OnDataChannel(func(dc *webrtc.DataChannel) {
|
||||
// TODO: Is this the best way to handle DataChannel? Should we just use the map directly?
|
||||
@@ -689,6 +657,8 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) {
|
||||
continue
|
||||
}
|
||||
slog.Debug("Set remote description for pushed stream", "room", room.Name)
|
||||
// Flush candidates now if they weren't before (race-condition)
|
||||
iceHelper.FlushHeldCandidates()
|
||||
|
||||
// Create an answer
|
||||
answer, err := pc.CreateAnswer(nil)
|
||||
|
||||
@@ -63,13 +63,13 @@ func (p *Participant) SetTrack(trackType webrtc.RTPCodecType, track *webrtc.Trac
|
||||
p.AudioTrack = track
|
||||
_, err := p.PeerConnection.AddTrack(track)
|
||||
if err != nil {
|
||||
slog.Error("Failed to add Participant audio track", err)
|
||||
slog.Error("Failed to add Participant audio track", "participant", p.ID, "err", err)
|
||||
}
|
||||
case webrtc.RTPCodecTypeVideo:
|
||||
p.VideoTrack = track
|
||||
_, err := p.PeerConnection.AddTrack(track)
|
||||
if err != nil {
|
||||
slog.Error("Failed to add Participant video track", err)
|
||||
slog.Error("Failed to add Participant video track", "participant", p.ID, "err", err)
|
||||
}
|
||||
default:
|
||||
slog.Warn("Unknown track type", "participant", p.ID, "trackType", trackType)
|
||||
@@ -78,6 +78,9 @@ func (p *Participant) SetTrack(trackType webrtc.RTPCodecType, track *webrtc.Trac
|
||||
|
||||
// Close cleans up participant resources
|
||||
func (p *Participant) Close() {
|
||||
p.closeOnce.Do(func() {
|
||||
close(p.packetQueue)
|
||||
})
|
||||
if p.DataChannel != nil {
|
||||
err := p.DataChannel.Close()
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user