More multi-controller fixes, better controller polling logic, clean up dead relay code

This commit is contained in:
DatCaptainHorse
2025-11-01 00:53:15 +02:00
parent a54cf759fa
commit 1d88a03b93
18 changed files with 1149 additions and 1289 deletions

View File

@@ -10,7 +10,6 @@ import (
"os"
"relay/internal/common"
"relay/internal/shared"
"time"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -38,16 +37,6 @@ var globalRelay *Relay
// -- Structs --
// ClientSession tracks browser client connections
type ClientSession struct {
PeerID peer.ID
SessionID string
RoomName string
ConnectedAt time.Time
LastActivity time.Time
ControllerSlots []int32 // Track which controller slots this client owns
}
// Relay structure enhanced with metrics and state
type Relay struct {
*PeerInfo
@@ -59,7 +48,6 @@ type Relay struct {
// Local
LocalRooms *common.SafeMap[ulid.ULID, *shared.Room] // room ID -> local Room struct (hosted by this relay)
LocalMeshConnections *common.SafeMap[peer.ID, *webrtc.PeerConnection] // peer ID -> PeerConnection (connected to this relay)
ClientSessions *common.SafeMap[peer.ID, *ClientSession] // peer ID -> ClientSession
// Protocols
ProtocolRegistry
@@ -156,7 +144,6 @@ func NewRelay(ctx context.Context, port int, identityKey crypto.PrivKey) (*Relay
PingService: pingSvc,
LocalRooms: common.NewSafeMap[ulid.ULID, *shared.Room](),
LocalMeshConnections: common.NewSafeMap[peer.ID, *webrtc.PeerConnection](),
ClientSessions: common.NewSafeMap[peer.ID, *ClientSession](),
}
// Add network notifier after relay is initialized

View File

@@ -11,7 +11,6 @@ import (
"relay/internal/common"
"relay/internal/connections"
"relay/internal/shared"
"time"
gen "relay/internal/proto"
@@ -111,16 +110,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) {
sessionID = ulid.String()
}
session := &ClientSession{
PeerID: stream.Conn().RemotePeer(),
SessionID: sessionID,
RoomName: reqMsg.RoomName,
ConnectedAt: time.Now(),
LastActivity: time.Now(),
}
sp.relay.ClientSessions.Set(stream.Conn().RemotePeer(), session)
slog.Info("Client session established", "peer", session.PeerID, "session", sessionID, "room", reqMsg.RoomName)
slog.Info("Client session requested room stream", "session", sessionID, "room", reqMsg.RoomName)
// Send session ID back to client
sesMsg, err := common.CreateMessage(
@@ -177,7 +167,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) {
// Create participant for this viewer
participant, err := shared.NewParticipant(
"",
sessionID,
stream.Conn().RemotePeer(),
)
if err != nil {
@@ -185,11 +175,6 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) {
continue
}
// If this is a client session, link it
if session, ok := sp.relay.ClientSessions.Get(stream.Conn().RemotePeer()); ok {
participant.SessionID = session.SessionID
}
// Assign peer connection
participant.PeerConnection = pc
@@ -265,57 +250,9 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) {
// Track controller input separately
ndc.RegisterMessageCallback("controllerInput", func(data []byte) {
// Parse the message to track controller slots for client sessions
var msgWrapper gen.ProtoMessage
if err = proto.Unmarshal(data, &msgWrapper); err != nil {
var controllerMsgWrapper gen.ProtoMessage
if err = proto.Unmarshal(data, &controllerMsgWrapper); err != nil {
slog.Error("Failed to unmarshal controller input", "err", err)
} else if msgWrapper.Payload != nil {
// Get the peer ID for this connection
peerID := stream.Conn().RemotePeer()
// Check if it's a controller attach with assigned slot
if attach := msgWrapper.GetControllerAttach(); attach != nil && attach.SessionSlot >= 0 {
if session, ok := sp.relay.ClientSessions.Get(peerID); ok {
// Check if slot already tracked
hasSlot := false
for _, slot := range session.ControllerSlots {
if slot == attach.SessionSlot {
hasSlot = true
break
}
}
if !hasSlot {
session.ControllerSlots = append(session.ControllerSlots, attach.SessionSlot)
session.LastActivity = time.Now()
slog.Info("Controller slot assigned to client session",
"session", session.SessionID,
"slot", attach.SessionSlot,
"total_slots", len(session.ControllerSlots))
}
}
}
// Check if it's a controller detach
if detach := msgWrapper.GetControllerDetach(); detach != nil && detach.SessionSlot >= 0 {
if session, ok := sp.relay.ClientSessions.Get(peerID); ok {
newSlots := make([]int32, 0, len(session.ControllerSlots))
for _, slot := range session.ControllerSlots {
if slot != detach.SessionSlot {
newSlots = append(newSlots, slot)
}
}
session.ControllerSlots = newSlots
session.LastActivity = time.Now()
slog.Info("Controller slot removed from client session",
"session", session.SessionID,
"slot", detach.SessionSlot,
"remaining_slots", len(session.ControllerSlots))
}
}
// Update last activity on any controller input
if session, ok := sp.relay.ClientSessions.Get(peerID); ok {
session.LastActivity = time.Now()
}
}
// Forward to upstream room
@@ -609,7 +546,12 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) {
roomMap.Range(func(peerID peer.ID, conn *StreamConnection) bool {
if conn.ndc != nil {
if err = conn.ndc.SendBinary(data); err != nil {
slog.Error("Failed to forward controller input from pushed stream to viewer", "room", room.Name, "peer", peerID, "err", err)
if errors.Is(err, io.ErrClosedPipe) {
slog.Warn("Failed to forward controller input to viewer, treating as disconnected", "err", err)
sp.relay.onPeerDisconnected(peerID)
} else {
slog.Error("Failed to forward controller input from pushed stream to viewer", "room", room.Name, "peer", peerID, "err", err)
}
}
}
return true

View File

@@ -5,14 +5,9 @@ import (
"encoding/json"
"errors"
"log/slog"
"relay/internal/common"
"relay/internal/shared"
"time"
gen "relay/internal/proto"
"google.golang.org/protobuf/proto"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
@@ -134,46 +129,6 @@ func (r *Relay) onPeerConnected(peerID peer.ID) {
// onPeerDisconnected marks a peer as disconnected in our status view and removes latency info
func (r *Relay) onPeerDisconnected(peerID peer.ID) {
// Check if this was a client session disconnect
if session, ok := r.ClientSessions.Get(peerID); ok {
slog.Info("Client session disconnected",
"peer", peerID,
"session", session.SessionID,
"room", session.RoomName,
"controller_slots", session.ControllerSlots)
// Send cleanup message to nestri-server if client had active controllers
if len(session.ControllerSlots) > 0 {
room := r.GetRoomByName(session.RoomName)
if room != nil && room.DataChannel != nil {
// Create disconnect notification
disconnectMsg, err := common.CreateMessage(&gen.ProtoClientDisconnected{
SessionId: session.SessionID,
ControllerSlots: session.ControllerSlots,
}, "client-disconnected", nil)
if err != nil {
slog.Error("Failed to create client disconnect message", "err", err)
}
disMarshal, err := proto.Marshal(disconnectMsg)
if err != nil {
slog.Error("Failed to marshal client disconnect message", "err", err)
} else {
if err = room.DataChannel.SendBinary(disMarshal); err != nil {
slog.Error("Failed to send client disconnect notification", "err", err)
} else {
slog.Info("Sent controller cleanup notification to nestri-server",
"session", session.SessionID,
"slots", session.ControllerSlots)
}
}
}
}
r.ClientSessions.Delete(peerID)
return
}
// Relay peer disconnect handling
slog.Info("Mesh peer disconnected, deleting from local peer map", "peer", peerID)
if r.Peers.Has(peerID) {