mirror of
https://github.com/nestriness/nestri.git
synced 2026-03-17 03:43:07 +02:00
feat: WIP s6-overlay and friends
This commit is contained in:
@@ -131,6 +131,13 @@ func InitWebRTCAPI() error {
|
||||
// Interceptor registry
|
||||
interceptorRegistry := &interceptor.Registry{}
|
||||
|
||||
// FlexFEC
|
||||
if flags.FlexFEC {
|
||||
if err = webrtc.ConfigureFlexFEC03(118, mediaEngine, interceptorRegistry); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Register our interceptors..
|
||||
nackGenFactory, err := nack.NewGeneratorInterceptor()
|
||||
if err != nil {
|
||||
@@ -153,11 +160,11 @@ func InitWebRTCAPI() error {
|
||||
// New in v4, reduces CPU usage and latency when enabled
|
||||
settingEngine.EnableSCTPZeroChecksum(true)
|
||||
|
||||
nat11IP := GetFlags().NAT11IP
|
||||
/*nat11IP := GetFlags().NAT11IP
|
||||
if len(nat11IP) > 0 {
|
||||
settingEngine.SetNAT1To1IPs([]string{nat11IP}, webrtc.ICECandidateTypeHost)
|
||||
slog.Info("Using NAT 1:1 IP for WebRTC", "nat11_ip", nat11IP)
|
||||
}
|
||||
}*/
|
||||
|
||||
muxPort := GetFlags().UDPMuxPort
|
||||
if muxPort > 0 {
|
||||
@@ -186,6 +193,11 @@ func InitWebRTCAPI() error {
|
||||
// Improves speed when sending offers to browsers (https://github.com/pion/webrtc/issues/3174)
|
||||
settingEngine.SetIncludeLoopbackCandidate(true)
|
||||
|
||||
// Enable ICE Renomination for network recovery
|
||||
if err = settingEngine.SetICERenomination(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a new API object with our customized settings
|
||||
globalWebRTCAPI = webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine), webrtc.WithSettingEngine(settingEngine), webrtc.WithInterceptorRegistry(interceptorRegistry))
|
||||
|
||||
|
||||
@@ -13,19 +13,22 @@ import (
|
||||
var globalFlags *Flags
|
||||
|
||||
type Flags struct {
|
||||
RegenIdentity bool // Remove old identity on startup and regenerate it
|
||||
Verbose bool // Log everything to console
|
||||
Debug bool // Enable debug mode, implies Verbose
|
||||
EndpointPort int // Port for HTTP/S and WS/S endpoint (TCP)
|
||||
WebRTCUDPStart int // WebRTC UDP port range start - ignored if UDPMuxPort is set
|
||||
WebRTCUDPEnd int // WebRTC UDP port range end - ignored if UDPMuxPort is set
|
||||
STUNServer string // WebRTC STUN server
|
||||
UDPMuxPort int // WebRTC UDP mux port - if set, overrides UDP port range
|
||||
AutoAddLocalIP bool // Automatically add local IP to NAT 1 to 1 IPs
|
||||
NAT11IP string // WebRTC NAT 1 to 1 IP - allows specifying IP of relay if behind NAT
|
||||
PersistDir string // Directory to save persistent data to
|
||||
Metrics bool // Enable metrics endpoint
|
||||
MetricsPort int // Port for metrics endpoint
|
||||
RegenIdentity bool // Remove old identity on startup and regenerate it
|
||||
Verbose bool // Log everything to console
|
||||
Debug bool // Enable debug mode, implies Verbose
|
||||
EndpointPort int // Port for HTTP/S and WS/S endpoint (TCP)
|
||||
WebRTCUDPStart int // WebRTC UDP port range start - ignored if UDPMuxPort is set
|
||||
WebRTCUDPEnd int // WebRTC UDP port range end - ignored if UDPMuxPort is set
|
||||
STUNServer string // WebRTC STUN server
|
||||
UDPMuxPort int // WebRTC UDP mux port - if set, overrides UDP port range
|
||||
AutoAddLocalIP bool // Automatically add local IP to NAT 1 to 1 IPs
|
||||
NAT11IP string // WebRTC NAT 1 to 1 IP - allows specifying IP of relay if behind NAT
|
||||
PersistDir string // Directory to save persistent data to
|
||||
Metrics bool // Enable metrics endpoint
|
||||
MetricsPort int // Port for metrics endpoint
|
||||
PlayoutDelayMin int // UNSTABLE: Minimum playout latency
|
||||
PlayoutDelayMax int // UNSTABLE: Maximum playout latency
|
||||
FlexFEC bool // UNSTABLE: Enable/disable FlexFEC for video streams
|
||||
}
|
||||
|
||||
func (flags *Flags) DebugLog() {
|
||||
@@ -43,6 +46,9 @@ func (flags *Flags) DebugLog() {
|
||||
"persistDir", flags.PersistDir,
|
||||
"metrics", flags.Metrics,
|
||||
"metricsPort", flags.MetricsPort,
|
||||
"playoutDelayMin", flags.PlayoutDelayMin,
|
||||
"playoutDelayMax", flags.PlayoutDelayMax,
|
||||
"flexFEC", flags.FlexFEC,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -91,6 +97,9 @@ func InitFlags() {
|
||||
flag.StringVar(&globalFlags.PersistDir, "persistDir", getEnvAsString("PERSIST_DIR", "./persist-data"), "Directory to save persistent data to")
|
||||
flag.BoolVar(&globalFlags.Metrics, "metrics", getEnvAsBool("METRICS", false), "Enable metrics endpoint")
|
||||
flag.IntVar(&globalFlags.MetricsPort, "metricsPort", getEnvAsInt("METRICS_PORT", 3030), "Port for metrics endpoint")
|
||||
flag.IntVar(&globalFlags.PlayoutDelayMin, "playoutDelayMin", getEnvAsInt("PLAYOUTDELAY_MIN", 0), "Minimum playout delay")
|
||||
flag.IntVar(&globalFlags.PlayoutDelayMin, "playoutDelayMax", getEnvAsInt("PLAYOUTDELAY_MAX", 0), "Maximum playout delay")
|
||||
flag.BoolVar(&globalFlags.FlexFEC, "flexFEC", getEnvAsBool("FLEXFEC", true), "Enable FlexFEC for video streams")
|
||||
// Parse flags
|
||||
flag.Parse()
|
||||
|
||||
|
||||
@@ -17,7 +17,6 @@ import (
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pion/rtp"
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
@@ -181,7 +180,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) {
|
||||
// Add audio/video tracks
|
||||
{
|
||||
localTrack, err := webrtc.NewTrackLocalStaticRTP(
|
||||
room.AudioCodec,
|
||||
room.GetAudioCodec(),
|
||||
"participant-"+participant.ID.String(),
|
||||
"participant-"+participant.ID.String()+"-audio",
|
||||
)
|
||||
@@ -194,7 +193,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) {
|
||||
}
|
||||
{
|
||||
localTrack, err := webrtc.NewTrackLocalStaticRTP(
|
||||
room.VideoCodec,
|
||||
room.GetVideoCodec(),
|
||||
"participant-"+participant.ID.String(),
|
||||
"participant-"+participant.ID.String()+"-video",
|
||||
)
|
||||
@@ -296,7 +295,7 @@ func (sp *StreamProtocol) handleStreamRequest(stream network.Stream) {
|
||||
})
|
||||
|
||||
// Create offer
|
||||
offer, err := pc.CreateOffer(nil)
|
||||
offer, err := pc.CreateOffer(&webrtc.OfferOptions{OfferAnswerOptions: webrtc.OfferAnswerOptions{ICETricklingSupported: true}})
|
||||
if err != nil {
|
||||
slog.Error("Failed to create offer for requested stream", "room", reqMsg.RoomName, "err", err)
|
||||
continue
|
||||
@@ -571,21 +570,10 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) {
|
||||
})
|
||||
|
||||
pc.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
||||
// Prepare PlayoutDelayExtension so we don't need to recreate it for each packet
|
||||
playoutExt := &rtp.PlayoutDelayExtension{
|
||||
MinDelay: 0,
|
||||
MaxDelay: 0,
|
||||
}
|
||||
playoutPayload, err := playoutExt.Marshal()
|
||||
if err != nil {
|
||||
slog.Error("Failed to marshal PlayoutDelayExtension for room", "room", room.Name, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if remoteTrack.Kind() == webrtc.RTPCodecTypeAudio {
|
||||
room.AudioCodec = remoteTrack.Codec().RTPCodecCapability
|
||||
room.SetAudioCodec(remoteTrack.Codec().RTPCodecCapability)
|
||||
} else if remoteTrack.Kind() == webrtc.RTPCodecTypeVideo {
|
||||
room.VideoCodec = remoteTrack.Codec().RTPCodecCapability
|
||||
room.SetVideoCodec(remoteTrack.Codec().RTPCodecCapability)
|
||||
}
|
||||
|
||||
for {
|
||||
@@ -597,14 +585,6 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) {
|
||||
break
|
||||
}
|
||||
|
||||
// Use PlayoutDelayExtension for low latency, if set for this track kind
|
||||
if extID, ok := common.GetExtension(remoteTrack.Kind(), common.ExtensionPlayoutDelay); ok {
|
||||
if err = rtpPacket.SetExtension(extID, playoutPayload); err != nil {
|
||||
slog.Error("Failed to set PlayoutDelayExtension for room", "room", room.Name, "err", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast
|
||||
room.BroadcastPacket(remoteTrack.Kind(), rtpPacket)
|
||||
}
|
||||
@@ -622,7 +602,7 @@ func (sp *StreamProtocol) handleStreamPush(stream network.Stream) {
|
||||
iceHelper.FlushHeldCandidates()
|
||||
|
||||
// Create an answer
|
||||
answer, err := pc.CreateAnswer(nil)
|
||||
answer, err := pc.CreateAnswer(&webrtc.AnswerOptions{OfferAnswerOptions: webrtc.OfferAnswerOptions{ICETricklingSupported: true}})
|
||||
if err != nil {
|
||||
slog.Error("Failed to create answer for pushed stream", "room", room.Name, "err", err)
|
||||
continue
|
||||
|
||||
@@ -23,12 +23,15 @@ func (r *Relay) GetRoomByID(id ulid.ULID) *shared.Room {
|
||||
|
||||
// GetRoomByName retrieves a local Room struct by its name
|
||||
func (r *Relay) GetRoomByName(name string) *shared.Room {
|
||||
for _, room := range r.LocalRooms.Copy() {
|
||||
if room.Name == name {
|
||||
return room
|
||||
}
|
||||
}
|
||||
return nil
|
||||
var found *shared.Room
|
||||
r.LocalRooms.Range(func(id ulid.ULID, room *shared.Room) bool {
|
||||
if room.Name == name {
|
||||
found = room
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
return found
|
||||
}
|
||||
|
||||
// CreateRoom creates a new local Room struct with the given name
|
||||
|
||||
@@ -327,7 +327,7 @@ type ProtoMessage_KeyDown struct {
|
||||
}
|
||||
|
||||
type ProtoMessage_KeyUp struct {
|
||||
KeyUp *ProtoKeyUp `protobuf:"bytes,8,opt,name=key_up,json=keyUp,proto3,oneof"`
|
||||
KeyUp *ProtoKeyUp `protobuf:"bytes,8,opt,name=key_up,json=keyUp,proto3,oneof"` //ProtoClipboard clipboard = 9;
|
||||
}
|
||||
|
||||
type ProtoMessage_ControllerAttach struct {
|
||||
|
||||
@@ -3,6 +3,7 @@ package shared
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/pion/rtp"
|
||||
"io"
|
||||
"log/slog"
|
||||
"relay/internal/common"
|
||||
@@ -104,6 +105,17 @@ func (p *Participant) Close() {
|
||||
}
|
||||
|
||||
func (p *Participant) packetWriter() {
|
||||
flags := common.GetFlags()
|
||||
playoutExt := &rtp.PlayoutDelayExtension{
|
||||
MinDelay: uint16(flags.PlayoutDelayMin),
|
||||
MaxDelay: uint16(flags.PlayoutDelayMax),
|
||||
}
|
||||
playoutPayload, err := playoutExt.Marshal()
|
||||
if err != nil {
|
||||
slog.Error("Failed to marshal PlayoutDelayExtension for participant", "participant", p.ID, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
for pkt := range p.packetQueue {
|
||||
var track *webrtc.TrackLocalStaticRTP
|
||||
|
||||
@@ -114,6 +126,14 @@ func (p *Participant) packetWriter() {
|
||||
track = p.VideoTrack
|
||||
}
|
||||
|
||||
// Use PlayoutDelayExtension for low latency, if set for this track kind
|
||||
if extID, ok := common.GetExtension(track.Kind(), common.ExtensionPlayoutDelay); ok {
|
||||
if err = pkt.packet.SetExtension(extID, playoutPayload); err != nil {
|
||||
slog.Error("Failed to set PlayoutDelayExtension for participant", "participant", p.ID, "err", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if track != nil {
|
||||
if err := track.WriteRTP(pkt.packet); err != nil && !errors.Is(err, io.ErrClosedPipe) {
|
||||
slog.Error("WriteRTP failed", "participant", p.ID, "kind", pkt.kind, "err", err)
|
||||
|
||||
@@ -19,8 +19,8 @@ var participantPacketPool = sync.Pool{
|
||||
}
|
||||
|
||||
type participantPacket struct {
|
||||
kind webrtc.RTPCodecType
|
||||
packet *rtp.Packet
|
||||
kind webrtc.RTPCodecType
|
||||
packet *rtp.Packet
|
||||
}
|
||||
|
||||
type RoomInfo struct {
|
||||
@@ -31,11 +31,13 @@ type RoomInfo struct {
|
||||
|
||||
type Room struct {
|
||||
RoomInfo
|
||||
AudioCodec webrtc.RTPCodecCapability
|
||||
VideoCodec webrtc.RTPCodecCapability
|
||||
PeerConnection *webrtc.PeerConnection
|
||||
DataChannel *connections.NestriDataChannel
|
||||
|
||||
codecMu sync.RWMutex
|
||||
audioCodec webrtc.RTPCodecCapability
|
||||
videoCodec webrtc.RTPCodecCapability
|
||||
|
||||
// Atomic pointer to slice of participant channels
|
||||
participantChannels atomic.Pointer[[]chan<- *participantPacket]
|
||||
participantsMtx sync.Mutex // Use only for add/remove
|
||||
@@ -90,7 +92,6 @@ func (r *Room) Close() {
|
||||
}
|
||||
}
|
||||
|
||||
// AddParticipant adds a Participant to a Room
|
||||
func (r *Room) AddParticipant(participant *Participant) {
|
||||
r.participantsMtx.Lock()
|
||||
defer r.participantsMtx.Unlock()
|
||||
@@ -108,7 +109,6 @@ func (r *Room) AddParticipant(participant *Participant) {
|
||||
slog.Debug("Added participant", "participant", participant.ID, "room", r.Name)
|
||||
}
|
||||
|
||||
// RemoveParticipantByID removes a Participant from a Room by participant's ID
|
||||
func (r *Room) RemoveParticipantByID(pID ulid.ULID) {
|
||||
r.participantsMtx.Lock()
|
||||
defer r.participantsMtx.Unlock()
|
||||
@@ -134,7 +134,6 @@ func (r *Room) RemoveParticipantByID(pID ulid.ULID) {
|
||||
slog.Debug("Removed participant", "participant", pID, "room", r.Name)
|
||||
}
|
||||
|
||||
// IsOnline checks if the room is online
|
||||
func (r *Room) IsOnline() bool {
|
||||
return r.PeerConnection != nil
|
||||
}
|
||||
@@ -153,7 +152,7 @@ func (r *Room) BroadcastPacket(kind webrtc.RTPCodecType, pkt *rtp.Packet) {
|
||||
// Get packet struct from pool
|
||||
pp := participantPacketPool.Get().(*participantPacket)
|
||||
pp.kind = kind
|
||||
pp.packet = pkt
|
||||
pp.packet = pkt.Clone()
|
||||
|
||||
select {
|
||||
case ch <- pp:
|
||||
@@ -165,3 +164,27 @@ func (r *Room) BroadcastPacket(kind webrtc.RTPCodecType, pkt *rtp.Packet) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Room) SetAudioCodec(c webrtc.RTPCodecCapability) {
|
||||
r.codecMu.Lock()
|
||||
defer r.codecMu.Unlock()
|
||||
r.audioCodec = c
|
||||
}
|
||||
|
||||
func (r *Room) GetAudioCodec() webrtc.RTPCodecCapability {
|
||||
r.codecMu.RLock()
|
||||
defer r.codecMu.RUnlock()
|
||||
return r.audioCodec
|
||||
}
|
||||
|
||||
func (r *Room) SetVideoCodec(c webrtc.RTPCodecCapability) {
|
||||
r.codecMu.Lock()
|
||||
defer r.codecMu.Unlock()
|
||||
r.videoCodec = c
|
||||
}
|
||||
|
||||
func (r *Room) GetVideoCodec() webrtc.RTPCodecCapability {
|
||||
r.codecMu.RLock()
|
||||
defer r.codecMu.RUnlock()
|
||||
return r.videoCodec
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user