package relay import ( "encoding/json" "fmt" "net/http" "sync" "time" // "github.com/gorilla/mux" "github.com/hashicorp/memberlist" "github.com/pion/webrtc/v4" ) // PeerInfo represents information about an SFU peer type PeerInfo struct { NodeID string `json:"nodeId"` Zone string `json:"zone"` PublicIP string `json:"publicIp"` PrivateIP string `json:"privateIp,omitempty"` Streams map[string]bool `json:"streams"` // streamID -> isOrigin } // StreamInfo tracks a stream's origin and local subscribers type StreamInfo struct { ID string OriginPeerID string IsLocal bool Publisher *webrtc.PeerConnection Subscribers map[string]*webrtc.PeerConnection InterPeerConn map[string]*webrtc.PeerConnection // connections to other SFU peers mu sync.RWMutex } // DistributedSFU manages streams and peer communication type DistributedSFU struct { nodeID string zone string publicIP string privateIP string streams map[string]*StreamInfo peers map[string]*PeerInfo memberlist *memberlist.Memberlist mu sync.RWMutex config webrtc.Configuration } // NewDistributedSFU creates a new distributed SFU instance func NewDistributedSFU(nodeID, zone, publicIP, privateIP string, seeds []string) (*DistributedSFU, error) { sfu := &DistributedSFU{ nodeID: nodeID, zone: zone, publicIP: publicIP, privateIP: privateIP, streams: make(map[string]*StreamInfo), peers: make(map[string]*PeerInfo), config: webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ {URLs: []string{"stun:stun.l.google.com:19302"}}, }, }, } // Configure memberlist for peer discovery config := memberlist.DefaultLANConfig() config.Name = nodeID config.BindAddr = privateIP config.AdvertiseAddr = publicIP // Add delegate for handling peer updates config.Delegate = &peerDelegate{sfu: sfu} // Initialize memberlist list, err := memberlist.Create(config) if err != nil { return nil, err } // Join the cluster if seeds are provided if len(seeds) > 0 { _, err = list.Join(seeds) if err != nil { return nil, err } } sfu.memberlist = list return sfu, nil } // peerDelegate implements memberlist.Delegate type peerDelegate struct { sfu *DistributedSFU } // NodeMeta returns metadata about the current node func (d *peerDelegate) NodeMeta(limit int) []byte { meta := PeerInfo{ NodeID: d.sfu.nodeID, Zone: d.sfu.zone, PublicIP: d.sfu.publicIP, PrivateIP: d.sfu.privateIP, Streams: make(map[string]bool), } d.sfu.mu.RLock() for id, info := range d.sfu.streams { meta.Streams[id] = info.IsLocal } d.sfu.mu.RUnlock() data, _ := json.Marshal(meta) return data } // NotifyMsg handles peer updates func (d *peerDelegate) NotifyMsg(msg []byte) { var peer PeerInfo if err := json.Unmarshal(msg, &peer); err != nil { return } d.sfu.mu.Lock() d.sfu.peers[peer.NodeID] = &peer // Check for new streams we don't have locally for streamID, isOrigin := range peer.Streams { if isOrigin { if _, exists := d.sfu.streams[streamID]; !exists { // Initialize inter-peer connection for this stream d.sfu.initInterPeerStream(streamID, peer.NodeID) } } } d.sfu.mu.Unlock() } // initInterPeerStream sets up connection to another SFU for a stream func (sfu *DistributedSFU) initInterPeerStream(streamID, peerID string) { stream := &StreamInfo{ ID: streamID, OriginPeerID: peerID, IsLocal: false, Subscribers: make(map[string]*webrtc.PeerConnection), InterPeerConn: make(map[string]*webrtc.PeerConnection), } // Create peer connection to the origin SFU pc, err := webrtc.NewPeerConnection(sfu.config) if err != nil { return } stream.InterPeerConn[peerID] = pc sfu.streams[streamID] = stream // Setup inter-peer WebRTC connection go sfu.establishInterPeerConnection(streamID, peerID, pc) } // establishInterPeerConnection handles WebRTC signaling between SFU peers func (sfu *DistributedSFU) establishInterPeerConnection(streamID, peerID string, pc *webrtc.PeerConnection) { // This would typically involve making an HTTP request to the peer's control endpoint // to exchange SDP offers/answers and ICE candidates peerInfo := sfu.peers[peerID] // Example endpoint URL construction peerURL := fmt.Sprintf("http://%s:8080/peer/%s/stream/%s", peerInfo.PublicIP, sfu.nodeID, streamID) // Handle incoming tracks from peer pc.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { sfu.mu.RLock() stream := sfu.streams[streamID] sfu.mu.RUnlock() // Forward the track to local subscribers stream.mu.RLock() for _, subscriber := range stream.Subscribers { localTrack, err := webrtc.NewTrackLocalStaticRTP( remoteTrack.Codec().RTPCodecCapability, remoteTrack.ID(), remoteTrack.StreamID(), ) if err != nil { continue } if _, err := subscriber.AddTrack(localTrack); err != nil { continue } go func() { for { packet, _, err := remoteTrack.ReadRTP() if err != nil { return } if err := localTrack.WriteRTP(packet); err != nil { return } } }() } stream.mu.RUnlock() }) // Implement SDP exchange with peer // ... (signaling implementation) } // HandleWHIPPublish now includes peer notification func (sfu *DistributedSFU) HandleWHIPPublish(w http.ResponseWriter, r *http.Request) { streamID := mux.Vars(r)["streamID"] // Create stream info stream := &StreamInfo{ ID: streamID, IsLocal: true, Subscribers: make(map[string]*webrtc.PeerConnection), InterPeerConn: make(map[string]*webrtc.PeerConnection), } // ... (rest of WHIP publish logic) // Notify other peers about the new stream sfu.broadcastStreamUpdate(streamID, true) } // HandleWHEPSubscribe now checks both local and remote streams func (sfu *DistributedSFU) HandleWHEPSubscribe(w http.ResponseWriter, r *http.Request) { streamID := mux.Vars(r)["streamID"] sfu.mu.RLock() stream, exists := sfu.streams[streamID] sfu.mu.RUnlock() if !exists { // Check if any peer has this stream if peer := sfu.findStreamPeer(streamID); peer != nil { // Initialize inter-peer connection if needed sfu.initInterPeerStream(streamID, peer.NodeID) } else { http.Error(w, "Stream not found", http.StatusNotFound) return } } // ... (rest of WHEP subscribe logic) } // findStreamPeer finds the peer that has the origin of a stream func (sfu *DistributedSFU) findStreamPeer(streamID string) *PeerInfo { sfu.mu.RLock() defer sfu.mu.RUnlock() for _, peer := range sfu.peers { if isOrigin, exists := peer.Streams[streamID]; exists && isOrigin { return peer } } return nil } func main() { // Initialize the distributed SFU sfu, err := NewDistributedSFU( "sfu-1", "us-east", "203.0.113.1", "10.0.0.1", []string{"203.0.113.2:7946", "203.0.113.3:7946"}, ) if err != nil { panic(err) } router := mux.NewRouter() // Regular WHIP/WHEP endpoints router.HandleFunc("/whip/{streamID}", sfu.HandleWHIPPublish).Methods("POST") router.HandleFunc("/whep/{streamID}/{subscriberID}", sfu.HandleWHEPSubscribe).Methods("POST") // Inter-peer communication endpoint router.HandleFunc("/peer/{peerID}/stream/{streamID}", sfu.HandlePeerSignaling).Methods("POST") server := &http.Server{ Addr: ":8080", Handler: router, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, } server.ListenAndServe() }