from __future__ import annotations import asyncio from dataclasses import dataclass from typing import Awaitable, Callable, Optional, Tuple OnDatagram = Callable[[bytes, object], Awaitable[None]] @dataclass class TransportPeer: addr: object # opaque peer handle (e.g., QUIC session) class DatagramServerTransport: async def send(self, data: bytes, peer: TransportPeer) -> None: raise NotImplementedError async def run(self) -> None: raise NotImplementedError class InMemoryTransport(DatagramServerTransport): """A test transport that loops datagrams back to registered peers.""" def __init__(self, on_datagram: OnDatagram): self._on_datagram = on_datagram self._peers: list[TransportPeer] = [] def register_peer(self, peer: TransportPeer) -> None: self._peers.append(peer) async def send(self, data: bytes, peer: TransportPeer) -> None: # In-memory: deliver only to the addressed peer if peer in self._peers: await self._on_datagram(data, peer) async def run(self) -> None: # Nothing to do in in-memory test transport await asyncio.Future() class QuicWebTransportServer(DatagramServerTransport): """Placeholder for a real WebTransport (HTTP/3) datagram server. Integrate with aioquic or another QUIC library and invoke the provided on_datagram callback when a datagram arrives. """ def __init__(self, on_datagram: OnDatagram): self._on_datagram = on_datagram async def send(self, data: bytes, peer: TransportPeer) -> None: raise NotImplementedError("QUIC server not implemented in skeleton") async def run(self) -> None: raise NotImplementedError("QUIC server not implemented in skeleton")