Files
netris-nestri/packages/relay/internal/core/p2p.go
Kristian Ollikainen 6e82eff9e2 feat: Migrate from WebSocket to libp2p for peer-to-peer connectivity (#286)
## Description
Whew, some stuff is still not re-implemented, but it's working!

Rabbit's gonna explode with the amount of changes I reckon 😅



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a peer-to-peer relay system using libp2p with enhanced
stream forwarding, room state synchronization, and mDNS peer discovery.
- Added decentralized room and participant management, metrics
publishing, and safe, size-limited, concurrent message streaming with
robust framing and callback dispatching.
- Implemented asynchronous, callback-driven message handling over custom
libp2p streams replacing WebSocket signaling.
- **Improvements**
- Migrated signaling and stream protocols from WebSocket to libp2p,
improving reliability and scalability.
- Simplified configuration and environment variables, removing
deprecated flags and adding persistent data support.
- Enhanced logging, error handling, and connection management for better
observability and robustness.
- Refined RTP header extension registration and NAT IP handling for
improved WebRTC performance.
- **Bug Fixes**
- Improved ICE candidate buffering and SDP negotiation in WebRTC
connections.
  - Fixed NAT IP and UDP port range configuration issues.
- **Refactor**
- Modularized codebase, reorganized relay and server logic, and removed
deprecated WebSocket-based components.
- Streamlined message structures, removed obsolete enums and message
types, and simplified SafeMap concurrency.
- Replaced WebSocket signaling with libp2p stream protocols in server
and relay components.
- **Chores**
- Updated and cleaned dependencies across Go, Rust, and JavaScript
packages.
  - Added `.gitignore` for persistent data directory in relay package.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: DatCaptainHorse <DatCaptainHorse@users.noreply.github.com>
Co-authored-by: Philipp Neumann <3daquawolf@gmail.com>
2025-06-06 16:48:49 +03:00

129 lines
3.8 KiB
Go

package core
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)
// --- Structs ---
// networkNotifier logs connection events and updates relay state
type networkNotifier struct {
relay *Relay
}
// Connected is called when a connection is established
func (n *networkNotifier) Connected(net network.Network, conn network.Conn) {
if n.relay == nil {
n.relay.onPeerConnected(conn.RemotePeer())
}
}
// Disconnected is called when a connection is terminated
func (n *networkNotifier) Disconnected(net network.Network, conn network.Conn) {
// Update the status of the disconnected peer
if n.relay != nil {
n.relay.onPeerDisconnected(conn.RemotePeer())
}
}
// Listen is called when the node starts listening on an address
func (n *networkNotifier) Listen(net network.Network, addr multiaddr.Multiaddr) {}
// ListenClose is called when the node stops listening on an address
func (n *networkNotifier) ListenClose(net network.Network, addr multiaddr.Multiaddr) {}
// --- PubSub Setup ---
// setupPubSub initializes PubSub topics and subscriptions.
func (r *Relay) setupPubSub(ctx context.Context) error {
var err error
// Room State Topic
r.pubTopicState, err = r.PubSub.Join(roomStateTopicName)
if err != nil {
return fmt.Errorf("failed to join room state topic '%s': %w", roomStateTopicName, err)
}
stateSub, err := r.pubTopicState.Subscribe()
if err != nil {
return fmt.Errorf("failed to subscribe to room state topic '%s': %w", roomStateTopicName, err)
}
go r.handleRoomStateMessages(ctx, stateSub) // Handler in relay_state.go
// Relay Metrics Topic
r.pubTopicRelayMetrics, err = r.PubSub.Join(relayMetricsTopicName)
if err != nil {
return fmt.Errorf("failed to join relay metrics topic '%s': %w", relayMetricsTopicName, err)
}
metricsSub, err := r.pubTopicRelayMetrics.Subscribe()
if err != nil {
return fmt.Errorf("failed to subscribe to relay metrics topic '%s': %w", relayMetricsTopicName, err)
}
go r.handleRelayMetricsMessages(ctx, metricsSub) // Handler in relay_state.go
slog.Info("PubSub topics joined and subscriptions started")
return nil
}
// --- Connection Management ---
// connectToRelay is internal method to connect to a relay peer using multiaddresses
func (r *Relay) connectToRelay(ctx context.Context, peerInfo *peer.AddrInfo) error {
if peerInfo.ID == r.ID {
return errors.New("cannot connect to self")
}
// Use a timeout for the connection attempt
connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second) // 15s timeout
defer cancel()
slog.Info("Attempting to connect to peer", "peer", peerInfo.ID, "addrs", peerInfo.Addrs)
if err := r.Host.Connect(connectCtx, *peerInfo); err != nil {
return fmt.Errorf("failed to connect to %s: %w", peerInfo.ID, err)
}
slog.Info("Successfully connected to peer", "peer", peerInfo.ID, "addrs", peerInfo.Addrs)
return nil
}
// ConnectToRelay connects to another relay by its multiaddress.
func (r *Relay) ConnectToRelay(ctx context.Context, addr string) error {
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return fmt.Errorf("invalid multiaddress: %w", err)
}
peerInfo, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
return fmt.Errorf("failed to extract peer info: %w", err)
}
return r.connectToRelay(ctx, peerInfo)
}
// printConnectInstructions logs the multiaddresses for connecting to this relay.
func printConnectInstructions(p2pHost host.Host) {
peerInfo := peer.AddrInfo{
ID: p2pHost.ID(),
Addrs: p2pHost.Addrs(),
}
addrs, err := peer.AddrInfoToP2pAddrs(&peerInfo)
if err != nil {
slog.Error("Failed to convert peer info to addresses", "err", err)
return
}
slog.Info("Mesh connection addresses:")
for _, addr := range addrs {
slog.Info(fmt.Sprintf("> %s", addr.String()))
}
}