2 Commits

Author SHA1 Message Date
Kristian Ollikainen
650ddaa601 Merge 8d5895fc5e into 32341574dc 2025-11-01 05:02:29 +02:00
DatCaptainHorse
8d5895fc5e Some rabbit nitpick fixes 2025-11-01 05:02:23 +02:00
5 changed files with 95 additions and 61 deletions

View File

@@ -56,7 +56,7 @@ export class Controller {
// Polling configuration // Polling configuration
private readonly FULL_RATE_MS = 10; // 100 UPS private readonly FULL_RATE_MS = 10; // 100 UPS
private readonly IDLE_THRESHOLD = 100; // ms before considering idle/hands off controller private readonly IDLE_THRESHOLD = 100; // ms before considering idle/hands off controller
private readonly FULL_INTERVAL= 250; // ms before sending full state occassionally, to verify inputs are synced private readonly FULL_INTERVAL = 250; // ms before sending full state occassionally, to verify inputs are synced
// Polling state // Polling state
private pollingState: PollState = PollState.IDLE; private pollingState: PollState = PollState.IDLE;
@@ -230,7 +230,7 @@ export class Controller {
// Changing from running to idle.. // Changing from running to idle..
if (this.pollingState === PollState.RUNNING) { if (this.pollingState === PollState.RUNNING) {
// Send full state on idle assumption // Send full state on idle assumption
this.sendBatchedState(0xFF, 0); this.sendBatchedState(0xff, 0);
this.pollingState = PollState.IDLE; this.pollingState = PollState.IDLE;
} }
} }
@@ -364,12 +364,15 @@ export class Controller {
// For FULL_STATE, include everything // For FULL_STATE, include everything
if (updateType === 0) { if (updateType === 0) {
message.changedFields = 0xFF; message.changedFields = 0xff;
message.buttonChangedMask = Object.fromEntries( message.buttonChangedMask = Object.fromEntries(
Array.from(this.state.buttonState).map(([key, value]) => { Array.from(this.state.buttonState)
return [this.controllerButtonToVirtualKeyCode(key), value]; .map(
}), ([key, value]) =>
[this.controllerButtonToVirtualKeyCode(key), value] as const,
)
.filter(([code]) => code !== undefined),
); );
message.leftStickX = this.state.leftX; message.leftStickX = this.state.leftX;
message.leftStickY = this.state.leftY; message.leftStickY = this.state.leftY;
@@ -402,9 +405,12 @@ export class Controller {
}) })
.map((key) => { .map((key) => {
const newValue = currentStateMap.get(key) ?? false; const newValue = currentStateMap.get(key) ?? false;
return [
return [this.controllerButtonToVirtualKeyCode(key), newValue]; this.controllerButtonToVirtualKeyCode(key),
}), newValue,
] as const;
})
.filter(([code]) => code !== undefined),
); );
} }
if (changedFields & this.CHANGED_LEFT_STICK_X) { if (changedFields & this.CHANGED_LEFT_STICK_X) {
@@ -468,14 +474,16 @@ export class Controller {
this.wrtc.removeDataChannelCallback(this._dcHandler); this.wrtc.removeDataChannelCallback(this._dcHandler);
this._dcHandler = null; this._dcHandler = null;
} }
// Gamepad disconnected if (this.gamepad) {
const detachMsg = createMessage( // Gamepad disconnected
create(ProtoControllerDetachSchema, { const detachMsg = createMessage(
sessionSlot: this.gamepad.index, create(ProtoControllerDetachSchema, {
}), sessionSlot: this.gamepad.index,
"controllerInput", }),
); "controllerInput",
this.wrtc.sendBinary(toBinary(ProtoMessageSchema, detachMsg)); );
this.wrtc.sendBinary(toBinary(ProtoMessageSchema, detachMsg));
}
} }
private controllerButtonToVirtualKeyCode(code: number): number | undefined { private controllerButtonToVirtualKeyCode(code: number): number | undefined {
@@ -487,7 +495,7 @@ export class Controller {
// Check if this rumble is for us // Check if this rumble is for us
if ( if (
rumbleMsg.sessionId !== this.wrtc.getSessionID() && rumbleMsg.sessionId !== this.wrtc.getSessionID() ||
rumbleMsg.sessionSlot !== this.gamepad.index rumbleMsg.sessionSlot !== this.gamepad.index
) )
return; return;

View File

@@ -172,7 +172,7 @@ export class WebRTCStream {
const requestMsg = createMessage( const requestMsg = createMessage(
create(ProtoClientRequestRoomStreamSchema, { create(ProtoClientRequestRoomStreamSchema, {
roomName: roomName, roomName: roomName,
sessionId: clientId, sessionId: clientId ?? "",
}), }),
"request-stream-room", "request-stream-room",
); );

View File

@@ -0,0 +1,53 @@
package common
import (
"log/slog"
"github.com/pion/webrtc/v4"
)
// ICEHelper holds webrtc.ICECandidateInit(s) until remote candidate is set for given webrtc.PeerConnection
// Held candidates should be flushed at the end of negotiation to ensure all are available for connection
type ICEHelper struct {
candidates []webrtc.ICECandidateInit
pc *webrtc.PeerConnection
}
func NewICEHelper(pc *webrtc.PeerConnection) *ICEHelper {
return &ICEHelper{
pc: pc,
candidates: make([]webrtc.ICECandidateInit, 0),
}
}
func (ice *ICEHelper) SetPeerConnection(pc *webrtc.PeerConnection) {
ice.pc = pc
}
func (ice *ICEHelper) AddCandidate(c webrtc.ICECandidateInit) {
if ice.pc != nil {
if ice.pc.RemoteDescription() != nil {
// Add immediately if remote is set
if err := ice.pc.AddICECandidate(c); err != nil {
slog.Error("Failed to add ICE candidate", "err", err)
}
// Also flush held candidates automatically
ice.FlushHeldCandidates()
} else {
// Hold in slice until remote is set
ice.candidates = append(ice.candidates, c)
}
}
}
func (ice *ICEHelper) FlushHeldCandidates() {
if ice.pc != nil && len(ice.candidates) > 0 {
for _, heldCandidate := range ice.candidates {
if err := ice.pc.AddICECandidate(heldCandidate); err != nil {
slog.Error("Failed to add held ICE candidate", "err", err)
}
}
// Clear the held candidates
ice.candidates = make([]webrtc.ICECandidateInit, 0)
}
}

View File

@@ -71,7 +71,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) {
safeBRW := common.NewSafeBufioRW(brw) safeBRW := common.NewSafeBufioRW(brw)
var currentRoomName string // Track the current room for this stream var currentRoomName string // Track the current room for this stream
iceHolder := make([]webrtc.ICECandidateInit, 0) iceHelper := common.NewICEHelper(nil)
for { for {
var msgWrapper gen.ProtoMessage var msgWrapper gen.ProtoMessage
err := safeBRW.ReceiveProto(&msgWrapper) err := safeBRW.ReceiveProto(&msgWrapper)
@@ -177,6 +177,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) {
// Assign peer connection // Assign peer connection
participant.PeerConnection = pc participant.PeerConnection = pc
iceHelper.SetPeerConnection(pc)
// Add audio/video tracks // Add audio/video tracks
{ {
@@ -344,29 +345,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) {
SDPMLineIndex: &smollified, SDPMLineIndex: &smollified,
UsernameFragment: iceMsg.Candidate.UsernameFragment, UsernameFragment: iceMsg.Candidate.UsernameFragment,
} }
// Use currentRoomName to get the connection from nested map iceHelper.AddCandidate(cand)
if len(currentRoomName) > 0 {
if roomMap, ok := sp.servedConns.Get(currentRoomName); ok {
if conn, ok := roomMap.Get(stream.Conn().RemotePeer()); ok && conn.pc.RemoteDescription() != nil {
if err = conn.pc.AddICECandidate(cand); err != nil {
slog.Error("Failed to add ICE candidate", "err", err)
}
for _, heldIce := range iceHolder {
if err := conn.pc.AddICECandidate(heldIce); err != nil {
slog.Error("Failed to add held ICE candidate", "err", err)
}
}
// Clear the held candidates
iceHolder = make([]webrtc.ICECandidateInit, 0)
} else {
// Hold the candidate until remote description is set
iceHolder = append(iceHolder, cand)
}
}
} else {
// Hold the candidate until remote description is set
iceHolder = append(iceHolder, cand)
}
} else { } else {
slog.Error("Could not GetIce from ice-candidate") slog.Error("Could not GetIce from ice-candidate")
} }
@@ -386,6 +365,8 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) {
continue continue
} }
slog.Debug("Set remote description for answer") slog.Debug("Set remote description for answer")
// Flush held candidates now if missed before (race-condition)
iceHelper.FlushHeldCandidates()
} else { } else {
slog.Warn("Received answer without active PeerConnection") slog.Warn("Received answer without active PeerConnection")
} }
@@ -406,7 +387,7 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) {
safeBRW := common.NewSafeBufioRW(brw) safeBRW := common.NewSafeBufioRW(brw)
var room *shared.Room var room *shared.Room
iceHolder := make([]webrtc.ICECandidateInit, 0) iceHelper := common.NewICEHelper(nil)
for { for {
var msgWrapper gen.ProtoMessage var msgWrapper gen.ProtoMessage
err := safeBRW.ReceiveProto(&msgWrapper) err := safeBRW.ReceiveProto(&msgWrapper)
@@ -483,21 +464,7 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) {
SDPMLineIndex: &smollified, SDPMLineIndex: &smollified,
UsernameFragment: iceMsg.Candidate.UsernameFragment, UsernameFragment: iceMsg.Candidate.UsernameFragment,
} }
if conn, ok := sp.incomingConns.Get(room.Name); ok && conn.pc.RemoteDescription() != nil { iceHelper.AddCandidate(cand)
if err = conn.pc.AddICECandidate(cand); err != nil {
slog.Error("Failed to add ICE candidate for pushed stream", "err", err)
}
for _, heldIce := range iceHolder {
if err = conn.pc.AddICECandidate(heldIce); err != nil {
slog.Error("Failed to add held ICE candidate for pushed stream", "err", err)
}
}
// Clear the held candidates
iceHolder = make([]webrtc.ICECandidateInit, 0)
} else {
// Hold the candidate until remote description is set
iceHolder = append(iceHolder, cand)
}
} else { } else {
slog.Error("Failed to GetIce in pushed stream ice-candidate") slog.Error("Failed to GetIce in pushed stream ice-candidate")
} }
@@ -529,6 +496,7 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) {
// Assign room peer connection // Assign room peer connection
room.PeerConnection = pc room.PeerConnection = pc
iceHelper.SetPeerConnection(pc)
pc.OnDataChannel(func(dc *webrtc.DataChannel) { pc.OnDataChannel(func(dc *webrtc.DataChannel) {
// TODO: Is this the best way to handle DataChannel? Should we just use the map directly? // TODO: Is this the best way to handle DataChannel? Should we just use the map directly?
@@ -689,6 +657,8 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) {
continue continue
} }
slog.Debug("Set remote description for pushed stream", "room", room.Name) slog.Debug("Set remote description for pushed stream", "room", room.Name)
// Flush candidates now if they weren't before (race-condition)
iceHelper.FlushHeldCandidates()
// Create an answer // Create an answer
answer, err := pc.CreateAnswer(nil) answer, err := pc.CreateAnswer(nil)

View File

@@ -63,13 +63,13 @@ func (p *Participant) SetTrack(trackType webrtc.RTPCodecType, track *webrtc.Trac
p.AudioTrack = track p.AudioTrack = track
_, err := p.PeerConnection.AddTrack(track) _, err := p.PeerConnection.AddTrack(track)
if err != nil { if err != nil {
slog.Error("Failed to add Participant audio track", err) slog.Error("Failed to add Participant audio track", "participant", p.ID, "err", err)
} }
case webrtc.RTPCodecTypeVideo: case webrtc.RTPCodecTypeVideo:
p.VideoTrack = track p.VideoTrack = track
_, err := p.PeerConnection.AddTrack(track) _, err := p.PeerConnection.AddTrack(track)
if err != nil { if err != nil {
slog.Error("Failed to add Participant video track", err) slog.Error("Failed to add Participant video track", "participant", p.ID, "err", err)
} }
default: default:
slog.Warn("Unknown track type", "participant", p.ID, "trackType", trackType) slog.Warn("Unknown track type", "participant", p.ID, "trackType", trackType)
@@ -78,6 +78,9 @@ func (p *Participant) SetTrack(trackType webrtc.RTPCodecType, track *webrtc.Trac
// Close cleans up participant resources // Close cleans up participant resources
func (p *Participant) Close() { func (p *Participant) Close() {
p.closeOnce.Do(func() {
close(p.packetQueue)
})
if p.DataChannel != nil { if p.DataChannel != nil {
err := p.DataChannel.Close() err := p.DataChannel.Close()
if err != nil { if err != nil {