From 8d5895fc5e23acdbd73283e4da1cd3361b0dce1e Mon Sep 17 00:00:00 2001 From: DatCaptainHorse Date: Sat, 1 Nov 2025 05:02:23 +0200 Subject: [PATCH] Some rabbit nitpick fixes --- packages/input/src/controller.ts | 44 ++++++++------- packages/input/src/webrtc-stream.ts | 2 +- packages/relay/internal/common/ice_helper.go | 53 +++++++++++++++++++ .../relay/internal/core/protocol_stream.go | 50 ++++------------- packages/relay/internal/shared/participant.go | 7 ++- 5 files changed, 95 insertions(+), 61 deletions(-) create mode 100644 packages/relay/internal/common/ice_helper.go diff --git a/packages/input/src/controller.ts b/packages/input/src/controller.ts index ba71c69d..e1760945 100644 --- a/packages/input/src/controller.ts +++ b/packages/input/src/controller.ts @@ -56,7 +56,7 @@ export class Controller { // Polling configuration private readonly FULL_RATE_MS = 10; // 100 UPS private readonly IDLE_THRESHOLD = 100; // ms before considering idle/hands off controller - private readonly FULL_INTERVAL= 250; // ms before sending full state occassionally, to verify inputs are synced + private readonly FULL_INTERVAL = 250; // ms before sending full state occassionally, to verify inputs are synced // Polling state private pollingState: PollState = PollState.IDLE; @@ -230,7 +230,7 @@ export class Controller { // Changing from running to idle.. if (this.pollingState === PollState.RUNNING) { // Send full state on idle assumption - this.sendBatchedState(0xFF, 0); + this.sendBatchedState(0xff, 0); this.pollingState = PollState.IDLE; } } @@ -364,12 +364,15 @@ export class Controller { // For FULL_STATE, include everything if (updateType === 0) { - message.changedFields = 0xFF; + message.changedFields = 0xff; message.buttonChangedMask = Object.fromEntries( - Array.from(this.state.buttonState).map(([key, value]) => { - return [this.controllerButtonToVirtualKeyCode(key), value]; - }), + Array.from(this.state.buttonState) + .map( + ([key, value]) => + [this.controllerButtonToVirtualKeyCode(key), value] as const, + ) + .filter(([code]) => code !== undefined), ); message.leftStickX = this.state.leftX; message.leftStickY = this.state.leftY; @@ -402,9 +405,12 @@ export class Controller { }) .map((key) => { const newValue = currentStateMap.get(key) ?? false; - - return [this.controllerButtonToVirtualKeyCode(key), newValue]; - }), + return [ + this.controllerButtonToVirtualKeyCode(key), + newValue, + ] as const; + }) + .filter(([code]) => code !== undefined), ); } if (changedFields & this.CHANGED_LEFT_STICK_X) { @@ -468,14 +474,16 @@ export class Controller { this.wrtc.removeDataChannelCallback(this._dcHandler); this._dcHandler = null; } - // Gamepad disconnected - const detachMsg = createMessage( - create(ProtoControllerDetachSchema, { - sessionSlot: this.gamepad.index, - }), - "controllerInput", - ); - this.wrtc.sendBinary(toBinary(ProtoMessageSchema, detachMsg)); + if (this.gamepad) { + // Gamepad disconnected + const detachMsg = createMessage( + create(ProtoControllerDetachSchema, { + sessionSlot: this.gamepad.index, + }), + "controllerInput", + ); + this.wrtc.sendBinary(toBinary(ProtoMessageSchema, detachMsg)); + } } private controllerButtonToVirtualKeyCode(code: number): number | undefined { @@ -487,7 +495,7 @@ export class Controller { // Check if this rumble is for us if ( - rumbleMsg.sessionId !== this.wrtc.getSessionID() && + rumbleMsg.sessionId !== this.wrtc.getSessionID() || rumbleMsg.sessionSlot !== this.gamepad.index ) return; diff --git a/packages/input/src/webrtc-stream.ts b/packages/input/src/webrtc-stream.ts index 8b43627b..0b90faf2 100644 --- a/packages/input/src/webrtc-stream.ts +++ b/packages/input/src/webrtc-stream.ts @@ -172,7 +172,7 @@ export class WebRTCStream { const requestMsg = createMessage( create(ProtoClientRequestRoomStreamSchema, { roomName: roomName, - sessionId: clientId, + sessionId: clientId ?? "", }), "request-stream-room", ); diff --git a/packages/relay/internal/common/ice_helper.go b/packages/relay/internal/common/ice_helper.go new file mode 100644 index 00000000..d519fd1c --- /dev/null +++ b/packages/relay/internal/common/ice_helper.go @@ -0,0 +1,53 @@ +package common + +import ( + "log/slog" + + "github.com/pion/webrtc/v4" +) + +// ICEHelper holds webrtc.ICECandidateInit(s) until remote candidate is set for given webrtc.PeerConnection +// Held candidates should be flushed at the end of negotiation to ensure all are available for connection +type ICEHelper struct { + candidates []webrtc.ICECandidateInit + pc *webrtc.PeerConnection +} + +func NewICEHelper(pc *webrtc.PeerConnection) *ICEHelper { + return &ICEHelper{ + pc: pc, + candidates: make([]webrtc.ICECandidateInit, 0), + } +} + +func (ice *ICEHelper) SetPeerConnection(pc *webrtc.PeerConnection) { + ice.pc = pc +} + +func (ice *ICEHelper) AddCandidate(c webrtc.ICECandidateInit) { + if ice.pc != nil { + if ice.pc.RemoteDescription() != nil { + // Add immediately if remote is set + if err := ice.pc.AddICECandidate(c); err != nil { + slog.Error("Failed to add ICE candidate", "err", err) + } + // Also flush held candidates automatically + ice.FlushHeldCandidates() + } else { + // Hold in slice until remote is set + ice.candidates = append(ice.candidates, c) + } + } +} + +func (ice *ICEHelper) FlushHeldCandidates() { + if ice.pc != nil && len(ice.candidates) > 0 { + for _, heldCandidate := range ice.candidates { + if err := ice.pc.AddICECandidate(heldCandidate); err != nil { + slog.Error("Failed to add held ICE candidate", "err", err) + } + } + // Clear the held candidates + ice.candidates = make([]webrtc.ICECandidateInit, 0) + } +} diff --git a/packages/relay/internal/core/protocol_stream.go b/packages/relay/internal/core/protocol_stream.go index e7566ed9..925eb4bf 100644 --- a/packages/relay/internal/core/protocol_stream.go +++ b/packages/relay/internal/core/protocol_stream.go @@ -71,7 +71,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) { safeBRW := common.NewSafeBufioRW(brw) var currentRoomName string // Track the current room for this stream - iceHolder := make([]webrtc.ICECandidateInit, 0) + iceHelper := common.NewICEHelper(nil) for { var msgWrapper gen.ProtoMessage err := safeBRW.ReceiveProto(&msgWrapper) @@ -177,6 +177,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) { // Assign peer connection participant.PeerConnection = pc + iceHelper.SetPeerConnection(pc) // Add audio/video tracks { @@ -344,29 +345,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) { SDPMLineIndex: &smollified, UsernameFragment: iceMsg.Candidate.UsernameFragment, } - // Use currentRoomName to get the connection from nested map - if len(currentRoomName) > 0 { - if roomMap, ok := sp.servedConns.Get(currentRoomName); ok { - if conn, ok := roomMap.Get(stream.Conn().RemotePeer()); ok && conn.pc.RemoteDescription() != nil { - if err = conn.pc.AddICECandidate(cand); err != nil { - slog.Error("Failed to add ICE candidate", "err", err) - } - for _, heldIce := range iceHolder { - if err := conn.pc.AddICECandidate(heldIce); err != nil { - slog.Error("Failed to add held ICE candidate", "err", err) - } - } - // Clear the held candidates - iceHolder = make([]webrtc.ICECandidateInit, 0) - } else { - // Hold the candidate until remote description is set - iceHolder = append(iceHolder, cand) - } - } - } else { - // Hold the candidate until remote description is set - iceHolder = append(iceHolder, cand) - } + iceHelper.AddCandidate(cand) } else { slog.Error("Could not GetIce from ice-candidate") } @@ -386,6 +365,8 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) { continue } slog.Debug("Set remote description for answer") + // Flush held candidates now if missed before (race-condition) + iceHelper.FlushHeldCandidates() } else { slog.Warn("Received answer without active PeerConnection") } @@ -406,7 +387,7 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) { safeBRW := common.NewSafeBufioRW(brw) var room *shared.Room - iceHolder := make([]webrtc.ICECandidateInit, 0) + iceHelper := common.NewICEHelper(nil) for { var msgWrapper gen.ProtoMessage err := safeBRW.ReceiveProto(&msgWrapper) @@ -483,21 +464,7 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) { SDPMLineIndex: &smollified, UsernameFragment: iceMsg.Candidate.UsernameFragment, } - if conn, ok := sp.incomingConns.Get(room.Name); ok && conn.pc.RemoteDescription() != nil { - if err = conn.pc.AddICECandidate(cand); err != nil { - slog.Error("Failed to add ICE candidate for pushed stream", "err", err) - } - for _, heldIce := range iceHolder { - if err = conn.pc.AddICECandidate(heldIce); err != nil { - slog.Error("Failed to add held ICE candidate for pushed stream", "err", err) - } - } - // Clear the held candidates - iceHolder = make([]webrtc.ICECandidateInit, 0) - } else { - // Hold the candidate until remote description is set - iceHolder = append(iceHolder, cand) - } + iceHelper.AddCandidate(cand) } else { slog.Error("Failed to GetIce in pushed stream ice-candidate") } @@ -529,6 +496,7 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) { // Assign room peer connection room.PeerConnection = pc + iceHelper.SetPeerConnection(pc) pc.OnDataChannel(func(dc *webrtc.DataChannel) { // TODO: Is this the best way to handle DataChannel? Should we just use the map directly? @@ -689,6 +657,8 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) { continue } slog.Debug("Set remote description for pushed stream", "room", room.Name) + // Flush candidates now if they weren't before (race-condition) + iceHelper.FlushHeldCandidates() // Create an answer answer, err := pc.CreateAnswer(nil) diff --git a/packages/relay/internal/shared/participant.go b/packages/relay/internal/shared/participant.go index 74b87d60..9fef6ad1 100644 --- a/packages/relay/internal/shared/participant.go +++ b/packages/relay/internal/shared/participant.go @@ -63,13 +63,13 @@ func (p *Participant) SetTrack(trackType webrtc.RTPCodecType, track *webrtc.Trac p.AudioTrack = track _, err := p.PeerConnection.AddTrack(track) if err != nil { - slog.Error("Failed to add Participant audio track", err) + slog.Error("Failed to add Participant audio track", "participant", p.ID, "err", err) } case webrtc.RTPCodecTypeVideo: p.VideoTrack = track _, err := p.PeerConnection.AddTrack(track) if err != nil { - slog.Error("Failed to add Participant video track", err) + slog.Error("Failed to add Participant video track", "participant", p.ID, "err", err) } default: slog.Warn("Unknown track type", "participant", p.ID, "trackType", trackType) @@ -78,6 +78,9 @@ func (p *Participant) SetTrack(trackType webrtc.RTPCodecType, track *webrtc.Trac // Close cleans up participant resources func (p *Participant) Close() { + p.closeOnce.Do(func() { + close(p.packetQueue) + }) if p.DataChannel != nil { err := p.DataChannel.Close() if err != nil {