From a54cf759fa63f75efee19695c1e774ef5e0c45b3 Mon Sep 17 00:00:00 2001 From: DatCaptainHorse Date: Sat, 25 Oct 2025 03:57:26 +0300 Subject: [PATCH] Fixed multi-controllers, optimize and improve code in relay and nestri-server --- containerfiles/runner-builder.Containerfile | 20 +- packages/input/src/controller.ts | 119 ++++++--- .../input/src/proto/latency_tracker_pb.ts | 2 +- packages/input/src/proto/messages_pb.ts | 2 +- packages/input/src/proto/types_pb.ts | 114 ++++++--- packages/input/src/webrtc-stream.ts | 17 +- .../play-standalone/src/pages/[room].astro | 6 +- packages/relay/internal/common/common.go | 2 +- .../relay/internal/core/protocol_stream.go | 211 ++++++++-------- packages/relay/internal/core/room.go | 2 +- packages/relay/internal/core/state.go | 6 +- packages/relay/internal/proto/types.pb.go | 186 +++++++++----- packages/relay/internal/shared/participant.go | 110 +++++++-- packages/relay/internal/shared/room.go | 228 +++++++++--------- packages/scripts/entrypoint.sh | 28 ++- packages/scripts/entrypoint_nestri.sh | 2 +- packages/server/Cargo.lock | 128 +++++----- packages/server/Cargo.toml | 2 +- packages/server/src/args.rs | 8 + packages/server/src/args/app_args.rs | 8 + packages/server/src/enc_helper.rs | 62 +---- packages/server/src/input/controller.rs | 44 ++-- packages/server/src/main.rs | 29 +-- packages/server/src/nestrisink/imp.rs | 13 +- packages/server/src/nestrisink/mod.rs | 2 +- packages/server/src/proto/proto.rs | 84 ++++--- protobufs/types.proto | 46 ++-- 27 files changed, 837 insertions(+), 644 deletions(-) diff --git a/containerfiles/runner-builder.Containerfile b/containerfiles/runner-builder.Containerfile index 68528fdf..e6613bb4 100644 --- a/containerfiles/runner-builder.Containerfile +++ b/containerfiles/runner-builder.Containerfile @@ -41,7 +41,7 @@ RUN --mount=type=cache,target=/var/cache/pacman/pkg \ pacman -Sy --noconfirm lib32-gcc-libs # Clone repository -RUN git clone --depth 1 --rev "9e8bfd0217eeab011c5afc368d3ea67a4c239e81" https://github.com/DatCaptainHorse/vimputti.git +RUN git clone --depth 1 --rev "f2f21561ddcb814d74455311969d3e8934b052c6" https://github.com/DatCaptainHorse/vimputti.git #-------------------------------------------------------------------- FROM vimputti-manager-deps AS vimputti-manager-planner @@ -129,23 +129,8 @@ RUN --mount=type=cache,target=/var/cache/pacman/pkg \ RUN --mount=type=cache,target=${CARGO_HOME}/registry \ cargo install cargo-c -# Grab cudart from NVIDIA.. -RUN wget https://developer.download.nvidia.com/compute/cuda/redist/cuda_cudart/linux-x86_64/cuda_cudart-linux-x86_64-13.0.96-archive.tar.xz -O cuda_cudart.tar.xz && \ - mkdir cuda_cudart && tar -xf cuda_cudart.tar.xz -C cuda_cudart --strip-components=1 && \ - cp cuda_cudart/lib/libcudart.so cuda_cudart/lib/libcudart.so.* /usr/lib/ && \ - rm -r cuda_cudart && \ - rm cuda_cudart.tar.xz - -# Grab cuda lib from NVIDIA (it's in driver package of all things..) -RUN wget https://developer.download.nvidia.com/compute/cuda/redist/nvidia_driver/linux-x86_64/nvidia_driver-linux-x86_64-580.95.05-archive.tar.xz -O nvidia_driver.tar.xz && \ - mkdir nvidia_driver && tar -xf nvidia_driver.tar.xz -C nvidia_driver --strip-components=1 && \ - cp nvidia_driver/lib/libcuda.so.* /usr/lib/libcuda.so && \ - ln -s /usr/lib/libcuda.so /usr/lib/libcuda.so.1 && \ - rm -r nvidia_driver && \ - rm nvidia_driver.tar.xz - # Clone repository -RUN git clone --depth 1 --rev "afa853fa03e8403c83bbb3bc0cf39147ad46c266" https://github.com/games-on-whales/gst-wayland-display.git +RUN git clone --depth 1 --rev "a4abcfe2cffe2d33b564d1308b58504a5e3012b1" https://github.com/games-on-whales/gst-wayland-display.git #-------------------------------------------------------------------- FROM gst-wayland-deps AS gst-wayland-planner @@ -214,5 +199,4 @@ COPY --from=gst-wayland-cached-builder /artifacts/include/ /artifacts/include/ COPY --from=vimputti-manager-cached-builder /artifacts/vimputti-manager /artifacts/bin/ COPY --from=vimputti-manager-cached-builder /artifacts/libvimputti_shim_64.so /artifacts/lib64/libvimputti_shim.so COPY --from=vimputti-manager-cached-builder /artifacts/libvimputti_shim_32.so /artifacts/lib32/libvimputti_shim.so -COPY --from=gst-wayland-deps /usr/lib/libcuda.so /usr/lib/libcuda.so.* /artifacts/lib/ COPY --from=bubblewrap-builder /artifacts/bin/bwrap /artifacts/bin/ diff --git a/packages/input/src/controller.ts b/packages/input/src/controller.ts index d1c5316d..6742f6e5 100644 --- a/packages/input/src/controller.ts +++ b/packages/input/src/controller.ts @@ -32,7 +32,6 @@ interface GamepadState { export class Controller { protected wrtc: WebRTCStream; - protected slotMap: Map = new Map(); // local slot to server slot protected connected: boolean = false; protected gamepad: Gamepad | null = null; protected lastState: GamepadState = { @@ -50,6 +49,14 @@ export class Controller { protected stickDeadzone: number = 2048; // 2048 / 32768 = ~0.06 (6% of stick range) private updateInterval = 10.0; // 100 updates per second + private isIdle: boolean = true; + private lastInputTime: number = Date.now(); + private idleUpdateInterval: number = 150.0; // ~6-7 updates per second for keep-alive packets + private inputDetected: boolean = false; + private lastFullStateSend: number = Date.now(); + private fullStateSendInterval: number = 500.0; // send full state every 0.5 seconds (helps packet loss) + private forceFullStateSend: boolean = false; + private _dcHandler: ((data: ArrayBuffer) => void) | null = null; constructor({ webrtc, e }: Props) { @@ -79,9 +86,8 @@ export class Controller { const attachMsg = messageWrapper.payload.value; // Gamepad connected succesfully this.gamepad = e.gamepad; - this.slotMap.set(e.gamepad.index, attachMsg.slot); console.log( - `Gamepad connected: ${e.gamepad.id} assigned to slot ${attachMsg.slot} on server, local slot ${e.gamepad.index}`, + `Gamepad connected: ${e.gamepad.id}, local slot ${e.gamepad.index}, msg: ${attachMsg.sessionSlot}`, ); } } catch (err) { @@ -93,6 +99,7 @@ export class Controller { const attachMsg = createMessage( create(ProtoControllerAttachSchema, { id: this.vendor_id_to_controller(vendorId, productId), + sessionSlot: e.gamepad.index, sessionId: this.wrtc.getSessionID(), }), "controllerInput", @@ -102,6 +109,10 @@ export class Controller { this.run(); } + public getSlot(): number { + return this.gamepad.index; + } + // Maps vendor id and product id to supported controller type // Currently supported: Sony (ps4, ps5), Microsoft (xbox360, xboxone), Nintendo (switchpro) // Default fallback to xbox360 @@ -154,6 +165,13 @@ export class Controller { private pollGamepad() { // Get updated gamepad state const gamepads = navigator.getGamepads(); + + // Periodically force send full state to clear stuck inputs + if (Date.now() - this.lastFullStateSend > this.fullStateSendInterval) { + this.forceFullStateSend = true; + this.lastFullStateSend = Date.now(); + } + if (this.gamepad) { if (gamepads[this.gamepad.index]) { this.gamepad = gamepads[this.gamepad!.index]; @@ -164,7 +182,7 @@ export class Controller { // ignore trigger buttons (6-7) as we handle those as axis if (index === 6 || index === 7) return; // If state differs, send - if (button.pressed !== this.lastState.buttonState.get(index)) { + if (button.pressed !== this.lastState.buttonState.get(index) || this.forceFullStateSend) { const linuxCode = this.controllerButtonToVirtualKeyCode(index); if (linuxCode === undefined) { // Skip unmapped button index @@ -174,13 +192,15 @@ export class Controller { const buttonMessage = createMessage( create(ProtoControllerButtonSchema, { - slot: this.getServerSlot(), + sessionSlot: this.gamepad.index, + sessionId: this.wrtc.getSessionID(), button: linuxCode, pressed: button.pressed, }), "controllerInput", ); this.wrtc.sendBinary(toBinary(ProtoMessageSchema, buttonMessage)); + this.inputDetected = true; // Store button state this.lastState.buttonState.set(index, button.pressed); } @@ -198,16 +218,18 @@ export class Controller { ), ); // If state differs, send - if (leftTrigger !== this.lastState.leftTrigger) { + if (leftTrigger !== this.lastState.leftTrigger || this.forceFullStateSend) { const triggerMessage = createMessage( create(ProtoControllerTriggerSchema, { - slot: this.getServerSlot(), + sessionSlot: this.gamepad.index, + sessionId: this.wrtc.getSessionID(), trigger: 0, // 0 = left, 1 = right value: leftTrigger, }), "controllerInput", ); this.wrtc.sendBinary(toBinary(ProtoMessageSchema, triggerMessage)); + this.inputDetected = true; this.lastState.leftTrigger = leftTrigger; } const rightTrigger = Math.round( @@ -220,16 +242,18 @@ export class Controller { ), ); // If state differs, send - if (rightTrigger !== this.lastState.rightTrigger) { + if (rightTrigger !== this.lastState.rightTrigger || this.forceFullStateSend) { const triggerMessage = createMessage( create(ProtoControllerTriggerSchema, { - slot: this.getServerSlot(), + sessionSlot: this.gamepad.index, + sessionId: this.wrtc.getSessionID(), trigger: 1, // 0 = left, 1 = right value: rightTrigger, }), "controllerInput", ); this.wrtc.sendBinary(toBinary(ProtoMessageSchema, triggerMessage)); + this.inputDetected = true; this.lastState.rightTrigger = rightTrigger; } @@ -238,32 +262,36 @@ export class Controller { const dpadLeft = this.gamepad.buttons[14]?.pressed ? 1 : 0; const dpadRight = this.gamepad.buttons[15]?.pressed ? 1 : 0; const dpadX = dpadLeft ? -1 : dpadRight ? 1 : 0; - if (dpadX !== this.lastState.dpadX) { + if (dpadX !== this.lastState.dpadX || this.forceFullStateSend) { const dpadMessage = createMessage( create(ProtoControllerAxisSchema, { - slot: this.getServerSlot(), + sessionSlot: this.gamepad.index, + sessionId: this.wrtc.getSessionID(), axis: 0, // 0 = dpadX, 1 = dpadY value: dpadX, }), "controllerInput", ); - this.lastState.dpadX = dpadX; this.wrtc.sendBinary(toBinary(ProtoMessageSchema, dpadMessage)); + this.inputDetected = true; + this.lastState.dpadX = dpadX; } const dpadUp = this.gamepad.buttons[12]?.pressed ? 1 : 0; const dpadDown = this.gamepad.buttons[13]?.pressed ? 1 : 0; const dpadY = dpadUp ? -1 : dpadDown ? 1 : 0; - if (dpadY !== this.lastState.dpadY) { + if (dpadY !== this.lastState.dpadY || this.forceFullStateSend) { const dpadMessage = createMessage( create(ProtoControllerAxisSchema, { - slot: this.getServerSlot(), + sessionSlot: this.gamepad.index, + sessionId: this.wrtc.getSessionID(), axis: 1, // 0 = dpadX, 1 = dpadY value: dpadY, }), "controllerInput", ); this.wrtc.sendBinary(toBinary(ProtoMessageSchema, dpadMessage)); + this.inputDetected = true; this.lastState.dpadY = dpadY; } @@ -292,10 +320,12 @@ export class Controller { // if moves inside deadzone, zero it if not inside deadzone last time if ( sendLeftX !== this.lastState.leftX || - sendLeftY !== this.lastState.leftY + sendLeftY !== this.lastState.leftY || this.forceFullStateSend ) { const stickMessage = createMessage( create(ProtoControllerStickSchema, { + sessionSlot: this.gamepad.index, + sessionId: this.wrtc.getSessionID(), stick: 0, // 0 = left, 1 = right x: sendLeftX, y: sendLeftY, @@ -303,6 +333,7 @@ export class Controller { "controllerInput", ); this.wrtc.sendBinary(toBinary(ProtoMessageSchema, stickMessage)); + this.inputDetected = true; this.lastState.leftX = sendLeftX; this.lastState.leftY = sendLeftY; } @@ -328,10 +359,12 @@ export class Controller { Math.abs(rightY) > this.stickDeadzone ? Math.round(rightY) : 0; if ( sendRightX !== this.lastState.rightX || - sendRightY !== this.lastState.rightY + sendRightY !== this.lastState.rightY || this.forceFullStateSend ) { const stickMessage = createMessage( create(ProtoControllerStickSchema, { + sessionSlot: this.gamepad.index, + sessionId: this.wrtc.getSessionID(), stick: 1, // 0 = left, 1 = right x: sendRightX, y: sendRightY, @@ -339,11 +372,14 @@ export class Controller { "controllerInput", ); this.wrtc.sendBinary(toBinary(ProtoMessageSchema, stickMessage)); + this.inputDetected = true; this.lastState.rightX = sendRightX; this.lastState.rightY = sendRightY; } } } + + this.forceFullStateSend = false; } private loopInterval: any = null; @@ -352,10 +388,34 @@ export class Controller { if (this.connected) this.stop(); this.connected = true; - // Poll gamepads in setInterval loop + this.isIdle = true; + this.lastInputTime = Date.now(); + this.loopInterval = setInterval(() => { - if (this.connected) this.pollGamepad(); - }, this.updateInterval); + if (this.connected) { + this.inputDetected = false; // Reset before poll + this.pollGamepad(); + + // Switch polling rate based on input + if (this.inputDetected) { + this.lastInputTime = Date.now(); + if (this.isIdle) { + this.isIdle = false; + clearInterval(this.loopInterval); + this.loopInterval = setInterval(() => { + if (this.connected) this.pollGamepad(); + }, this.updateInterval); + } + } else if (!this.isIdle && Date.now() - this.lastInputTime > 200) { + // Switch to idle polling after 200ms of no input + this.isIdle = true; + clearInterval(this.loopInterval); + this.loopInterval = setInterval(() => { + if (this.connected) this.pollGamepad(); + }, this.idleUpdateInterval); + } + } + }, this.isIdle ? this.idleUpdateInterval : this.updateInterval); } public stop() { @@ -366,21 +426,6 @@ export class Controller { this.connected = false; } - public getLocalSlot(): number { - if (this.gamepad) { - return this.gamepad.index; - } - return -1; - } - - public getServerSlot(): number { - if (this.gamepad) { - const slot = this.slotMap.get(this.gamepad.index); - if (slot !== undefined) return slot; - } - return -1; - } - public dispose() { this.stop(); // Remove callback @@ -391,7 +436,7 @@ export class Controller { // Gamepad disconnected const detachMsg = createMessage( create(ProtoControllerDetachSchema, { - slot: this.getServerSlot(), + sessionSlot: this.gamepad.index, }), "controllerInput", ); @@ -407,7 +452,9 @@ export class Controller { if (!this.connected) return; // Check if aimed at this controller slot - if (rumbleMsg.slot !== this.getServerSlot()) return; + if (rumbleMsg.sessionId !== this.wrtc.getSessionID() && + rumbleMsg.sessionSlot !== this.gamepad.index) + return; // Trigger actual rumble // Need to remap from 0-65535 to 0.0-1.0 ranges diff --git a/packages/input/src/proto/latency_tracker_pb.ts b/packages/input/src/proto/latency_tracker_pb.ts index 27eefd2a..2d35d78e 100644 --- a/packages/input/src/proto/latency_tracker_pb.ts +++ b/packages/input/src/proto/latency_tracker_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v2.9.0 with parameter "target=ts" +// @generated by protoc-gen-es v2.10.0 with parameter "target=ts" // @generated from file latency_tracker.proto (package proto, syntax proto3) /* eslint-disable */ diff --git a/packages/input/src/proto/messages_pb.ts b/packages/input/src/proto/messages_pb.ts index 8c33ee30..4e8c70de 100644 --- a/packages/input/src/proto/messages_pb.ts +++ b/packages/input/src/proto/messages_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v2.9.0 with parameter "target=ts" +// @generated by protoc-gen-es v2.10.0 with parameter "target=ts" // @generated from file messages.proto (package proto, syntax proto3) /* eslint-disable */ diff --git a/packages/input/src/proto/types_pb.ts b/packages/input/src/proto/types_pb.ts index 6538b8f6..6da4bf1e 100644 --- a/packages/input/src/proto/types_pb.ts +++ b/packages/input/src/proto/types_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v2.9.0 with parameter "target=ts" +// @generated by protoc-gen-es v2.10.0 with parameter "target=ts" // @generated from file types.proto (package proto, syntax proto3) /* eslint-disable */ @@ -10,7 +10,7 @@ import type { Message } from "@bufbuild/protobuf"; * Describes the file types.proto. */ export const file_types: GenFile = /*@__PURE__*/ - fileDesc("Cgt0eXBlcy5wcm90bxIFcHJvdG8iJgoOUHJvdG9Nb3VzZU1vdmUSCQoBeBgBIAEoBRIJCgF5GAIgASgFIikKEVByb3RvTW91c2VNb3ZlQWJzEgkKAXgYASABKAUSCQoBeRgCIAEoBSInCg9Qcm90b01vdXNlV2hlZWwSCQoBeBgBIAEoBRIJCgF5GAIgASgFIiAKEVByb3RvTW91c2VLZXlEb3duEgsKA2tleRgBIAEoBSIeCg9Qcm90b01vdXNlS2V5VXASCwoDa2V5GAEgASgFIhsKDFByb3RvS2V5RG93bhILCgNrZXkYASABKAUiGQoKUHJvdG9LZXlVcBILCgNrZXkYASABKAUiRQoVUHJvdG9Db250cm9sbGVyQXR0YWNoEgoKAmlkGAEgASgJEgwKBHNsb3QYAiABKAUSEgoKc2Vzc2lvbl9pZBgDIAEoCSIlChVQcm90b0NvbnRyb2xsZXJEZXRhY2gSDAoEc2xvdBgBIAEoBSJGChVQcm90b0NvbnRyb2xsZXJCdXR0b24SDAoEc2xvdBgBIAEoBRIOCgZidXR0b24YAiABKAUSDwoHcHJlc3NlZBgDIAEoCCJGChZQcm90b0NvbnRyb2xsZXJUcmlnZ2VyEgwKBHNsb3QYASABKAUSDwoHdHJpZ2dlchgCIAEoBRINCgV2YWx1ZRgDIAEoBSJJChRQcm90b0NvbnRyb2xsZXJTdGljaxIMCgRzbG90GAEgASgFEg0KBXN0aWNrGAIgASgFEgkKAXgYAyABKAUSCQoBeRgEIAEoBSJAChNQcm90b0NvbnRyb2xsZXJBeGlzEgwKBHNsb3QYASABKAUSDAoEYXhpcxgCIAEoBRINCgV2YWx1ZRgDIAEoBSJmChVQcm90b0NvbnRyb2xsZXJSdW1ibGUSDAoEc2xvdBgBIAEoBRIVCg1sb3dfZnJlcXVlbmN5GAIgASgFEhYKDmhpZ2hfZnJlcXVlbmN5GAMgASgFEhAKCGR1cmF0aW9uGAQgASgFIqoBChNSVENJY2VDYW5kaWRhdGVJbml0EhEKCWNhbmRpZGF0ZRgBIAEoCRIaCg1zZHBNTGluZUluZGV4GAIgASgNSACIAQESEwoGc2RwTWlkGAMgASgJSAGIAQESHQoQdXNlcm5hbWVGcmFnbWVudBgEIAEoCUgCiAEBQhAKDl9zZHBNTGluZUluZGV4QgkKB19zZHBNaWRCEwoRX3VzZXJuYW1lRnJhZ21lbnQiNgoZUlRDU2Vzc2lvbkRlc2NyaXB0aW9uSW5pdBILCgNzZHAYASABKAkSDAoEdHlwZRgCIAEoCSI5CghQcm90b0lDRRItCgljYW5kaWRhdGUYASABKAsyGi5wcm90by5SVENJY2VDYW5kaWRhdGVJbml0IjkKCFByb3RvU0RQEi0KA3NkcBgBIAEoCzIgLnByb3RvLlJUQ1Nlc3Npb25EZXNjcmlwdGlvbkluaXQiGAoIUHJvdG9SYXcSDAoEZGF0YRgBIAEoCSJFChxQcm90b0NsaWVudFJlcXVlc3RSb29tU3RyZWFtEhEKCXJvb21fbmFtZRgBIAEoCRISCgpzZXNzaW9uX2lkGAIgASgJIkcKF1Byb3RvQ2xpZW50RGlzY29ubmVjdGVkEhIKCnNlc3Npb25faWQYASABKAkSGAoQY29udHJvbGxlcl9zbG90cxgCIAMoBSIqChVQcm90b1NlcnZlclB1c2hTdHJlYW0SEQoJcm9vbV9uYW1lGAEgASgJQhZaFHJlbGF5L2ludGVybmFsL3Byb3RvYgZwcm90bzM"); + fileDesc("Cgt0eXBlcy5wcm90bxIFcHJvdG8iJgoOUHJvdG9Nb3VzZU1vdmUSCQoBeBgBIAEoBRIJCgF5GAIgASgFIikKEVByb3RvTW91c2VNb3ZlQWJzEgkKAXgYASABKAUSCQoBeRgCIAEoBSInCg9Qcm90b01vdXNlV2hlZWwSCQoBeBgBIAEoBRIJCgF5GAIgASgFIiAKEVByb3RvTW91c2VLZXlEb3duEgsKA2tleRgBIAEoBSIeCg9Qcm90b01vdXNlS2V5VXASCwoDa2V5GAEgASgFIhsKDFByb3RvS2V5RG93bhILCgNrZXkYASABKAUiGQoKUHJvdG9LZXlVcBILCgNrZXkYASABKAUiTQoVUHJvdG9Db250cm9sbGVyQXR0YWNoEgoKAmlkGAEgASgJEhQKDHNlc3Npb25fc2xvdBgCIAEoBRISCgpzZXNzaW9uX2lkGAMgASgJIkEKFVByb3RvQ29udHJvbGxlckRldGFjaBIUCgxzZXNzaW9uX3Nsb3QYASABKAUSEgoKc2Vzc2lvbl9pZBgCIAEoCSJiChVQcm90b0NvbnRyb2xsZXJCdXR0b24SFAoMc2Vzc2lvbl9zbG90GAEgASgFEhIKCnNlc3Npb25faWQYAiABKAkSDgoGYnV0dG9uGAMgASgFEg8KB3ByZXNzZWQYBCABKAgiYgoWUHJvdG9Db250cm9sbGVyVHJpZ2dlchIUCgxzZXNzaW9uX3Nsb3QYASABKAUSEgoKc2Vzc2lvbl9pZBgCIAEoCRIPCgd0cmlnZ2VyGAMgASgFEg0KBXZhbHVlGAQgASgFImUKFFByb3RvQ29udHJvbGxlclN0aWNrEhQKDHNlc3Npb25fc2xvdBgBIAEoBRISCgpzZXNzaW9uX2lkGAIgASgJEg0KBXN0aWNrGAMgASgFEgkKAXgYBCABKAUSCQoBeRgFIAEoBSJcChNQcm90b0NvbnRyb2xsZXJBeGlzEhQKDHNlc3Npb25fc2xvdBgBIAEoBRISCgpzZXNzaW9uX2lkGAIgASgJEgwKBGF4aXMYAyABKAUSDQoFdmFsdWUYBCABKAUiggEKFVByb3RvQ29udHJvbGxlclJ1bWJsZRIUCgxzZXNzaW9uX3Nsb3QYASABKAUSEgoKc2Vzc2lvbl9pZBgCIAEoCRIVCg1sb3dfZnJlcXVlbmN5GAMgASgFEhYKDmhpZ2hfZnJlcXVlbmN5GAQgASgFEhAKCGR1cmF0aW9uGAUgASgFIqoBChNSVENJY2VDYW5kaWRhdGVJbml0EhEKCWNhbmRpZGF0ZRgBIAEoCRIaCg1zZHBNTGluZUluZGV4GAIgASgNSACIAQESEwoGc2RwTWlkGAMgASgJSAGIAQESHQoQdXNlcm5hbWVGcmFnbWVudBgEIAEoCUgCiAEBQhAKDl9zZHBNTGluZUluZGV4QgkKB19zZHBNaWRCEwoRX3VzZXJuYW1lRnJhZ21lbnQiNgoZUlRDU2Vzc2lvbkRlc2NyaXB0aW9uSW5pdBILCgNzZHAYASABKAkSDAoEdHlwZRgCIAEoCSI5CghQcm90b0lDRRItCgljYW5kaWRhdGUYASABKAsyGi5wcm90by5SVENJY2VDYW5kaWRhdGVJbml0IjkKCFByb3RvU0RQEi0KA3NkcBgBIAEoCzIgLnByb3RvLlJUQ1Nlc3Npb25EZXNjcmlwdGlvbkluaXQiGAoIUHJvdG9SYXcSDAoEZGF0YRgBIAEoCSJFChxQcm90b0NsaWVudFJlcXVlc3RSb29tU3RyZWFtEhEKCXJvb21fbmFtZRgBIAEoCRISCgpzZXNzaW9uX2lkGAIgASgJIkcKF1Byb3RvQ2xpZW50RGlzY29ubmVjdGVkEhIKCnNlc3Npb25faWQYASABKAkSGAoQY29udHJvbGxlcl9zbG90cxgCIAMoBSIqChVQcm90b1NlcnZlclB1c2hTdHJlYW0SEQoJcm9vbV9uYW1lGAEgASgJQhZaFHJlbGF5L2ludGVybmFsL3Byb3RvYgZwcm90bzM"); /** * MouseMove message @@ -174,14 +174,14 @@ export type ProtoControllerAttach = Message<"proto.ProtoControllerAttach"> & { id: string; /** - * Slot number (0-3) + * Session specific slot number (0-3) * - * @generated from field: int32 slot = 2; + * @generated from field: int32 session_slot = 2; */ - slot: number; + sessionSlot: number; /** - * Session ID of the client attaching the controller + * Session ID of the client * * @generated from field: string session_id = 3; */ @@ -202,11 +202,18 @@ export const ProtoControllerAttachSchema: GenMessage = /* */ export type ProtoControllerDetach = Message<"proto.ProtoControllerDetach"> & { /** - * Slot number (0-3) + * Session specific slot number (0-3) * - * @generated from field: int32 slot = 1; + * @generated from field: int32 session_slot = 1; */ - slot: number; + sessionSlot: number; + + /** + * Session ID of the client + * + * @generated from field: string session_id = 2; + */ + sessionId: string; }; /** @@ -223,23 +230,30 @@ export const ProtoControllerDetachSchema: GenMessage = /* */ export type ProtoControllerButton = Message<"proto.ProtoControllerButton"> & { /** - * Slot number (0-3) + * Session specific slot number (0-3) * - * @generated from field: int32 slot = 1; + * @generated from field: int32 session_slot = 1; */ - slot: number; + sessionSlot: number; + + /** + * Session ID of the client + * + * @generated from field: string session_id = 2; + */ + sessionId: string; /** * Button code (linux input event code) * - * @generated from field: int32 button = 2; + * @generated from field: int32 button = 3; */ button: number; /** * true if pressed, false if released * - * @generated from field: bool pressed = 3; + * @generated from field: bool pressed = 4; */ pressed: boolean; }; @@ -258,23 +272,30 @@ export const ProtoControllerButtonSchema: GenMessage = /* */ export type ProtoControllerTrigger = Message<"proto.ProtoControllerTrigger"> & { /** - * Slot number (0-3) + * Session specific slot number (0-3) * - * @generated from field: int32 slot = 1; + * @generated from field: int32 session_slot = 1; */ - slot: number; + sessionSlot: number; + + /** + * Session ID of the client + * + * @generated from field: string session_id = 2; + */ + sessionId: string; /** * Trigger number (0 for left, 1 for right) * - * @generated from field: int32 trigger = 2; + * @generated from field: int32 trigger = 3; */ trigger: number; /** * trigger value (-32768 to 32767) * - * @generated from field: int32 value = 3; + * @generated from field: int32 value = 4; */ value: number; }; @@ -293,30 +314,37 @@ export const ProtoControllerTriggerSchema: GenMessage = */ export type ProtoControllerStick = Message<"proto.ProtoControllerStick"> & { /** - * Slot number (0-3) + * Session specific slot number (0-3) * - * @generated from field: int32 slot = 1; + * @generated from field: int32 session_slot = 1; */ - slot: number; + sessionSlot: number; + + /** + * Session ID of the client + * + * @generated from field: string session_id = 2; + */ + sessionId: string; /** * Stick number (0 for left, 1 for right) * - * @generated from field: int32 stick = 2; + * @generated from field: int32 stick = 3; */ stick: number; /** * X axis value (-32768 to 32767) * - * @generated from field: int32 x = 3; + * @generated from field: int32 x = 4; */ x: number; /** * Y axis value (-32768 to 32767) * - * @generated from field: int32 y = 4; + * @generated from field: int32 y = 5; */ y: number; }; @@ -335,23 +363,30 @@ export const ProtoControllerStickSchema: GenMessage = /*@_ */ export type ProtoControllerAxis = Message<"proto.ProtoControllerAxis"> & { /** - * Slot number (0-3) + * Session specific slot number (0-3) * - * @generated from field: int32 slot = 1; + * @generated from field: int32 session_slot = 1; */ - slot: number; + sessionSlot: number; + + /** + * Session ID of the client + * + * @generated from field: string session_id = 2; + */ + sessionId: string; /** * Axis number (0 for d-pad horizontal, 1 for d-pad vertical) * - * @generated from field: int32 axis = 2; + * @generated from field: int32 axis = 3; */ axis: number; /** * axis value (-1 to 1) * - * @generated from field: int32 value = 3; + * @generated from field: int32 value = 4; */ value: number; }; @@ -370,30 +405,37 @@ export const ProtoControllerAxisSchema: GenMessage = /*@__P */ export type ProtoControllerRumble = Message<"proto.ProtoControllerRumble"> & { /** - * Slot number (0-3) + * Session specific slot number (0-3) * - * @generated from field: int32 slot = 1; + * @generated from field: int32 session_slot = 1; */ - slot: number; + sessionSlot: number; + + /** + * Session ID of the client + * + * @generated from field: string session_id = 2; + */ + sessionId: string; /** * Low frequency rumble (0-65535) * - * @generated from field: int32 low_frequency = 2; + * @generated from field: int32 low_frequency = 3; */ lowFrequency: number; /** * High frequency rumble (0-65535) * - * @generated from field: int32 high_frequency = 3; + * @generated from field: int32 high_frequency = 4; */ highFrequency: number; /** * Duration in milliseconds * - * @generated from field: int32 duration = 4; + * @generated from field: int32 duration = 5; */ duration: number; }; diff --git a/packages/input/src/webrtc-stream.ts b/packages/input/src/webrtc-stream.ts index b91e5b3c..8b43627b 100644 --- a/packages/input/src/webrtc-stream.ts +++ b/packages/input/src/webrtc-stream.ts @@ -22,6 +22,7 @@ import { P2PMessageStream } from "./streamwrapper"; const NESTRI_PROTOCOL_STREAM_REQUEST = "/nestri-relay/stream-request/1.0.0"; export class WebRTCStream { + private _sessionId: string | null = null; private _p2p: Libp2p | undefined = undefined; private _p2pConn: Connection | undefined = undefined; private _msgStream: P2PMessageStream | undefined = undefined; @@ -128,9 +129,9 @@ export class WebRTCStream { }); this._msgStream.on("session-assigned", (data: ProtoClientRequestRoomStream) => { - const sessionId = data.sessionId; - localStorage.setItem("nestri-session-id", sessionId); - console.log("Session ID assigned:", sessionId, "for room:", data.roomName); + this._sessionId = data.sessionId; + localStorage.setItem("nestri-session-id", this._sessionId); + console.log("Session ID assigned:", this._sessionId, "for room:", data.roomName); }); this._msgStream.on("offer", async (data: ProtoSDP) => { @@ -162,7 +163,7 @@ export class WebRTCStream { this._onConnected?.(null); }); - const clientId = localStorage.getItem("nestri-session-id"); + const clientId = this.getSessionID(); if (clientId) { console.debug("Using existing session ID:", clientId); } @@ -180,8 +181,10 @@ export class WebRTCStream { } } - public getSessionID(): string { - return localStorage.getItem("nestri-session-id") || ""; + public getSessionID(): string | null { + if (this._sessionId === null) + this._sessionId = localStorage.getItem("nestri-session-id"); + return this._sessionId; } // Forces opus to stereo in Chromium browsers, because of course @@ -298,7 +301,7 @@ export class WebRTCStream { // @ts-ignore receiver.jitterBufferTarget = receiver.jitterBufferDelayHint = receiver.playoutDelayHint = 0; } - }, 15); + }, 50); }); } } diff --git a/packages/play-standalone/src/pages/[room].astro b/packages/play-standalone/src/pages/[room].astro index 22e5305b..5556cab3 100644 --- a/packages/play-standalone/src/pages/[room].astro +++ b/packages/play-standalone/src/pages/[room].astro @@ -90,11 +90,7 @@ if (envs_map.size > 0) { let nestriControllers: Controller[] = []; window.addEventListener("gamepadconnected", (e) => { - // Ignore gamepads with id including "nestri" console.log("Gamepad connected:", e.gamepad); - if (e.gamepad.id.toLowerCase().includes("nestri")) - return; - const controller = new Controller({ webrtc: stream, e: e, @@ -106,7 +102,7 @@ if (envs_map.size > 0) { if (e.gamepad.id.toLowerCase().includes("nestri")) return; - let disconnected = nestriControllers.find((c) => c.getLocalSlot() === e.gamepad.index); + let disconnected = nestriControllers.find((c) => c.getSlot() === e.gamepad.index); if (disconnected) { disconnected.dispose(); nestriControllers = nestriControllers.filter((c) => c !== disconnected); diff --git a/packages/relay/internal/common/common.go b/packages/relay/internal/common/common.go index a1c86b77..0a462d56 100644 --- a/packages/relay/internal/common/common.go +++ b/packages/relay/internal/common/common.go @@ -26,7 +26,7 @@ func InitWebRTCAPI() error { mediaEngine := &webrtc.MediaEngine{} // Register our extensions - if err := RegisterExtensions(mediaEngine); err != nil { + if err = RegisterExtensions(mediaEngine); err != nil { return fmt.Errorf("failed to register extensions: %w", err) } diff --git a/packages/relay/internal/core/protocol_stream.go b/packages/relay/internal/core/protocol_stream.go index bff77ccc..19f3934f 100644 --- a/packages/relay/internal/core/protocol_stream.go +++ b/packages/relay/internal/core/protocol_stream.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "log/slog" + "math" "relay/internal/common" "relay/internal/connections" "relay/internal/shared" @@ -176,7 +177,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) { // Create participant for this viewer participant, err := shared.NewParticipant( - "", // session ID will be set if this is a client session + "", stream.Conn().RemotePeer(), ) if err != nil { @@ -189,102 +190,37 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) { participant.SessionID = session.SessionID } + // Assign peer connection participant.PeerConnection = pc - // Create per-participant tracks - if room.VideoTrack != nil { - participant.VideoTrack, err = webrtc.NewTrackLocalStaticRTP( - room.VideoTrack.Codec(), - "video-"+participant.ID.String(), - "nestri-"+reqMsg.RoomName+"-video", + // Add audio/video tracks + { + localTrack, err := webrtc.NewTrackLocalStaticRTP( + room.AudioCodec, + "participant-"+participant.ID.String(), + "participant-"+participant.ID.String()+"-audio", ) if err != nil { - slog.Error("Failed to create participant video track", "room", reqMsg.RoomName, "err", err) - continue + slog.Error("Failed to create track for stream request", "err", err) + return } - - rtpSender, err := pc.AddTrack(participant.VideoTrack) - if err != nil { - slog.Error("Failed to add participant video track", "room", reqMsg.RoomName, "err", err) - continue - } - - slog.Info("Added video track for participant", - "room", reqMsg.RoomName, - "participant", participant.ID, - "sender_id", fmt.Sprintf("%p", rtpSender)) - - // Relay packets from channel to track (VIDEO) - go func() { - for pkt := range participant.VideoChan { - // Use a context with timeout - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - - done := make(chan error, 1) - go func() { - done <- participant.VideoTrack.WriteRTP(pkt) - }() - - select { - case err := <-done: - cancel() - if err != nil { - if !errors.Is(err, io.ErrClosedPipe) { - slog.Debug("Failed to write video", "room", reqMsg.RoomName, "err", err) - } - return - } - case <-ctx.Done(): - cancel() - slog.Error("WriteRTP BLOCKED for >100ms!", - "participant", participant.ID, - "room", reqMsg.RoomName) - // Don't return, continue processing - } - } - }() + participant.SetTrack(webrtc.RTPCodecTypeAudio, localTrack) + slog.Debug("Set audio track for requested stream", "room", room.Name) } - if room.AudioTrack != nil { - participant.AudioTrack, err = webrtc.NewTrackLocalStaticRTP( - room.AudioTrack.Codec(), - "audio-"+participant.ID.String(), - "nestri-"+reqMsg.RoomName+"-audio", + { + localTrack, err := webrtc.NewTrackLocalStaticRTP( + room.VideoCodec, + "participant-"+participant.ID.String(), + "participant-"+participant.ID.String()+"-video", ) if err != nil { - slog.Error("Failed to create participant audio track", "room", reqMsg.RoomName, "err", err) - continue + slog.Error("Failed to create track for stream request", "err", err) + return } - - _, err := pc.AddTrack(participant.AudioTrack) - if err != nil { - slog.Error("Failed to add participant audio track", "room", reqMsg.RoomName, "err", err) - continue - } - - // Relay packets from channel to track (AUDIO) - go func() { - for pkt := range participant.AudioChan { - start := time.Now() - if err := participant.AudioTrack.WriteRTP(pkt); err != nil { - if !errors.Is(err, io.ErrClosedPipe) { - slog.Debug("Failed to write audio to participant", "room", reqMsg.RoomName, "err", err) - } - return - } - duration := time.Since(start) - if duration > 50*time.Millisecond { - slog.Warn("Slow audio WriteRTP detected", - "duration", duration, - "participant", participant.ID, - "room", reqMsg.RoomName) - } - } - }() + participant.SetTrack(webrtc.RTPCodecTypeVideo, localTrack) + slog.Debug("Set video track for requested stream", "room", room.Name) } - // Add participant to room - room.AddParticipant(participant) - // Cleanup on disconnect cleanupParticipantID := participant.ID pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { @@ -294,6 +230,9 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) { slog.Info("Participant disconnected from room", "room", reqMsg.RoomName, "participant", cleanupParticipantID) room.RemoveParticipantByID(cleanupParticipantID) participant.Close() + } else if state == webrtc.PeerConnectionStateConnected { + // Add participant to room when connection is established + room.AddParticipant(participant) } }) @@ -334,33 +273,33 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) { peerID := stream.Conn().RemotePeer() // Check if it's a controller attach with assigned slot - if attach := msgWrapper.GetControllerAttach(); attach != nil && attach.Slot >= 0 { + if attach := msgWrapper.GetControllerAttach(); attach != nil && attach.SessionSlot >= 0 { if session, ok := sp.relay.ClientSessions.Get(peerID); ok { // Check if slot already tracked hasSlot := false for _, slot := range session.ControllerSlots { - if slot == attach.Slot { + if slot == attach.SessionSlot { hasSlot = true break } } if !hasSlot { - session.ControllerSlots = append(session.ControllerSlots, attach.Slot) + session.ControllerSlots = append(session.ControllerSlots, attach.SessionSlot) session.LastActivity = time.Now() slog.Info("Controller slot assigned to client session", "session", session.SessionID, - "slot", attach.Slot, + "slot", attach.SessionSlot, "total_slots", len(session.ControllerSlots)) } } } // Check if it's a controller detach - if detach := msgWrapper.GetControllerDetach(); detach != nil && detach.Slot >= 0 { + if detach := msgWrapper.GetControllerDetach(); detach != nil && detach.SessionSlot >= 0 { if session, ok := sp.relay.ClientSessions.Get(peerID); ok { newSlots := make([]int32, 0, len(session.ControllerSlots)) for _, slot := range session.ControllerSlots { - if slot != detach.Slot { + if slot != detach.SessionSlot { newSlots = append(newSlots, slot) } } @@ -368,7 +307,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) { session.LastActivity = time.Now() slog.Info("Controller slot removed from client session", "session", session.SessionID, - "slot", detach.Slot, + "slot", detach.SessionSlot, "remaining_slots", len(session.ControllerSlots)) } } @@ -537,19 +476,25 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) { if err != nil { if errors.Is(err, io.EOF) || errors.Is(err, network.ErrReset) { slog.Debug("Stream push connection closed by peer", "peer", stream.Conn().RemotePeer(), "error", err) + if room != nil { + room.Close() + sp.incomingConns.Set(room.Name, nil) + } return } slog.Error("Failed to receive data for stream push", "err", err) _ = stream.Reset() - + if room != nil { + room.Close() + sp.incomingConns.Set(room.Name, nil) + } return } if msgWrapper.MessageBase == nil { slog.Error("No MessageBase in stream push") - _ = stream.Reset() - return + continue } switch msgWrapper.MessageBase.PayloadType { @@ -606,7 +551,7 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) { slog.Error("Failed to add ICE candidate for pushed stream", "err", err) } for _, heldIce := range iceHolder { - if err := conn.pc.AddICECandidate(heldIce); err != nil { + if err = conn.pc.AddICECandidate(heldIce); err != nil { slog.Error("Failed to add held ICE candidate for pushed stream", "err", err) } } @@ -645,6 +590,9 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) { continue } + // Assign room peer connection + room.PeerConnection = pc + pc.OnDataChannel(func(dc *webrtc.DataChannel) { // TODO: Is this the best way to handle DataChannel? Should we just use the map directly? room.DataChannel = connections.NewNestriDataChannel(dc) @@ -708,17 +656,6 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) { }) pc.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { - localTrack, err := webrtc.NewTrackLocalStaticRTP(remoteTrack.Codec().RTPCodecCapability, remoteTrack.Kind().String(), fmt.Sprintf("nestri-%s-%s", room.Name, remoteTrack.Kind().String())) - if err != nil { - slog.Error("Failed to create local track for pushed stream", "room", room.Name, "track_kind", remoteTrack.Kind().String(), "err", err) - return - } - - slog.Debug("Received track for pushed stream", "room", room.Name, "track_kind", remoteTrack.Kind().String()) - - // Set track for Room - room.SetTrack(remoteTrack.Kind(), localTrack) - // Prepare PlayoutDelayExtension so we don't need to recreate it for each packet playoutExt := &rtp.PlayoutDelayExtension{ MinDelay: 0, @@ -730,6 +667,12 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) { return } + if remoteTrack.Kind() == webrtc.RTPCodecTypeAudio { + room.AudioCodec = remoteTrack.Codec().RTPCodecCapability + } else if remoteTrack.Kind() == webrtc.RTPCodecTypeVideo { + room.VideoCodec = remoteTrack.Codec().RTPCodecCapability + } + for { rtpPacket, _, err := remoteTrack.ReadRTP() if err != nil { @@ -741,19 +684,61 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) { // Use PlayoutDelayExtension for low latency, if set for this track kind if extID, ok := common.GetExtension(remoteTrack.Kind(), common.ExtensionPlayoutDelay); ok { - if err := rtpPacket.SetExtension(extID, playoutPayload); err != nil { + if err = rtpPacket.SetExtension(extID, playoutPayload); err != nil { slog.Error("Failed to set PlayoutDelayExtension for room", "room", room.Name, "err", err) continue } } - room.BroadcastPacket(remoteTrack.Kind(), rtpPacket) + // Calculate differences + var timeDiff int64 + var sequenceDiff int + + if remoteTrack.Kind() == webrtc.RTPCodecTypeVideo { + timeDiff = int64(rtpPacket.Timestamp) - int64(room.LastVideoTimestamp) + if !room.VideoTimestampSet { + timeDiff = 0 + room.VideoTimestampSet = true + } else if timeDiff < -(math.MaxUint32 / 10) { + timeDiff += math.MaxUint32 + 1 + } + + sequenceDiff = int(rtpPacket.SequenceNumber) - int(room.LastVideoSequenceNumber) + if !room.VideoSequenceSet { + sequenceDiff = 0 + room.VideoSequenceSet = true + } else if sequenceDiff < -(math.MaxUint16 / 10) { + sequenceDiff += math.MaxUint16 + 1 + } + + room.LastVideoTimestamp = rtpPacket.Timestamp + room.LastVideoSequenceNumber = rtpPacket.SequenceNumber + } else { // Audio + timeDiff = int64(rtpPacket.Timestamp) - int64(room.LastAudioTimestamp) + if !room.AudioTimestampSet { + timeDiff = 0 + room.AudioTimestampSet = true + } else if timeDiff < -(math.MaxUint32 / 10) { + timeDiff += math.MaxUint32 + 1 + } + + sequenceDiff = int(rtpPacket.SequenceNumber) - int(room.LastAudioSequenceNumber) + if !room.AudioSequenceSet { + sequenceDiff = 0 + room.AudioSequenceSet = true + } else if sequenceDiff < -(math.MaxUint16 / 10) { + sequenceDiff += math.MaxUint16 + 1 + } + + room.LastAudioTimestamp = rtpPacket.Timestamp + room.LastAudioSequenceNumber = rtpPacket.SequenceNumber + } + + // Broadcast with differences + room.BroadcastPacketRetimed(remoteTrack.Kind(), rtpPacket, timeDiff, sequenceDiff) } slog.Debug("Track closed for room", "room", room.Name, "track_kind", remoteTrack.Kind().String()) - - // Cleanup the track from the room - room.SetTrack(remoteTrack.Kind(), nil) }) // Set the remote description diff --git a/packages/relay/internal/core/room.go b/packages/relay/internal/core/room.go index 78e5e5c4..c4a5b2a9 100644 --- a/packages/relay/internal/core/room.go +++ b/packages/relay/internal/core/room.go @@ -45,7 +45,7 @@ func (r *Relay) DeleteRoomIfEmpty(room *shared.Room) { if room == nil { return } - if room.Participants.Len() == 0 && r.LocalRooms.Has(room.ID) { + if len(room.Participants) <= 0 && r.LocalRooms.Has(room.ID) { slog.Debug("Deleting empty room without participants", "room", room.Name) r.LocalRooms.Delete(room.ID) err := room.PeerConnection.Close() diff --git a/packages/relay/internal/core/state.go b/packages/relay/internal/core/state.go index d4d088c2..9323eadc 100644 --- a/packages/relay/internal/core/state.go +++ b/packages/relay/internal/core/state.go @@ -195,18 +195,18 @@ func (r *Relay) updateMeshRoomStates(peerID peer.ID, states []shared.RoomInfo) { } // If previously did not exist, but does now, request a connection if participants exist for our room - existed := r.Rooms.Has(state.ID.String()) + /*existed := r.Rooms.Has(state.ID.String()) if !existed { // Request connection to this peer if we have participants in our local room if room, ok := r.LocalRooms.Get(state.ID); ok { - if room.Participants.Len() > 0 { + if len(room.Participants) > 0 { slog.Debug("Got new remote room state, we locally have participants for, requesting stream", "room_name", room.Name, "peer", peerID) if err := r.StreamProtocol.RequestStream(context.Background(), room, peerID); err != nil { slog.Error("Failed to request stream for new remote room state", "room_name", room.Name, "peer", peerID, "err", err) } } } - } + }*/ r.Rooms.Set(state.ID.String(), state) } diff --git a/packages/relay/internal/proto/types.pb.go b/packages/relay/internal/proto/types.pb.go index dae068ac..e71fa071 100644 --- a/packages/relay/internal/proto/types.pb.go +++ b/packages/relay/internal/proto/types.pb.go @@ -363,9 +363,9 @@ func (x *ProtoKeyUp) GetKey() int32 { // ControllerAttach message type ProtoControllerAttach struct { state protoimpl.MessageState `protogen:"open.v1"` - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // One of the following enums: "ps", "xbox" or "switch" - Slot int32 `protobuf:"varint,2,opt,name=slot,proto3" json:"slot,omitempty"` // Slot number (0-3) - SessionId string `protobuf:"bytes,3,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` // Session ID of the client attaching the controller + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // One of the following enums: "ps", "xbox" or "switch" + SessionSlot int32 `protobuf:"varint,2,opt,name=session_slot,json=sessionSlot,proto3" json:"session_slot,omitempty"` // Session specific slot number (0-3) + SessionId string `protobuf:"bytes,3,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` // Session ID of the client unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -407,9 +407,9 @@ func (x *ProtoControllerAttach) GetId() string { return "" } -func (x *ProtoControllerAttach) GetSlot() int32 { +func (x *ProtoControllerAttach) GetSessionSlot() int32 { if x != nil { - return x.Slot + return x.SessionSlot } return 0 } @@ -424,7 +424,8 @@ func (x *ProtoControllerAttach) GetSessionId() string { // ControllerDetach message type ProtoControllerDetach struct { state protoimpl.MessageState `protogen:"open.v1"` - Slot int32 `protobuf:"varint,1,opt,name=slot,proto3" json:"slot,omitempty"` // Slot number (0-3) + SessionSlot int32 `protobuf:"varint,1,opt,name=session_slot,json=sessionSlot,proto3" json:"session_slot,omitempty"` // Session specific slot number (0-3) + SessionId string `protobuf:"bytes,2,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` // Session ID of the client unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -459,19 +460,27 @@ func (*ProtoControllerDetach) Descriptor() ([]byte, []int) { return file_types_proto_rawDescGZIP(), []int{8} } -func (x *ProtoControllerDetach) GetSlot() int32 { +func (x *ProtoControllerDetach) GetSessionSlot() int32 { if x != nil { - return x.Slot + return x.SessionSlot } return 0 } +func (x *ProtoControllerDetach) GetSessionId() string { + if x != nil { + return x.SessionId + } + return "" +} + // ControllerButton message type ProtoControllerButton struct { state protoimpl.MessageState `protogen:"open.v1"` - Slot int32 `protobuf:"varint,1,opt,name=slot,proto3" json:"slot,omitempty"` // Slot number (0-3) - Button int32 `protobuf:"varint,2,opt,name=button,proto3" json:"button,omitempty"` // Button code (linux input event code) - Pressed bool `protobuf:"varint,3,opt,name=pressed,proto3" json:"pressed,omitempty"` // true if pressed, false if released + SessionSlot int32 `protobuf:"varint,1,opt,name=session_slot,json=sessionSlot,proto3" json:"session_slot,omitempty"` // Session specific slot number (0-3) + SessionId string `protobuf:"bytes,2,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` // Session ID of the client + Button int32 `protobuf:"varint,3,opt,name=button,proto3" json:"button,omitempty"` // Button code (linux input event code) + Pressed bool `protobuf:"varint,4,opt,name=pressed,proto3" json:"pressed,omitempty"` // true if pressed, false if released unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -506,13 +515,20 @@ func (*ProtoControllerButton) Descriptor() ([]byte, []int) { return file_types_proto_rawDescGZIP(), []int{9} } -func (x *ProtoControllerButton) GetSlot() int32 { +func (x *ProtoControllerButton) GetSessionSlot() int32 { if x != nil { - return x.Slot + return x.SessionSlot } return 0 } +func (x *ProtoControllerButton) GetSessionId() string { + if x != nil { + return x.SessionId + } + return "" +} + func (x *ProtoControllerButton) GetButton() int32 { if x != nil { return x.Button @@ -530,9 +546,10 @@ func (x *ProtoControllerButton) GetPressed() bool { // ControllerTriggers message type ProtoControllerTrigger struct { state protoimpl.MessageState `protogen:"open.v1"` - Slot int32 `protobuf:"varint,1,opt,name=slot,proto3" json:"slot,omitempty"` // Slot number (0-3) - Trigger int32 `protobuf:"varint,2,opt,name=trigger,proto3" json:"trigger,omitempty"` // Trigger number (0 for left, 1 for right) - Value int32 `protobuf:"varint,3,opt,name=value,proto3" json:"value,omitempty"` // trigger value (-32768 to 32767) + SessionSlot int32 `protobuf:"varint,1,opt,name=session_slot,json=sessionSlot,proto3" json:"session_slot,omitempty"` // Session specific slot number (0-3) + SessionId string `protobuf:"bytes,2,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` // Session ID of the client + Trigger int32 `protobuf:"varint,3,opt,name=trigger,proto3" json:"trigger,omitempty"` // Trigger number (0 for left, 1 for right) + Value int32 `protobuf:"varint,4,opt,name=value,proto3" json:"value,omitempty"` // trigger value (-32768 to 32767) unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -567,13 +584,20 @@ func (*ProtoControllerTrigger) Descriptor() ([]byte, []int) { return file_types_proto_rawDescGZIP(), []int{10} } -func (x *ProtoControllerTrigger) GetSlot() int32 { +func (x *ProtoControllerTrigger) GetSessionSlot() int32 { if x != nil { - return x.Slot + return x.SessionSlot } return 0 } +func (x *ProtoControllerTrigger) GetSessionId() string { + if x != nil { + return x.SessionId + } + return "" +} + func (x *ProtoControllerTrigger) GetTrigger() int32 { if x != nil { return x.Trigger @@ -591,10 +615,11 @@ func (x *ProtoControllerTrigger) GetValue() int32 { // ControllerSticks message type ProtoControllerStick struct { state protoimpl.MessageState `protogen:"open.v1"` - Slot int32 `protobuf:"varint,1,opt,name=slot,proto3" json:"slot,omitempty"` // Slot number (0-3) - Stick int32 `protobuf:"varint,2,opt,name=stick,proto3" json:"stick,omitempty"` // Stick number (0 for left, 1 for right) - X int32 `protobuf:"varint,3,opt,name=x,proto3" json:"x,omitempty"` // X axis value (-32768 to 32767) - Y int32 `protobuf:"varint,4,opt,name=y,proto3" json:"y,omitempty"` // Y axis value (-32768 to 32767) + SessionSlot int32 `protobuf:"varint,1,opt,name=session_slot,json=sessionSlot,proto3" json:"session_slot,omitempty"` // Session specific slot number (0-3) + SessionId string `protobuf:"bytes,2,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` // Session ID of the client + Stick int32 `protobuf:"varint,3,opt,name=stick,proto3" json:"stick,omitempty"` // Stick number (0 for left, 1 for right) + X int32 `protobuf:"varint,4,opt,name=x,proto3" json:"x,omitempty"` // X axis value (-32768 to 32767) + Y int32 `protobuf:"varint,5,opt,name=y,proto3" json:"y,omitempty"` // Y axis value (-32768 to 32767) unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -629,13 +654,20 @@ func (*ProtoControllerStick) Descriptor() ([]byte, []int) { return file_types_proto_rawDescGZIP(), []int{11} } -func (x *ProtoControllerStick) GetSlot() int32 { +func (x *ProtoControllerStick) GetSessionSlot() int32 { if x != nil { - return x.Slot + return x.SessionSlot } return 0 } +func (x *ProtoControllerStick) GetSessionId() string { + if x != nil { + return x.SessionId + } + return "" +} + func (x *ProtoControllerStick) GetStick() int32 { if x != nil { return x.Stick @@ -660,9 +692,10 @@ func (x *ProtoControllerStick) GetY() int32 { // ControllerAxis message type ProtoControllerAxis struct { state protoimpl.MessageState `protogen:"open.v1"` - Slot int32 `protobuf:"varint,1,opt,name=slot,proto3" json:"slot,omitempty"` // Slot number (0-3) - Axis int32 `protobuf:"varint,2,opt,name=axis,proto3" json:"axis,omitempty"` // Axis number (0 for d-pad horizontal, 1 for d-pad vertical) - Value int32 `protobuf:"varint,3,opt,name=value,proto3" json:"value,omitempty"` // axis value (-1 to 1) + SessionSlot int32 `protobuf:"varint,1,opt,name=session_slot,json=sessionSlot,proto3" json:"session_slot,omitempty"` // Session specific slot number (0-3) + SessionId string `protobuf:"bytes,2,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` // Session ID of the client + Axis int32 `protobuf:"varint,3,opt,name=axis,proto3" json:"axis,omitempty"` // Axis number (0 for d-pad horizontal, 1 for d-pad vertical) + Value int32 `protobuf:"varint,4,opt,name=value,proto3" json:"value,omitempty"` // axis value (-1 to 1) unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -697,13 +730,20 @@ func (*ProtoControllerAxis) Descriptor() ([]byte, []int) { return file_types_proto_rawDescGZIP(), []int{12} } -func (x *ProtoControllerAxis) GetSlot() int32 { +func (x *ProtoControllerAxis) GetSessionSlot() int32 { if x != nil { - return x.Slot + return x.SessionSlot } return 0 } +func (x *ProtoControllerAxis) GetSessionId() string { + if x != nil { + return x.SessionId + } + return "" +} + func (x *ProtoControllerAxis) GetAxis() int32 { if x != nil { return x.Axis @@ -721,10 +761,11 @@ func (x *ProtoControllerAxis) GetValue() int32 { // ControllerRumble message type ProtoControllerRumble struct { state protoimpl.MessageState `protogen:"open.v1"` - Slot int32 `protobuf:"varint,1,opt,name=slot,proto3" json:"slot,omitempty"` // Slot number (0-3) - LowFrequency int32 `protobuf:"varint,2,opt,name=low_frequency,json=lowFrequency,proto3" json:"low_frequency,omitempty"` // Low frequency rumble (0-65535) - HighFrequency int32 `protobuf:"varint,3,opt,name=high_frequency,json=highFrequency,proto3" json:"high_frequency,omitempty"` // High frequency rumble (0-65535) - Duration int32 `protobuf:"varint,4,opt,name=duration,proto3" json:"duration,omitempty"` // Duration in milliseconds + SessionSlot int32 `protobuf:"varint,1,opt,name=session_slot,json=sessionSlot,proto3" json:"session_slot,omitempty"` // Session specific slot number (0-3) + SessionId string `protobuf:"bytes,2,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` // Session ID of the client + LowFrequency int32 `protobuf:"varint,3,opt,name=low_frequency,json=lowFrequency,proto3" json:"low_frequency,omitempty"` // Low frequency rumble (0-65535) + HighFrequency int32 `protobuf:"varint,4,opt,name=high_frequency,json=highFrequency,proto3" json:"high_frequency,omitempty"` // High frequency rumble (0-65535) + Duration int32 `protobuf:"varint,5,opt,name=duration,proto3" json:"duration,omitempty"` // Duration in milliseconds unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -759,13 +800,20 @@ func (*ProtoControllerRumble) Descriptor() ([]byte, []int) { return file_types_proto_rawDescGZIP(), []int{13} } -func (x *ProtoControllerRumble) GetSlot() int32 { +func (x *ProtoControllerRumble) GetSessionSlot() int32 { if x != nil { - return x.Slot + return x.SessionSlot } return 0 } +func (x *ProtoControllerRumble) GetSessionId() string { + if x != nil { + return x.SessionId + } + return "" +} + func (x *ProtoControllerRumble) GetLowFrequency() int32 { if x != nil { return x.LowFrequency @@ -1215,36 +1263,48 @@ const file_types_proto_rawDesc = "" + "\x03key\x18\x01 \x01(\x05R\x03key\"\x1e\n" + "\n" + "ProtoKeyUp\x12\x10\n" + - "\x03key\x18\x01 \x01(\x05R\x03key\"Z\n" + + "\x03key\x18\x01 \x01(\x05R\x03key\"i\n" + "\x15ProtoControllerAttach\x12\x0e\n" + - "\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n" + - "\x04slot\x18\x02 \x01(\x05R\x04slot\x12\x1d\n" + + "\x02id\x18\x01 \x01(\tR\x02id\x12!\n" + + "\fsession_slot\x18\x02 \x01(\x05R\vsessionSlot\x12\x1d\n" + "\n" + - "session_id\x18\x03 \x01(\tR\tsessionId\"+\n" + - "\x15ProtoControllerDetach\x12\x12\n" + - "\x04slot\x18\x01 \x01(\x05R\x04slot\"]\n" + - "\x15ProtoControllerButton\x12\x12\n" + - "\x04slot\x18\x01 \x01(\x05R\x04slot\x12\x16\n" + - "\x06button\x18\x02 \x01(\x05R\x06button\x12\x18\n" + - "\apressed\x18\x03 \x01(\bR\apressed\"\\\n" + - "\x16ProtoControllerTrigger\x12\x12\n" + - "\x04slot\x18\x01 \x01(\x05R\x04slot\x12\x18\n" + - "\atrigger\x18\x02 \x01(\x05R\atrigger\x12\x14\n" + - "\x05value\x18\x03 \x01(\x05R\x05value\"\\\n" + - "\x14ProtoControllerStick\x12\x12\n" + - "\x04slot\x18\x01 \x01(\x05R\x04slot\x12\x14\n" + - "\x05stick\x18\x02 \x01(\x05R\x05stick\x12\f\n" + - "\x01x\x18\x03 \x01(\x05R\x01x\x12\f\n" + - "\x01y\x18\x04 \x01(\x05R\x01y\"S\n" + - "\x13ProtoControllerAxis\x12\x12\n" + - "\x04slot\x18\x01 \x01(\x05R\x04slot\x12\x12\n" + - "\x04axis\x18\x02 \x01(\x05R\x04axis\x12\x14\n" + - "\x05value\x18\x03 \x01(\x05R\x05value\"\x93\x01\n" + - "\x15ProtoControllerRumble\x12\x12\n" + - "\x04slot\x18\x01 \x01(\x05R\x04slot\x12#\n" + - "\rlow_frequency\x18\x02 \x01(\x05R\flowFrequency\x12%\n" + - "\x0ehigh_frequency\x18\x03 \x01(\x05R\rhighFrequency\x12\x1a\n" + - "\bduration\x18\x04 \x01(\x05R\bduration\"\xde\x01\n" + + "session_id\x18\x03 \x01(\tR\tsessionId\"Y\n" + + "\x15ProtoControllerDetach\x12!\n" + + "\fsession_slot\x18\x01 \x01(\x05R\vsessionSlot\x12\x1d\n" + + "\n" + + "session_id\x18\x02 \x01(\tR\tsessionId\"\x8b\x01\n" + + "\x15ProtoControllerButton\x12!\n" + + "\fsession_slot\x18\x01 \x01(\x05R\vsessionSlot\x12\x1d\n" + + "\n" + + "session_id\x18\x02 \x01(\tR\tsessionId\x12\x16\n" + + "\x06button\x18\x03 \x01(\x05R\x06button\x12\x18\n" + + "\apressed\x18\x04 \x01(\bR\apressed\"\x8a\x01\n" + + "\x16ProtoControllerTrigger\x12!\n" + + "\fsession_slot\x18\x01 \x01(\x05R\vsessionSlot\x12\x1d\n" + + "\n" + + "session_id\x18\x02 \x01(\tR\tsessionId\x12\x18\n" + + "\atrigger\x18\x03 \x01(\x05R\atrigger\x12\x14\n" + + "\x05value\x18\x04 \x01(\x05R\x05value\"\x8a\x01\n" + + "\x14ProtoControllerStick\x12!\n" + + "\fsession_slot\x18\x01 \x01(\x05R\vsessionSlot\x12\x1d\n" + + "\n" + + "session_id\x18\x02 \x01(\tR\tsessionId\x12\x14\n" + + "\x05stick\x18\x03 \x01(\x05R\x05stick\x12\f\n" + + "\x01x\x18\x04 \x01(\x05R\x01x\x12\f\n" + + "\x01y\x18\x05 \x01(\x05R\x01y\"\x81\x01\n" + + "\x13ProtoControllerAxis\x12!\n" + + "\fsession_slot\x18\x01 \x01(\x05R\vsessionSlot\x12\x1d\n" + + "\n" + + "session_id\x18\x02 \x01(\tR\tsessionId\x12\x12\n" + + "\x04axis\x18\x03 \x01(\x05R\x04axis\x12\x14\n" + + "\x05value\x18\x04 \x01(\x05R\x05value\"\xc1\x01\n" + + "\x15ProtoControllerRumble\x12!\n" + + "\fsession_slot\x18\x01 \x01(\x05R\vsessionSlot\x12\x1d\n" + + "\n" + + "session_id\x18\x02 \x01(\tR\tsessionId\x12#\n" + + "\rlow_frequency\x18\x03 \x01(\x05R\flowFrequency\x12%\n" + + "\x0ehigh_frequency\x18\x04 \x01(\x05R\rhighFrequency\x12\x1a\n" + + "\bduration\x18\x05 \x01(\x05R\bduration\"\xde\x01\n" + "\x13RTCIceCandidateInit\x12\x1c\n" + "\tcandidate\x18\x01 \x01(\tR\tcandidate\x12)\n" + "\rsdpMLineIndex\x18\x02 \x01(\rH\x00R\rsdpMLineIndex\x88\x01\x01\x12\x1b\n" + diff --git a/packages/relay/internal/shared/participant.go b/packages/relay/internal/shared/participant.go index 6916be95..74b87d60 100644 --- a/packages/relay/internal/shared/participant.go +++ b/packages/relay/internal/shared/participant.go @@ -1,14 +1,16 @@ package shared import ( + "errors" "fmt" + "io" "log/slog" "relay/internal/common" "relay/internal/connections" + "sync" "github.com/libp2p/go-libp2p/core/peer" "github.com/oklog/ulid/v2" - "github.com/pion/rtp" "github.com/pion/webrtc/v4" ) @@ -22,8 +24,15 @@ type Participant struct { // Per-viewer tracks and channels VideoTrack *webrtc.TrackLocalStaticRTP AudioTrack *webrtc.TrackLocalStaticRTP - VideoChan chan *rtp.Packet - AudioChan chan *rtp.Packet + + // Per-viewer RTP state for retiming + VideoSequenceNumber uint16 + VideoTimestamp uint32 + AudioSequenceNumber uint16 + AudioTimestamp uint32 + + packetQueue chan *participantPacket + closeOnce sync.Once } func NewParticipant(sessionID string, peerID peer.ID) (*Participant, error) { @@ -31,24 +40,50 @@ func NewParticipant(sessionID string, peerID peer.ID) (*Participant, error) { if err != nil { return nil, fmt.Errorf("failed to create ULID for Participant: %w", err) } - return &Participant{ - ID: id, - SessionID: sessionID, - PeerID: peerID, - VideoChan: make(chan *rtp.Packet, 500), - AudioChan: make(chan *rtp.Packet, 100), - }, nil + p := &Participant{ + ID: id, + SessionID: sessionID, + PeerID: peerID, + VideoSequenceNumber: 0, + VideoTimestamp: 0, + AudioSequenceNumber: 0, + AudioTimestamp: 0, + packetQueue: make(chan *participantPacket, 1000), + } + + go p.packetWriter() + + return p, nil +} + +// SetTrack sets audio/video track for Participant +func (p *Participant) SetTrack(trackType webrtc.RTPCodecType, track *webrtc.TrackLocalStaticRTP) { + switch trackType { + case webrtc.RTPCodecTypeAudio: + p.AudioTrack = track + _, err := p.PeerConnection.AddTrack(track) + if err != nil { + slog.Error("Failed to add Participant audio track", err) + } + case webrtc.RTPCodecTypeVideo: + p.VideoTrack = track + _, err := p.PeerConnection.AddTrack(track) + if err != nil { + slog.Error("Failed to add Participant video track", err) + } + default: + slog.Warn("Unknown track type", "participant", p.ID, "trackType", trackType) + } } // Close cleans up participant resources func (p *Participant) Close() { - if p.VideoChan != nil { - close(p.VideoChan) - p.VideoChan = nil - } - if p.AudioChan != nil { - close(p.AudioChan) - p.AudioChan = nil + if p.DataChannel != nil { + err := p.DataChannel.Close() + if err != nil { + slog.Error("Failed to close Participant DataChannel", err) + } + p.DataChannel = nil } if p.PeerConnection != nil { err := p.PeerConnection.Close() @@ -57,4 +92,45 @@ func (p *Participant) Close() { } p.PeerConnection = nil } + if p.VideoTrack != nil { + p.VideoTrack = nil + } + if p.AudioTrack != nil { + p.AudioTrack = nil + } +} + +func (p *Participant) packetWriter() { + for pkt := range p.packetQueue { + var track *webrtc.TrackLocalStaticRTP + var sequenceNumber uint16 + var timestamp uint32 + + // No mutex needed - only this goroutine modifies these + if pkt.kind == webrtc.RTPCodecTypeAudio { + track = p.AudioTrack + p.AudioSequenceNumber = uint16(int(p.AudioSequenceNumber) + pkt.sequenceDiff) + p.AudioTimestamp = uint32(int64(p.AudioTimestamp) + pkt.timeDiff) + sequenceNumber = p.AudioSequenceNumber + timestamp = p.AudioTimestamp + } else { + track = p.VideoTrack + p.VideoSequenceNumber = uint16(int(p.VideoSequenceNumber) + pkt.sequenceDiff) + p.VideoTimestamp = uint32(int64(p.VideoTimestamp) + pkt.timeDiff) + sequenceNumber = p.VideoSequenceNumber + timestamp = p.VideoTimestamp + } + + if track != nil { + pkt.packet.SequenceNumber = sequenceNumber + pkt.packet.Timestamp = timestamp + + if err := track.WriteRTP(pkt.packet); err != nil && !errors.Is(err, io.ErrClosedPipe) { + slog.Error("WriteRTP failed", "participant", p.ID, "kind", pkt.kind, "err", err) + } + } + + // Return packet struct to pool + participantPacketPool.Put(pkt) + } } diff --git a/packages/relay/internal/shared/room.go b/packages/relay/internal/shared/room.go index 5ef09361..516fdc92 100644 --- a/packages/relay/internal/shared/room.go +++ b/packages/relay/internal/shared/room.go @@ -2,9 +2,9 @@ package shared import ( "log/slog" - "relay/internal/common" "relay/internal/connections" - "time" + "sync" + "sync/atomic" "github.com/libp2p/go-libp2p/core/peer" "github.com/oklog/ulid/v2" @@ -12,6 +12,19 @@ import ( "github.com/pion/webrtc/v4" ) +var participantPacketPool = sync.Pool{ + New: func() interface{} { + return &participantPacket{} + }, +} + +type participantPacket struct { + kind webrtc.RTPCodecType + packet *rtp.Packet + timeDiff int64 + sequenceDiff int +} + type RoomInfo struct { ID ulid.ULID `json:"id"` Name string `json:"name"` @@ -20,16 +33,27 @@ type RoomInfo struct { type Room struct { RoomInfo + AudioCodec webrtc.RTPCodecCapability + VideoCodec webrtc.RTPCodecCapability PeerConnection *webrtc.PeerConnection - AudioTrack *webrtc.TrackLocalStaticRTP - VideoTrack *webrtc.TrackLocalStaticRTP DataChannel *connections.NestriDataChannel - Participants *common.SafeMap[ulid.ULID, *Participant] - // Broadcast queues (unbuffered, fan-out happens async) - videoBroadcastChan chan *rtp.Packet - audioBroadcastChan chan *rtp.Packet - broadcastStop chan struct{} + // Atomic pointer to slice of participant channels + participantChannels atomic.Pointer[[]chan<- *participantPacket] + participantsMtx sync.Mutex // Use only for add/remove + + Participants map[ulid.ULID]*Participant // Keep general track of Participant(s) + + // Track last seen values to calculate diffs + LastVideoTimestamp uint32 + LastVideoSequenceNumber uint16 + LastAudioTimestamp uint32 + LastAudioSequenceNumber uint16 + + VideoTimestampSet bool + VideoSequenceSet bool + AudioTimestampSet bool + AudioSequenceSet bool } func NewRoom(name string, roomID ulid.ULID, ownerID peer.ID) *Room { @@ -39,133 +63,109 @@ func NewRoom(name string, roomID ulid.ULID, ownerID peer.ID) *Room { Name: name, OwnerID: ownerID, }, - Participants: common.NewSafeMap[ulid.ULID, *Participant](), - videoBroadcastChan: make(chan *rtp.Packet, 1000), // Large buffer for incoming packets - audioBroadcastChan: make(chan *rtp.Packet, 500), - broadcastStop: make(chan struct{}), + PeerConnection: nil, + DataChannel: nil, + Participants: make(map[ulid.ULID]*Participant), } - // Start async broadcasters - go r.videoBroadcaster() - go r.audioBroadcaster() + emptyChannels := make([]chan<- *participantPacket, 0) + r.participantChannels.Store(&emptyChannels) return r } +// Close closes up Room (stream ended) +func (r *Room) Close() { + if r.DataChannel != nil { + err := r.DataChannel.Close() + if err != nil { + slog.Error("Failed to close Room DataChannel", err) + } + r.DataChannel = nil + } + if r.PeerConnection != nil { + err := r.PeerConnection.Close() + if err != nil { + slog.Error("Failed to close Room PeerConnection", err) + } + r.PeerConnection = nil + } +} + // AddParticipant adds a Participant to a Room func (r *Room) AddParticipant(participant *Participant) { - slog.Debug("Adding participant to room", "participant", participant.ID, "room", r.Name) - r.Participants.Set(participant.ID, participant) + r.participantsMtx.Lock() + defer r.participantsMtx.Unlock() + + r.Participants[participant.ID] = participant + + // Update channel slice atomically + current := r.participantChannels.Load() + newChannels := make([]chan<- *participantPacket, len(*current)+1) + copy(newChannels, *current) + newChannels[len(*current)] = participant.packetQueue + + r.participantChannels.Store(&newChannels) + + slog.Debug("Added participant", "participant", participant.ID, "room", r.Name) } // RemoveParticipantByID removes a Participant from a Room by participant's ID func (r *Room) RemoveParticipantByID(pID ulid.ULID) { - if _, ok := r.Participants.Get(pID); ok { - r.Participants.Delete(pID) + r.participantsMtx.Lock() + defer r.participantsMtx.Unlock() + + participant, ok := r.Participants[pID] + if !ok { + return } + + delete(r.Participants, pID) + + // Update channel slice + current := r.participantChannels.Load() + newChannels := make([]chan<- *participantPacket, 0, len(*current)-1) + for _, ch := range *current { + if ch != participant.packetQueue { + newChannels = append(newChannels, ch) + } + } + + r.participantChannels.Store(&newChannels) + + slog.Debug("Removed participant", "participant", pID, "room", r.Name) } -// IsOnline checks if the room is online (has both audio and video tracks) +// IsOnline checks if the room is online func (r *Room) IsOnline() bool { - return r.AudioTrack != nil && r.VideoTrack != nil + return r.PeerConnection != nil } -func (r *Room) SetTrack(trackType webrtc.RTPCodecType, track *webrtc.TrackLocalStaticRTP) { - switch trackType { - case webrtc.RTPCodecTypeAudio: - r.AudioTrack = track - case webrtc.RTPCodecTypeVideo: - r.VideoTrack = track - default: - slog.Warn("Unknown track type", "room", r.Name, "trackType", trackType) +func (r *Room) BroadcastPacketRetimed(kind webrtc.RTPCodecType, pkt *rtp.Packet, timeDiff int64, sequenceDiff int) { + // Lock-free load of channel slice + channels := r.participantChannels.Load() + + // no participants.. + if len(*channels) == 0 { + return } -} -// BroadcastPacket enqueues packet for async broadcast (non-blocking) -func (r *Room) BroadcastPacket(kind webrtc.RTPCodecType, pkt *rtp.Packet) { - start := time.Now() - if kind == webrtc.RTPCodecTypeVideo { + // Send to each participant channel (non-blocking) + for i, ch := range *channels { + // Get packet struct from pool + pp := participantPacketPool.Get().(*participantPacket) + pp.kind = kind + pp.packet = pkt.Clone() + pp.timeDiff = timeDiff + pp.sequenceDiff = sequenceDiff + select { - case r.videoBroadcastChan <- pkt: - duration := time.Since(start) - if duration > 10*time.Millisecond { - slog.Warn("Slow video broadcast enqueue", "duration", duration, "room", r.Name) - } + case ch <- pp: + // Sent successfully default: - // Broadcast queue full - system overload, drop packet globally - slog.Warn("Video broadcast queue full, dropping packet", "room", r.Name) - } - } else { - select { - case r.audioBroadcastChan <- pkt: - duration := time.Since(start) - if duration > 10*time.Millisecond { - slog.Warn("Slow audio broadcast enqueue", "duration", duration, "room", r.Name) - } - default: - slog.Warn("Audio broadcast queue full, dropping packet", "room", r.Name) - } - } -} - -// Close stops the broadcasters -func (r *Room) Close() { - close(r.broadcastStop) - close(r.videoBroadcastChan) - close(r.audioBroadcastChan) -} - -// videoBroadcaster runs async fan-out for video packets -func (r *Room) videoBroadcaster() { - for { - select { - case pkt := <-r.videoBroadcastChan: - // Fan out to all participants without blocking - r.Participants.Range(func(_ ulid.ULID, participant *Participant) bool { - if participant.VideoChan != nil { - // Clone packet for each participant to avoid shared pointer issues - clonedPkt := pkt.Clone() - select { - case participant.VideoChan <- clonedPkt: - // Sent - default: - // Participant slow, drop packet - slog.Debug("Dropped video packet for slow participant", - "room", r.Name, - "participant", participant.ID) - } - } - return true - }) - case <-r.broadcastStop: - return - } - } -} - -// audioBroadcaster runs async fan-out for audio packets -func (r *Room) audioBroadcaster() { - for { - select { - case pkt := <-r.audioBroadcastChan: - r.Participants.Range(func(_ ulid.ULID, participant *Participant) bool { - if participant.AudioChan != nil { - // Clone packet for each participant to avoid shared pointer issues - clonedPkt := pkt.Clone() - select { - case participant.AudioChan <- clonedPkt: - // Sent - default: - // Participant slow, drop packet - slog.Debug("Dropped audio packet for slow participant", - "room", r.Name, - "participant", participant.ID) - } - } - return true - }) - case <-r.broadcastStop: - return + // Channel full, drop packet, log? + slog.Warn("Channel full, dropping packet", "channel_index", i) + participantPacketPool.Put(pp) } } } diff --git a/packages/scripts/entrypoint.sh b/packages/scripts/entrypoint.sh index 8e753d42..3342035d 100644 --- a/packages/scripts/entrypoint.sh +++ b/packages/scripts/entrypoint.sh @@ -15,13 +15,13 @@ NVIDIA_INSTALLER_DIR="/tmp" TIMEOUT_SECONDS=10 ENTCMD_PREFIX="" -# Ensures user directory ownership -chown_user_directory() { +# Ensures user ownership across directories +handle_user_permissions() { if ! $ENTCMD_PREFIX chown "${NESTRI_USER}:${NESTRI_USER}" "${NESTRI_HOME}" 2>/dev/null; then echo "Error: Failed to change ownership of ${NESTRI_HOME} to ${NESTRI_USER}:${NESTRI_USER}" >&2 return 1 fi - # Also apply to .cache separately + # Also apply to .cache if [[ -d "${NESTRI_HOME}/.cache" ]]; then if ! $ENTCMD_PREFIX chown "${NESTRI_USER}:${NESTRI_USER}" "${NESTRI_HOME}/.cache" 2>/dev/null; then echo "Error: Failed to change ownership of ${NESTRI_HOME}/.cache to ${NESTRI_USER}:${NESTRI_USER}" >&2 @@ -324,9 +324,23 @@ main() { log "Skipping CAP_SYS_NICE for gamescope, capability not available" fi - # Handle user directory permissions - log "Ensuring user directory permissions..." - chown_user_directory || exit 1 + # Make sure /tmp/.X11-unix exists.. + if [[ ! -d "/tmp/.X11-unix" ]]; then + log "Creating /tmp/.X11-unix directory.." + $ENTCMD_PREFIX mkdir -p /tmp/.X11-unix || { + log "Error: Failed to create /tmp/.X11-unix directory" + exit 1 + } + # Set required perms.. + $ENTCMD_PREFIX chmod 1777 /tmp/.X11-unix || { + log "Error: Failed to chmod /tmp/.X11-unix to 1777" + exit 1 + } + fi + + # Handle user permissions + log "Ensuring user permissions..." + handle_user_permissions || exit 1 # Setup namespaceless env if needed for container runtime if [[ "$container_runtime" != "podman" ]]; then @@ -336,7 +350,7 @@ main() { # Make sure /run/udev/ directory exists with /run/udev/control, needed for virtual controller support if [[ ! -d "/run/udev" || ! -e "/run/udev/control" ]]; then - log "Creating /run/udev directory and control file..." + log "Creating /run/udev directory and control file.." $ENTCMD_PREFIX mkdir -p /run/udev || { log "Error: Failed to create /run/udev directory" exit 1 diff --git a/packages/scripts/entrypoint_nestri.sh b/packages/scripts/entrypoint_nestri.sh index fb5e127c..abbb95fd 100644 --- a/packages/scripts/entrypoint_nestri.sh +++ b/packages/scripts/entrypoint_nestri.sh @@ -187,7 +187,7 @@ start_compositor() { if [[ -n "${NESTRI_LAUNCH_CMD}" ]]; then log "Starting application: $NESTRI_LAUNCH_CMD" - WAYLAND_DISPLAY=wayland-0 /bin/bash -c "$NESTRI_LAUNCH_CMD" & + WAYLAND_DISPLAY="$COMPOSITOR_SOCKET" /bin/bash -c "$NESTRI_LAUNCH_CMD" & APP_PID=$! fi else diff --git a/packages/server/Cargo.lock b/packages/server/Cargo.lock index d8aec525..ef4f4763 100644 --- a/packages/server/Cargo.lock +++ b/packages/server/Cargo.lock @@ -181,7 +181,7 @@ checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", "synstructure", ] @@ -193,7 +193,7 @@ checksum = "3109e49b1e4909e9db6515a30c633684d68cdeaa252f215214cb4fa1a5bfee2c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", "synstructure", ] @@ -205,7 +205,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -246,7 +246,7 @@ checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -257,7 +257,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -395,7 +395,7 @@ version = "0.72.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "cexpr", "clang-sys", "itertools 0.13.0", @@ -406,7 +406,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -417,9 +417,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.9.4" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394" +checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" [[package]] name = "blake2" @@ -603,9 +603,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.49" +version = "4.5.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4512b90fa68d3a9932cea5184017c5d200f5921df706d45e853537dea51508f" +checksum = "0c2cfd7bf8a6017ddaa4e32ffe7403d547790db06bd171c1c53926faab501623" dependencies = [ "clap_builder", "clap_derive", @@ -613,9 +613,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.49" +version = "4.5.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0025e98baa12e766c67ba13ff4695a887a1eba19569aad00a472546795bd6730" +checksum = "0a4c05b9e80c5ccd3a7ef080ad7b6ba7d6fc00a985b8b157197075677c82c7a0" dependencies = [ "anstream", "anstyle", @@ -632,7 +632,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -839,7 +839,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -879,7 +879,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976" dependencies = [ "data-encoding", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -956,7 +956,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -1090,7 +1090,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -1262,7 +1262,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -1396,7 +1396,7 @@ version = "0.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1f2cbc4577536c849335878552f42086bfd25a8dcd6f54a18655cf818b20c8f" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "futures-channel", "futures-core", "futures-executor", @@ -1421,7 +1421,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -2299,9 +2299,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.11.4" +version = "2.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b0f83760fb341a774ed326568e19f5a863af4a952def8c39f9ab92fd95b88e5" +checksum = "6717a8d2a5a929a1a2eb43a12812498ed141a0bcfb7e8f7844fbdbe4303bba9f" dependencies = [ "equivalent", "hashbrown 0.16.0", @@ -2368,9 +2368,9 @@ dependencies = [ [[package]] name = "is_terminal_polyfill" -version = "1.70.1" +version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" [[package]] name = "itertools" @@ -2828,7 +2828,7 @@ checksum = "dd297cf53f0cb3dee4d2620bb319ae47ef27c702684309f682bdb7e55a18ae9c" dependencies = [ "heck", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -3239,7 +3239,7 @@ version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "cfg-if", "cfg_aliases", "libc", @@ -3355,9 +3355,9 @@ dependencies = [ [[package]] name = "once_cell_polyfill" -version = "1.70.1" +version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "opaque-debug" @@ -3499,7 +3499,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -3604,7 +3604,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -3627,9 +3627,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.101" +version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" +checksum = "8e0f6df8eaa422d97d72edcd152e1451618fed47fabbdbd5a8864167b1d4aff7" dependencies = [ "unicode-ident", ] @@ -3654,7 +3654,7 @@ checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -3677,7 +3677,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -3861,7 +3861,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", ] [[package]] @@ -4038,7 +4038,7 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys", @@ -4047,9 +4047,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.33" +version = "0.23.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "751e04a496ca00bb97a5e043158d23d66b5aabf2e1d5aa2a0aaebb1aafe6f82c" +checksum = "6a9586e9ee2b4f8fab52a0048ca7334d7024eef48e2cb9407e3497bb7cab7fa7" dependencies = [ "aws-lc-rs", "log", @@ -4180,7 +4180,7 @@ version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "core-foundation 0.10.1", "core-foundation-sys", "libc", @@ -4240,7 +4240,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -4481,9 +4481,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.106" +version = "2.0.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6" +checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917" dependencies = [ "proc-macro2", "quote", @@ -4507,7 +4507,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -4516,7 +4516,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "core-foundation 0.9.4", "system-configuration-sys", ] @@ -4574,7 +4574,7 @@ checksum = "451b374529930d7601b1eef8d32bc79ae870b6079b069401709c2a8bf9e75f36" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -4603,7 +4603,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -4614,7 +4614,7 @@ checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -4707,7 +4707,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -4817,7 +4817,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ - "bitflags 2.9.4", + "bitflags 2.10.0", "bytes", "futures-util", "http", @@ -4861,7 +4861,7 @@ checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -4986,9 +4986,9 @@ checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" [[package]] name = "unicode-ident" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d" +checksum = "462eeb75aeb73aea900253ce739c8e18a67423fadf006037cd3ff27e82748a06" [[package]] name = "universal-hash" @@ -5079,9 +5079,9 @@ checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] name = "vimputti" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a5839a89185ccec572f746ccc02e37702cc6c0b62a6aa0d9bcd6e5921edba12" +checksum = "ffb370ee43e3ee4ca5329886e64dc5b27c83dc8cced5a63c2418777dac9a41a8" dependencies = [ "anyhow", "libc", @@ -5177,7 +5177,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", "wasm-bindgen-shared", ] @@ -5212,7 +5212,7 @@ checksum = "9f07d2f20d4da7b26400c9f4a0511e6e0345b040694e8a75bd41d578fa4421d7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5502,7 +5502,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -5513,7 +5513,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -5959,7 +5959,7 @@ checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", "synstructure", ] @@ -5980,7 +5980,7 @@ checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -6000,7 +6000,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", "synstructure", ] @@ -6021,7 +6021,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] [[package]] @@ -6054,5 +6054,5 @@ checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.106", + "syn 2.0.108", ] diff --git a/packages/server/Cargo.toml b/packages/server/Cargo.toml index ce6d11c6..9da600f4 100644 --- a/packages/server/Cargo.toml +++ b/packages/server/Cargo.toml @@ -22,7 +22,7 @@ rand = "0.9" rustls = { version = "0.23", features = ["ring"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } -vimputti = "0.1.3" +vimputti = "0.1.4" chrono = "0.4" prost = "0.14" prost-types = "0.14" diff --git a/packages/server/src/args.rs b/packages/server/src/args.rs index bbb5c3fa..5432c96c 100644 --- a/packages/server/src/args.rs +++ b/packages/server/src/args.rs @@ -211,6 +211,14 @@ impl Args { .value_parser(value_parser!(u32).range(1..)) .default_value("192"), ) + .arg( + Arg::new("software-render") + .long("software-render") + .env("SOFTWARE_RENDER") + .help("Use software rendering for wayland") + .value_parser(BoolishValueParser::new()) + .default_value("false"), + ) .arg( Arg::new("zero-copy") .long("zero-copy") diff --git a/packages/server/src/args/app_args.rs b/packages/server/src/args/app_args.rs index 44fa0633..d313b8c9 100644 --- a/packages/server/src/args/app_args.rs +++ b/packages/server/src/args/app_args.rs @@ -15,6 +15,9 @@ pub struct AppArgs { /// vimputti socket path pub vimputti_path: Option, + /// Use software rendering for wayland display + pub software_render: bool, + /// Experimental zero-copy pipeline support /// TODO: Move to video encoding flags pub zero_copy: bool, @@ -51,6 +54,10 @@ impl AppArgs { vimputti_path: matches .get_one::("vimputti-path") .map(|s| s.clone()), + software_render: matches + .get_one::("software-render") + .unwrap_or(&false) + .clone(), zero_copy: matches .get_one::("zero-copy") .unwrap_or(&false) @@ -73,6 +80,7 @@ impl AppArgs { "> vimputti_path: '{}'", self.vimputti_path.as_ref().map_or("None", |s| s.as_str()) ); + tracing::info!("> software_render: {}", self.software_render); tracing::info!("> zero_copy: {}", self.zero_copy); } } diff --git a/packages/server/src/enc_helper.rs b/packages/server/src/enc_helper.rs index a97aca32..1888a265 100644 --- a/packages/server/src/enc_helper.rs +++ b/packages/server/src/enc_helper.rs @@ -585,7 +585,6 @@ pub fn get_best_working_encoder( encoders: &Vec, codec: &Codec, encoder_type: &EncoderType, - zero_copy: bool, ) -> Result> { let mut candidates = get_encoders_by_videocodec( encoders, @@ -601,7 +600,7 @@ pub fn get_best_working_encoder( while !candidates.is_empty() { let best = get_best_compatible_encoder(&candidates, codec, encoder_type)?; tracing::info!("Testing encoder: {}", best.name,); - if test_encoder(&best, zero_copy).is_ok() { + if test_encoder(&best).is_ok() { return Ok(best); } else { // Remove this encoder and try next best @@ -613,25 +612,10 @@ pub fn get_best_working_encoder( } /// Test if a pipeline with the given encoder can be created and set to Playing -pub fn test_encoder(encoder: &VideoEncoderInfo, zero_copy: bool) -> Result<(), Box> { - let src = gstreamer::ElementFactory::make("waylanddisplaysrc").build()?; - if let Some(gpu_info) = &encoder.gpu_info { - src.set_property_from_str("render-node", gpu_info.render_path()); - } +pub fn test_encoder(encoder: &VideoEncoderInfo) -> Result<(), Box> { + let src = gstreamer::ElementFactory::make("videotestsrc").build()?; let caps_filter = gstreamer::ElementFactory::make("capsfilter").build()?; - let caps = gstreamer::Caps::from_str(&format!( - "{},width=1280,height=720,framerate=30/1{}", - if zero_copy { - if encoder.encoder_api == EncoderAPI::NVENC { - "video/x-raw(memory:CUDAMemory)" - } else { - "video/x-raw(memory:DMABuf)" - } - } else { - "video/x-raw" - }, - if zero_copy { "" } else { ",format=RGBx" } - ))?; + let caps = gstreamer::Caps::from_str("video/x-raw,width=1280,height=720,framerate=30/1")?; caps_filter.set_property("caps", &caps); let enc = gstreamer::ElementFactory::make(&encoder.name).build()?; @@ -642,41 +626,9 @@ pub fn test_encoder(encoder: &VideoEncoderInfo, zero_copy: bool) -> Result<(), B // Create pipeline and link elements let pipeline = gstreamer::Pipeline::new(); - if zero_copy { - if encoder.encoder_api == EncoderAPI::NVENC { - // NVENC zero-copy path - pipeline.add_many(&[&src, &caps_filter, &enc, &sink])?; - gstreamer::Element::link_many(&[&src, &caps_filter, &enc, &sink])?; - } else { - // VA-API/QSV zero-copy path - let vapostproc = gstreamer::ElementFactory::make("vapostproc").build()?; - let va_caps_filter = gstreamer::ElementFactory::make("capsfilter").build()?; - let va_caps = gstreamer::Caps::from_str("video/x-raw(memory:VAMemory),format=NV12")?; - va_caps_filter.set_property("caps", &va_caps); - - pipeline.add_many(&[ - &src, - &caps_filter, - &vapostproc, - &va_caps_filter, - &enc, - &sink, - ])?; - gstreamer::Element::link_many(&[ - &src, - &caps_filter, - &vapostproc, - &va_caps_filter, - &enc, - &sink, - ])?; - } - } else { - // Non-zero-copy path for all encoders - needs videoconvert - let videoconvert = gstreamer::ElementFactory::make("videoconvert").build()?; - pipeline.add_many(&[&src, &caps_filter, &videoconvert, &enc, &sink])?; - gstreamer::Element::link_many(&[&src, &caps_filter, &videoconvert, &enc, &sink])?; - } + let videoconvert = gstreamer::ElementFactory::make("videoconvert").build()?; + pipeline.add_many(&[&src, &caps_filter, &videoconvert, &enc, &sink])?; + gstreamer::Element::link_many(&[&src, &caps_filter, &videoconvert, &enc, &sink])?; let bus = pipeline.bus().ok_or("Pipeline has no bus")?; pipeline.set_state(gstreamer::State::Playing)?; diff --git a/packages/server/src/input/controller.rs b/packages/server/src/input/controller.rs index 656c6d46..d34e8600 100644 --- a/packages/server/src/input/controller.rs +++ b/packages/server/src/input/controller.rs @@ -47,7 +47,7 @@ impl ControllerInput { pub struct ControllerManager { vimputti_client: Arc, cmd_tx: mpsc::Sender, - rumble_tx: mpsc::Sender<(u32, u16, u16, u16)>, // (slot, strong, weak, duration_ms) + rumble_tx: mpsc::Sender<(u32, u16, u16, u16, String)>, // (slot, strong, weak, duration_ms, session_id) attach_tx: mpsc::Sender, } impl ControllerManager { @@ -55,7 +55,7 @@ impl ControllerManager { vimputti_client: Arc, ) -> Result<( Self, - mpsc::Receiver<(u32, u16, u16, u16)>, + mpsc::Receiver<(u32, u16, u16, u16, String)>, mpsc::Receiver, )> { let (cmd_tx, cmd_rx) = mpsc::channel(512); @@ -88,12 +88,12 @@ impl ControllerManager { struct ControllerSlot { controller: ControllerInput, session_id: String, - last_activity: std::time::Instant, + session_slot: u32, } -// Returns first free controller slot from 0-7 +// Returns first free controller slot from 0-16 fn get_free_slot(controllers: &HashMap) -> Option { - for slot in 0..8 { + for slot in 0..17 { if !controllers.contains_key(&slot) { return Some(slot); } @@ -104,7 +104,7 @@ fn get_free_slot(controllers: &HashMap) -> Option { async fn command_loop( mut cmd_rx: mpsc::Receiver, vimputti_client: Arc, - rumble_tx: mpsc::Sender<(u32, u16, u16, u16)>, + rumble_tx: mpsc::Sender<(u32, u16, u16, u16, String)>, attach_tx: mpsc::Sender, ) { let mut controllers: HashMap = HashMap::new(); @@ -112,13 +112,15 @@ async fn command_loop( match payload { Payload::ControllerAttach(data) => { let session_id = data.session_id.clone(); + let session_slot = data.session_slot.clone(); // Check if this session already has a slot (reconnection) let existing_slot = controllers .iter() - .find(|(_, slot)| slot.session_id == session_id && !session_id.is_empty()) + .find(|(_, slot)| { + slot.session_id == session_id && slot.session_slot == session_slot as u32 + }) .map(|(slot_num, _)| *slot_num); - let slot = existing_slot.or_else(|| get_free_slot(&controllers)); if let Some(slot) = slot { @@ -131,7 +133,7 @@ async fn command_loop( controller .device_mut() .on_rumble(move |strong, weak, duration_ms| { - let _ = rumble_tx.try_send((slot, strong, weak, duration_ms)); + let _ = rumble_tx.try_send((slot, strong, weak, duration_ms, data.session_id.clone())); }) .await .map_err(|e| { @@ -146,7 +148,7 @@ async fn command_loop( // Return to attach_tx what slot was assigned let attach_info = ProtoControllerAttach { id: data.id.clone(), - slot: slot as i32, + session_slot: slot as i32, session_id: session_id.clone(), }; @@ -157,7 +159,7 @@ async fn command_loop( ControllerSlot { controller, session_id: session_id.clone(), - last_activity: std::time::Instant::now(), + session_slot: session_slot.clone() as u32, }, ); tracing::info!( @@ -185,25 +187,25 @@ async fn command_loop( } } Payload::ControllerDetach(data) => { - if controllers.remove(&(data.slot as u32)).is_some() { - tracing::info!("Controller detached from slot {}", data.slot); + if controllers.remove(&(data.session_slot as u32)).is_some() { + tracing::info!("Controller detached from slot {}", data.session_slot); } else { - tracing::warn!("No controller found in slot {} to detach", data.slot); + tracing::warn!("No controller found in slot {} to detach", data.session_slot); } } Payload::ControllerButton(data) => { - if let Some(controller) = controllers.get(&(data.slot as u32)) { + if let Some(controller) = controllers.get(&(data.session_slot as u32)) { if let Some(button) = vimputti::Button::from_ev_code(data.button as u16) { let device = controller.controller.device(); device.button(button, data.pressed); device.sync(); } } else { - tracing::warn!("Controller slot {} not found for button event", data.slot); + tracing::warn!("Controller slot {} not found for button event", data.session_slot); } } Payload::ControllerStick(data) => { - if let Some(controller) = controllers.get(&(data.slot as u32)) { + if let Some(controller) = controllers.get(&(data.session_slot as u32)) { let device = controller.controller.device(); if data.stick == 0 { // Left stick @@ -218,11 +220,11 @@ async fn command_loop( } device.sync(); } else { - tracing::warn!("Controller slot {} not found for stick event", data.slot); + tracing::warn!("Controller slot {} not found for stick event", data.session_slot); } } Payload::ControllerTrigger(data) => { - if let Some(controller) = controllers.get(&(data.slot as u32)) { + if let Some(controller) = controllers.get(&(data.session_slot as u32)) { let device = controller.controller.device(); if data.trigger == 0 { // Left trigger @@ -233,11 +235,11 @@ async fn command_loop( } device.sync(); } else { - tracing::warn!("Controller slot {} not found for trigger event", data.slot); + tracing::warn!("Controller slot {} not found for trigger event", data.session_slot); } } Payload::ControllerAxis(data) => { - if let Some(controller) = controllers.get(&(data.slot as u32)) { + if let Some(controller) = controllers.get(&(data.session_slot as u32)) { let device = controller.controller.device(); if data.axis == 0 { // dpad x diff --git a/packages/server/src/main.rs b/packages/server/src/main.rs index 3427b71c..e4a211c5 100644 --- a/packages/server/src/main.rs +++ b/packages/server/src/main.rs @@ -24,7 +24,7 @@ use tracing_subscriber::EnvFilter; use tracing_subscriber::filter::LevelFilter; // Handles gathering GPU information and selecting the most suitable GPU -fn handle_gpus(args: &args::Args) -> Result, Box> { +fn handle_gpus(args: &args::Args) -> Result, Box> { tracing::info!("Gathering GPU information.."); let mut gpus = gpu::get_gpus()?; if gpus.is_empty() { @@ -119,7 +119,6 @@ fn handle_encoder_video( &video_encoders, &args.encoding.video.codec, &args.encoding.video.encoder_type, - args.app.zero_copy, )?; } tracing::info!("Selected video encoder: '{}'", video_encoder.name); @@ -323,7 +322,9 @@ async fn main() -> Result<(), Box> { /* Video */ // Video Source Element let video_source = Arc::new(gstreamer::ElementFactory::make("waylanddisplaysrc").build()?); - if let Some(gpu_info) = &video_encoder_info.gpu_info { + if args.app.software_render { + video_source.set_property_from_str("render-node", "software"); + } else if let Some(gpu_info) = &video_encoder_info.gpu_info { video_source.set_property_from_str("render-node", gpu_info.render_path()); } @@ -428,20 +429,16 @@ async fn main() -> Result<(), Box> { webrtcsink.set_property("do-retransmission", false); /* Queues */ - let video_source_queue = gstreamer::ElementFactory::make("queue") - .property("max-size-buffers", 5u32) - .build()?; - - let audio_source_queue = gstreamer::ElementFactory::make("queue") - .property("max-size-buffers", 5u32) - .build()?; - let video_queue = gstreamer::ElementFactory::make("queue") - .property("max-size-buffers", 5u32) + .property("max-size-buffers", 2u32) + .property("max-size-time", 0u64) + .property("max-size-bytes", 0u32) .build()?; let audio_queue = gstreamer::ElementFactory::make("queue") - .property("max-size-buffers", 5u32) + .property("max-size-buffers", 2u32) + .property("max-size-time", 0u64) + .property("max-size-bytes", 0u32) .build()?; /* Clock Sync */ @@ -460,7 +457,6 @@ async fn main() -> Result<(), Box> { &caps_filter, &video_queue, &video_clocksync, - &video_source_queue, &video_source, &audio_encoder, &audio_capsfilter, @@ -468,7 +464,6 @@ async fn main() -> Result<(), Box> { &audio_clocksync, &audio_rate, &audio_converter, - &audio_source_queue, &audio_source, ])?; @@ -495,7 +490,6 @@ async fn main() -> Result<(), Box> { // Link main audio branch gstreamer::Element::link_many(&[ &audio_source, - &audio_source_queue, &audio_converter, &audio_rate, &audio_capsfilter, @@ -517,7 +511,6 @@ async fn main() -> Result<(), Box> { if let (Some(vapostproc), Some(va_caps_filter)) = (&vapostproc, &va_caps_filter) { gstreamer::Element::link_many(&[ &video_source, - &video_source_queue, &caps_filter, &video_queue, &video_clocksync, @@ -529,7 +522,6 @@ async fn main() -> Result<(), Box> { // NVENC pipeline gstreamer::Element::link_many(&[ &video_source, - &video_source_queue, &caps_filter, &video_encoder, ])?; @@ -537,7 +529,6 @@ async fn main() -> Result<(), Box> { } else { gstreamer::Element::link_many(&[ &video_source, - &video_source_queue, &caps_filter, &video_queue, &video_clocksync, diff --git a/packages/server/src/nestrisink/imp.rs b/packages/server/src/nestrisink/imp.rs index f0da654c..88937411 100644 --- a/packages/server/src/nestrisink/imp.rs +++ b/packages/server/src/nestrisink/imp.rs @@ -23,7 +23,7 @@ pub struct Signaller { wayland_src: PLRwLock>>, data_channel: PLRwLock>>, controller_manager: PLRwLock>>, - rumble_rx: Mutex>>, + rumble_rx: Mutex>>, attach_rx: Mutex>>, } impl Default for Signaller { @@ -70,11 +70,11 @@ impl Signaller { self.controller_manager.read().clone() } - pub async fn set_rumble_rx(&self, rumble_rx: mpsc::Receiver<(u32, u16, u16, u16)>) { + pub async fn set_rumble_rx(&self, rumble_rx: mpsc::Receiver<(u32, u16, u16, u16, String)>) { *self.rumble_rx.lock().await = Some(rumble_rx); } - pub async fn take_rumble_rx(&self) -> Option> { + pub async fn take_rumble_rx(&self) -> Option> { self.rumble_rx.lock().await.take() } @@ -382,7 +382,7 @@ impl ObjectImpl for Signaller { fn setup_data_channel( controller_manager: Option>, - rumble_rx: Option>, // (slot, strong, weak, duration_ms) + rumble_rx: Option>, // (slot, strong, weak, duration_ms, session_id) attach_rx: Option>, data_channel: Arc, wayland_src: &gstreamer::Element, @@ -423,10 +423,11 @@ fn setup_data_channel( if let Some(mut rumble_rx) = rumble_rx { let data_channel_clone = data_channel.clone(); tokio::spawn(async move { - while let Some((slot, strong, weak, duration_ms)) = rumble_rx.recv().await { + while let Some((slot, strong, weak, duration_ms, session_id)) = rumble_rx.recv().await { let rumble_msg = crate::proto::create_message( Payload::ControllerRumble(ProtoControllerRumble { - slot: slot as i32, + session_slot: slot as i32, + session_id: session_id, low_frequency: weak as i32, high_frequency: strong as i32, duration: duration_ms as i32, diff --git a/packages/server/src/nestrisink/mod.rs b/packages/server/src/nestrisink/mod.rs index 4de29964..a37b03d1 100644 --- a/packages/server/src/nestrisink/mod.rs +++ b/packages/server/src/nestrisink/mod.rs @@ -18,7 +18,7 @@ impl NestriSignaller { nestri_conn: NestriConnection, wayland_src: Arc, controller_manager: Option>, - rumble_rx: Option>, + rumble_rx: Option>, attach_rx: Option>, ) -> Result> { let obj: Self = glib::Object::new(); diff --git a/packages/server/src/proto/proto.rs b/packages/server/src/proto/proto.rs index a9ef7a42..635b8f3e 100644 --- a/packages/server/src/proto/proto.rs +++ b/packages/server/src/proto/proto.rs @@ -84,95 +84,113 @@ pub struct ProtoControllerAttach { /// One of the following enums: "ps", "xbox" or "switch" #[prost(string, tag="1")] pub id: ::prost::alloc::string::String, - /// Slot number (0-3) + /// Session specific slot number (0-3) #[prost(int32, tag="2")] - pub slot: i32, - /// Session ID of the client attaching the controller + pub session_slot: i32, + /// Session ID of the client #[prost(string, tag="3")] pub session_id: ::prost::alloc::string::String, } /// ControllerDetach message #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, Copy, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ProtoControllerDetach { - /// Slot number (0-3) + /// Session specific slot number (0-3) #[prost(int32, tag="1")] - pub slot: i32, + pub session_slot: i32, + /// Session ID of the client + #[prost(string, tag="2")] + pub session_id: ::prost::alloc::string::String, } /// ControllerButton message #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, Copy, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ProtoControllerButton { - /// Slot number (0-3) + /// Session specific slot number (0-3) #[prost(int32, tag="1")] - pub slot: i32, + pub session_slot: i32, + /// Session ID of the client + #[prost(string, tag="2")] + pub session_id: ::prost::alloc::string::String, /// Button code (linux input event code) - #[prost(int32, tag="2")] + #[prost(int32, tag="3")] pub button: i32, /// true if pressed, false if released - #[prost(bool, tag="3")] + #[prost(bool, tag="4")] pub pressed: bool, } /// ControllerTriggers message #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, Copy, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ProtoControllerTrigger { - /// Slot number (0-3) + /// Session specific slot number (0-3) #[prost(int32, tag="1")] - pub slot: i32, + pub session_slot: i32, + /// Session ID of the client + #[prost(string, tag="2")] + pub session_id: ::prost::alloc::string::String, /// Trigger number (0 for left, 1 for right) - #[prost(int32, tag="2")] + #[prost(int32, tag="3")] pub trigger: i32, /// trigger value (-32768 to 32767) - #[prost(int32, tag="3")] + #[prost(int32, tag="4")] pub value: i32, } /// ControllerSticks message #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, Copy, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ProtoControllerStick { - /// Slot number (0-3) + /// Session specific slot number (0-3) #[prost(int32, tag="1")] - pub slot: i32, + pub session_slot: i32, + /// Session ID of the client + #[prost(string, tag="2")] + pub session_id: ::prost::alloc::string::String, /// Stick number (0 for left, 1 for right) - #[prost(int32, tag="2")] + #[prost(int32, tag="3")] pub stick: i32, /// X axis value (-32768 to 32767) - #[prost(int32, tag="3")] + #[prost(int32, tag="4")] pub x: i32, /// Y axis value (-32768 to 32767) - #[prost(int32, tag="4")] + #[prost(int32, tag="5")] pub y: i32, } /// ControllerAxis message #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, Copy, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ProtoControllerAxis { - /// Slot number (0-3) + /// Session specific slot number (0-3) #[prost(int32, tag="1")] - pub slot: i32, + pub session_slot: i32, + /// Session ID of the client + #[prost(string, tag="2")] + pub session_id: ::prost::alloc::string::String, /// Axis number (0 for d-pad horizontal, 1 for d-pad vertical) - #[prost(int32, tag="2")] + #[prost(int32, tag="3")] pub axis: i32, /// axis value (-1 to 1) - #[prost(int32, tag="3")] + #[prost(int32, tag="4")] pub value: i32, } /// ControllerRumble message #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, Copy, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ProtoControllerRumble { - /// Slot number (0-3) + /// Session specific slot number (0-3) #[prost(int32, tag="1")] - pub slot: i32, + pub session_slot: i32, + /// Session ID of the client + #[prost(string, tag="2")] + pub session_id: ::prost::alloc::string::String, /// Low frequency rumble (0-65535) - #[prost(int32, tag="2")] + #[prost(int32, tag="3")] pub low_frequency: i32, /// High frequency rumble (0-65535) - #[prost(int32, tag="3")] + #[prost(int32, tag="4")] pub high_frequency: i32, /// Duration in milliseconds - #[prost(int32, tag="4")] + #[prost(int32, tag="5")] pub duration: i32, } // WebRTC + signaling diff --git a/protobufs/types.proto b/protobufs/types.proto index 037b05e9..1f951f16 100644 --- a/protobufs/types.proto +++ b/protobufs/types.proto @@ -51,50 +51,56 @@ message ProtoKeyUp { // ControllerAttach message message ProtoControllerAttach { string id = 1; // One of the following enums: "ps", "xbox" or "switch" - int32 slot = 2; // Slot number (0-3) - string session_id = 3; // Session ID of the client attaching the controller + int32 session_slot = 2; // Session specific slot number (0-3) + string session_id = 3; // Session ID of the client } // ControllerDetach message message ProtoControllerDetach { - int32 slot = 1; // Slot number (0-3) + int32 session_slot = 1; // Session specific slot number (0-3) + string session_id = 2; // Session ID of the client } // ControllerButton message message ProtoControllerButton { - int32 slot = 1; // Slot number (0-3) - int32 button = 2; // Button code (linux input event code) - bool pressed = 3; // true if pressed, false if released + int32 session_slot = 1; // Session specific slot number (0-3) + string session_id = 2; // Session ID of the client + int32 button = 3; // Button code (linux input event code) + bool pressed = 4; // true if pressed, false if released } // ControllerTriggers message message ProtoControllerTrigger { - int32 slot = 1; // Slot number (0-3) - int32 trigger = 2; // Trigger number (0 for left, 1 for right) - int32 value = 3; // trigger value (-32768 to 32767) + int32 session_slot = 1; // Session specific slot number (0-3) + string session_id = 2; // Session ID of the client + int32 trigger = 3; // Trigger number (0 for left, 1 for right) + int32 value = 4; // trigger value (-32768 to 32767) } // ControllerSticks message message ProtoControllerStick { - int32 slot = 1; // Slot number (0-3) - int32 stick = 2; // Stick number (0 for left, 1 for right) - int32 x = 3; // X axis value (-32768 to 32767) - int32 y = 4; // Y axis value (-32768 to 32767) + int32 session_slot = 1; // Session specific slot number (0-3) + string session_id = 2; // Session ID of the client + int32 stick = 3; // Stick number (0 for left, 1 for right) + int32 x = 4; // X axis value (-32768 to 32767) + int32 y = 5; // Y axis value (-32768 to 32767) } // ControllerAxis message message ProtoControllerAxis { - int32 slot = 1; // Slot number (0-3) - int32 axis = 2; // Axis number (0 for d-pad horizontal, 1 for d-pad vertical) - int32 value = 3; // axis value (-1 to 1) + int32 session_slot = 1; // Session specific slot number (0-3) + string session_id = 2; // Session ID of the client + int32 axis = 3; // Axis number (0 for d-pad horizontal, 1 for d-pad vertical) + int32 value = 4; // axis value (-1 to 1) } // ControllerRumble message message ProtoControllerRumble { - int32 slot = 1; // Slot number (0-3) - int32 low_frequency = 2; // Low frequency rumble (0-65535) - int32 high_frequency = 3; // High frequency rumble (0-65535) - int32 duration = 4; // Duration in milliseconds + int32 session_slot = 1; // Session specific slot number (0-3) + string session_id = 2; // Session ID of the client + int32 low_frequency = 3; // Low frequency rumble (0-65535) + int32 high_frequency = 4; // High frequency rumble (0-65535) + int32 duration = 5; // Duration in milliseconds } /* WebRTC + signaling */