mirror of
https://github.com/nestriness/nestri.git
synced 2025-12-12 08:45:38 +02:00
⭐ feat(www): Finish up on the UI components (#158)
This commit is contained in:
295
packages/relay/internal/peer.go.txt
Normal file
295
packages/relay/internal/peer.go.txt
Normal file
@@ -0,0 +1,295 @@
|
||||
package relay
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
// "github.com/gorilla/mux"
|
||||
"github.com/hashicorp/memberlist"
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
// PeerInfo represents information about an SFU peer
|
||||
type PeerInfo struct {
|
||||
NodeID string `json:"nodeId"`
|
||||
Zone string `json:"zone"`
|
||||
PublicIP string `json:"publicIp"`
|
||||
PrivateIP string `json:"privateIp,omitempty"`
|
||||
Streams map[string]bool `json:"streams"` // streamID -> isOrigin
|
||||
}
|
||||
|
||||
// StreamInfo tracks a stream's origin and local subscribers
|
||||
type StreamInfo struct {
|
||||
ID string
|
||||
OriginPeerID string
|
||||
IsLocal bool
|
||||
Publisher *webrtc.PeerConnection
|
||||
Subscribers map[string]*webrtc.PeerConnection
|
||||
InterPeerConn map[string]*webrtc.PeerConnection // connections to other SFU peers
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// DistributedSFU manages streams and peer communication
|
||||
type DistributedSFU struct {
|
||||
nodeID string
|
||||
zone string
|
||||
publicIP string
|
||||
privateIP string
|
||||
streams map[string]*StreamInfo
|
||||
peers map[string]*PeerInfo
|
||||
memberlist *memberlist.Memberlist
|
||||
mu sync.RWMutex
|
||||
config webrtc.Configuration
|
||||
}
|
||||
|
||||
// NewDistributedSFU creates a new distributed SFU instance
|
||||
func NewDistributedSFU(nodeID, zone, publicIP, privateIP string, seeds []string) (*DistributedSFU, error) {
|
||||
sfu := &DistributedSFU{
|
||||
nodeID: nodeID,
|
||||
zone: zone,
|
||||
publicIP: publicIP,
|
||||
privateIP: privateIP,
|
||||
streams: make(map[string]*StreamInfo),
|
||||
peers: make(map[string]*PeerInfo),
|
||||
config: webrtc.Configuration{
|
||||
ICEServers: []webrtc.ICEServer{
|
||||
{URLs: []string{"stun:stun.l.google.com:19302"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Configure memberlist for peer discovery
|
||||
config := memberlist.DefaultLANConfig()
|
||||
config.Name = nodeID
|
||||
config.BindAddr = privateIP
|
||||
config.AdvertiseAddr = publicIP
|
||||
|
||||
// Add delegate for handling peer updates
|
||||
config.Delegate = &peerDelegate{sfu: sfu}
|
||||
|
||||
// Initialize memberlist
|
||||
list, err := memberlist.Create(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Join the cluster if seeds are provided
|
||||
if len(seeds) > 0 {
|
||||
_, err = list.Join(seeds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
sfu.memberlist = list
|
||||
return sfu, nil
|
||||
}
|
||||
|
||||
// peerDelegate implements memberlist.Delegate
|
||||
type peerDelegate struct {
|
||||
sfu *DistributedSFU
|
||||
}
|
||||
|
||||
// NodeMeta returns metadata about the current node
|
||||
func (d *peerDelegate) NodeMeta(limit int) []byte {
|
||||
meta := PeerInfo{
|
||||
NodeID: d.sfu.nodeID,
|
||||
Zone: d.sfu.zone,
|
||||
PublicIP: d.sfu.publicIP,
|
||||
PrivateIP: d.sfu.privateIP,
|
||||
Streams: make(map[string]bool),
|
||||
}
|
||||
|
||||
d.sfu.mu.RLock()
|
||||
for id, info := range d.sfu.streams {
|
||||
meta.Streams[id] = info.IsLocal
|
||||
}
|
||||
d.sfu.mu.RUnlock()
|
||||
|
||||
data, _ := json.Marshal(meta)
|
||||
return data
|
||||
}
|
||||
|
||||
// NotifyMsg handles peer updates
|
||||
func (d *peerDelegate) NotifyMsg(msg []byte) {
|
||||
var peer PeerInfo
|
||||
if err := json.Unmarshal(msg, &peer); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
d.sfu.mu.Lock()
|
||||
d.sfu.peers[peer.NodeID] = &peer
|
||||
|
||||
// Check for new streams we don't have locally
|
||||
for streamID, isOrigin := range peer.Streams {
|
||||
if isOrigin {
|
||||
if _, exists := d.sfu.streams[streamID]; !exists {
|
||||
// Initialize inter-peer connection for this stream
|
||||
d.sfu.initInterPeerStream(streamID, peer.NodeID)
|
||||
}
|
||||
}
|
||||
}
|
||||
d.sfu.mu.Unlock()
|
||||
}
|
||||
|
||||
// initInterPeerStream sets up connection to another SFU for a stream
|
||||
func (sfu *DistributedSFU) initInterPeerStream(streamID, peerID string) {
|
||||
stream := &StreamInfo{
|
||||
ID: streamID,
|
||||
OriginPeerID: peerID,
|
||||
IsLocal: false,
|
||||
Subscribers: make(map[string]*webrtc.PeerConnection),
|
||||
InterPeerConn: make(map[string]*webrtc.PeerConnection),
|
||||
}
|
||||
|
||||
// Create peer connection to the origin SFU
|
||||
pc, err := webrtc.NewPeerConnection(sfu.config)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
stream.InterPeerConn[peerID] = pc
|
||||
sfu.streams[streamID] = stream
|
||||
|
||||
// Setup inter-peer WebRTC connection
|
||||
go sfu.establishInterPeerConnection(streamID, peerID, pc)
|
||||
}
|
||||
|
||||
// establishInterPeerConnection handles WebRTC signaling between SFU peers
|
||||
func (sfu *DistributedSFU) establishInterPeerConnection(streamID, peerID string, pc *webrtc.PeerConnection) {
|
||||
// This would typically involve making an HTTP request to the peer's control endpoint
|
||||
// to exchange SDP offers/answers and ICE candidates
|
||||
peerInfo := sfu.peers[peerID]
|
||||
|
||||
// Example endpoint URL construction
|
||||
peerURL := fmt.Sprintf("http://%s:8080/peer/%s/stream/%s",
|
||||
peerInfo.PublicIP, sfu.nodeID, streamID)
|
||||
|
||||
// Handle incoming tracks from peer
|
||||
pc.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
||||
sfu.mu.RLock()
|
||||
stream := sfu.streams[streamID]
|
||||
sfu.mu.RUnlock()
|
||||
|
||||
// Forward the track to local subscribers
|
||||
stream.mu.RLock()
|
||||
for _, subscriber := range stream.Subscribers {
|
||||
localTrack, err := webrtc.NewTrackLocalStaticRTP(
|
||||
remoteTrack.Codec().RTPCodecCapability,
|
||||
remoteTrack.ID(),
|
||||
remoteTrack.StreamID(),
|
||||
)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err := subscriber.AddTrack(localTrack); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
packet, _, err := remoteTrack.ReadRTP()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if err := localTrack.WriteRTP(packet); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
stream.mu.RUnlock()
|
||||
})
|
||||
|
||||
// Implement SDP exchange with peer
|
||||
// ... (signaling implementation)
|
||||
}
|
||||
|
||||
// HandleWHIPPublish now includes peer notification
|
||||
func (sfu *DistributedSFU) HandleWHIPPublish(w http.ResponseWriter, r *http.Request) {
|
||||
streamID := mux.Vars(r)["streamID"]
|
||||
|
||||
// Create stream info
|
||||
stream := &StreamInfo{
|
||||
ID: streamID,
|
||||
IsLocal: true,
|
||||
Subscribers: make(map[string]*webrtc.PeerConnection),
|
||||
InterPeerConn: make(map[string]*webrtc.PeerConnection),
|
||||
}
|
||||
|
||||
// ... (rest of WHIP publish logic)
|
||||
|
||||
// Notify other peers about the new stream
|
||||
sfu.broadcastStreamUpdate(streamID, true)
|
||||
}
|
||||
|
||||
// HandleWHEPSubscribe now checks both local and remote streams
|
||||
func (sfu *DistributedSFU) HandleWHEPSubscribe(w http.ResponseWriter, r *http.Request) {
|
||||
streamID := mux.Vars(r)["streamID"]
|
||||
|
||||
sfu.mu.RLock()
|
||||
stream, exists := sfu.streams[streamID]
|
||||
sfu.mu.RUnlock()
|
||||
|
||||
if !exists {
|
||||
// Check if any peer has this stream
|
||||
if peer := sfu.findStreamPeer(streamID); peer != nil {
|
||||
// Initialize inter-peer connection if needed
|
||||
sfu.initInterPeerStream(streamID, peer.NodeID)
|
||||
} else {
|
||||
http.Error(w, "Stream not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// ... (rest of WHEP subscribe logic)
|
||||
}
|
||||
|
||||
// findStreamPeer finds the peer that has the origin of a stream
|
||||
func (sfu *DistributedSFU) findStreamPeer(streamID string) *PeerInfo {
|
||||
sfu.mu.RLock()
|
||||
defer sfu.mu.RUnlock()
|
||||
|
||||
for _, peer := range sfu.peers {
|
||||
if isOrigin, exists := peer.Streams[streamID]; exists && isOrigin {
|
||||
return peer
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Initialize the distributed SFU
|
||||
sfu, err := NewDistributedSFU(
|
||||
"sfu-1",
|
||||
"us-east",
|
||||
"203.0.113.1",
|
||||
"10.0.0.1",
|
||||
[]string{"203.0.113.2:7946", "203.0.113.3:7946"},
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
router := mux.NewRouter()
|
||||
|
||||
// Regular WHIP/WHEP endpoints
|
||||
router.HandleFunc("/whip/{streamID}", sfu.HandleWHIPPublish).Methods("POST")
|
||||
router.HandleFunc("/whep/{streamID}/{subscriberID}", sfu.HandleWHEPSubscribe).Methods("POST")
|
||||
|
||||
// Inter-peer communication endpoint
|
||||
router.HandleFunc("/peer/{peerID}/stream/{streamID}", sfu.HandlePeerSignaling).Methods("POST")
|
||||
|
||||
server := &http.Server{
|
||||
Addr: ":8080",
|
||||
Handler: router,
|
||||
ReadTimeout: 10 * time.Second,
|
||||
WriteTimeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
server.ListenAndServe()
|
||||
}
|
||||
Reference in New Issue
Block a user