mirror of
https://github.com/nestriness/nestri.git
synced 2025-12-13 01:05:37 +02:00
## Description We are attempting to hookup maitred to the API Maitred duties will be: - [ ] Hookup to the API - [ ] Wait for signal (from the API) to start Steam - [ ] Stop signal to stop the gaming session, clean up Steam... and maybe do the backup ## Summary by CodeRabbit - **New Features** - Introduced Docker-based deployment configurations for both the main and relay applications. - Added new API endpoints enabling real-time machine messaging and enhanced IoT operations. - Expanded database schema and actor types to support improved machine tracking. - **Improvements** - Enhanced real-time communication and relay management with streamlined room handling. - Upgraded dependencies, logging, and error handling for greater stability and performance. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: DatCaptainHorse <DatCaptainHorse@users.noreply.github.com> Co-authored-by: Kristian Ollikainen <14197772+DatCaptainHorse@users.noreply.github.com>
203 lines
6.0 KiB
Go
203 lines
6.0 KiB
Go
package internal
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/libp2p/go-reuseport"
|
|
"log/slog"
|
|
"net/http"
|
|
"relay/internal/common"
|
|
"relay/internal/connections"
|
|
"strconv"
|
|
)
|
|
|
|
var httpMux *http.ServeMux
|
|
|
|
func InitHTTPEndpoint(_ context.Context, ctxCancel context.CancelFunc) error {
|
|
// Create HTTP mux which serves our WS endpoint
|
|
httpMux = http.NewServeMux()
|
|
|
|
// Endpoints themselves
|
|
httpMux.Handle("/", http.NotFoundHandler())
|
|
// If control endpoint secret is set, enable the control endpoint
|
|
if len(common.GetFlags().ControlSecret) > 0 {
|
|
httpMux.HandleFunc("/api/control", corsAnyHandler(controlHandler))
|
|
}
|
|
// WS endpoint
|
|
httpMux.HandleFunc("/api/ws/{roomName}", corsAnyHandler(wsHandler))
|
|
|
|
// Get our serving port
|
|
port := common.GetFlags().EndpointPort
|
|
tlsCert := common.GetFlags().TLSCert
|
|
tlsKey := common.GetFlags().TLSKey
|
|
|
|
// Create re-usable listener port
|
|
httpListener, err := reuseport.Listen("tcp", ":"+strconv.Itoa(port))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create TCP listener: %w", err)
|
|
}
|
|
|
|
// Log and start the endpoint server
|
|
if len(tlsCert) <= 0 && len(tlsKey) <= 0 {
|
|
slog.Info("Starting HTTP endpoint server", "port", port)
|
|
go func() {
|
|
if err := http.Serve(httpListener, httpMux); err != nil {
|
|
slog.Error("Failed to start HTTP server", "err", err)
|
|
ctxCancel()
|
|
}
|
|
}()
|
|
} else if len(tlsCert) > 0 && len(tlsKey) > 0 {
|
|
slog.Info("Starting HTTPS endpoint server", "port", port)
|
|
go func() {
|
|
if err := http.ServeTLS(httpListener, httpMux, tlsCert, tlsKey); err != nil {
|
|
slog.Error("Failed to start HTTPS server", "err", err)
|
|
ctxCancel()
|
|
}
|
|
}()
|
|
} else {
|
|
return errors.New("no TLS certificate or TLS key provided")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// logHTTPError logs (if verbose) and sends an error code to requester
|
|
func logHTTPError(w http.ResponseWriter, err string, code int) {
|
|
if common.GetFlags().Verbose {
|
|
slog.Error("HTTP error", "code", code, "message", err)
|
|
}
|
|
http.Error(w, err, code)
|
|
}
|
|
|
|
// corsAnyHandler allows any origin to access the endpoint
|
|
func corsAnyHandler(next func(w http.ResponseWriter, r *http.Request)) http.HandlerFunc {
|
|
return func(res http.ResponseWriter, req *http.Request) {
|
|
// Allow all origins
|
|
res.Header().Set("Access-Control-Allow-Origin", "*")
|
|
res.Header().Set("Access-Control-Allow-Methods", "*")
|
|
res.Header().Set("Access-Control-Allow-Headers", "*")
|
|
|
|
if req.Method != http.MethodOptions {
|
|
next(res, req)
|
|
}
|
|
}
|
|
}
|
|
|
|
// wsHandler is the handler for the /api/ws/{roomName} endpoint
|
|
func wsHandler(w http.ResponseWriter, r *http.Request) {
|
|
// Get given room name now
|
|
roomName := r.PathValue("roomName")
|
|
if len(roomName) <= 0 {
|
|
logHTTPError(w, "no room name given", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
rel := GetRelay()
|
|
// Get or create room in any case
|
|
room := rel.GetOrCreateRoom(roomName)
|
|
|
|
// Upgrade to WebSocket
|
|
upgrader := websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
return true
|
|
},
|
|
}
|
|
wsConn, err := upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
logHTTPError(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Create SafeWebSocket
|
|
ws := connections.NewSafeWebSocket(wsConn)
|
|
// Assign message handler for join request
|
|
ws.RegisterMessageCallback("join", func(data []byte) {
|
|
var joinMsg connections.MessageJoin
|
|
if err = json.Unmarshal(data, &joinMsg); err != nil {
|
|
slog.Error("Failed to unmarshal join message", "err", err)
|
|
return
|
|
}
|
|
|
|
slog.Debug("Join message", "room", room.Name, "joinerType", joinMsg.JoinerType)
|
|
|
|
// Handle join request, depending if it's from ingest/node or participant/client
|
|
switch joinMsg.JoinerType {
|
|
case connections.JoinerNode:
|
|
// If room already online, send InUse answer
|
|
if room.Online {
|
|
if err = ws.SendAnswerMessageWS(connections.AnswerInUse); err != nil {
|
|
slog.Error("Failed to send InUse answer to node", "room", room.Name, "err", err)
|
|
}
|
|
return
|
|
}
|
|
room.AssignWebSocket(ws)
|
|
go IngestHandler(room)
|
|
case connections.JoinerClient:
|
|
// Create participant and add to room regardless of online status
|
|
participant := NewParticipant(ws)
|
|
room.AddParticipant(participant)
|
|
// If room not online, send Offline answer
|
|
if !room.Online {
|
|
if err = ws.SendAnswerMessageWS(connections.AnswerOffline); err != nil {
|
|
slog.Error("Failed to send offline answer to participant", "room", room.Name, "err", err)
|
|
}
|
|
}
|
|
go ParticipantHandler(participant, room, rel)
|
|
default:
|
|
slog.Error("Unknown joiner type", "joinerType", joinMsg.JoinerType)
|
|
}
|
|
|
|
// Unregister ourselves, if something happens on the other side they should just reconnect?
|
|
ws.UnregisterMessageCallback("join")
|
|
})
|
|
}
|
|
|
|
// controlMessage is the JSON struct for the control messages
|
|
type controlMessage struct {
|
|
Type string `json:"type"`
|
|
Value string `json:"value"`
|
|
}
|
|
|
|
// controlHandler is the handler for the /api/control endpoint, for controlling this relay
|
|
func controlHandler(w http.ResponseWriter, r *http.Request) {
|
|
// Check for control secret in Authorization header
|
|
authHeader := r.Header.Get("Authorization")
|
|
if len(authHeader) <= 0 || authHeader != common.GetFlags().ControlSecret {
|
|
logHTTPError(w, "missing or invalid Authorization header", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
// Handle CORS preflight request
|
|
if r.Method == http.MethodOptions {
|
|
w.WriteHeader(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
// Decode the control message
|
|
var msg controlMessage
|
|
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
|
|
logHTTPError(w, "failed to decode control message", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
//relay := GetRelay()
|
|
switch msg.Type {
|
|
case "join_mesh":
|
|
// Join the mesh network, get relay address from msg.Value
|
|
if len(msg.Value) <= 0 {
|
|
logHTTPError(w, "missing relay address", http.StatusBadRequest)
|
|
return
|
|
}
|
|
ctx := r.Context()
|
|
if err := GetRelay().ConnectToRelay(ctx, msg.Value); err != nil {
|
|
http.Error(w, fmt.Sprintf("Failed to connect: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
w.Write([]byte("Successfully connected to relay"))
|
|
default:
|
|
logHTTPError(w, "unknown control message type", http.StatusBadRequest)
|
|
}
|
|
}
|