- Configure logging via LOG_LEVEL env (default INFO) - Log when servers start listening (WT/QUIC/InMemory/HTTPS static) - Log WT CONNECT accept, QUIC peer connect, datagram traffic at DEBUG - Log GameServer creation, tick loop start, JOIN accept/deny, config_update broadcasts, and input reception
60 lines
1.8 KiB
Python
60 lines
1.8 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from dataclasses import dataclass
|
|
import logging
|
|
from typing import Awaitable, Callable, Optional, Tuple
|
|
|
|
|
|
OnDatagram = Callable[[bytes, object], Awaitable[None]]
|
|
|
|
|
|
@dataclass
|
|
class TransportPeer:
|
|
addr: object # opaque peer handle (e.g., QUIC session)
|
|
|
|
|
|
class DatagramServerTransport:
|
|
async def send(self, data: bytes, peer: TransportPeer) -> None:
|
|
raise NotImplementedError
|
|
|
|
async def run(self) -> None:
|
|
raise NotImplementedError
|
|
|
|
|
|
class InMemoryTransport(DatagramServerTransport):
|
|
"""A test transport that loops datagrams back to registered peers."""
|
|
|
|
def __init__(self, on_datagram: OnDatagram):
|
|
self._on_datagram = on_datagram
|
|
self._peers: list[TransportPeer] = []
|
|
|
|
def register_peer(self, peer: TransportPeer) -> None:
|
|
self._peers.append(peer)
|
|
|
|
async def send(self, data: bytes, peer: TransportPeer) -> None:
|
|
# In-memory: deliver only to the addressed peer
|
|
if peer in self._peers:
|
|
await self._on_datagram(data, peer)
|
|
|
|
async def run(self) -> None:
|
|
logging.info("InMemory transport started (no network)")
|
|
await asyncio.Future()
|
|
|
|
|
|
class QuicWebTransportServer(DatagramServerTransport):
|
|
"""Placeholder for a real WebTransport (HTTP/3) datagram server.
|
|
|
|
Integrate with aioquic or another QUIC library and invoke the provided
|
|
on_datagram callback when a datagram arrives.
|
|
"""
|
|
|
|
def __init__(self, on_datagram: OnDatagram):
|
|
self._on_datagram = on_datagram
|
|
|
|
async def send(self, data: bytes, peer: TransportPeer) -> None:
|
|
raise NotImplementedError("QUIC server not implemented in skeleton")
|
|
|
|
async def run(self) -> None:
|
|
raise NotImplementedError("QUIC server not implemented in skeleton")
|