mirror of
https://github.com/nestriness/nestri.git
synced 2025-12-12 08:45:38 +02:00
296 lines
7.4 KiB
Plaintext
296 lines
7.4 KiB
Plaintext
package relay
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
// "github.com/gorilla/mux"
|
|
"github.com/hashicorp/memberlist"
|
|
"github.com/pion/webrtc/v4"
|
|
)
|
|
|
|
// PeerInfo represents information about an SFU peer
|
|
type PeerInfo struct {
|
|
NodeID string `json:"nodeId"`
|
|
Zone string `json:"zone"`
|
|
PublicIP string `json:"publicIp"`
|
|
PrivateIP string `json:"privateIp,omitempty"`
|
|
Streams map[string]bool `json:"streams"` // streamID -> isOrigin
|
|
}
|
|
|
|
// StreamInfo tracks a stream's origin and local subscribers
|
|
type StreamInfo struct {
|
|
ID string
|
|
OriginPeerID string
|
|
IsLocal bool
|
|
Publisher *webrtc.PeerConnection
|
|
Subscribers map[string]*webrtc.PeerConnection
|
|
InterPeerConn map[string]*webrtc.PeerConnection // connections to other SFU peers
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// DistributedSFU manages streams and peer communication
|
|
type DistributedSFU struct {
|
|
nodeID string
|
|
zone string
|
|
publicIP string
|
|
privateIP string
|
|
streams map[string]*StreamInfo
|
|
peers map[string]*PeerInfo
|
|
memberlist *memberlist.Memberlist
|
|
mu sync.RWMutex
|
|
config webrtc.Configuration
|
|
}
|
|
|
|
// NewDistributedSFU creates a new distributed SFU instance
|
|
func NewDistributedSFU(nodeID, zone, publicIP, privateIP string, seeds []string) (*DistributedSFU, error) {
|
|
sfu := &DistributedSFU{
|
|
nodeID: nodeID,
|
|
zone: zone,
|
|
publicIP: publicIP,
|
|
privateIP: privateIP,
|
|
streams: make(map[string]*StreamInfo),
|
|
peers: make(map[string]*PeerInfo),
|
|
config: webrtc.Configuration{
|
|
ICEServers: []webrtc.ICEServer{
|
|
{URLs: []string{"stun:stun.l.google.com:19302"}},
|
|
},
|
|
},
|
|
}
|
|
|
|
// Configure memberlist for peer discovery
|
|
config := memberlist.DefaultLANConfig()
|
|
config.Name = nodeID
|
|
config.BindAddr = privateIP
|
|
config.AdvertiseAddr = publicIP
|
|
|
|
// Add delegate for handling peer updates
|
|
config.Delegate = &peerDelegate{sfu: sfu}
|
|
|
|
// Initialize memberlist
|
|
list, err := memberlist.Create(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Join the cluster if seeds are provided
|
|
if len(seeds) > 0 {
|
|
_, err = list.Join(seeds)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
sfu.memberlist = list
|
|
return sfu, nil
|
|
}
|
|
|
|
// peerDelegate implements memberlist.Delegate
|
|
type peerDelegate struct {
|
|
sfu *DistributedSFU
|
|
}
|
|
|
|
// NodeMeta returns metadata about the current node
|
|
func (d *peerDelegate) NodeMeta(limit int) []byte {
|
|
meta := PeerInfo{
|
|
NodeID: d.sfu.nodeID,
|
|
Zone: d.sfu.zone,
|
|
PublicIP: d.sfu.publicIP,
|
|
PrivateIP: d.sfu.privateIP,
|
|
Streams: make(map[string]bool),
|
|
}
|
|
|
|
d.sfu.mu.RLock()
|
|
for id, info := range d.sfu.streams {
|
|
meta.Streams[id] = info.IsLocal
|
|
}
|
|
d.sfu.mu.RUnlock()
|
|
|
|
data, _ := json.Marshal(meta)
|
|
return data
|
|
}
|
|
|
|
// NotifyMsg handles peer updates
|
|
func (d *peerDelegate) NotifyMsg(msg []byte) {
|
|
var peer PeerInfo
|
|
if err := json.Unmarshal(msg, &peer); err != nil {
|
|
return
|
|
}
|
|
|
|
d.sfu.mu.Lock()
|
|
d.sfu.peers[peer.NodeID] = &peer
|
|
|
|
// Check for new streams we don't have locally
|
|
for streamID, isOrigin := range peer.Streams {
|
|
if isOrigin {
|
|
if _, exists := d.sfu.streams[streamID]; !exists {
|
|
// Initialize inter-peer connection for this stream
|
|
d.sfu.initInterPeerStream(streamID, peer.NodeID)
|
|
}
|
|
}
|
|
}
|
|
d.sfu.mu.Unlock()
|
|
}
|
|
|
|
// initInterPeerStream sets up connection to another SFU for a stream
|
|
func (sfu *DistributedSFU) initInterPeerStream(streamID, peerID string) {
|
|
stream := &StreamInfo{
|
|
ID: streamID,
|
|
OriginPeerID: peerID,
|
|
IsLocal: false,
|
|
Subscribers: make(map[string]*webrtc.PeerConnection),
|
|
InterPeerConn: make(map[string]*webrtc.PeerConnection),
|
|
}
|
|
|
|
// Create peer connection to the origin SFU
|
|
pc, err := webrtc.NewPeerConnection(sfu.config)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
stream.InterPeerConn[peerID] = pc
|
|
sfu.streams[streamID] = stream
|
|
|
|
// Setup inter-peer WebRTC connection
|
|
go sfu.establishInterPeerConnection(streamID, peerID, pc)
|
|
}
|
|
|
|
// establishInterPeerConnection handles WebRTC signaling between SFU peers
|
|
func (sfu *DistributedSFU) establishInterPeerConnection(streamID, peerID string, pc *webrtc.PeerConnection) {
|
|
// This would typically involve making an HTTP request to the peer's control endpoint
|
|
// to exchange SDP offers/answers and ICE candidates
|
|
peerInfo := sfu.peers[peerID]
|
|
|
|
// Example endpoint URL construction
|
|
peerURL := fmt.Sprintf("http://%s:8080/peer/%s/stream/%s",
|
|
peerInfo.PublicIP, sfu.nodeID, streamID)
|
|
|
|
// Handle incoming tracks from peer
|
|
pc.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
|
sfu.mu.RLock()
|
|
stream := sfu.streams[streamID]
|
|
sfu.mu.RUnlock()
|
|
|
|
// Forward the track to local subscribers
|
|
stream.mu.RLock()
|
|
for _, subscriber := range stream.Subscribers {
|
|
localTrack, err := webrtc.NewTrackLocalStaticRTP(
|
|
remoteTrack.Codec().RTPCodecCapability,
|
|
remoteTrack.ID(),
|
|
remoteTrack.StreamID(),
|
|
)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
if _, err := subscriber.AddTrack(localTrack); err != nil {
|
|
continue
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
packet, _, err := remoteTrack.ReadRTP()
|
|
if err != nil {
|
|
return
|
|
}
|
|
if err := localTrack.WriteRTP(packet); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
stream.mu.RUnlock()
|
|
})
|
|
|
|
// Implement SDP exchange with peer
|
|
// ... (signaling implementation)
|
|
}
|
|
|
|
// HandleWHIPPublish now includes peer notification
|
|
func (sfu *DistributedSFU) HandleWHIPPublish(w http.ResponseWriter, r *http.Request) {
|
|
streamID := mux.Vars(r)["streamID"]
|
|
|
|
// Create stream info
|
|
stream := &StreamInfo{
|
|
ID: streamID,
|
|
IsLocal: true,
|
|
Subscribers: make(map[string]*webrtc.PeerConnection),
|
|
InterPeerConn: make(map[string]*webrtc.PeerConnection),
|
|
}
|
|
|
|
// ... (rest of WHIP publish logic)
|
|
|
|
// Notify other peers about the new stream
|
|
sfu.broadcastStreamUpdate(streamID, true)
|
|
}
|
|
|
|
// HandleWHEPSubscribe now checks both local and remote streams
|
|
func (sfu *DistributedSFU) HandleWHEPSubscribe(w http.ResponseWriter, r *http.Request) {
|
|
streamID := mux.Vars(r)["streamID"]
|
|
|
|
sfu.mu.RLock()
|
|
stream, exists := sfu.streams[streamID]
|
|
sfu.mu.RUnlock()
|
|
|
|
if !exists {
|
|
// Check if any peer has this stream
|
|
if peer := sfu.findStreamPeer(streamID); peer != nil {
|
|
// Initialize inter-peer connection if needed
|
|
sfu.initInterPeerStream(streamID, peer.NodeID)
|
|
} else {
|
|
http.Error(w, "Stream not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
}
|
|
|
|
// ... (rest of WHEP subscribe logic)
|
|
}
|
|
|
|
// findStreamPeer finds the peer that has the origin of a stream
|
|
func (sfu *DistributedSFU) findStreamPeer(streamID string) *PeerInfo {
|
|
sfu.mu.RLock()
|
|
defer sfu.mu.RUnlock()
|
|
|
|
for _, peer := range sfu.peers {
|
|
if isOrigin, exists := peer.Streams[streamID]; exists && isOrigin {
|
|
return peer
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func main() {
|
|
// Initialize the distributed SFU
|
|
sfu, err := NewDistributedSFU(
|
|
"sfu-1",
|
|
"us-east",
|
|
"203.0.113.1",
|
|
"10.0.0.1",
|
|
[]string{"203.0.113.2:7946", "203.0.113.3:7946"},
|
|
)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
router := mux.NewRouter()
|
|
|
|
// Regular WHIP/WHEP endpoints
|
|
router.HandleFunc("/whip/{streamID}", sfu.HandleWHIPPublish).Methods("POST")
|
|
router.HandleFunc("/whep/{streamID}/{subscriberID}", sfu.HandleWHEPSubscribe).Methods("POST")
|
|
|
|
// Inter-peer communication endpoint
|
|
router.HandleFunc("/peer/{peerID}/stream/{streamID}", sfu.HandlePeerSignaling).Methods("POST")
|
|
|
|
server := &http.Server{
|
|
Addr: ":8080",
|
|
Handler: router,
|
|
ReadTimeout: 10 * time.Second,
|
|
WriteTimeout: 10 * time.Second,
|
|
}
|
|
|
|
server.ListenAndServe()
|
|
}
|