Files
codexPySnake/server/transport.py
Vladyslav Doloman 9043ba81c0 Server scaffold: protocol + config + transport abstraction + tick loop skeleton
- Protocol: PacketType, TLV Body types, QUIC varint, header, input_broadcast
  and config_update builders, 2-bit body bitpacking helper
- Config/model: live-config ServerConfig, basic GameState/Snake/Session
- Transport: InMemoryTransport placeholder and QUIC server stub
- Server: asyncio tick loop, periodic config_update broadcast, immediate
  input_broadcast relay; main entry and run.py
2025-10-07 20:02:28 +03:00

62 lines
1.8 KiB
Python

from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import Awaitable, Callable, Optional, Tuple
OnDatagram = Callable[[bytes, object], Awaitable[None]]
@dataclass
class TransportPeer:
addr: object # opaque peer handle (e.g., QUIC session)
class DatagramServerTransport:
async def send(self, data: bytes, peer: TransportPeer) -> None:
raise NotImplementedError
async def run(self) -> None:
raise NotImplementedError
class InMemoryTransport(DatagramServerTransport):
"""A test transport that loops datagrams back to registered peers."""
def __init__(self, on_datagram: OnDatagram):
self._on_datagram = on_datagram
self._peers: list[TransportPeer] = []
def register_peer(self, peer: TransportPeer) -> None:
self._peers.append(peer)
async def send(self, data: bytes, peer: TransportPeer) -> None:
# In-memory: deliver to all except sender to simulate broadcast domain
for p in self._peers:
if p is peer:
continue
await self._on_datagram(data, p)
async def run(self) -> None:
# Nothing to do in in-memory test transport
await asyncio.Future()
class QuicWebTransportServer(DatagramServerTransport):
"""Placeholder for a real WebTransport (HTTP/3) datagram server.
Integrate with aioquic or another QUIC library and invoke the provided
on_datagram callback when a datagram arrives.
"""
def __init__(self, on_datagram: OnDatagram):
self._on_datagram = on_datagram
async def send(self, data: bytes, peer: TransportPeer) -> None:
raise NotImplementedError("QUIC server not implemented in skeleton")
async def run(self) -> None:
raise NotImplementedError("QUIC server not implemented in skeleton")