feat(maitred): Update maitred - hookup to the API (#198)

## Description
We are attempting to hookup maitred to the API
Maitred duties will be:
- [ ] Hookup to the API
- [ ]  Wait for signal (from the API) to start Steam
- [ ] Stop signal to stop the gaming session, clean up Steam... and
maybe do the backup

## Summary by CodeRabbit

- **New Features**
- Introduced Docker-based deployment configurations for both the main
and relay applications.
- Added new API endpoints enabling real-time machine messaging and
enhanced IoT operations.
- Expanded database schema and actor types to support improved machine
tracking.

- **Improvements**
- Enhanced real-time communication and relay management with streamlined
room handling.
- Upgraded dependencies, logging, and error handling for greater
stability and performance.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: DatCaptainHorse <DatCaptainHorse@users.noreply.github.com>
Co-authored-by: Kristian Ollikainen <14197772+DatCaptainHorse@users.noreply.github.com>
This commit is contained in:
Wanjohi
2025-04-07 23:23:53 +03:00
committed by GitHub
parent 6990494b34
commit de80f3e6ab
84 changed files with 7357 additions and 1331 deletions

View File

@@ -0,0 +1,366 @@
package realtime
import (
"context"
"fmt"
"log/slog"
"nestri/maitred/internal"
"nestri/maitred/internal/containers"
"strings"
"sync"
"time"
)
var (
nestriRunnerImage = "ghcr.io/nestrilabs/nestri/runner:nightly"
nestriRelayImage = "ghcr.io/nestrilabs/nestri/relay:nightly"
)
type ManagedContainerType int
const (
// Runner is the nestri runner container
Runner ManagedContainerType = iota
// Relay is the nestri relay container
Relay
)
// ManagedContainer type with extra information fields
type ManagedContainer struct {
containers.Container
Type ManagedContainerType
}
// managedContainers is a map of containers that are managed by us (maitred)
var (
managedContainers = make(map[string]ManagedContainer)
managedContainersMutex sync.RWMutex
)
// InitializeManager handles the initialization of the managed containers and pulls their latest images
func InitializeManager(ctx context.Context, ctrEngine containers.ContainerEngine) error {
// If debug, override the images
if internal.GetFlags().Debug {
nestriRunnerImage = "ghcr.io/datcaptainhorse/nestri-cachyos:latest-v3"
nestriRelayImage = "ghcr.io/datcaptainhorse/nestri-relay:latest"
}
// Look for existing stopped runner containers and remove them
slog.Info("Checking and removing old runner containers")
oldRunners, err := ctrEngine.ListContainersByImage(ctx, nestriRunnerImage)
if err != nil {
return err
}
for _, c := range oldRunners {
// If running, stop first
if strings.Contains(strings.ToLower(c.State), "running") {
slog.Info("Stopping old runner container", "id", c.ID)
if err = ctrEngine.StopContainer(ctx, c.ID); err != nil {
return err
}
}
slog.Info("Removing old runner container", "id", c.ID)
if err = ctrEngine.RemoveContainer(ctx, c.ID); err != nil {
return err
}
}
// Pull the runner image if not in debug mode
if !internal.GetFlags().Debug {
slog.Info("Pulling runner image", "image", nestriRunnerImage)
if err := ctrEngine.PullImage(ctx, nestriRunnerImage); err != nil {
return fmt.Errorf("failed to pull runner image: %w", err)
}
}
// Look for existing stopped relay containers and remove them
slog.Info("Checking and removing old relay containers")
oldRelays, err := ctrEngine.ListContainersByImage(ctx, nestriRelayImage)
if err != nil {
return err
}
for _, c := range oldRelays {
// If running, stop first
if strings.Contains(strings.ToLower(c.State), "running") {
slog.Info("Stopping old relay container", "id", c.ID)
if err = ctrEngine.StopContainer(ctx, c.ID); err != nil {
return err
}
}
slog.Info("Removing old relay container", "id", c.ID)
if err = ctrEngine.RemoveContainer(ctx, c.ID); err != nil {
return err
}
}
// Pull the relay image if not in debug mode
if !internal.GetFlags().Debug {
slog.Info("Pulling relay image", "image", nestriRelayImage)
if err := ctrEngine.PullImage(ctx, nestriRelayImage); err != nil {
return fmt.Errorf("failed to pull relay image: %w", err)
}
}
return nil
}
// CreateRunner creates a new runner image container
func CreateRunner(ctx context.Context, ctrEngine containers.ContainerEngine) (string, error) {
// For safety, limit to 4 runners
if CountRunners() >= 4 {
return "", fmt.Errorf("maximum number of runners reached")
}
// Create the container
containerID, err := ctrEngine.NewContainer(ctx, nestriRunnerImage, nil)
if err != nil {
return "", err
}
// Add the container to the managed list
managedContainersMutex.Lock()
defer managedContainersMutex.Unlock()
managedContainers[containerID] = ManagedContainer{
Container: containers.Container{
ID: containerID,
},
Type: Runner,
}
return containerID, nil
}
// StartRunner starts a runner container, keeping track of it's state
func StartRunner(ctx context.Context, ctrEngine containers.ContainerEngine, id string) error {
// Verify the container is part of the managed list
managedContainersMutex.RLock()
if _, ok := managedContainers[id]; !ok {
managedContainersMutex.RUnlock()
return fmt.Errorf("container %s is not managed", id)
}
managedContainersMutex.RUnlock()
// Start the container
if err := ctrEngine.StartContainer(ctx, id); err != nil {
return err
}
// Check container status in background at 10 second intervals, if it exits print it's logs
go func() {
err := monitorContainer(ctx, ctrEngine, id)
if err != nil {
slog.Error("failure while monitoring runner container", "id", id, "err", err)
return
}
}()
return nil
}
// RemoveRunner removes a runner container
func RemoveRunner(ctx context.Context, ctrEngine containers.ContainerEngine, id string) error {
// Stop the container if it's running
if strings.Contains(strings.ToLower(managedContainers[id].State), "running") {
if err := ctrEngine.StopContainer(ctx, id); err != nil {
return err
}
}
// Remove the container
if err := ctrEngine.RemoveContainer(ctx, id); err != nil {
return err
}
// Remove the container from the managed list
managedContainersMutex.Lock()
defer managedContainersMutex.Unlock()
delete(managedContainers, id)
return nil
}
// ListRunners returns a list of all runner containers
func ListRunners() []ManagedContainer {
managedContainersMutex.Lock()
defer managedContainersMutex.Unlock()
var runners []ManagedContainer
for _, v := range managedContainers {
if v.Type == Runner {
runners = append(runners, v)
}
}
return runners
}
// CountRunners returns the number of runner containers
func CountRunners() int {
return len(ListRunners())
}
// CreateRelay creates a new relay image container
func CreateRelay(ctx context.Context, ctrEngine containers.ContainerEngine) (string, error) {
// Limit to 1 relay
if CountRelays() >= 1 {
return "", fmt.Errorf("maximum number of relays reached")
}
// TODO: Placeholder for control secret, should be generated at runtime
secretEnv := fmt.Sprintf("CONTROL_SECRET=%s", "1234")
// Create the container
containerID, err := ctrEngine.NewContainer(ctx, nestriRelayImage, []string{secretEnv})
if err != nil {
return "", err
}
// Add the container to the managed list
managedContainersMutex.Lock()
defer managedContainersMutex.Unlock()
managedContainers[containerID] = ManagedContainer{
Container: containers.Container{
ID: containerID,
},
Type: Relay,
}
return containerID, nil
}
// StartRelay starts a relay container, keeping track of it's state
func StartRelay(ctx context.Context, ctrEngine containers.ContainerEngine, id string) error {
// Verify the container is part of the managed list
managedContainersMutex.RLock()
if _, ok := managedContainers[id]; !ok {
managedContainersMutex.RUnlock()
return fmt.Errorf("container %s is not managed", id)
}
managedContainersMutex.RUnlock()
// Start the container
if err := ctrEngine.StartContainer(ctx, id); err != nil {
return err
}
// Check container status in background at 10 second intervals, if it exits print it's logs
go func() {
err := monitorContainer(ctx, ctrEngine, id)
if err != nil {
slog.Error("failure while monitoring relay container", "id", id, "err", err)
return
}
}()
return nil
}
// RemoveRelay removes a relay container
func RemoveRelay(ctx context.Context, ctrEngine containers.ContainerEngine, id string) error {
// Stop the container if it's running
if strings.Contains(strings.ToLower(managedContainers[id].State), "running") {
if err := ctrEngine.StopContainer(ctx, id); err != nil {
return err
}
}
// Remove the container
if err := ctrEngine.RemoveContainer(ctx, id); err != nil {
return err
}
// Remove the container from the managed list
managedContainersMutex.Lock()
defer managedContainersMutex.Unlock()
delete(managedContainers, id)
return nil
}
// ListRelays returns a list of all relay containers
func ListRelays() []ManagedContainer {
managedContainersMutex.Lock()
defer managedContainersMutex.Unlock()
var relays []ManagedContainer
for _, v := range managedContainers {
if v.Type == Relay {
relays = append(relays, v)
}
}
return relays
}
// CountRelays returns the number of relay containers
func CountRelays() int {
return len(ListRelays())
}
// CleanupManaged stops and removes all managed containers
func CleanupManaged(ctx context.Context, ctrEngine containers.ContainerEngine) error {
if len(managedContainers) <= 0 {
return nil
}
slog.Info("Cleaning up managed containers")
managedContainersMutex.Lock()
defer managedContainersMutex.Unlock()
for id := range managedContainers {
// If running, stop first
if strings.Contains(strings.ToLower(managedContainers[id].State), "running") {
slog.Info("Stopping managed container", "id", id)
if err := ctrEngine.StopContainer(ctx, id); err != nil {
return err
}
}
// Remove the container
slog.Info("Removing managed container", "id", id)
if err := ctrEngine.RemoveContainer(ctx, id); err != nil {
return err
}
// Remove from the managed list
delete(managedContainers, id)
}
return nil
}
func monitorContainer(ctx context.Context, ctrEngine containers.ContainerEngine, id string) error {
for {
select {
case <-ctx.Done():
return nil
default:
// Check the container status
ctr, err := ctrEngine.InspectContainer(ctx, id)
if err != nil {
return fmt.Errorf("failed to inspect container: %w", err)
}
// Update the container state in the managed list
managedContainersMutex.Lock()
managedContainers[id] = ManagedContainer{
Container: containers.Container{
ID: ctr.ID,
Name: ctr.Name,
State: ctr.State,
Image: ctr.Image,
},
Type: Relay,
}
managedContainersMutex.Unlock()
if !strings.Contains(strings.ToLower(ctr.State), "running") {
// Container is not running, print logs
logs, err := ctrEngine.LogsContainer(ctx, id)
if err != nil {
return fmt.Errorf("failed to get container logs: %w", err)
}
return fmt.Errorf("container %s stopped running: %s", id, logs)
}
}
// Sleep for 10 seconds
select {
case <-ctx.Done():
return nil
case <-time.After(10 * time.Second):
}
}
}

View File

@@ -0,0 +1,52 @@
package realtime
import (
"encoding/json"
)
// BaseMessage is the generic top-level message structure
type BaseMessage struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
}
type CreatePayload struct{}
type StartPayload struct {
ContainerID string `json:"container_id"`
}
type StopPayload struct {
ContainerID string `json:"container_id"`
}
// ParseMessage parses a BaseMessage and returns the specific payload
func ParseMessage(data []byte) (BaseMessage, interface{}, error) {
var base BaseMessage
if err := json.Unmarshal(data, &base); err != nil {
return base, nil, err
}
switch base.Type {
case "create":
var payload CreatePayload
if err := json.Unmarshal(base.Payload, &payload); err != nil {
return base, nil, err
}
return base, payload, nil
case "start":
var payload StartPayload
if err := json.Unmarshal(base.Payload, &payload); err != nil {
return base, nil, err
}
return base, payload, nil
case "stop":
var payload StopPayload
if err := json.Unmarshal(base.Payload, &payload); err != nil {
return base, nil, err
}
return base, payload, nil
default:
return base, base.Payload, nil
}
}

View File

@@ -0,0 +1,182 @@
package realtime
import (
"context"
"fmt"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"log/slog"
"nestri/maitred/internal/auth"
"nestri/maitred/internal/containers"
"nestri/maitred/internal/resource"
"net/url"
"os"
"time"
)
func Run(ctx context.Context, machineID string, containerEngine containers.ContainerEngine, resource *resource.Resource) error {
var clientID = generateClientID()
var topic = fmt.Sprintf("%s/%s/%s", resource.App.Name, resource.App.Stage, machineID)
var serverURL = fmt.Sprintf("wss://%s/mqtt?x-amz-customauthorizer-name=%s", resource.Realtime.Endpoint, resource.Realtime.Authorizer)
slog.Info("Realtime", "topic", topic)
userTokens, err := auth.FetchUserToken(machineID, resource)
if err != nil {
return err
}
slog.Info("Realtime", "token", userTokens.AccessToken)
u, err := url.Parse(serverURL)
if err != nil {
return err
}
router := paho.NewStandardRouter()
router.DefaultHandler(func(p *paho.Publish) {
slog.Debug("DefaultHandler", "topic", p.Topic, "message", fmt.Sprintf("default handler received message: %s - with topic: %s", p.Payload, p.Topic))
})
createTopic := fmt.Sprintf("%s/create", topic)
slog.Debug("Registering handler", "topic", createTopic)
router.RegisterHandler(createTopic, func(p *paho.Publish) {
slog.Debug("Router", "message", "received create message with payload", fmt.Sprintf("%s", p.Payload))
base, _, err := ParseMessage(p.Payload)
if err != nil {
slog.Error("Router", "err", fmt.Sprintf("failed to parse message: %s", err))
return
}
if base.Type != "create" {
slog.Error("Router", "err", "unexpected message type")
return
}
// Create runner container
containerID, err := CreateRunner(ctx, containerEngine)
if err != nil {
slog.Error("Router", "err", fmt.Sprintf("failed to create runner container: %s", err))
return
}
slog.Info("Router", "info", fmt.Sprintf("created runner container: %s", containerID))
})
startTopic := fmt.Sprintf("%s/start", topic)
slog.Debug("Registering handler", "topic", startTopic)
router.RegisterHandler(startTopic, func(p *paho.Publish) {
slog.Debug("Router", "message", "received start message with payload", fmt.Sprintf("%s", p.Payload))
base, payload, err := ParseMessage(p.Payload)
if err != nil {
slog.Error("Router", "err", fmt.Sprintf("failed to parse message: %s", err))
return
}
if base.Type != "start" {
slog.Error("Router", "err", "unexpected message type")
return
}
// Get container ID
startPayload, ok := payload.(StartPayload)
if !ok {
slog.Error("Router", "err", "failed to get payload")
return
}
// Start runner container
if err = containerEngine.StartContainer(ctx, startPayload.ContainerID); err != nil {
slog.Error("Router", "err", fmt.Sprintf("failed to start runner container: %s", err))
return
}
slog.Info("Router", "info", fmt.Sprintf("started runner container: %s", startPayload.ContainerID))
})
stopTopic := fmt.Sprintf("%s/stop", topic)
slog.Debug("Registering handler", "topic", stopTopic)
router.RegisterHandler(stopTopic, func(p *paho.Publish) {
slog.Debug("Router", "message", "received stop message with payload", fmt.Sprintf("%s", p.Payload))
base, payload, err := ParseMessage(p.Payload)
if err != nil {
slog.Error("Router", "err", fmt.Sprintf("failed to parse message: %s", err))
return
}
if base.Type != "stop" {
slog.Error("Router", "err", "unexpected message type")
return
}
// Get container ID
stopPayload, ok := payload.(StopPayload)
if !ok {
slog.Error("Router", "err", "failed to get payload")
return
}
// Stop runner container
if err = containerEngine.StopContainer(ctx, stopPayload.ContainerID); err != nil {
slog.Error("Router", "err", fmt.Sprintf("failed to stop runner container: %s", err))
return
}
slog.Info("Router", "info", fmt.Sprintf("stopped runner container: %s", stopPayload.ContainerID))
})
legacyLogger := slog.NewLogLogger(slog.NewTextHandler(os.Stdout, nil), slog.LevelError)
cliCfg := autopaho.ClientConfig{
ServerUrls: []*url.URL{u},
ConnectUsername: "",
ConnectPassword: []byte(userTokens.AccessToken),
KeepAlive: 20,
CleanStartOnInitialConnection: true,
SessionExpiryInterval: 60,
ReconnectBackoff: autopaho.NewConstantBackoff(time.Second),
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
slog.Info("Router", "info", "MQTT connection is up and running")
if _, err = cm.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{Topic: fmt.Sprintf("%s/#", topic), QoS: 1},
},
}); err != nil {
slog.Error("Router", "err", fmt.Sprint("failed to subscribe, likely no messages will be received: ", err))
}
},
Errors: legacyLogger,
OnConnectError: func(err error) {
slog.Error("Router", "err", fmt.Sprintf("error whilst attempting connection: %s", err))
},
ClientConfig: paho.ClientConfig{
ClientID: clientID,
OnPublishReceived: []func(paho.PublishReceived) (bool, error){
func(pr paho.PublishReceived) (bool, error) {
router.Route(pr.Packet.Packet())
return true, nil
}},
OnClientError: func(err error) { slog.Error("Router", "err", fmt.Sprintf("client error: %s", err)) },
OnServerDisconnect: func(d *paho.Disconnect) {
if d.Properties != nil {
slog.Info("Router", "info", fmt.Sprintf("server requested disconnect: %s", d.Properties.ReasonString))
} else {
slog.Info("Router", "info", fmt.Sprintf("server requested disconnect; reason code: %d", d.ReasonCode))
}
},
},
}
c, err := autopaho.NewConnection(ctx, cliCfg)
if err != nil {
return err
}
if err = c.AwaitConnection(ctx); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,17 @@
package realtime
import (
"crypto/rand"
"fmt"
"github.com/oklog/ulid/v2"
"time"
)
func generateClientID() string {
// Create a source of entropy (cryptographically secure)
entropy := ulid.Monotonic(rand.Reader, 0)
// Generate a new ULID
id := ulid.MustNew(ulid.Timestamp(time.Now()), entropy)
// Create the client ID string
return fmt.Sprintf("mch_%s", id.String())
}