mirror of
https://github.com/nestriness/nestri.git
synced 2025-12-13 01:05:37 +02:00
## Description Whew, some stuff is still not re-implemented, but it's working! Rabbit's gonna explode with the amount of changes I reckon 😅 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a peer-to-peer relay system using libp2p with enhanced stream forwarding, room state synchronization, and mDNS peer discovery. - Added decentralized room and participant management, metrics publishing, and safe, size-limited, concurrent message streaming with robust framing and callback dispatching. - Implemented asynchronous, callback-driven message handling over custom libp2p streams replacing WebSocket signaling. - **Improvements** - Migrated signaling and stream protocols from WebSocket to libp2p, improving reliability and scalability. - Simplified configuration and environment variables, removing deprecated flags and adding persistent data support. - Enhanced logging, error handling, and connection management for better observability and robustness. - Refined RTP header extension registration and NAT IP handling for improved WebRTC performance. - **Bug Fixes** - Improved ICE candidate buffering and SDP negotiation in WebRTC connections. - Fixed NAT IP and UDP port range configuration issues. - **Refactor** - Modularized codebase, reorganized relay and server logic, and removed deprecated WebSocket-based components. - Streamlined message structures, removed obsolete enums and message types, and simplified SafeMap concurrency. - Replaced WebSocket signaling with libp2p stream protocols in server and relay components. - **Chores** - Updated and cleaned dependencies across Go, Rust, and JavaScript packages. - Added `.gitignore` for persistent data directory in relay package. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: DatCaptainHorse <DatCaptainHorse@users.noreply.github.com> Co-authored-by: Philipp Neumann <3daquawolf@gmail.com>
145 lines
4.1 KiB
Go
145 lines
4.1 KiB
Go
package shared
|
|
|
|
import (
|
|
"log/slog"
|
|
"relay/internal/common"
|
|
"relay/internal/connections"
|
|
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/oklog/ulid/v2"
|
|
"github.com/pion/webrtc/v4"
|
|
)
|
|
|
|
type RoomInfo struct {
|
|
ID ulid.ULID `json:"id"`
|
|
Name string `json:"name"`
|
|
OwnerID peer.ID `json:"owner_id"`
|
|
}
|
|
|
|
type Room struct {
|
|
RoomInfo
|
|
PeerConnection *webrtc.PeerConnection
|
|
AudioTrack *webrtc.TrackLocalStaticRTP
|
|
VideoTrack *webrtc.TrackLocalStaticRTP
|
|
DataChannel *connections.NestriDataChannel
|
|
Participants *common.SafeMap[ulid.ULID, *Participant]
|
|
}
|
|
|
|
func NewRoom(name string, roomID ulid.ULID, ownerID peer.ID) *Room {
|
|
return &Room{
|
|
RoomInfo: RoomInfo{
|
|
ID: roomID,
|
|
Name: name,
|
|
OwnerID: ownerID,
|
|
},
|
|
Participants: common.NewSafeMap[ulid.ULID, *Participant](),
|
|
}
|
|
}
|
|
|
|
// AddParticipant adds a Participant to a Room
|
|
func (r *Room) AddParticipant(participant *Participant) {
|
|
slog.Debug("Adding participant to room", "participant", participant.ID, "room", r.Name)
|
|
r.Participants.Set(participant.ID, participant)
|
|
}
|
|
|
|
// Removes a Participant from a Room by participant's ID
|
|
func (r *Room) removeParticipantByID(pID ulid.ULID) {
|
|
if _, ok := r.Participants.Get(pID); ok {
|
|
r.Participants.Delete(pID)
|
|
}
|
|
}
|
|
|
|
// Removes all participants from a Room
|
|
/*func (r *Room) removeAllParticipants() {
|
|
for id, participant := range r.Participants.Copy() {
|
|
if err := r.signalParticipantOffline(participant); err != nil {
|
|
slog.Error("Failed to signal participant offline", "participant", participant.ID, "room", r.Name, "err", err)
|
|
}
|
|
r.Participants.Delete(id)
|
|
slog.Debug("Removed participant from room", "participant", id, "room", r.Name)
|
|
}
|
|
}*/
|
|
|
|
// IsOnline checks if the room is online (has both audio and video tracks)
|
|
func (r *Room) IsOnline() bool {
|
|
return r.AudioTrack != nil && r.VideoTrack != nil
|
|
}
|
|
|
|
func (r *Room) SetTrack(trackType webrtc.RTPCodecType, track *webrtc.TrackLocalStaticRTP) {
|
|
//oldOnline := r.IsOnline()
|
|
|
|
switch trackType {
|
|
case webrtc.RTPCodecTypeAudio:
|
|
r.AudioTrack = track
|
|
case webrtc.RTPCodecTypeVideo:
|
|
r.VideoTrack = track
|
|
default:
|
|
slog.Warn("Unknown track type", "room", r.Name, "trackType", trackType)
|
|
}
|
|
|
|
/*newOnline := r.IsOnline()
|
|
if oldOnline != newOnline {
|
|
if newOnline {
|
|
slog.Debug("Room online, participants will be signaled", "room", r.Name)
|
|
r.signalParticipantsWithTracks()
|
|
} else {
|
|
slog.Debug("Room offline, signaling participants", "room", r.Name)
|
|
r.signalParticipantsOffline()
|
|
}
|
|
|
|
// TODO: Publish updated state to mesh
|
|
go func() {
|
|
if err := r.Relay.publishRoomStates(context.Background()); err != nil {
|
|
slog.Error("Failed to publish room states on change", "room", r.Name, "err", err)
|
|
}
|
|
}()
|
|
}*/
|
|
}
|
|
|
|
/* TODO: libp2p'ify
|
|
func (r *Room) signalParticipantsWithTracks() {
|
|
for _, participant := range r.Participants.Copy() {
|
|
if err := r.signalParticipantWithTracks(participant); err != nil {
|
|
slog.Error("Failed to signal participant with tracks", "participant", participant.ID, "room", r.Name, "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Room) signalParticipantWithTracks(participant *Participant) error {
|
|
if r.AudioTrack != nil {
|
|
if err := participant.addTrack(r.AudioTrack); err != nil {
|
|
return fmt.Errorf("failed to add audio track: %w", err)
|
|
}
|
|
}
|
|
if r.VideoTrack != nil {
|
|
if err := participant.addTrack(r.VideoTrack); err != nil {
|
|
return fmt.Errorf("failed to add video track: %w", err)
|
|
}
|
|
}
|
|
if err := participant.signalOffer(); err != nil {
|
|
return fmt.Errorf("failed to signal offer: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *Room) signalParticipantsOffline() {
|
|
for _, participant := range r.Participants.Copy() {
|
|
if err := r.signalParticipantOffline(participant); err != nil {
|
|
slog.Error("Failed to signal participant offline", "participant", participant.ID, "room", r.Name, "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// signalParticipantOffline signals a single participant offline
|
|
func (r *Room) signalParticipantOffline(participant *Participant) error {
|
|
// Skip if websocket is nil or closed
|
|
if participant.WebSocket == nil || participant.WebSocket.IsClosed() {
|
|
return nil
|
|
}
|
|
if err := participant.WebSocket.SendAnswerMessageWS(connections.AnswerOffline); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
*/
|