feat: Fully use protobuf, fix controller issues and cleanup (#305)

## Description
### First commit
Restructured protobuf schemas to make them easier to use across
languages, switched to using them in-place of JSON for signaling as
well, so there's no 2 different message formats flying about. Few new
message types to deal with clients and nestri-servers better (not final
format, may see changes still).

General cleanup of dead/unused code along some bug squashing and package
updates.

TODO for future commits:
- [x] Fix additional controllers not doing inputs (possibly needs
vimputti changes)
- [x] ~~Restructure relay protocols code a bit, to reduce bloatiness of
the currently single file for them, more code re-use.~~
- Gonna keep this PR somewhat manageable without poking more at relay..
- [x] ~~Try to fix issue where with multiple clients, static stream
content causes video to freeze until there's some movement.~~
- Was caused by server tuned profile being `throughput-performance`,
causing CPU latency to be too high.
- [x] Ponder the orb


### Second + third commit
Redid the controller polling handling and fixed multi-controller
handling in vimputti and nestri code sides. Remove some dead relay code
as well to clean up the protocol source file, we'll revisit the meshing
functionality later.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Added software rendering option and MangoHud runtime config;
controller sessions now support reconnection and batched state updates
with persistent session IDs.

* **Bug Fixes**
* Restored previously-filtered NES-like gamepads so they connect
correctly.

* **Chores**
* Modernized dependencies and protobuf tooling, migrated to
protobuf-based messaging and streaming, and removed obsolete CUDA build
steps.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: DatCaptainHorse <DatCaptainHorse@users.noreply.github.com>
This commit is contained in:
Kristian Ollikainen
2025-11-08 13:10:28 +02:00
committed by GitHub
parent 32341574dc
commit d87a0b35dd
50 changed files with 4413 additions and 3883 deletions

View File

@@ -10,7 +10,7 @@ require (
github.com/oklog/ulid/v2 v2.1.1
github.com/pion/ice/v4 v4.0.10
github.com/pion/interceptor v0.1.41
github.com/pion/rtp v1.8.24
github.com/pion/rtp v1.8.25
github.com/pion/webrtc/v4 v4.1.6
github.com/prometheus/client_golang v1.23.2
google.golang.org/protobuf v1.36.10
@@ -30,17 +30,17 @@ require (
github.com/gorilla/websocket v1.5.3 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/huin/goupnp v1.3.0 // indirect
github.com/ipfs/go-cid v0.5.0 // indirect
github.com/ipfs/go-cid v0.6.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/compress v1.18.1 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/koron/go-ssdp v0.1.0 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.3.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-netroute v0.3.0 // indirect
github.com/libp2p/go-netroute v0.4.0 // indirect
github.com/libp2p/go-yamux/v5 v5.1.0 // indirect
github.com/libp2p/zeroconf/v2 v2.2.0 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
@@ -71,13 +71,13 @@ require (
github.com/pion/sdp/v3 v3.0.16 // indirect
github.com/pion/srtp/v3 v3.0.8 // indirect
github.com/pion/stun v0.6.1 // indirect
github.com/pion/stun/v3 v3.0.0 // indirect
github.com/pion/stun/v3 v3.0.1 // indirect
github.com/pion/transport/v2 v2.2.10 // indirect
github.com/pion/transport/v3 v3.0.8 // indirect
github.com/pion/turn/v4 v4.1.1 // indirect
github.com/pion/turn/v4 v4.1.2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.1 // indirect
github.com/prometheus/procfs v0.17.0 // indirect
github.com/prometheus/common v0.67.2 // indirect
github.com/prometheus/procfs v0.19.2 // indirect
github.com/quic-go/qpack v0.5.1 // indirect
github.com/quic-go/quic-go v0.55.0 // indirect
github.com/quic-go/webtransport-go v0.9.0 // indirect
@@ -91,12 +91,12 @@ require (
go.uber.org/zap v1.27.0 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
golang.org/x/crypto v0.43.0 // indirect
golang.org/x/exp v0.0.0-20251017212417-90e834f514db // indirect
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect
golang.org/x/mod v0.29.0 // indirect
golang.org/x/net v0.46.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/sys v0.37.0 // indirect
golang.org/x/telemetry v0.0.0-20251014153721-24f779f6aaef // indirect
golang.org/x/telemetry v0.0.0-20251028164327-d7a2859f34e8 // indirect
golang.org/x/text v0.30.0 // indirect
golang.org/x/time v0.14.0 // indirect
golang.org/x/tools v0.38.0 // indirect

View File

@@ -71,8 +71,8 @@ github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc=
github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
github.com/ipfs/go-cid v0.5.0 h1:goEKKhaGm0ul11IHA7I6p1GmKz8kEYniqFopaB5Otwg=
github.com/ipfs/go-cid v0.5.0/go.mod h1:0L7vmeNXpQpUS9vt+yEARkJ8rOg43DF3iPgn4GIN0mk=
github.com/ipfs/go-cid v0.6.0 h1:DlOReBV1xhHBhhfy/gBNNTSyfOM6rLiIx9J7A4DGf30=
github.com/ipfs/go-cid v0.6.0/go.mod h1:NC4kS1LZjzfhK40UGmpXv5/qD2kcMzACYJNntCUiDhQ=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk=
@@ -82,8 +82,8 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/koron/go-ssdp v0.1.0 h1:ckl5x5H6qSNFmi+wCuROvvGUu2FQnMbQrU95IHCcv3Y=
@@ -113,8 +113,8 @@ github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUI
github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg=
github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0=
github.com/libp2p/go-msgio v0.3.0/go.mod h1:nyRM819GmVaF9LX3l03RMh10QdOroF++NBbxAb0mmDM=
github.com/libp2p/go-netroute v0.3.0 h1:nqPCXHmeNmgTJnktosJ/sIef9hvwYCrsLxXmfNks/oc=
github.com/libp2p/go-netroute v0.3.0/go.mod h1:Nkd5ShYgSMS5MUKy/MU2T57xFoOKvvLR92Lic48LEyA=
github.com/libp2p/go-netroute v0.4.0 h1:sZZx9hyANYUx9PZyqcgE/E1GUG3iEtTZHUEvdtXT7/Q=
github.com/libp2p/go-netroute v0.4.0/go.mod h1:Nkd5ShYgSMS5MUKy/MU2T57xFoOKvvLR92Lic48LEyA=
github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQscQm2s=
github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU=
github.com/libp2p/go-yamux/v5 v5.1.0 h1:8Qlxj4E9JGJAQVW6+uj2o7mqkqsIVlSUGmTWhlXzoHE=
@@ -199,8 +199,8 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.16 h1:fk1B1dNW4hsI78XUCljZJlC4kZOPk67mNRuQ0fcEkSo=
github.com/pion/rtcp v1.2.16/go.mod h1:/as7VKfYbs5NIb4h6muQ35kQF/J0ZVNz2Z3xKoCBYOo=
github.com/pion/rtp v1.8.24 h1:+ICyZXUQDv95EsHN70RrA4XKJf5MGWyC6QQc1u6/ynI=
github.com/pion/rtp v1.8.24/go.mod h1:rF5nS1GqbR7H/TCpKwylzeq6yDM+MM6k+On5EgeThEM=
github.com/pion/rtp v1.8.25 h1:b8+y44GNbwOJTYWuVan7SglX/hMlicVCAtL50ztyZHw=
github.com/pion/rtp v1.8.25/go.mod h1:rF5nS1GqbR7H/TCpKwylzeq6yDM+MM6k+On5EgeThEM=
github.com/pion/sctp v1.8.40 h1:bqbgWYOrUhsYItEnRObUYZuzvOMsVplS3oNgzedBlG8=
github.com/pion/sctp v1.8.40/go.mod h1:SPBBUENXE6ThkEksN5ZavfAhFYll+h+66ZiG6IZQuzo=
github.com/pion/sdp/v3 v3.0.16 h1:0dKzYO6gTAvuLaAKQkC02eCPjMIi4NuAr/ibAwrGDCo=
@@ -209,16 +209,16 @@ github.com/pion/srtp/v3 v3.0.8 h1:RjRrjcIeQsilPzxvdaElN0CpuQZdMvcl9VZ5UY9suUM=
github.com/pion/srtp/v3 v3.0.8/go.mod h1:2Sq6YnDH7/UDCvkSoHSDNDeyBcFgWL0sAVycVbAsXFg=
github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4=
github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/8=
github.com/pion/stun/v3 v3.0.0 h1:4h1gwhWLWuZWOJIJR9s2ferRO+W3zA/b6ijOI6mKzUw=
github.com/pion/stun/v3 v3.0.0/go.mod h1:HvCN8txt8mwi4FBvS3EmDghW6aQJ24T+y+1TKjB5jyU=
github.com/pion/stun/v3 v3.0.1 h1:jx1uUq6BdPihF0yF33Jj2mh+C9p0atY94IkdnW174kA=
github.com/pion/stun/v3 v3.0.1/go.mod h1:RHnvlKFg+qHgoKIqtQWMOJF52wsImCAf/Jh5GjX+4Tw=
github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g=
github.com/pion/transport/v2 v2.2.4/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0=
github.com/pion/transport/v2 v2.2.10 h1:ucLBLE8nuxiHfvkFKnkDQRYWYfp8ejf4YBOPfaQpw6Q=
github.com/pion/transport/v2 v2.2.10/go.mod h1:sq1kSLWs+cHW9E+2fJP95QudkzbK7wscs8yYgQToO5E=
github.com/pion/transport/v3 v3.0.8 h1:oI3myyYnTKUSTthu/NZZ8eu2I5sHbxbUNNFW62olaYc=
github.com/pion/transport/v3 v3.0.8/go.mod h1:+c2eewC5WJQHiAA46fkMMzoYZSuGzA/7E2FPrOYHctQ=
github.com/pion/turn/v4 v4.1.1 h1:9UnY2HB99tpDyz3cVVZguSxcqkJ1DsTSZ+8TGruh4fc=
github.com/pion/turn/v4 v4.1.1/go.mod h1:2123tHk1O++vmjI5VSD0awT50NywDAq5A2NNNU4Jjs8=
github.com/pion/turn/v4 v4.1.2 h1:Em2svpl6aBFa88dLhxypMUzaLjC79kWZWx8FIov01cc=
github.com/pion/turn/v4 v4.1.2/go.mod h1:ISYWfZYy0Z3tXzRpyYZHTL+U23yFQIspfxogdQ8pn9Y=
github.com/pion/webrtc/v4 v4.1.6 h1:srHH2HwvCGwPba25EYJgUzgLqCQoXl1VCUnrGQMSzUw=
github.com/pion/webrtc/v4 v4.1.6/go.mod h1:wKecGRlkl3ox/As/MYghJL+b/cVXMEhoPMJWPuGQFhU=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -231,11 +231,11 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.67.1 h1:OTSON1P4DNxzTg4hmKCc37o4ZAZDv0cfXLkOt0oEowI=
github.com/prometheus/common v0.67.1/go.mod h1:RpmT9v35q2Y+lsieQsdOh5sXZ6ajUGC8NjZAmr8vb0Q=
github.com/prometheus/common v0.67.2 h1:PcBAckGFTIHt2+L3I33uNRTlKTplNzFctXcWhPyAEN8=
github.com/prometheus/common v0.67.2/go.mod h1:63W3KZb1JOKgcjlIr64WW/LvFGAqKPj0atm+knVGEko=
github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0=
github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw=
github.com/prometheus/procfs v0.19.2 h1:zUMhqEW66Ex7OXIiDkll3tl9a1ZdilUOd/F6ZXw4Vws=
github.com/prometheus/procfs v0.19.2/go.mod h1:M0aotyiemPhBCM0z5w87kL22CxfcH05ZpYlu+b4J7mw=
github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI=
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
github.com/quic-go/quic-go v0.55.0 h1:zccPQIqYCXDt5NmcEabyYvOnomjs8Tlwl7tISjJh9Mk=
@@ -323,8 +323,8 @@ golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1m
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20251017212417-90e834f514db h1:by6IehL4BH5k3e3SJmcoNbOobMey2SLpAF79iPOEBvw=
golang.org/x/exp v0.0.0-20251017212417-90e834f514db/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70=
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY=
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
@@ -396,8 +396,8 @@ golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/telemetry v0.0.0-20251014153721-24f779f6aaef h1:5xFtU4tmJMJSxSeDlr1dgBff2tDXrq0laLdS1EA3LYw=
golang.org/x/telemetry v0.0.0-20251014153721-24f779f6aaef/go.mod h1:Pi4ztBfryZoJEkyFTI5/Ocsu2jXyDr6iSdgJiYE/uwE=
golang.org/x/telemetry v0.0.0-20251028164327-d7a2859f34e8 h1:DwMAzqwLj2rVin75cRFh1kfhwQY3hyHrU1oCEDZXPmQ=
golang.org/x/telemetry v0.0.0-20251028164327-d7a2859f34e8/go.mod h1:Pi4ztBfryZoJEkyFTI5/Ocsu2jXyDr6iSdgJiYE/uwE=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=

View File

@@ -26,7 +26,7 @@ func InitWebRTCAPI() error {
mediaEngine := &webrtc.MediaEngine{}
// Register our extensions
if err := RegisterExtensions(mediaEngine); err != nil {
if err = RegisterExtensions(mediaEngine); err != nil {
return fmt.Errorf("failed to register extensions: %w", err)
}

View File

@@ -0,0 +1,53 @@
package common
import (
"log/slog"
"github.com/pion/webrtc/v4"
)
// ICEHelper holds webrtc.ICECandidateInit(s) until remote candidate is set for given webrtc.PeerConnection
// Held candidates should be flushed at the end of negotiation to ensure all are available for connection
type ICEHelper struct {
candidates []webrtc.ICECandidateInit
pc *webrtc.PeerConnection
}
func NewICEHelper(pc *webrtc.PeerConnection) *ICEHelper {
return &ICEHelper{
pc: pc,
candidates: make([]webrtc.ICECandidateInit, 0),
}
}
func (ice *ICEHelper) SetPeerConnection(pc *webrtc.PeerConnection) {
ice.pc = pc
}
func (ice *ICEHelper) AddCandidate(c webrtc.ICECandidateInit) {
if ice.pc != nil {
if ice.pc.RemoteDescription() != nil {
// Add immediately if remote is set
if err := ice.pc.AddICECandidate(c); err != nil {
slog.Error("Failed to add ICE candidate", "err", err)
}
// Also flush held candidates automatically
ice.FlushHeldCandidates()
} else {
// Hold in slice until remote is set
ice.candidates = append(ice.candidates, c)
}
}
}
func (ice *ICEHelper) FlushHeldCandidates() {
if ice.pc != nil && len(ice.candidates) > 0 {
for _, heldCandidate := range ice.candidates {
if err := ice.pc.AddICECandidate(heldCandidate); err != nil {
slog.Error("Failed to add held ICE candidate", "err", err)
}
}
// Clear the held candidates
ice.candidates = make([]webrtc.ICECandidateInit, 0)
}
}

View File

@@ -3,16 +3,28 @@ package common
import (
"bufio"
"encoding/binary"
"encoding/json"
"errors"
"io"
gen "relay/internal/proto"
"sync"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/timestamppb"
)
// MaxSize is the maximum allowed data size (1MB)
const MaxSize = 1024 * 1024
// readUvarint reads an unsigned varint from the reader
func readUvarint(r io.ByteReader) (uint64, error) {
return binary.ReadUvarint(r)
}
// writeUvarint writes an unsigned varint to the writer
func writeUvarint(w io.Writer, x uint64) error {
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(buf, x)
_, err := w.Write(buf[:n])
return err
}
// SafeBufioRW wraps a bufio.ReadWriter for sending and receiving JSON and protobufs safely
type SafeBufioRW struct {
@@ -24,83 +36,6 @@ func NewSafeBufioRW(brw *bufio.ReadWriter) *SafeBufioRW {
return &SafeBufioRW{brw: brw}
}
// SendJSON serializes the given data as JSON and sends it with a 4-byte length prefix
func (bu *SafeBufioRW) SendJSON(data interface{}) error {
bu.mutex.Lock()
defer bu.mutex.Unlock()
jsonData, err := json.Marshal(data)
if err != nil {
return err
}
if len(jsonData) > MaxSize {
return errors.New("JSON data exceeds maximum size")
}
// Write the 4-byte length prefix
if err = binary.Write(bu.brw, binary.BigEndian, uint32(len(jsonData))); err != nil {
return err
}
// Write the JSON data
if _, err = bu.brw.Write(jsonData); err != nil {
return err
}
// Flush the writer to ensure data is sent
return bu.brw.Flush()
}
// ReceiveJSON reads a 4-byte length prefix, then reads and unmarshals the JSON
func (bu *SafeBufioRW) ReceiveJSON(dest interface{}) error {
bu.mutex.RLock()
defer bu.mutex.RUnlock()
// Read the 4-byte length prefix
var length uint32
if err := binary.Read(bu.brw, binary.BigEndian, &length); err != nil {
return err
}
if length > MaxSize {
return errors.New("received JSON data exceeds maximum size")
}
// Read the JSON data
data := make([]byte, length)
if _, err := io.ReadFull(bu.brw, data); err != nil {
return err
}
return json.Unmarshal(data, dest)
}
// Receive reads a 4-byte length prefix, then reads the raw data
func (bu *SafeBufioRW) Receive() ([]byte, error) {
bu.mutex.RLock()
defer bu.mutex.RUnlock()
// Read the 4-byte length prefix
var length uint32
if err := binary.Read(bu.brw, binary.BigEndian, &length); err != nil {
return nil, err
}
if length > MaxSize {
return nil, errors.New("received data exceeds maximum size")
}
// Read the raw data
data := make([]byte, length)
if _, err := io.ReadFull(bu.brw, data); err != nil {
return nil, err
}
return data, nil
}
// SendProto serializes the given protobuf message and sends it with a 4-byte length prefix
func (bu *SafeBufioRW) SendProto(msg proto.Message) error {
bu.mutex.Lock()
defer bu.mutex.Unlock()
@@ -110,12 +45,8 @@ func (bu *SafeBufioRW) SendProto(msg proto.Message) error {
return err
}
if len(protoData) > MaxSize {
return errors.New("protobuf data exceeds maximum size")
}
// Write the 4-byte length prefix
if err = binary.Write(bu.brw, binary.BigEndian, uint32(len(protoData))); err != nil {
// Write varint length prefix
if err := writeUvarint(bu.brw, uint64(len(protoData))); err != nil {
return err
}
@@ -124,25 +55,19 @@ func (bu *SafeBufioRW) SendProto(msg proto.Message) error {
return err
}
// Flush the writer to ensure data is sent
return bu.brw.Flush()
}
// ReceiveProto reads a 4-byte length prefix, then reads and unmarshals the protobuf
func (bu *SafeBufioRW) ReceiveProto(msg proto.Message) error {
bu.mutex.RLock()
defer bu.mutex.RUnlock()
// Read the 4-byte length prefix
var length uint32
if err := binary.Read(bu.brw, binary.BigEndian, &length); err != nil {
// Read varint length prefix
length, err := readUvarint(bu.brw)
if err != nil {
return err
}
if length > MaxSize {
return errors.New("received Protobuf data exceeds maximum size")
}
// Read the Protobuf data
data := make([]byte, length)
if _, err := io.ReadFull(bu.brw, data); err != nil {
@@ -152,24 +77,51 @@ func (bu *SafeBufioRW) ReceiveProto(msg proto.Message) error {
return proto.Unmarshal(data, msg)
}
// Write writes raw data to the underlying buffer
func (bu *SafeBufioRW) Write(data []byte) (int, error) {
bu.mutex.Lock()
defer bu.mutex.Unlock()
if len(data) > MaxSize {
return 0, errors.New("data exceeds maximum size")
}
n, err := bu.brw.Write(data)
if err != nil {
return n, err
}
// Flush the writer to ensure data is sent
if err = bu.brw.Flush(); err != nil {
return n, err
}
return n, nil
type CreateMessageOptions struct {
SequenceID string
Latency *gen.ProtoLatencyTracker
}
func CreateMessage(payload proto.Message, payloadType string, opts *CreateMessageOptions) (*gen.ProtoMessage, error) {
msg := &gen.ProtoMessage{
MessageBase: &gen.ProtoMessageBase{
PayloadType: payloadType,
},
}
if opts != nil {
if opts.Latency != nil {
msg.MessageBase.Latency = opts.Latency
} else if opts.SequenceID != "" {
msg.MessageBase.Latency = &gen.ProtoLatencyTracker{
SequenceId: opts.SequenceID,
Timestamps: []*gen.ProtoTimestampEntry{
{
Stage: "created",
Time: timestamppb.Now(),
},
},
}
}
}
// Use reflection to set the oneof field automatically
msgReflect := msg.ProtoReflect()
payloadReflect := payload.ProtoReflect()
oneofDesc := msgReflect.Descriptor().Oneofs().ByName("payload")
if oneofDesc == nil {
return nil, errors.New("payload oneof not found")
}
fields := oneofDesc.Fields()
for i := 0; i < fields.Len(); i++ {
field := fields.Get(i)
if field.Message() != nil && field.Message().FullName() == payloadReflect.Descriptor().FullName() {
msgReflect.Set(field, protoreflect.ValueOfMessage(payloadReflect))
return msg, nil
}
}
return nil, errors.New("payload type not found in oneof")
}

View File

@@ -31,16 +31,18 @@ func NewNestriDataChannel(dc *webrtc.DataChannel) *NestriDataChannel {
}
// Decode message
var base gen.ProtoMessageInput
var base gen.ProtoMessage
if err := proto.Unmarshal(msg.Data, &base); err != nil {
slog.Error("failed to decode binary DataChannel message", "err", err)
return
}
// Handle message type callback
if callback, ok := ndc.callbacks["input"]; ok {
go callback(msg.Data)
} // We don't care about unhandled messages
// Route based on PayloadType
if base.MessageBase != nil && len(base.MessageBase.PayloadType) > 0 {
if callback, ok := ndc.callbacks[base.MessageBase.PayloadType]; ok {
go callback(msg.Data)
}
}
})
return ndc

View File

@@ -1,94 +0,0 @@
package connections
import (
"encoding/json"
"relay/internal/common"
"github.com/pion/webrtc/v4"
)
// MessageBase is the base type for any JSON message
type MessageBase struct {
Type string `json:"payload_type"`
Latency *common.LatencyTracker `json:"latency,omitempty"`
}
type MessageRaw struct {
MessageBase
Data json.RawMessage `json:"data"`
}
func NewMessageRaw(t string, data json.RawMessage) *MessageRaw {
return &MessageRaw{
MessageBase: MessageBase{
Type: t,
},
Data: data,
}
}
type MessageLog struct {
MessageBase
Level string `json:"level"`
Message string `json:"message"`
Time string `json:"time"`
}
func NewMessageLog(t string, level, message, time string) *MessageLog {
return &MessageLog{
MessageBase: MessageBase{
Type: t,
},
Level: level,
Message: message,
Time: time,
}
}
type MessageMetrics struct {
MessageBase
UsageCPU float64 `json:"usage_cpu"`
UsageMemory float64 `json:"usage_memory"`
Uptime uint64 `json:"uptime"`
PipelineLatency float64 `json:"pipeline_latency"`
}
func NewMessageMetrics(t string, usageCPU, usageMemory float64, uptime uint64, pipelineLatency float64) *MessageMetrics {
return &MessageMetrics{
MessageBase: MessageBase{
Type: t,
},
UsageCPU: usageCPU,
UsageMemory: usageMemory,
Uptime: uptime,
PipelineLatency: pipelineLatency,
}
}
type MessageICE struct {
MessageBase
Candidate webrtc.ICECandidateInit `json:"candidate"`
}
func NewMessageICE(t string, candidate webrtc.ICECandidateInit) *MessageICE {
return &MessageICE{
MessageBase: MessageBase{
Type: t,
},
Candidate: candidate,
}
}
type MessageSDP struct {
MessageBase
SDP webrtc.SessionDescription `json:"sdp"`
}
func NewMessageSDP(t string, sdp webrtc.SessionDescription) *MessageSDP {
return &MessageSDP{
MessageBase: MessageBase{
Type: t,
},
SDP: sdp,
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -45,7 +45,7 @@ func (r *Relay) DeleteRoomIfEmpty(room *shared.Room) {
if room == nil {
return
}
if room.Participants.Len() == 0 && r.LocalRooms.Has(room.ID) {
if len(room.Participants) <= 0 && r.LocalRooms.Has(room.ID) {
slog.Debug("Deleting empty room without participants", "room", room.Name)
r.LocalRooms.Delete(room.ID)
err := room.PeerConnection.Close()

View File

@@ -129,12 +129,11 @@ func (r *Relay) onPeerConnected(peerID peer.ID) {
// onPeerDisconnected marks a peer as disconnected in our status view and removes latency info
func (r *Relay) onPeerDisconnected(peerID peer.ID) {
// Relay peer disconnect handling
slog.Info("Mesh peer disconnected, deleting from local peer map", "peer", peerID)
// Remove peer from local mesh peers
if r.Peers.Has(peerID) {
r.Peers.Delete(peerID)
}
// Remove any rooms associated with this peer
if r.Rooms.Has(peerID.String()) {
r.Rooms.Delete(peerID.String())
}
@@ -151,18 +150,18 @@ func (r *Relay) updateMeshRoomStates(peerID peer.ID, states []shared.RoomInfo) {
}
// If previously did not exist, but does now, request a connection if participants exist for our room
existed := r.Rooms.Has(state.ID.String())
/*existed := r.Rooms.Has(state.ID.String())
if !existed {
// Request connection to this peer if we have participants in our local room
if room, ok := r.LocalRooms.Get(state.ID); ok {
if room.Participants.Len() > 0 {
if len(room.Participants) > 0 {
slog.Debug("Got new remote room state, we locally have participants for, requesting stream", "room_name", room.Name, "peer", peerID)
if err := r.StreamProtocol.RequestStream(context.Background(), room, peerID); err != nil {
slog.Error("Failed to request stream for new remote room state", "room_name", room.Name, "peer", peerID, "err", err)
}
}
}
}
}*/
r.Rooms.Set(state.ID.String(), state)
}

View File

@@ -73,28 +73,47 @@ func (x *ProtoMessageBase) GetLatency() *ProtoLatencyTracker {
return nil
}
type ProtoMessageInput struct {
state protoimpl.MessageState `protogen:"open.v1"`
MessageBase *ProtoMessageBase `protobuf:"bytes,1,opt,name=message_base,json=messageBase,proto3" json:"message_base,omitempty"`
Data *ProtoInput `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
type ProtoMessage struct {
state protoimpl.MessageState `protogen:"open.v1"`
MessageBase *ProtoMessageBase `protobuf:"bytes,1,opt,name=message_base,json=messageBase,proto3" json:"message_base,omitempty"`
// Types that are valid to be assigned to Payload:
//
// *ProtoMessage_MouseMove
// *ProtoMessage_MouseMoveAbs
// *ProtoMessage_MouseWheel
// *ProtoMessage_MouseKeyDown
// *ProtoMessage_MouseKeyUp
// *ProtoMessage_KeyDown
// *ProtoMessage_KeyUp
// *ProtoMessage_ControllerAttach
// *ProtoMessage_ControllerDetach
// *ProtoMessage_ControllerRumble
// *ProtoMessage_ControllerStateBatch
// *ProtoMessage_Ice
// *ProtoMessage_Sdp
// *ProtoMessage_Raw
// *ProtoMessage_ClientRequestRoomStream
// *ProtoMessage_ClientDisconnected
// *ProtoMessage_ServerPushStream
Payload isProtoMessage_Payload `protobuf_oneof:"payload"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ProtoMessageInput) Reset() {
*x = ProtoMessageInput{}
func (x *ProtoMessage) Reset() {
*x = ProtoMessage{}
mi := &file_messages_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ProtoMessageInput) String() string {
func (x *ProtoMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ProtoMessageInput) ProtoMessage() {}
func (*ProtoMessage) ProtoMessage() {}
func (x *ProtoMessageInput) ProtoReflect() protoreflect.Message {
func (x *ProtoMessage) ProtoReflect() protoreflect.Message {
mi := &file_messages_proto_msgTypes[1]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -106,25 +125,287 @@ func (x *ProtoMessageInput) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
}
// Deprecated: Use ProtoMessageInput.ProtoReflect.Descriptor instead.
func (*ProtoMessageInput) Descriptor() ([]byte, []int) {
// Deprecated: Use ProtoMessage.ProtoReflect.Descriptor instead.
func (*ProtoMessage) Descriptor() ([]byte, []int) {
return file_messages_proto_rawDescGZIP(), []int{1}
}
func (x *ProtoMessageInput) GetMessageBase() *ProtoMessageBase {
func (x *ProtoMessage) GetMessageBase() *ProtoMessageBase {
if x != nil {
return x.MessageBase
}
return nil
}
func (x *ProtoMessageInput) GetData() *ProtoInput {
func (x *ProtoMessage) GetPayload() isProtoMessage_Payload {
if x != nil {
return x.Data
return x.Payload
}
return nil
}
func (x *ProtoMessage) GetMouseMove() *ProtoMouseMove {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_MouseMove); ok {
return x.MouseMove
}
}
return nil
}
func (x *ProtoMessage) GetMouseMoveAbs() *ProtoMouseMoveAbs {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_MouseMoveAbs); ok {
return x.MouseMoveAbs
}
}
return nil
}
func (x *ProtoMessage) GetMouseWheel() *ProtoMouseWheel {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_MouseWheel); ok {
return x.MouseWheel
}
}
return nil
}
func (x *ProtoMessage) GetMouseKeyDown() *ProtoMouseKeyDown {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_MouseKeyDown); ok {
return x.MouseKeyDown
}
}
return nil
}
func (x *ProtoMessage) GetMouseKeyUp() *ProtoMouseKeyUp {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_MouseKeyUp); ok {
return x.MouseKeyUp
}
}
return nil
}
func (x *ProtoMessage) GetKeyDown() *ProtoKeyDown {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_KeyDown); ok {
return x.KeyDown
}
}
return nil
}
func (x *ProtoMessage) GetKeyUp() *ProtoKeyUp {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_KeyUp); ok {
return x.KeyUp
}
}
return nil
}
func (x *ProtoMessage) GetControllerAttach() *ProtoControllerAttach {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ControllerAttach); ok {
return x.ControllerAttach
}
}
return nil
}
func (x *ProtoMessage) GetControllerDetach() *ProtoControllerDetach {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ControllerDetach); ok {
return x.ControllerDetach
}
}
return nil
}
func (x *ProtoMessage) GetControllerRumble() *ProtoControllerRumble {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ControllerRumble); ok {
return x.ControllerRumble
}
}
return nil
}
func (x *ProtoMessage) GetControllerStateBatch() *ProtoControllerStateBatch {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ControllerStateBatch); ok {
return x.ControllerStateBatch
}
}
return nil
}
func (x *ProtoMessage) GetIce() *ProtoICE {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_Ice); ok {
return x.Ice
}
}
return nil
}
func (x *ProtoMessage) GetSdp() *ProtoSDP {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_Sdp); ok {
return x.Sdp
}
}
return nil
}
func (x *ProtoMessage) GetRaw() *ProtoRaw {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_Raw); ok {
return x.Raw
}
}
return nil
}
func (x *ProtoMessage) GetClientRequestRoomStream() *ProtoClientRequestRoomStream {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ClientRequestRoomStream); ok {
return x.ClientRequestRoomStream
}
}
return nil
}
func (x *ProtoMessage) GetClientDisconnected() *ProtoClientDisconnected {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ClientDisconnected); ok {
return x.ClientDisconnected
}
}
return nil
}
func (x *ProtoMessage) GetServerPushStream() *ProtoServerPushStream {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ServerPushStream); ok {
return x.ServerPushStream
}
}
return nil
}
type isProtoMessage_Payload interface {
isProtoMessage_Payload()
}
type ProtoMessage_MouseMove struct {
// Input types
MouseMove *ProtoMouseMove `protobuf:"bytes,2,opt,name=mouse_move,json=mouseMove,proto3,oneof"`
}
type ProtoMessage_MouseMoveAbs struct {
MouseMoveAbs *ProtoMouseMoveAbs `protobuf:"bytes,3,opt,name=mouse_move_abs,json=mouseMoveAbs,proto3,oneof"`
}
type ProtoMessage_MouseWheel struct {
MouseWheel *ProtoMouseWheel `protobuf:"bytes,4,opt,name=mouse_wheel,json=mouseWheel,proto3,oneof"`
}
type ProtoMessage_MouseKeyDown struct {
MouseKeyDown *ProtoMouseKeyDown `protobuf:"bytes,5,opt,name=mouse_key_down,json=mouseKeyDown,proto3,oneof"`
}
type ProtoMessage_MouseKeyUp struct {
MouseKeyUp *ProtoMouseKeyUp `protobuf:"bytes,6,opt,name=mouse_key_up,json=mouseKeyUp,proto3,oneof"`
}
type ProtoMessage_KeyDown struct {
KeyDown *ProtoKeyDown `protobuf:"bytes,7,opt,name=key_down,json=keyDown,proto3,oneof"`
}
type ProtoMessage_KeyUp struct {
KeyUp *ProtoKeyUp `protobuf:"bytes,8,opt,name=key_up,json=keyUp,proto3,oneof"`
}
type ProtoMessage_ControllerAttach struct {
// Controller input types
ControllerAttach *ProtoControllerAttach `protobuf:"bytes,9,opt,name=controller_attach,json=controllerAttach,proto3,oneof"`
}
type ProtoMessage_ControllerDetach struct {
ControllerDetach *ProtoControllerDetach `protobuf:"bytes,10,opt,name=controller_detach,json=controllerDetach,proto3,oneof"`
}
type ProtoMessage_ControllerRumble struct {
ControllerRumble *ProtoControllerRumble `protobuf:"bytes,11,opt,name=controller_rumble,json=controllerRumble,proto3,oneof"`
}
type ProtoMessage_ControllerStateBatch struct {
ControllerStateBatch *ProtoControllerStateBatch `protobuf:"bytes,12,opt,name=controller_state_batch,json=controllerStateBatch,proto3,oneof"`
}
type ProtoMessage_Ice struct {
// Signaling types
Ice *ProtoICE `protobuf:"bytes,20,opt,name=ice,proto3,oneof"`
}
type ProtoMessage_Sdp struct {
Sdp *ProtoSDP `protobuf:"bytes,21,opt,name=sdp,proto3,oneof"`
}
type ProtoMessage_Raw struct {
Raw *ProtoRaw `protobuf:"bytes,22,opt,name=raw,proto3,oneof"`
}
type ProtoMessage_ClientRequestRoomStream struct {
ClientRequestRoomStream *ProtoClientRequestRoomStream `protobuf:"bytes,23,opt,name=client_request_room_stream,json=clientRequestRoomStream,proto3,oneof"`
}
type ProtoMessage_ClientDisconnected struct {
ClientDisconnected *ProtoClientDisconnected `protobuf:"bytes,24,opt,name=client_disconnected,json=clientDisconnected,proto3,oneof"`
}
type ProtoMessage_ServerPushStream struct {
ServerPushStream *ProtoServerPushStream `protobuf:"bytes,25,opt,name=server_push_stream,json=serverPushStream,proto3,oneof"`
}
func (*ProtoMessage_MouseMove) isProtoMessage_Payload() {}
func (*ProtoMessage_MouseMoveAbs) isProtoMessage_Payload() {}
func (*ProtoMessage_MouseWheel) isProtoMessage_Payload() {}
func (*ProtoMessage_MouseKeyDown) isProtoMessage_Payload() {}
func (*ProtoMessage_MouseKeyUp) isProtoMessage_Payload() {}
func (*ProtoMessage_KeyDown) isProtoMessage_Payload() {}
func (*ProtoMessage_KeyUp) isProtoMessage_Payload() {}
func (*ProtoMessage_ControllerAttach) isProtoMessage_Payload() {}
func (*ProtoMessage_ControllerDetach) isProtoMessage_Payload() {}
func (*ProtoMessage_ControllerRumble) isProtoMessage_Payload() {}
func (*ProtoMessage_ControllerStateBatch) isProtoMessage_Payload() {}
func (*ProtoMessage_Ice) isProtoMessage_Payload() {}
func (*ProtoMessage_Sdp) isProtoMessage_Payload() {}
func (*ProtoMessage_Raw) isProtoMessage_Payload() {}
func (*ProtoMessage_ClientRequestRoomStream) isProtoMessage_Payload() {}
func (*ProtoMessage_ClientDisconnected) isProtoMessage_Payload() {}
func (*ProtoMessage_ServerPushStream) isProtoMessage_Payload() {}
var File_messages_proto protoreflect.FileDescriptor
const file_messages_proto_rawDesc = "" +
@@ -132,10 +413,31 @@ const file_messages_proto_rawDesc = "" +
"\x0emessages.proto\x12\x05proto\x1a\vtypes.proto\x1a\x15latency_tracker.proto\"k\n" +
"\x10ProtoMessageBase\x12!\n" +
"\fpayload_type\x18\x01 \x01(\tR\vpayloadType\x124\n" +
"\alatency\x18\x02 \x01(\v2\x1a.proto.ProtoLatencyTrackerR\alatency\"v\n" +
"\x11ProtoMessageInput\x12:\n" +
"\fmessage_base\x18\x01 \x01(\v2\x17.proto.ProtoMessageBaseR\vmessageBase\x12%\n" +
"\x04data\x18\x02 \x01(\v2\x11.proto.ProtoInputR\x04dataB\x16Z\x14relay/internal/protob\x06proto3"
"\alatency\x18\x02 \x01(\v2\x1a.proto.ProtoLatencyTrackerR\alatency\"\x9b\t\n" +
"\fProtoMessage\x12:\n" +
"\fmessage_base\x18\x01 \x01(\v2\x17.proto.ProtoMessageBaseR\vmessageBase\x126\n" +
"\n" +
"mouse_move\x18\x02 \x01(\v2\x15.proto.ProtoMouseMoveH\x00R\tmouseMove\x12@\n" +
"\x0emouse_move_abs\x18\x03 \x01(\v2\x18.proto.ProtoMouseMoveAbsH\x00R\fmouseMoveAbs\x129\n" +
"\vmouse_wheel\x18\x04 \x01(\v2\x16.proto.ProtoMouseWheelH\x00R\n" +
"mouseWheel\x12@\n" +
"\x0emouse_key_down\x18\x05 \x01(\v2\x18.proto.ProtoMouseKeyDownH\x00R\fmouseKeyDown\x12:\n" +
"\fmouse_key_up\x18\x06 \x01(\v2\x16.proto.ProtoMouseKeyUpH\x00R\n" +
"mouseKeyUp\x120\n" +
"\bkey_down\x18\a \x01(\v2\x13.proto.ProtoKeyDownH\x00R\akeyDown\x12*\n" +
"\x06key_up\x18\b \x01(\v2\x11.proto.ProtoKeyUpH\x00R\x05keyUp\x12K\n" +
"\x11controller_attach\x18\t \x01(\v2\x1c.proto.ProtoControllerAttachH\x00R\x10controllerAttach\x12K\n" +
"\x11controller_detach\x18\n" +
" \x01(\v2\x1c.proto.ProtoControllerDetachH\x00R\x10controllerDetach\x12K\n" +
"\x11controller_rumble\x18\v \x01(\v2\x1c.proto.ProtoControllerRumbleH\x00R\x10controllerRumble\x12X\n" +
"\x16controller_state_batch\x18\f \x01(\v2 .proto.ProtoControllerStateBatchH\x00R\x14controllerStateBatch\x12#\n" +
"\x03ice\x18\x14 \x01(\v2\x0f.proto.ProtoICEH\x00R\x03ice\x12#\n" +
"\x03sdp\x18\x15 \x01(\v2\x0f.proto.ProtoSDPH\x00R\x03sdp\x12#\n" +
"\x03raw\x18\x16 \x01(\v2\x0f.proto.ProtoRawH\x00R\x03raw\x12b\n" +
"\x1aclient_request_room_stream\x18\x17 \x01(\v2#.proto.ProtoClientRequestRoomStreamH\x00R\x17clientRequestRoomStream\x12Q\n" +
"\x13client_disconnected\x18\x18 \x01(\v2\x1e.proto.ProtoClientDisconnectedH\x00R\x12clientDisconnected\x12L\n" +
"\x12server_push_stream\x18\x19 \x01(\v2\x1c.proto.ProtoServerPushStreamH\x00R\x10serverPushStreamB\t\n" +
"\apayloadB\x16Z\x14relay/internal/protob\x06proto3"
var (
file_messages_proto_rawDescOnce sync.Once
@@ -151,20 +453,52 @@ func file_messages_proto_rawDescGZIP() []byte {
var file_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_messages_proto_goTypes = []any{
(*ProtoMessageBase)(nil), // 0: proto.ProtoMessageBase
(*ProtoMessageInput)(nil), // 1: proto.ProtoMessageInput
(*ProtoLatencyTracker)(nil), // 2: proto.ProtoLatencyTracker
(*ProtoInput)(nil), // 3: proto.ProtoInput
(*ProtoMessageBase)(nil), // 0: proto.ProtoMessageBase
(*ProtoMessage)(nil), // 1: proto.ProtoMessage
(*ProtoLatencyTracker)(nil), // 2: proto.ProtoLatencyTracker
(*ProtoMouseMove)(nil), // 3: proto.ProtoMouseMove
(*ProtoMouseMoveAbs)(nil), // 4: proto.ProtoMouseMoveAbs
(*ProtoMouseWheel)(nil), // 5: proto.ProtoMouseWheel
(*ProtoMouseKeyDown)(nil), // 6: proto.ProtoMouseKeyDown
(*ProtoMouseKeyUp)(nil), // 7: proto.ProtoMouseKeyUp
(*ProtoKeyDown)(nil), // 8: proto.ProtoKeyDown
(*ProtoKeyUp)(nil), // 9: proto.ProtoKeyUp
(*ProtoControllerAttach)(nil), // 10: proto.ProtoControllerAttach
(*ProtoControllerDetach)(nil), // 11: proto.ProtoControllerDetach
(*ProtoControllerRumble)(nil), // 12: proto.ProtoControllerRumble
(*ProtoControllerStateBatch)(nil), // 13: proto.ProtoControllerStateBatch
(*ProtoICE)(nil), // 14: proto.ProtoICE
(*ProtoSDP)(nil), // 15: proto.ProtoSDP
(*ProtoRaw)(nil), // 16: proto.ProtoRaw
(*ProtoClientRequestRoomStream)(nil), // 17: proto.ProtoClientRequestRoomStream
(*ProtoClientDisconnected)(nil), // 18: proto.ProtoClientDisconnected
(*ProtoServerPushStream)(nil), // 19: proto.ProtoServerPushStream
}
var file_messages_proto_depIdxs = []int32{
2, // 0: proto.ProtoMessageBase.latency:type_name -> proto.ProtoLatencyTracker
0, // 1: proto.ProtoMessageInput.message_base:type_name -> proto.ProtoMessageBase
3, // 2: proto.ProtoMessageInput.data:type_name -> proto.ProtoInput
3, // [3:3] is the sub-list for method output_type
3, // [3:3] is the sub-list for method input_type
3, // [3:3] is the sub-list for extension type_name
3, // [3:3] is the sub-list for extension extendee
0, // [0:3] is the sub-list for field type_name
2, // 0: proto.ProtoMessageBase.latency:type_name -> proto.ProtoLatencyTracker
0, // 1: proto.ProtoMessage.message_base:type_name -> proto.ProtoMessageBase
3, // 2: proto.ProtoMessage.mouse_move:type_name -> proto.ProtoMouseMove
4, // 3: proto.ProtoMessage.mouse_move_abs:type_name -> proto.ProtoMouseMoveAbs
5, // 4: proto.ProtoMessage.mouse_wheel:type_name -> proto.ProtoMouseWheel
6, // 5: proto.ProtoMessage.mouse_key_down:type_name -> proto.ProtoMouseKeyDown
7, // 6: proto.ProtoMessage.mouse_key_up:type_name -> proto.ProtoMouseKeyUp
8, // 7: proto.ProtoMessage.key_down:type_name -> proto.ProtoKeyDown
9, // 8: proto.ProtoMessage.key_up:type_name -> proto.ProtoKeyUp
10, // 9: proto.ProtoMessage.controller_attach:type_name -> proto.ProtoControllerAttach
11, // 10: proto.ProtoMessage.controller_detach:type_name -> proto.ProtoControllerDetach
12, // 11: proto.ProtoMessage.controller_rumble:type_name -> proto.ProtoControllerRumble
13, // 12: proto.ProtoMessage.controller_state_batch:type_name -> proto.ProtoControllerStateBatch
14, // 13: proto.ProtoMessage.ice:type_name -> proto.ProtoICE
15, // 14: proto.ProtoMessage.sdp:type_name -> proto.ProtoSDP
16, // 15: proto.ProtoMessage.raw:type_name -> proto.ProtoRaw
17, // 16: proto.ProtoMessage.client_request_room_stream:type_name -> proto.ProtoClientRequestRoomStream
18, // 17: proto.ProtoMessage.client_disconnected:type_name -> proto.ProtoClientDisconnected
19, // 18: proto.ProtoMessage.server_push_stream:type_name -> proto.ProtoServerPushStream
19, // [19:19] is the sub-list for method output_type
19, // [19:19] is the sub-list for method input_type
19, // [19:19] is the sub-list for extension type_name
19, // [19:19] is the sub-list for extension extendee
0, // [0:19] is the sub-list for field type_name
}
func init() { file_messages_proto_init() }
@@ -174,6 +508,25 @@ func file_messages_proto_init() {
}
file_types_proto_init()
file_latency_tracker_proto_init()
file_messages_proto_msgTypes[1].OneofWrappers = []any{
(*ProtoMessage_MouseMove)(nil),
(*ProtoMessage_MouseMoveAbs)(nil),
(*ProtoMessage_MouseWheel)(nil),
(*ProtoMessage_MouseKeyDown)(nil),
(*ProtoMessage_MouseKeyUp)(nil),
(*ProtoMessage_KeyDown)(nil),
(*ProtoMessage_KeyUp)(nil),
(*ProtoMessage_ControllerAttach)(nil),
(*ProtoMessage_ControllerDetach)(nil),
(*ProtoMessage_ControllerRumble)(nil),
(*ProtoMessage_ControllerStateBatch)(nil),
(*ProtoMessage_Ice)(nil),
(*ProtoMessage_Sdp)(nil),
(*ProtoMessage_Raw)(nil),
(*ProtoMessage_ClientRequestRoomStream)(nil),
(*ProtoMessage_ClientDisconnected)(nil),
(*ProtoMessage_ServerPushStream)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{

File diff suppressed because it is too large Load Diff

View File

@@ -1,44 +1,139 @@
package shared
import (
"errors"
"fmt"
"io"
"log/slog"
"relay/internal/common"
"relay/internal/connections"
"sync"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/oklog/ulid/v2"
"github.com/pion/webrtc/v4"
)
type Participant struct {
ID ulid.ULID
SessionID string // Track session for reconnection
PeerID peer.ID // libp2p peer ID
PeerConnection *webrtc.PeerConnection
DataChannel *connections.NestriDataChannel
// Per-viewer tracks and channels
VideoTrack *webrtc.TrackLocalStaticRTP
AudioTrack *webrtc.TrackLocalStaticRTP
// Per-viewer RTP state for retiming
VideoSequenceNumber uint16
VideoTimestamp uint32
AudioSequenceNumber uint16
AudioTimestamp uint32
packetQueue chan *participantPacket
closeOnce sync.Once
}
func NewParticipant() (*Participant, error) {
func NewParticipant(sessionID string, peerID peer.ID) (*Participant, error) {
id, err := common.NewULID()
if err != nil {
return nil, fmt.Errorf("failed to create ULID for Participant: %w", err)
}
return &Participant{
ID: id,
}, nil
}
func (p *Participant) addTrack(trackLocal *webrtc.TrackLocalStaticRTP) error {
rtpSender, err := p.PeerConnection.AddTrack(trackLocal)
if err != nil {
return err
p := &Participant{
ID: id,
SessionID: sessionID,
PeerID: peerID,
VideoSequenceNumber: 0,
VideoTimestamp: 0,
AudioSequenceNumber: 0,
AudioTimestamp: 0,
packetQueue: make(chan *participantPacket, 1000),
}
go func() {
rtcpBuffer := make([]byte, 1400)
for {
if _, _, rtcpErr := rtpSender.Read(rtcpBuffer); rtcpErr != nil {
break
go p.packetWriter()
return p, nil
}
// SetTrack sets audio/video track for Participant
func (p *Participant) SetTrack(trackType webrtc.RTPCodecType, track *webrtc.TrackLocalStaticRTP) {
switch trackType {
case webrtc.RTPCodecTypeAudio:
p.AudioTrack = track
_, err := p.PeerConnection.AddTrack(track)
if err != nil {
slog.Error("Failed to add audio track", "participant", p.ID, "err", err)
}
case webrtc.RTPCodecTypeVideo:
p.VideoTrack = track
_, err := p.PeerConnection.AddTrack(track)
if err != nil {
slog.Error("Failed to add video track", "participant", p.ID, "err", err)
}
default:
slog.Warn("Unknown track type", "participant", p.ID, "trackType", trackType)
}
}
// Close cleans up participant resources
func (p *Participant) Close() {
p.closeOnce.Do(func() {
close(p.packetQueue)
})
if p.DataChannel != nil {
err := p.DataChannel.Close()
if err != nil {
slog.Error("Failed to close DataChannel", "participant", p.ID, "err", err)
}
p.DataChannel = nil
}
if p.PeerConnection != nil {
err := p.PeerConnection.Close()
if err != nil {
slog.Error("Failed to close PeerConnection", "participant", p.ID, "err", err)
}
p.PeerConnection = nil
}
if p.VideoTrack != nil {
p.VideoTrack = nil
}
if p.AudioTrack != nil {
p.AudioTrack = nil
}
}
func (p *Participant) packetWriter() {
for pkt := range p.packetQueue {
var track *webrtc.TrackLocalStaticRTP
var sequenceNumber uint16
var timestamp uint32
// No mutex needed - only this goroutine modifies these
if pkt.kind == webrtc.RTPCodecTypeAudio {
track = p.AudioTrack
p.AudioSequenceNumber = uint16(int(p.AudioSequenceNumber) + pkt.sequenceDiff)
p.AudioTimestamp = uint32(int64(p.AudioTimestamp) + pkt.timeDiff)
sequenceNumber = p.AudioSequenceNumber
timestamp = p.AudioTimestamp
} else {
track = p.VideoTrack
p.VideoSequenceNumber = uint16(int(p.VideoSequenceNumber) + pkt.sequenceDiff)
p.VideoTimestamp = uint32(int64(p.VideoTimestamp) + pkt.timeDiff)
sequenceNumber = p.VideoSequenceNumber
timestamp = p.VideoTimestamp
}
if track != nil {
pkt.packet.SequenceNumber = sequenceNumber
pkt.packet.Timestamp = timestamp
if err := track.WriteRTP(pkt.packet); err != nil && !errors.Is(err, io.ErrClosedPipe) {
slog.Error("WriteRTP failed", "participant", p.ID, "kind", pkt.kind, "err", err)
}
}
}()
return nil
// Return packet struct to pool
participantPacketPool.Put(pkt)
}
}

View File

@@ -2,14 +2,29 @@ package shared
import (
"log/slog"
"relay/internal/common"
"relay/internal/connections"
"sync"
"sync/atomic"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/oklog/ulid/v2"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
)
var participantPacketPool = sync.Pool{
New: func() interface{} {
return &participantPacket{}
},
}
type participantPacket struct {
kind webrtc.RTPCodecType
packet *rtp.Packet
timeDiff int64
sequenceDiff int
}
type RoomInfo struct {
ID ulid.ULID `json:"id"`
Name string `json:"name"`
@@ -18,49 +33,139 @@ type RoomInfo struct {
type Room struct {
RoomInfo
AudioCodec webrtc.RTPCodecCapability
VideoCodec webrtc.RTPCodecCapability
PeerConnection *webrtc.PeerConnection
AudioTrack *webrtc.TrackLocalStaticRTP
VideoTrack *webrtc.TrackLocalStaticRTP
DataChannel *connections.NestriDataChannel
Participants *common.SafeMap[ulid.ULID, *Participant]
// Atomic pointer to slice of participant channels
participantChannels atomic.Pointer[[]chan<- *participantPacket]
participantsMtx sync.Mutex // Use only for add/remove
Participants map[ulid.ULID]*Participant // Keep general track of Participant(s)
// Track last seen values to calculate diffs
LastVideoTimestamp uint32
LastVideoSequenceNumber uint16
LastAudioTimestamp uint32
LastAudioSequenceNumber uint16
VideoTimestampSet bool
VideoSequenceSet bool
AudioTimestampSet bool
AudioSequenceSet bool
}
func NewRoom(name string, roomID ulid.ULID, ownerID peer.ID) *Room {
return &Room{
r := &Room{
RoomInfo: RoomInfo{
ID: roomID,
Name: name,
OwnerID: ownerID,
},
Participants: common.NewSafeMap[ulid.ULID, *Participant](),
PeerConnection: nil,
DataChannel: nil,
Participants: make(map[ulid.ULID]*Participant),
}
emptyChannels := make([]chan<- *participantPacket, 0)
r.participantChannels.Store(&emptyChannels)
return r
}
// Close closes up Room (stream ended)
func (r *Room) Close() {
if r.DataChannel != nil {
err := r.DataChannel.Close()
if err != nil {
slog.Error("Failed to close Room DataChannel", err)
}
r.DataChannel = nil
}
if r.PeerConnection != nil {
err := r.PeerConnection.Close()
if err != nil {
slog.Error("Failed to close Room PeerConnection", err)
}
r.PeerConnection = nil
}
}
// AddParticipant adds a Participant to a Room
func (r *Room) AddParticipant(participant *Participant) {
slog.Debug("Adding participant to room", "participant", participant.ID, "room", r.Name)
r.Participants.Set(participant.ID, participant)
r.participantsMtx.Lock()
defer r.participantsMtx.Unlock()
r.Participants[participant.ID] = participant
// Update channel slice atomically
current := r.participantChannels.Load()
newChannels := make([]chan<- *participantPacket, len(*current)+1)
copy(newChannels, *current)
newChannels[len(*current)] = participant.packetQueue
r.participantChannels.Store(&newChannels)
slog.Debug("Added participant", "participant", participant.ID, "room", r.Name)
}
// Removes a Participant from a Room by participant's ID
func (r *Room) removeParticipantByID(pID ulid.ULID) {
if _, ok := r.Participants.Get(pID); ok {
r.Participants.Delete(pID)
// RemoveParticipantByID removes a Participant from a Room by participant's ID
func (r *Room) RemoveParticipantByID(pID ulid.ULID) {
r.participantsMtx.Lock()
defer r.participantsMtx.Unlock()
participant, ok := r.Participants[pID]
if !ok {
return
}
delete(r.Participants, pID)
// Update channel slice
current := r.participantChannels.Load()
newChannels := make([]chan<- *participantPacket, 0, len(*current)-1)
for _, ch := range *current {
if ch != participant.packetQueue {
newChannels = append(newChannels, ch)
}
}
r.participantChannels.Store(&newChannels)
slog.Debug("Removed participant", "participant", pID, "room", r.Name)
}
// IsOnline checks if the room is online (has both audio and video tracks)
// IsOnline checks if the room is online
func (r *Room) IsOnline() bool {
return r.AudioTrack != nil && r.VideoTrack != nil
return r.PeerConnection != nil
}
func (r *Room) SetTrack(trackType webrtc.RTPCodecType, track *webrtc.TrackLocalStaticRTP) {
switch trackType {
case webrtc.RTPCodecTypeAudio:
r.AudioTrack = track
case webrtc.RTPCodecTypeVideo:
r.VideoTrack = track
default:
slog.Warn("Unknown track type", "room", r.Name, "trackType", trackType)
func (r *Room) BroadcastPacketRetimed(kind webrtc.RTPCodecType, pkt *rtp.Packet, timeDiff int64, sequenceDiff int) {
// Lock-free load of channel slice
channels := r.participantChannels.Load()
// no participants..
if len(*channels) == 0 {
return
}
// Send to each participant channel (non-blocking)
for i, ch := range *channels {
// Get packet struct from pool
pp := participantPacketPool.Get().(*participantPacket)
pp.kind = kind
pp.packet = pkt.Clone()
pp.timeDiff = timeDiff
pp.sequenceDiff = sequenceDiff
select {
case ch <- pp:
// Sent successfully
default:
// Channel full, drop packet, log?
slog.Warn("Channel full, dropping packet", "channel_index", i)
participantPacketPool.Put(pp)
}
}
}