Files
netris-nestri/packages/relay/internal/core/state.go
Kristian Ollikainen c62a22b552 feat: Controller support, performance enchancements, multi-stage images, fixes (#304)
## Description
Oops.. another massive PR 🥲 

This PR contains multiple improvements and changes.

Firstly, thanks gst-wayland-display's PR
[here](https://github.com/games-on-whales/gst-wayland-display/pull/20).
NVIDIA path is now way more efficient than before.

Secondly, adding controller support was a massive hurdle, requiring me
to start another project
[vimputti](https://github.com/DatCaptainHorse/vimputti) - which allows
simple virtual controller inputs in isolated containers. Well, it's not
simple, it includes LD_PRELOAD shims and other craziness, but the
library API is simple to use..

Thirdly, split runner image into 3 separate stages, base + build +
runtime, should help keep things in check in future, also added GitHub
Actions CI builds for v2 to v4 builds (hopefully they pass..).

Fourth, replaced the runner's runtime Steam patching with better and
simpler bubblewrap patch, massive thanks to `games-on-whales` to
figuring it out better!

Fifth, relay for once needed some changes, the new changes are still
mostly WIP, but I'll deal with them next time I have energy.. I'm spent
now. Needed to include these changes as relay needed a minor change to
allow rumble events to flow back to client peer.

Sixth.. tons of package updates, minor code improvements and the usual. 

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* End-to-end gamepad/controller support (attach/detach, buttons, sticks,
triggers, rumble) with client/server integration and virtual controller
plumbing.
  * Optional Prometheus metrics endpoint and WebTransport support.
  * Background vimputti manager process added for controller handling.

* **Improvements**
  * Multi-variant container image builds and streamlined runtime images.
  * Zero-copy video pipeline and encoder improvements for lower latency.
  * Updated Steam compat mapping and dependency/toolchain refreshes.

* **Bug Fixes**
* More robust GPU detection, input/fullscreen lifecycle,
startup/entrypoint, and container runtime fixes.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: DatCaptainHorse <DatCaptainHorse@users.noreply.github.com>
2025-10-20 11:20:05 +03:00

170 lines
5.2 KiB
Go

package core
import (
"context"
"encoding/json"
"errors"
"log/slog"
"relay/internal/shared"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)
// --- PubSub Message Handlers ---
// handleRoomStateMessages processes incoming room state updates from peers.
func (r *Relay) handleRoomStateMessages(ctx context.Context, sub *pubsub.Subscription) {
slog.Debug("Starting room state message handler...")
for {
select {
case <-ctx.Done():
slog.Info("Stopping room state message handler")
return
default:
msg, err := sub.Next(ctx)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, pubsub.ErrSubscriptionCancelled) || errors.Is(err, context.DeadlineExceeded) {
slog.Info("Room state subscription ended", "err", err)
return
}
slog.Error("Error receiving room state message", "err", err)
time.Sleep(1 * time.Second)
continue
}
if msg.GetFrom() == r.Host.ID() {
continue
}
var states []shared.RoomInfo
if err := json.Unmarshal(msg.Data, &states); err != nil {
slog.Error("Failed to unmarshal room states", "from", msg.GetFrom(), "data_len", len(msg.Data), "err", err)
continue
}
r.updateMeshRoomStates(msg.GetFrom(), states)
}
}
}
// handleRelayMetricsMessages processes incoming status updates from peers.
func (r *Relay) handleRelayMetricsMessages(ctx context.Context, sub *pubsub.Subscription) {
slog.Debug("Starting relay metrics message handler...")
for {
select {
case <-ctx.Done():
slog.Info("Stopping relay metrics message handler")
return
default:
msg, err := sub.Next(ctx)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, pubsub.ErrSubscriptionCancelled) || errors.Is(err, context.DeadlineExceeded) {
slog.Info("Relay metrics subscription ended", "err", err)
return
}
slog.Error("Error receiving relay metrics message", "err", err)
time.Sleep(1 * time.Second)
continue
}
if msg.GetFrom() == r.Host.ID() {
continue
}
var info PeerInfo
if err = json.Unmarshal(msg.Data, &info); err != nil {
slog.Error("Failed to unmarshal relay status", "from", msg.GetFrom(), "data_len", len(msg.Data), "err", err)
continue
}
if info.ID != msg.GetFrom() {
slog.Error("Peer ID mismatch in relay status", "expected", info.ID, "actual", msg.GetFrom())
continue
}
r.onPeerStatus(info)
}
}
}
// --- State Check Functions ---
// hasConnectedPeer checks if peer is in map and has a valid connection
func (r *Relay) hasConnectedPeer(peerID peer.ID) bool {
if _, ok := r.Peers.Get(peerID); !ok {
return false
}
if r.Host.Network().Connectedness(peerID) != network.Connected {
slog.Debug("Peer not connected", "peer", peerID)
return false
}
return true
}
// --- State Change Functions ---
// onPeerStatus updates the status of a peer based on received metrics, adding local perspective
func (r *Relay) onPeerStatus(recvInfo PeerInfo) {
r.Peers.Set(recvInfo.ID, &recvInfo)
}
// onPeerConnected is called when a new peer connects to the relay
func (r *Relay) onPeerConnected(peerID peer.ID) {
// Add to local peer map
r.Peers.Set(peerID, &PeerInfo{
ID: peerID,
})
slog.Info("Peer connected", "peer", peerID)
// Trigger immediate state exchange
go func() {
if err := r.publishRelayMetrics(context.Background()); err != nil {
slog.Error("Failed to publish relay metrics on connect", "err", err)
} else {
if err = r.publishRoomStates(context.Background()); err != nil {
slog.Error("Failed to publish room states on connect", "err", err)
}
}
}()
}
// onPeerDisconnected marks a peer as disconnected in our status view and removes latency info
func (r *Relay) onPeerDisconnected(peerID peer.ID) {
slog.Info("Mesh peer disconnected, deleting from local peer map", "peer", peerID)
// Remove peer from local mesh peers
if r.Peers.Has(peerID) {
r.Peers.Delete(peerID)
}
// Remove any rooms associated with this peer
if r.Rooms.Has(peerID.String()) {
r.Rooms.Delete(peerID.String())
}
// TODO: If any rooms were routed through this peer, handle that case
}
// updateMeshRoomStates merges received room states into the MeshRooms map
// TODO: Wrap in another type with timestamp or another mechanism to avoid conflicts
func (r *Relay) updateMeshRoomStates(peerID peer.ID, states []shared.RoomInfo) {
for _, state := range states {
if state.OwnerID == r.ID {
continue
}
// If previously did not exist, but does now, request a connection if participants exist for our room
existed := r.Rooms.Has(state.ID.String())
if !existed {
// Request connection to this peer if we have participants in our local room
if room, ok := r.LocalRooms.Get(state.ID); ok {
if room.Participants.Len() > 0 {
slog.Debug("Got new remote room state, we locally have participants for, requesting stream", "room_name", room.Name, "peer", peerID)
if err := r.StreamProtocol.RequestStream(context.Background(), room, peerID); err != nil {
slog.Error("Failed to request stream for new remote room state", "room_name", room.Name, "peer", peerID, "err", err)
}
}
}
}
r.Rooms.Set(state.ID.String(), state)
}
}