mirror of
https://github.com/nestriness/nestri.git
synced 2025-12-13 01:05:37 +02:00
## Description Whew, some stuff is still not re-implemented, but it's working! Rabbit's gonna explode with the amount of changes I reckon 😅 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a peer-to-peer relay system using libp2p with enhanced stream forwarding, room state synchronization, and mDNS peer discovery. - Added decentralized room and participant management, metrics publishing, and safe, size-limited, concurrent message streaming with robust framing and callback dispatching. - Implemented asynchronous, callback-driven message handling over custom libp2p streams replacing WebSocket signaling. - **Improvements** - Migrated signaling and stream protocols from WebSocket to libp2p, improving reliability and scalability. - Simplified configuration and environment variables, removing deprecated flags and adding persistent data support. - Enhanced logging, error handling, and connection management for better observability and robustness. - Refined RTP header extension registration and NAT IP handling for improved WebRTC performance. - **Bug Fixes** - Improved ICE candidate buffering and SDP negotiation in WebRTC connections. - Fixed NAT IP and UDP port range configuration issues. - **Refactor** - Modularized codebase, reorganized relay and server logic, and removed deprecated WebSocket-based components. - Streamlined message structures, removed obsolete enums and message types, and simplified SafeMap concurrency. - Replaced WebSocket signaling with libp2p stream protocols in server and relay components. - **Chores** - Updated and cleaned dependencies across Go, Rust, and JavaScript packages. - Added `.gitignore` for persistent data directory in relay package. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: DatCaptainHorse <DatCaptainHorse@users.noreply.github.com> Co-authored-by: Philipp Neumann <3daquawolf@gmail.com>
129 lines
3.8 KiB
Go
129 lines
3.8 KiB
Go
package core
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/network"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/multiformats/go-multiaddr"
|
|
)
|
|
|
|
// --- Structs ---
|
|
|
|
// networkNotifier logs connection events and updates relay state
|
|
type networkNotifier struct {
|
|
relay *Relay
|
|
}
|
|
|
|
// Connected is called when a connection is established
|
|
func (n *networkNotifier) Connected(net network.Network, conn network.Conn) {
|
|
if n.relay == nil {
|
|
n.relay.onPeerConnected(conn.RemotePeer())
|
|
}
|
|
}
|
|
|
|
// Disconnected is called when a connection is terminated
|
|
func (n *networkNotifier) Disconnected(net network.Network, conn network.Conn) {
|
|
// Update the status of the disconnected peer
|
|
if n.relay != nil {
|
|
n.relay.onPeerDisconnected(conn.RemotePeer())
|
|
}
|
|
}
|
|
|
|
// Listen is called when the node starts listening on an address
|
|
func (n *networkNotifier) Listen(net network.Network, addr multiaddr.Multiaddr) {}
|
|
|
|
// ListenClose is called when the node stops listening on an address
|
|
func (n *networkNotifier) ListenClose(net network.Network, addr multiaddr.Multiaddr) {}
|
|
|
|
// --- PubSub Setup ---
|
|
|
|
// setupPubSub initializes PubSub topics and subscriptions.
|
|
func (r *Relay) setupPubSub(ctx context.Context) error {
|
|
var err error
|
|
|
|
// Room State Topic
|
|
r.pubTopicState, err = r.PubSub.Join(roomStateTopicName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to join room state topic '%s': %w", roomStateTopicName, err)
|
|
}
|
|
stateSub, err := r.pubTopicState.Subscribe()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to subscribe to room state topic '%s': %w", roomStateTopicName, err)
|
|
}
|
|
go r.handleRoomStateMessages(ctx, stateSub) // Handler in relay_state.go
|
|
|
|
// Relay Metrics Topic
|
|
r.pubTopicRelayMetrics, err = r.PubSub.Join(relayMetricsTopicName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to join relay metrics topic '%s': %w", relayMetricsTopicName, err)
|
|
}
|
|
metricsSub, err := r.pubTopicRelayMetrics.Subscribe()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to subscribe to relay metrics topic '%s': %w", relayMetricsTopicName, err)
|
|
}
|
|
go r.handleRelayMetricsMessages(ctx, metricsSub) // Handler in relay_state.go
|
|
|
|
slog.Info("PubSub topics joined and subscriptions started")
|
|
return nil
|
|
}
|
|
|
|
// --- Connection Management ---
|
|
|
|
// connectToRelay is internal method to connect to a relay peer using multiaddresses
|
|
func (r *Relay) connectToRelay(ctx context.Context, peerInfo *peer.AddrInfo) error {
|
|
if peerInfo.ID == r.ID {
|
|
return errors.New("cannot connect to self")
|
|
}
|
|
|
|
// Use a timeout for the connection attempt
|
|
connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second) // 15s timeout
|
|
defer cancel()
|
|
|
|
slog.Info("Attempting to connect to peer", "peer", peerInfo.ID, "addrs", peerInfo.Addrs)
|
|
if err := r.Host.Connect(connectCtx, *peerInfo); err != nil {
|
|
return fmt.Errorf("failed to connect to %s: %w", peerInfo.ID, err)
|
|
}
|
|
|
|
slog.Info("Successfully connected to peer", "peer", peerInfo.ID, "addrs", peerInfo.Addrs)
|
|
return nil
|
|
}
|
|
|
|
// ConnectToRelay connects to another relay by its multiaddress.
|
|
func (r *Relay) ConnectToRelay(ctx context.Context, addr string) error {
|
|
ma, err := multiaddr.NewMultiaddr(addr)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid multiaddress: %w", err)
|
|
}
|
|
|
|
peerInfo, err := peer.AddrInfoFromP2pAddr(ma)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to extract peer info: %w", err)
|
|
}
|
|
|
|
return r.connectToRelay(ctx, peerInfo)
|
|
}
|
|
|
|
// printConnectInstructions logs the multiaddresses for connecting to this relay.
|
|
func printConnectInstructions(p2pHost host.Host) {
|
|
peerInfo := peer.AddrInfo{
|
|
ID: p2pHost.ID(),
|
|
Addrs: p2pHost.Addrs(),
|
|
}
|
|
addrs, err := peer.AddrInfoToP2pAddrs(&peerInfo)
|
|
if err != nil {
|
|
slog.Error("Failed to convert peer info to addresses", "err", err)
|
|
return
|
|
}
|
|
|
|
slog.Info("Mesh connection addresses:")
|
|
for _, addr := range addrs {
|
|
slog.Info(fmt.Sprintf("> %s", addr.String()))
|
|
}
|
|
}
|