Files
netris-nestri/packages/relay/internal/http.go
Wanjohi de80f3e6ab feat(maitred): Update maitred - hookup to the API (#198)
## Description
We are attempting to hookup maitred to the API
Maitred duties will be:
- [ ] Hookup to the API
- [ ]  Wait for signal (from the API) to start Steam
- [ ] Stop signal to stop the gaming session, clean up Steam... and
maybe do the backup

## Summary by CodeRabbit

- **New Features**
- Introduced Docker-based deployment configurations for both the main
and relay applications.
- Added new API endpoints enabling real-time machine messaging and
enhanced IoT operations.
- Expanded database schema and actor types to support improved machine
tracking.

- **Improvements**
- Enhanced real-time communication and relay management with streamlined
room handling.
- Upgraded dependencies, logging, and error handling for greater
stability and performance.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: DatCaptainHorse <DatCaptainHorse@users.noreply.github.com>
Co-authored-by: Kristian Ollikainen <14197772+DatCaptainHorse@users.noreply.github.com>
2025-04-07 23:23:53 +03:00

203 lines
6.0 KiB
Go

package internal
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/gorilla/websocket"
"github.com/libp2p/go-reuseport"
"log/slog"
"net/http"
"relay/internal/common"
"relay/internal/connections"
"strconv"
)
var httpMux *http.ServeMux
func InitHTTPEndpoint(_ context.Context, ctxCancel context.CancelFunc) error {
// Create HTTP mux which serves our WS endpoint
httpMux = http.NewServeMux()
// Endpoints themselves
httpMux.Handle("/", http.NotFoundHandler())
// If control endpoint secret is set, enable the control endpoint
if len(common.GetFlags().ControlSecret) > 0 {
httpMux.HandleFunc("/api/control", corsAnyHandler(controlHandler))
}
// WS endpoint
httpMux.HandleFunc("/api/ws/{roomName}", corsAnyHandler(wsHandler))
// Get our serving port
port := common.GetFlags().EndpointPort
tlsCert := common.GetFlags().TLSCert
tlsKey := common.GetFlags().TLSKey
// Create re-usable listener port
httpListener, err := reuseport.Listen("tcp", ":"+strconv.Itoa(port))
if err != nil {
return fmt.Errorf("failed to create TCP listener: %w", err)
}
// Log and start the endpoint server
if len(tlsCert) <= 0 && len(tlsKey) <= 0 {
slog.Info("Starting HTTP endpoint server", "port", port)
go func() {
if err := http.Serve(httpListener, httpMux); err != nil {
slog.Error("Failed to start HTTP server", "err", err)
ctxCancel()
}
}()
} else if len(tlsCert) > 0 && len(tlsKey) > 0 {
slog.Info("Starting HTTPS endpoint server", "port", port)
go func() {
if err := http.ServeTLS(httpListener, httpMux, tlsCert, tlsKey); err != nil {
slog.Error("Failed to start HTTPS server", "err", err)
ctxCancel()
}
}()
} else {
return errors.New("no TLS certificate or TLS key provided")
}
return nil
}
// logHTTPError logs (if verbose) and sends an error code to requester
func logHTTPError(w http.ResponseWriter, err string, code int) {
if common.GetFlags().Verbose {
slog.Error("HTTP error", "code", code, "message", err)
}
http.Error(w, err, code)
}
// corsAnyHandler allows any origin to access the endpoint
func corsAnyHandler(next func(w http.ResponseWriter, r *http.Request)) http.HandlerFunc {
return func(res http.ResponseWriter, req *http.Request) {
// Allow all origins
res.Header().Set("Access-Control-Allow-Origin", "*")
res.Header().Set("Access-Control-Allow-Methods", "*")
res.Header().Set("Access-Control-Allow-Headers", "*")
if req.Method != http.MethodOptions {
next(res, req)
}
}
}
// wsHandler is the handler for the /api/ws/{roomName} endpoint
func wsHandler(w http.ResponseWriter, r *http.Request) {
// Get given room name now
roomName := r.PathValue("roomName")
if len(roomName) <= 0 {
logHTTPError(w, "no room name given", http.StatusBadRequest)
return
}
rel := GetRelay()
// Get or create room in any case
room := rel.GetOrCreateRoom(roomName)
// Upgrade to WebSocket
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
wsConn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logHTTPError(w, err.Error(), http.StatusInternalServerError)
return
}
// Create SafeWebSocket
ws := connections.NewSafeWebSocket(wsConn)
// Assign message handler for join request
ws.RegisterMessageCallback("join", func(data []byte) {
var joinMsg connections.MessageJoin
if err = json.Unmarshal(data, &joinMsg); err != nil {
slog.Error("Failed to unmarshal join message", "err", err)
return
}
slog.Debug("Join message", "room", room.Name, "joinerType", joinMsg.JoinerType)
// Handle join request, depending if it's from ingest/node or participant/client
switch joinMsg.JoinerType {
case connections.JoinerNode:
// If room already online, send InUse answer
if room.Online {
if err = ws.SendAnswerMessageWS(connections.AnswerInUse); err != nil {
slog.Error("Failed to send InUse answer to node", "room", room.Name, "err", err)
}
return
}
room.AssignWebSocket(ws)
go IngestHandler(room)
case connections.JoinerClient:
// Create participant and add to room regardless of online status
participant := NewParticipant(ws)
room.AddParticipant(participant)
// If room not online, send Offline answer
if !room.Online {
if err = ws.SendAnswerMessageWS(connections.AnswerOffline); err != nil {
slog.Error("Failed to send offline answer to participant", "room", room.Name, "err", err)
}
}
go ParticipantHandler(participant, room, rel)
default:
slog.Error("Unknown joiner type", "joinerType", joinMsg.JoinerType)
}
// Unregister ourselves, if something happens on the other side they should just reconnect?
ws.UnregisterMessageCallback("join")
})
}
// controlMessage is the JSON struct for the control messages
type controlMessage struct {
Type string `json:"type"`
Value string `json:"value"`
}
// controlHandler is the handler for the /api/control endpoint, for controlling this relay
func controlHandler(w http.ResponseWriter, r *http.Request) {
// Check for control secret in Authorization header
authHeader := r.Header.Get("Authorization")
if len(authHeader) <= 0 || authHeader != common.GetFlags().ControlSecret {
logHTTPError(w, "missing or invalid Authorization header", http.StatusUnauthorized)
return
}
// Handle CORS preflight request
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusOK)
return
}
// Decode the control message
var msg controlMessage
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
logHTTPError(w, "failed to decode control message", http.StatusBadRequest)
return
}
//relay := GetRelay()
switch msg.Type {
case "join_mesh":
// Join the mesh network, get relay address from msg.Value
if len(msg.Value) <= 0 {
logHTTPError(w, "missing relay address", http.StatusBadRequest)
return
}
ctx := r.Context()
if err := GetRelay().ConnectToRelay(ctx, msg.Value); err != nil {
http.Error(w, fmt.Sprintf("Failed to connect: %v", err), http.StatusInternalServerError)
return
}
w.Write([]byte("Successfully connected to relay"))
default:
logHTTPError(w, "unknown control message type", http.StatusBadRequest)
}
}