Restructure protobufs and use them everywhere

This commit is contained in:
DatCaptainHorse
2025-10-21 18:41:45 +03:00
parent 32341574dc
commit 67f9a7d0a0
37 changed files with 3455 additions and 3074 deletions

View File

@@ -3,16 +3,28 @@ package common
import (
"bufio"
"encoding/binary"
"encoding/json"
"errors"
"io"
gen "relay/internal/proto"
"sync"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/timestamppb"
)
// MaxSize is the maximum allowed data size (1MB)
const MaxSize = 1024 * 1024
// readUvarint reads an unsigned varint from the reader
func readUvarint(r io.ByteReader) (uint64, error) {
return binary.ReadUvarint(r)
}
// writeUvarint writes an unsigned varint to the writer
func writeUvarint(w io.Writer, x uint64) error {
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(buf, x)
_, err := w.Write(buf[:n])
return err
}
// SafeBufioRW wraps a bufio.ReadWriter for sending and receiving JSON and protobufs safely
type SafeBufioRW struct {
@@ -24,83 +36,6 @@ func NewSafeBufioRW(brw *bufio.ReadWriter) *SafeBufioRW {
return &SafeBufioRW{brw: brw}
}
// SendJSON serializes the given data as JSON and sends it with a 4-byte length prefix
func (bu *SafeBufioRW) SendJSON(data interface{}) error {
bu.mutex.Lock()
defer bu.mutex.Unlock()
jsonData, err := json.Marshal(data)
if err != nil {
return err
}
if len(jsonData) > MaxSize {
return errors.New("JSON data exceeds maximum size")
}
// Write the 4-byte length prefix
if err = binary.Write(bu.brw, binary.BigEndian, uint32(len(jsonData))); err != nil {
return err
}
// Write the JSON data
if _, err = bu.brw.Write(jsonData); err != nil {
return err
}
// Flush the writer to ensure data is sent
return bu.brw.Flush()
}
// ReceiveJSON reads a 4-byte length prefix, then reads and unmarshals the JSON
func (bu *SafeBufioRW) ReceiveJSON(dest interface{}) error {
bu.mutex.RLock()
defer bu.mutex.RUnlock()
// Read the 4-byte length prefix
var length uint32
if err := binary.Read(bu.brw, binary.BigEndian, &length); err != nil {
return err
}
if length > MaxSize {
return errors.New("received JSON data exceeds maximum size")
}
// Read the JSON data
data := make([]byte, length)
if _, err := io.ReadFull(bu.brw, data); err != nil {
return err
}
return json.Unmarshal(data, dest)
}
// Receive reads a 4-byte length prefix, then reads the raw data
func (bu *SafeBufioRW) Receive() ([]byte, error) {
bu.mutex.RLock()
defer bu.mutex.RUnlock()
// Read the 4-byte length prefix
var length uint32
if err := binary.Read(bu.brw, binary.BigEndian, &length); err != nil {
return nil, err
}
if length > MaxSize {
return nil, errors.New("received data exceeds maximum size")
}
// Read the raw data
data := make([]byte, length)
if _, err := io.ReadFull(bu.brw, data); err != nil {
return nil, err
}
return data, nil
}
// SendProto serializes the given protobuf message and sends it with a 4-byte length prefix
func (bu *SafeBufioRW) SendProto(msg proto.Message) error {
bu.mutex.Lock()
defer bu.mutex.Unlock()
@@ -110,12 +45,8 @@ func (bu *SafeBufioRW) SendProto(msg proto.Message) error {
return err
}
if len(protoData) > MaxSize {
return errors.New("protobuf data exceeds maximum size")
}
// Write the 4-byte length prefix
if err = binary.Write(bu.brw, binary.BigEndian, uint32(len(protoData))); err != nil {
// Write varint length prefix
if err := writeUvarint(bu.brw, uint64(len(protoData))); err != nil {
return err
}
@@ -124,25 +55,19 @@ func (bu *SafeBufioRW) SendProto(msg proto.Message) error {
return err
}
// Flush the writer to ensure data is sent
return bu.brw.Flush()
}
// ReceiveProto reads a 4-byte length prefix, then reads and unmarshals the protobuf
func (bu *SafeBufioRW) ReceiveProto(msg proto.Message) error {
bu.mutex.RLock()
defer bu.mutex.RUnlock()
// Read the 4-byte length prefix
var length uint32
if err := binary.Read(bu.brw, binary.BigEndian, &length); err != nil {
// Read varint length prefix
length, err := readUvarint(bu.brw)
if err != nil {
return err
}
if length > MaxSize {
return errors.New("received Protobuf data exceeds maximum size")
}
// Read the Protobuf data
data := make([]byte, length)
if _, err := io.ReadFull(bu.brw, data); err != nil {
@@ -152,24 +77,51 @@ func (bu *SafeBufioRW) ReceiveProto(msg proto.Message) error {
return proto.Unmarshal(data, msg)
}
// Write writes raw data to the underlying buffer
func (bu *SafeBufioRW) Write(data []byte) (int, error) {
bu.mutex.Lock()
defer bu.mutex.Unlock()
if len(data) > MaxSize {
return 0, errors.New("data exceeds maximum size")
}
n, err := bu.brw.Write(data)
if err != nil {
return n, err
}
// Flush the writer to ensure data is sent
if err = bu.brw.Flush(); err != nil {
return n, err
}
return n, nil
type CreateMessageOptions struct {
SequenceID string
Latency *gen.ProtoLatencyTracker
}
func CreateMessage(payload proto.Message, payloadType string, opts *CreateMessageOptions) (*gen.ProtoMessage, error) {
msg := &gen.ProtoMessage{
MessageBase: &gen.ProtoMessageBase{
PayloadType: payloadType,
},
}
if opts != nil {
if opts.Latency != nil {
msg.MessageBase.Latency = opts.Latency
} else if opts.SequenceID != "" {
msg.MessageBase.Latency = &gen.ProtoLatencyTracker{
SequenceId: opts.SequenceID,
Timestamps: []*gen.ProtoTimestampEntry{
{
Stage: "created",
Time: timestamppb.Now(),
},
},
}
}
}
// Use reflection to set the oneof field automatically
msgReflect := msg.ProtoReflect()
payloadReflect := payload.ProtoReflect()
oneofDesc := msgReflect.Descriptor().Oneofs().ByName("payload")
if oneofDesc == nil {
return nil, errors.New("payload oneof not found")
}
fields := oneofDesc.Fields()
for i := 0; i < fields.Len(); i++ {
field := fields.Get(i)
if field.Message() != nil && field.Message().FullName() == payloadReflect.Descriptor().FullName() {
msgReflect.Set(field, protoreflect.ValueOfMessage(payloadReflect))
return msg, nil
}
}
return nil, errors.New("payload type not found in oneof")
}

View File

@@ -31,16 +31,18 @@ func NewNestriDataChannel(dc *webrtc.DataChannel) *NestriDataChannel {
}
// Decode message
var base gen.ProtoMessageInput
var base gen.ProtoMessage
if err := proto.Unmarshal(msg.Data, &base); err != nil {
slog.Error("failed to decode binary DataChannel message", "err", err)
return
}
// Handle message type callback
if callback, ok := ndc.callbacks["input"]; ok {
go callback(msg.Data)
} // We don't care about unhandled messages
// Route based on PayloadType
if base.MessageBase != nil && len(base.MessageBase.PayloadType) > 0 {
if callback, ok := ndc.callbacks[base.MessageBase.PayloadType]; ok {
go callback(msg.Data)
}
}
})
return ndc

View File

@@ -1,94 +0,0 @@
package connections
import (
"encoding/json"
"relay/internal/common"
"github.com/pion/webrtc/v4"
)
// MessageBase is the base type for any JSON message
type MessageBase struct {
Type string `json:"payload_type"`
Latency *common.LatencyTracker `json:"latency,omitempty"`
}
type MessageRaw struct {
MessageBase
Data json.RawMessage `json:"data"`
}
func NewMessageRaw(t string, data json.RawMessage) *MessageRaw {
return &MessageRaw{
MessageBase: MessageBase{
Type: t,
},
Data: data,
}
}
type MessageLog struct {
MessageBase
Level string `json:"level"`
Message string `json:"message"`
Time string `json:"time"`
}
func NewMessageLog(t string, level, message, time string) *MessageLog {
return &MessageLog{
MessageBase: MessageBase{
Type: t,
},
Level: level,
Message: message,
Time: time,
}
}
type MessageMetrics struct {
MessageBase
UsageCPU float64 `json:"usage_cpu"`
UsageMemory float64 `json:"usage_memory"`
Uptime uint64 `json:"uptime"`
PipelineLatency float64 `json:"pipeline_latency"`
}
func NewMessageMetrics(t string, usageCPU, usageMemory float64, uptime uint64, pipelineLatency float64) *MessageMetrics {
return &MessageMetrics{
MessageBase: MessageBase{
Type: t,
},
UsageCPU: usageCPU,
UsageMemory: usageMemory,
Uptime: uptime,
PipelineLatency: pipelineLatency,
}
}
type MessageICE struct {
MessageBase
Candidate webrtc.ICECandidateInit `json:"candidate"`
}
func NewMessageICE(t string, candidate webrtc.ICECandidateInit) *MessageICE {
return &MessageICE{
MessageBase: MessageBase{
Type: t,
},
Candidate: candidate,
}
}
type MessageSDP struct {
MessageBase
SDP webrtc.SessionDescription `json:"sdp"`
}
func NewMessageSDP(t string, sdp webrtc.SessionDescription) *MessageSDP {
return &MessageSDP{
MessageBase: MessageBase{
Type: t,
},
SDP: sdp,
}
}

View File

@@ -10,6 +10,7 @@ import (
"os"
"relay/internal/common"
"relay/internal/shared"
"time"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -37,6 +38,16 @@ var globalRelay *Relay
// -- Structs --
// ClientSession tracks browser client connections
type ClientSession struct {
PeerID peer.ID
SessionID string
RoomName string
ConnectedAt time.Time
LastActivity time.Time
ControllerSlots []int32 // Track which controller slots this client owns
}
// Relay structure enhanced with metrics and state
type Relay struct {
*PeerInfo
@@ -48,6 +59,7 @@ type Relay struct {
// Local
LocalRooms *common.SafeMap[ulid.ULID, *shared.Room] // room ID -> local Room struct (hosted by this relay)
LocalMeshConnections *common.SafeMap[peer.ID, *webrtc.PeerConnection] // peer ID -> PeerConnection (connected to this relay)
ClientSessions *common.SafeMap[peer.ID, *ClientSession] // peer ID -> ClientSession
// Protocols
ProtocolRegistry
@@ -144,6 +156,7 @@ func NewRelay(ctx context.Context, port int, identityKey crypto.PrivKey) (*Relay
PingService: pingSvc,
LocalRooms: common.NewSafeMap[ulid.ULID, *shared.Room](),
LocalMeshConnections: common.NewSafeMap[peer.ID, *webrtc.PeerConnection](),
ClientSessions: common.NewSafeMap[peer.ID, *ClientSession](),
}
// Add network notifier after relay is initialized

File diff suppressed because it is too large Load Diff

View File

@@ -5,9 +5,14 @@ import (
"encoding/json"
"errors"
"log/slog"
"relay/internal/common"
"relay/internal/shared"
"time"
gen "relay/internal/proto"
"google.golang.org/protobuf/proto"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
@@ -129,12 +134,51 @@ func (r *Relay) onPeerConnected(peerID peer.ID) {
// onPeerDisconnected marks a peer as disconnected in our status view and removes latency info
func (r *Relay) onPeerDisconnected(peerID peer.ID) {
// Check if this was a client session disconnect
if session, ok := r.ClientSessions.Get(peerID); ok {
slog.Info("Client session disconnected",
"peer", peerID,
"session", session.SessionID,
"room", session.RoomName,
"controller_slots", session.ControllerSlots)
// Send cleanup message to nestri-server if client had active controllers
if len(session.ControllerSlots) > 0 {
room := r.GetRoomByName(session.RoomName)
if room != nil && room.DataChannel != nil {
// Create disconnect notification
disconnectMsg, err := common.CreateMessage(&gen.ProtoClientDisconnected{
SessionId: session.SessionID,
ControllerSlots: session.ControllerSlots,
}, "client-disconnected", nil)
if err != nil {
slog.Error("Failed to create client disconnect message", "err", err)
}
disMarshal, err := proto.Marshal(disconnectMsg)
if err != nil {
slog.Error("Failed to marshal client disconnect message", "err", err)
} else {
if err = room.DataChannel.SendBinary(disMarshal); err != nil {
slog.Error("Failed to send client disconnect notification", "err", err)
} else {
slog.Info("Sent controller cleanup notification to nestri-server",
"session", session.SessionID,
"slots", session.ControllerSlots)
}
}
}
}
r.ClientSessions.Delete(peerID)
return
}
// Relay peer disconnect handling
slog.Info("Mesh peer disconnected, deleting from local peer map", "peer", peerID)
// Remove peer from local mesh peers
if r.Peers.Has(peerID) {
r.Peers.Delete(peerID)
}
// Remove any rooms associated with this peer
if r.Rooms.Has(peerID.String()) {
r.Rooms.Delete(peerID.String())
}

View File

@@ -73,28 +73,50 @@ func (x *ProtoMessageBase) GetLatency() *ProtoLatencyTracker {
return nil
}
type ProtoMessageInput struct {
state protoimpl.MessageState `protogen:"open.v1"`
MessageBase *ProtoMessageBase `protobuf:"bytes,1,opt,name=message_base,json=messageBase,proto3" json:"message_base,omitempty"`
Data *ProtoInput `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
type ProtoMessage struct {
state protoimpl.MessageState `protogen:"open.v1"`
MessageBase *ProtoMessageBase `protobuf:"bytes,1,opt,name=message_base,json=messageBase,proto3" json:"message_base,omitempty"`
// Types that are valid to be assigned to Payload:
//
// *ProtoMessage_MouseMove
// *ProtoMessage_MouseMoveAbs
// *ProtoMessage_MouseWheel
// *ProtoMessage_MouseKeyDown
// *ProtoMessage_MouseKeyUp
// *ProtoMessage_KeyDown
// *ProtoMessage_KeyUp
// *ProtoMessage_ControllerAttach
// *ProtoMessage_ControllerDetach
// *ProtoMessage_ControllerButton
// *ProtoMessage_ControllerTrigger
// *ProtoMessage_ControllerStick
// *ProtoMessage_ControllerAxis
// *ProtoMessage_ControllerRumble
// *ProtoMessage_Ice
// *ProtoMessage_Sdp
// *ProtoMessage_Raw
// *ProtoMessage_ClientRequestRoomStream
// *ProtoMessage_ClientDisconnected
// *ProtoMessage_ServerPushStream
Payload isProtoMessage_Payload `protobuf_oneof:"payload"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ProtoMessageInput) Reset() {
*x = ProtoMessageInput{}
func (x *ProtoMessage) Reset() {
*x = ProtoMessage{}
mi := &file_messages_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ProtoMessageInput) String() string {
func (x *ProtoMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ProtoMessageInput) ProtoMessage() {}
func (*ProtoMessage) ProtoMessage() {}
func (x *ProtoMessageInput) ProtoReflect() protoreflect.Message {
func (x *ProtoMessage) ProtoReflect() protoreflect.Message {
mi := &file_messages_proto_msgTypes[1]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -106,25 +128,331 @@ func (x *ProtoMessageInput) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
}
// Deprecated: Use ProtoMessageInput.ProtoReflect.Descriptor instead.
func (*ProtoMessageInput) Descriptor() ([]byte, []int) {
// Deprecated: Use ProtoMessage.ProtoReflect.Descriptor instead.
func (*ProtoMessage) Descriptor() ([]byte, []int) {
return file_messages_proto_rawDescGZIP(), []int{1}
}
func (x *ProtoMessageInput) GetMessageBase() *ProtoMessageBase {
func (x *ProtoMessage) GetMessageBase() *ProtoMessageBase {
if x != nil {
return x.MessageBase
}
return nil
}
func (x *ProtoMessageInput) GetData() *ProtoInput {
func (x *ProtoMessage) GetPayload() isProtoMessage_Payload {
if x != nil {
return x.Data
return x.Payload
}
return nil
}
func (x *ProtoMessage) GetMouseMove() *ProtoMouseMove {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_MouseMove); ok {
return x.MouseMove
}
}
return nil
}
func (x *ProtoMessage) GetMouseMoveAbs() *ProtoMouseMoveAbs {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_MouseMoveAbs); ok {
return x.MouseMoveAbs
}
}
return nil
}
func (x *ProtoMessage) GetMouseWheel() *ProtoMouseWheel {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_MouseWheel); ok {
return x.MouseWheel
}
}
return nil
}
func (x *ProtoMessage) GetMouseKeyDown() *ProtoMouseKeyDown {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_MouseKeyDown); ok {
return x.MouseKeyDown
}
}
return nil
}
func (x *ProtoMessage) GetMouseKeyUp() *ProtoMouseKeyUp {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_MouseKeyUp); ok {
return x.MouseKeyUp
}
}
return nil
}
func (x *ProtoMessage) GetKeyDown() *ProtoKeyDown {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_KeyDown); ok {
return x.KeyDown
}
}
return nil
}
func (x *ProtoMessage) GetKeyUp() *ProtoKeyUp {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_KeyUp); ok {
return x.KeyUp
}
}
return nil
}
func (x *ProtoMessage) GetControllerAttach() *ProtoControllerAttach {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ControllerAttach); ok {
return x.ControllerAttach
}
}
return nil
}
func (x *ProtoMessage) GetControllerDetach() *ProtoControllerDetach {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ControllerDetach); ok {
return x.ControllerDetach
}
}
return nil
}
func (x *ProtoMessage) GetControllerButton() *ProtoControllerButton {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ControllerButton); ok {
return x.ControllerButton
}
}
return nil
}
func (x *ProtoMessage) GetControllerTrigger() *ProtoControllerTrigger {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ControllerTrigger); ok {
return x.ControllerTrigger
}
}
return nil
}
func (x *ProtoMessage) GetControllerStick() *ProtoControllerStick {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ControllerStick); ok {
return x.ControllerStick
}
}
return nil
}
func (x *ProtoMessage) GetControllerAxis() *ProtoControllerAxis {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ControllerAxis); ok {
return x.ControllerAxis
}
}
return nil
}
func (x *ProtoMessage) GetControllerRumble() *ProtoControllerRumble {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ControllerRumble); ok {
return x.ControllerRumble
}
}
return nil
}
func (x *ProtoMessage) GetIce() *ProtoICE {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_Ice); ok {
return x.Ice
}
}
return nil
}
func (x *ProtoMessage) GetSdp() *ProtoSDP {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_Sdp); ok {
return x.Sdp
}
}
return nil
}
func (x *ProtoMessage) GetRaw() *ProtoRaw {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_Raw); ok {
return x.Raw
}
}
return nil
}
func (x *ProtoMessage) GetClientRequestRoomStream() *ProtoClientRequestRoomStream {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ClientRequestRoomStream); ok {
return x.ClientRequestRoomStream
}
}
return nil
}
func (x *ProtoMessage) GetClientDisconnected() *ProtoClientDisconnected {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ClientDisconnected); ok {
return x.ClientDisconnected
}
}
return nil
}
func (x *ProtoMessage) GetServerPushStream() *ProtoServerPushStream {
if x != nil {
if x, ok := x.Payload.(*ProtoMessage_ServerPushStream); ok {
return x.ServerPushStream
}
}
return nil
}
type isProtoMessage_Payload interface {
isProtoMessage_Payload()
}
type ProtoMessage_MouseMove struct {
// Input types
MouseMove *ProtoMouseMove `protobuf:"bytes,2,opt,name=mouse_move,json=mouseMove,proto3,oneof"`
}
type ProtoMessage_MouseMoveAbs struct {
MouseMoveAbs *ProtoMouseMoveAbs `protobuf:"bytes,3,opt,name=mouse_move_abs,json=mouseMoveAbs,proto3,oneof"`
}
type ProtoMessage_MouseWheel struct {
MouseWheel *ProtoMouseWheel `protobuf:"bytes,4,opt,name=mouse_wheel,json=mouseWheel,proto3,oneof"`
}
type ProtoMessage_MouseKeyDown struct {
MouseKeyDown *ProtoMouseKeyDown `protobuf:"bytes,5,opt,name=mouse_key_down,json=mouseKeyDown,proto3,oneof"`
}
type ProtoMessage_MouseKeyUp struct {
MouseKeyUp *ProtoMouseKeyUp `protobuf:"bytes,6,opt,name=mouse_key_up,json=mouseKeyUp,proto3,oneof"`
}
type ProtoMessage_KeyDown struct {
KeyDown *ProtoKeyDown `protobuf:"bytes,7,opt,name=key_down,json=keyDown,proto3,oneof"`
}
type ProtoMessage_KeyUp struct {
KeyUp *ProtoKeyUp `protobuf:"bytes,8,opt,name=key_up,json=keyUp,proto3,oneof"`
}
type ProtoMessage_ControllerAttach struct {
ControllerAttach *ProtoControllerAttach `protobuf:"bytes,9,opt,name=controller_attach,json=controllerAttach,proto3,oneof"`
}
type ProtoMessage_ControllerDetach struct {
ControllerDetach *ProtoControllerDetach `protobuf:"bytes,10,opt,name=controller_detach,json=controllerDetach,proto3,oneof"`
}
type ProtoMessage_ControllerButton struct {
ControllerButton *ProtoControllerButton `protobuf:"bytes,11,opt,name=controller_button,json=controllerButton,proto3,oneof"`
}
type ProtoMessage_ControllerTrigger struct {
ControllerTrigger *ProtoControllerTrigger `protobuf:"bytes,12,opt,name=controller_trigger,json=controllerTrigger,proto3,oneof"`
}
type ProtoMessage_ControllerStick struct {
ControllerStick *ProtoControllerStick `protobuf:"bytes,13,opt,name=controller_stick,json=controllerStick,proto3,oneof"`
}
type ProtoMessage_ControllerAxis struct {
ControllerAxis *ProtoControllerAxis `protobuf:"bytes,14,opt,name=controller_axis,json=controllerAxis,proto3,oneof"`
}
type ProtoMessage_ControllerRumble struct {
ControllerRumble *ProtoControllerRumble `protobuf:"bytes,15,opt,name=controller_rumble,json=controllerRumble,proto3,oneof"`
}
type ProtoMessage_Ice struct {
// Signaling types
Ice *ProtoICE `protobuf:"bytes,20,opt,name=ice,proto3,oneof"`
}
type ProtoMessage_Sdp struct {
Sdp *ProtoSDP `protobuf:"bytes,21,opt,name=sdp,proto3,oneof"`
}
type ProtoMessage_Raw struct {
Raw *ProtoRaw `protobuf:"bytes,22,opt,name=raw,proto3,oneof"`
}
type ProtoMessage_ClientRequestRoomStream struct {
ClientRequestRoomStream *ProtoClientRequestRoomStream `protobuf:"bytes,23,opt,name=client_request_room_stream,json=clientRequestRoomStream,proto3,oneof"`
}
type ProtoMessage_ClientDisconnected struct {
ClientDisconnected *ProtoClientDisconnected `protobuf:"bytes,24,opt,name=client_disconnected,json=clientDisconnected,proto3,oneof"`
}
type ProtoMessage_ServerPushStream struct {
ServerPushStream *ProtoServerPushStream `protobuf:"bytes,25,opt,name=server_push_stream,json=serverPushStream,proto3,oneof"`
}
func (*ProtoMessage_MouseMove) isProtoMessage_Payload() {}
func (*ProtoMessage_MouseMoveAbs) isProtoMessage_Payload() {}
func (*ProtoMessage_MouseWheel) isProtoMessage_Payload() {}
func (*ProtoMessage_MouseKeyDown) isProtoMessage_Payload() {}
func (*ProtoMessage_MouseKeyUp) isProtoMessage_Payload() {}
func (*ProtoMessage_KeyDown) isProtoMessage_Payload() {}
func (*ProtoMessage_KeyUp) isProtoMessage_Payload() {}
func (*ProtoMessage_ControllerAttach) isProtoMessage_Payload() {}
func (*ProtoMessage_ControllerDetach) isProtoMessage_Payload() {}
func (*ProtoMessage_ControllerButton) isProtoMessage_Payload() {}
func (*ProtoMessage_ControllerTrigger) isProtoMessage_Payload() {}
func (*ProtoMessage_ControllerStick) isProtoMessage_Payload() {}
func (*ProtoMessage_ControllerAxis) isProtoMessage_Payload() {}
func (*ProtoMessage_ControllerRumble) isProtoMessage_Payload() {}
func (*ProtoMessage_Ice) isProtoMessage_Payload() {}
func (*ProtoMessage_Sdp) isProtoMessage_Payload() {}
func (*ProtoMessage_Raw) isProtoMessage_Payload() {}
func (*ProtoMessage_ClientRequestRoomStream) isProtoMessage_Payload() {}
func (*ProtoMessage_ClientDisconnected) isProtoMessage_Payload() {}
func (*ProtoMessage_ServerPushStream) isProtoMessage_Payload() {}
var File_messages_proto protoreflect.FileDescriptor
const file_messages_proto_rawDesc = "" +
@@ -132,10 +460,35 @@ const file_messages_proto_rawDesc = "" +
"\x0emessages.proto\x12\x05proto\x1a\vtypes.proto\x1a\x15latency_tracker.proto\"k\n" +
"\x10ProtoMessageBase\x12!\n" +
"\fpayload_type\x18\x01 \x01(\tR\vpayloadType\x124\n" +
"\alatency\x18\x02 \x01(\v2\x1a.proto.ProtoLatencyTrackerR\alatency\"v\n" +
"\x11ProtoMessageInput\x12:\n" +
"\fmessage_base\x18\x01 \x01(\v2\x17.proto.ProtoMessageBaseR\vmessageBase\x12%\n" +
"\x04data\x18\x02 \x01(\v2\x11.proto.ProtoInputR\x04dataB\x16Z\x14relay/internal/protob\x06proto3"
"\alatency\x18\x02 \x01(\v2\x1a.proto.ProtoLatencyTrackerR\alatency\"\xef\n" +
"\n" +
"\fProtoMessage\x12:\n" +
"\fmessage_base\x18\x01 \x01(\v2\x17.proto.ProtoMessageBaseR\vmessageBase\x126\n" +
"\n" +
"mouse_move\x18\x02 \x01(\v2\x15.proto.ProtoMouseMoveH\x00R\tmouseMove\x12@\n" +
"\x0emouse_move_abs\x18\x03 \x01(\v2\x18.proto.ProtoMouseMoveAbsH\x00R\fmouseMoveAbs\x129\n" +
"\vmouse_wheel\x18\x04 \x01(\v2\x16.proto.ProtoMouseWheelH\x00R\n" +
"mouseWheel\x12@\n" +
"\x0emouse_key_down\x18\x05 \x01(\v2\x18.proto.ProtoMouseKeyDownH\x00R\fmouseKeyDown\x12:\n" +
"\fmouse_key_up\x18\x06 \x01(\v2\x16.proto.ProtoMouseKeyUpH\x00R\n" +
"mouseKeyUp\x120\n" +
"\bkey_down\x18\a \x01(\v2\x13.proto.ProtoKeyDownH\x00R\akeyDown\x12*\n" +
"\x06key_up\x18\b \x01(\v2\x11.proto.ProtoKeyUpH\x00R\x05keyUp\x12K\n" +
"\x11controller_attach\x18\t \x01(\v2\x1c.proto.ProtoControllerAttachH\x00R\x10controllerAttach\x12K\n" +
"\x11controller_detach\x18\n" +
" \x01(\v2\x1c.proto.ProtoControllerDetachH\x00R\x10controllerDetach\x12K\n" +
"\x11controller_button\x18\v \x01(\v2\x1c.proto.ProtoControllerButtonH\x00R\x10controllerButton\x12N\n" +
"\x12controller_trigger\x18\f \x01(\v2\x1d.proto.ProtoControllerTriggerH\x00R\x11controllerTrigger\x12H\n" +
"\x10controller_stick\x18\r \x01(\v2\x1b.proto.ProtoControllerStickH\x00R\x0fcontrollerStick\x12E\n" +
"\x0fcontroller_axis\x18\x0e \x01(\v2\x1a.proto.ProtoControllerAxisH\x00R\x0econtrollerAxis\x12K\n" +
"\x11controller_rumble\x18\x0f \x01(\v2\x1c.proto.ProtoControllerRumbleH\x00R\x10controllerRumble\x12#\n" +
"\x03ice\x18\x14 \x01(\v2\x0f.proto.ProtoICEH\x00R\x03ice\x12#\n" +
"\x03sdp\x18\x15 \x01(\v2\x0f.proto.ProtoSDPH\x00R\x03sdp\x12#\n" +
"\x03raw\x18\x16 \x01(\v2\x0f.proto.ProtoRawH\x00R\x03raw\x12b\n" +
"\x1aclient_request_room_stream\x18\x17 \x01(\v2#.proto.ProtoClientRequestRoomStreamH\x00R\x17clientRequestRoomStream\x12Q\n" +
"\x13client_disconnected\x18\x18 \x01(\v2\x1e.proto.ProtoClientDisconnectedH\x00R\x12clientDisconnected\x12L\n" +
"\x12server_push_stream\x18\x19 \x01(\v2\x1c.proto.ProtoServerPushStreamH\x00R\x10serverPushStreamB\t\n" +
"\apayloadB\x16Z\x14relay/internal/protob\x06proto3"
var (
file_messages_proto_rawDescOnce sync.Once
@@ -151,20 +504,58 @@ func file_messages_proto_rawDescGZIP() []byte {
var file_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_messages_proto_goTypes = []any{
(*ProtoMessageBase)(nil), // 0: proto.ProtoMessageBase
(*ProtoMessageInput)(nil), // 1: proto.ProtoMessageInput
(*ProtoLatencyTracker)(nil), // 2: proto.ProtoLatencyTracker
(*ProtoInput)(nil), // 3: proto.ProtoInput
(*ProtoMessageBase)(nil), // 0: proto.ProtoMessageBase
(*ProtoMessage)(nil), // 1: proto.ProtoMessage
(*ProtoLatencyTracker)(nil), // 2: proto.ProtoLatencyTracker
(*ProtoMouseMove)(nil), // 3: proto.ProtoMouseMove
(*ProtoMouseMoveAbs)(nil), // 4: proto.ProtoMouseMoveAbs
(*ProtoMouseWheel)(nil), // 5: proto.ProtoMouseWheel
(*ProtoMouseKeyDown)(nil), // 6: proto.ProtoMouseKeyDown
(*ProtoMouseKeyUp)(nil), // 7: proto.ProtoMouseKeyUp
(*ProtoKeyDown)(nil), // 8: proto.ProtoKeyDown
(*ProtoKeyUp)(nil), // 9: proto.ProtoKeyUp
(*ProtoControllerAttach)(nil), // 10: proto.ProtoControllerAttach
(*ProtoControllerDetach)(nil), // 11: proto.ProtoControllerDetach
(*ProtoControllerButton)(nil), // 12: proto.ProtoControllerButton
(*ProtoControllerTrigger)(nil), // 13: proto.ProtoControllerTrigger
(*ProtoControllerStick)(nil), // 14: proto.ProtoControllerStick
(*ProtoControllerAxis)(nil), // 15: proto.ProtoControllerAxis
(*ProtoControllerRumble)(nil), // 16: proto.ProtoControllerRumble
(*ProtoICE)(nil), // 17: proto.ProtoICE
(*ProtoSDP)(nil), // 18: proto.ProtoSDP
(*ProtoRaw)(nil), // 19: proto.ProtoRaw
(*ProtoClientRequestRoomStream)(nil), // 20: proto.ProtoClientRequestRoomStream
(*ProtoClientDisconnected)(nil), // 21: proto.ProtoClientDisconnected
(*ProtoServerPushStream)(nil), // 22: proto.ProtoServerPushStream
}
var file_messages_proto_depIdxs = []int32{
2, // 0: proto.ProtoMessageBase.latency:type_name -> proto.ProtoLatencyTracker
0, // 1: proto.ProtoMessageInput.message_base:type_name -> proto.ProtoMessageBase
3, // 2: proto.ProtoMessageInput.data:type_name -> proto.ProtoInput
3, // [3:3] is the sub-list for method output_type
3, // [3:3] is the sub-list for method input_type
3, // [3:3] is the sub-list for extension type_name
3, // [3:3] is the sub-list for extension extendee
0, // [0:3] is the sub-list for field type_name
2, // 0: proto.ProtoMessageBase.latency:type_name -> proto.ProtoLatencyTracker
0, // 1: proto.ProtoMessage.message_base:type_name -> proto.ProtoMessageBase
3, // 2: proto.ProtoMessage.mouse_move:type_name -> proto.ProtoMouseMove
4, // 3: proto.ProtoMessage.mouse_move_abs:type_name -> proto.ProtoMouseMoveAbs
5, // 4: proto.ProtoMessage.mouse_wheel:type_name -> proto.ProtoMouseWheel
6, // 5: proto.ProtoMessage.mouse_key_down:type_name -> proto.ProtoMouseKeyDown
7, // 6: proto.ProtoMessage.mouse_key_up:type_name -> proto.ProtoMouseKeyUp
8, // 7: proto.ProtoMessage.key_down:type_name -> proto.ProtoKeyDown
9, // 8: proto.ProtoMessage.key_up:type_name -> proto.ProtoKeyUp
10, // 9: proto.ProtoMessage.controller_attach:type_name -> proto.ProtoControllerAttach
11, // 10: proto.ProtoMessage.controller_detach:type_name -> proto.ProtoControllerDetach
12, // 11: proto.ProtoMessage.controller_button:type_name -> proto.ProtoControllerButton
13, // 12: proto.ProtoMessage.controller_trigger:type_name -> proto.ProtoControllerTrigger
14, // 13: proto.ProtoMessage.controller_stick:type_name -> proto.ProtoControllerStick
15, // 14: proto.ProtoMessage.controller_axis:type_name -> proto.ProtoControllerAxis
16, // 15: proto.ProtoMessage.controller_rumble:type_name -> proto.ProtoControllerRumble
17, // 16: proto.ProtoMessage.ice:type_name -> proto.ProtoICE
18, // 17: proto.ProtoMessage.sdp:type_name -> proto.ProtoSDP
19, // 18: proto.ProtoMessage.raw:type_name -> proto.ProtoRaw
20, // 19: proto.ProtoMessage.client_request_room_stream:type_name -> proto.ProtoClientRequestRoomStream
21, // 20: proto.ProtoMessage.client_disconnected:type_name -> proto.ProtoClientDisconnected
22, // 21: proto.ProtoMessage.server_push_stream:type_name -> proto.ProtoServerPushStream
22, // [22:22] is the sub-list for method output_type
22, // [22:22] is the sub-list for method input_type
22, // [22:22] is the sub-list for extension type_name
22, // [22:22] is the sub-list for extension extendee
0, // [0:22] is the sub-list for field type_name
}
func init() { file_messages_proto_init() }
@@ -174,6 +565,28 @@ func file_messages_proto_init() {
}
file_types_proto_init()
file_latency_tracker_proto_init()
file_messages_proto_msgTypes[1].OneofWrappers = []any{
(*ProtoMessage_MouseMove)(nil),
(*ProtoMessage_MouseMoveAbs)(nil),
(*ProtoMessage_MouseWheel)(nil),
(*ProtoMessage_MouseKeyDown)(nil),
(*ProtoMessage_MouseKeyUp)(nil),
(*ProtoMessage_KeyDown)(nil),
(*ProtoMessage_KeyUp)(nil),
(*ProtoMessage_ControllerAttach)(nil),
(*ProtoMessage_ControllerDetach)(nil),
(*ProtoMessage_ControllerButton)(nil),
(*ProtoMessage_ControllerTrigger)(nil),
(*ProtoMessage_ControllerStick)(nil),
(*ProtoMessage_ControllerAxis)(nil),
(*ProtoMessage_ControllerRumble)(nil),
(*ProtoMessage_Ice)(nil),
(*ProtoMessage_Sdp)(nil),
(*ProtoMessage_Raw)(nil),
(*ProtoMessage_ClientRequestRoomStream)(nil),
(*ProtoMessage_ClientDisconnected)(nil),
(*ProtoMessage_ServerPushStream)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{

File diff suppressed because it is too large Load Diff

View File

@@ -2,43 +2,59 @@ package shared
import (
"fmt"
"log/slog"
"relay/internal/common"
"relay/internal/connections"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/oklog/ulid/v2"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
)
type Participant struct {
ID ulid.ULID
SessionID string // Track session for reconnection
PeerID peer.ID // libp2p peer ID
PeerConnection *webrtc.PeerConnection
DataChannel *connections.NestriDataChannel
// Per-viewer tracks and channels
VideoTrack *webrtc.TrackLocalStaticRTP
AudioTrack *webrtc.TrackLocalStaticRTP
VideoChan chan *rtp.Packet
AudioChan chan *rtp.Packet
}
func NewParticipant() (*Participant, error) {
func NewParticipant(sessionID string, peerID peer.ID) (*Participant, error) {
id, err := common.NewULID()
if err != nil {
return nil, fmt.Errorf("failed to create ULID for Participant: %w", err)
}
return &Participant{
ID: id,
ID: id,
SessionID: sessionID,
PeerID: peerID,
VideoChan: make(chan *rtp.Packet, 500),
AudioChan: make(chan *rtp.Packet, 100),
}, nil
}
func (p *Participant) addTrack(trackLocal *webrtc.TrackLocalStaticRTP) error {
rtpSender, err := p.PeerConnection.AddTrack(trackLocal)
if err != nil {
return err
// Close cleans up participant resources
func (p *Participant) Close() {
if p.VideoChan != nil {
close(p.VideoChan)
p.VideoChan = nil
}
go func() {
rtcpBuffer := make([]byte, 1400)
for {
if _, _, rtcpErr := rtpSender.Read(rtcpBuffer); rtcpErr != nil {
break
}
if p.AudioChan != nil {
close(p.AudioChan)
p.AudioChan = nil
}
if p.PeerConnection != nil {
err := p.PeerConnection.Close()
if err != nil {
slog.Error("Failed to close Participant PeerConnection", err)
}
}()
return nil
p.PeerConnection = nil
}
}

View File

@@ -4,9 +4,11 @@ import (
"log/slog"
"relay/internal/common"
"relay/internal/connections"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/oklog/ulid/v2"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
)
@@ -23,17 +25,31 @@ type Room struct {
VideoTrack *webrtc.TrackLocalStaticRTP
DataChannel *connections.NestriDataChannel
Participants *common.SafeMap[ulid.ULID, *Participant]
// Broadcast queues (unbuffered, fan-out happens async)
videoBroadcastChan chan *rtp.Packet
audioBroadcastChan chan *rtp.Packet
broadcastStop chan struct{}
}
func NewRoom(name string, roomID ulid.ULID, ownerID peer.ID) *Room {
return &Room{
r := &Room{
RoomInfo: RoomInfo{
ID: roomID,
Name: name,
OwnerID: ownerID,
},
Participants: common.NewSafeMap[ulid.ULID, *Participant](),
Participants: common.NewSafeMap[ulid.ULID, *Participant](),
videoBroadcastChan: make(chan *rtp.Packet, 1000), // Large buffer for incoming packets
audioBroadcastChan: make(chan *rtp.Packet, 500),
broadcastStop: make(chan struct{}),
}
// Start async broadcasters
go r.videoBroadcaster()
go r.audioBroadcaster()
return r
}
// AddParticipant adds a Participant to a Room
@@ -42,8 +58,8 @@ func (r *Room) AddParticipant(participant *Participant) {
r.Participants.Set(participant.ID, participant)
}
// Removes a Participant from a Room by participant's ID
func (r *Room) removeParticipantByID(pID ulid.ULID) {
// RemoveParticipantByID removes a Participant from a Room by participant's ID
func (r *Room) RemoveParticipantByID(pID ulid.ULID) {
if _, ok := r.Participants.Get(pID); ok {
r.Participants.Delete(pID)
}
@@ -64,3 +80,92 @@ func (r *Room) SetTrack(trackType webrtc.RTPCodecType, track *webrtc.TrackLocalS
slog.Warn("Unknown track type", "room", r.Name, "trackType", trackType)
}
}
// BroadcastPacket enqueues packet for async broadcast (non-blocking)
func (r *Room) BroadcastPacket(kind webrtc.RTPCodecType, pkt *rtp.Packet) {
start := time.Now()
if kind == webrtc.RTPCodecTypeVideo {
select {
case r.videoBroadcastChan <- pkt:
duration := time.Since(start)
if duration > 10*time.Millisecond {
slog.Warn("Slow video broadcast enqueue", "duration", duration, "room", r.Name)
}
default:
// Broadcast queue full - system overload, drop packet globally
slog.Warn("Video broadcast queue full, dropping packet", "room", r.Name)
}
} else {
select {
case r.audioBroadcastChan <- pkt:
duration := time.Since(start)
if duration > 10*time.Millisecond {
slog.Warn("Slow audio broadcast enqueue", "duration", duration, "room", r.Name)
}
default:
slog.Warn("Audio broadcast queue full, dropping packet", "room", r.Name)
}
}
}
// Close stops the broadcasters
func (r *Room) Close() {
close(r.broadcastStop)
close(r.videoBroadcastChan)
close(r.audioBroadcastChan)
}
// videoBroadcaster runs async fan-out for video packets
func (r *Room) videoBroadcaster() {
for {
select {
case pkt := <-r.videoBroadcastChan:
// Fan out to all participants without blocking
r.Participants.Range(func(_ ulid.ULID, participant *Participant) bool {
if participant.VideoChan != nil {
// Clone packet for each participant to avoid shared pointer issues
clonedPkt := pkt.Clone()
select {
case participant.VideoChan <- clonedPkt:
// Sent
default:
// Participant slow, drop packet
slog.Debug("Dropped video packet for slow participant",
"room", r.Name,
"participant", participant.ID)
}
}
return true
})
case <-r.broadcastStop:
return
}
}
}
// audioBroadcaster runs async fan-out for audio packets
func (r *Room) audioBroadcaster() {
for {
select {
case pkt := <-r.audioBroadcastChan:
r.Participants.Range(func(_ ulid.ULID, participant *Participant) bool {
if participant.AudioChan != nil {
// Clone packet for each participant to avoid shared pointer issues
clonedPkt := pkt.Clone()
select {
case participant.AudioChan <- clonedPkt:
// Sent
default:
// Participant slow, drop packet
slog.Debug("Dropped audio packet for slow participant",
"room", r.Name,
"participant", participant.ID)
}
}
return true
})
case <-r.broadcastStop:
return
}
}
}