from __future__ import annotations import asyncio from dataclasses import dataclass from typing import Awaitable, Callable, Dict, Optional from .transport import DatagramServerTransport, OnDatagram, TransportPeer import logging try: from aioquic.asyncio import QuicConnectionProtocol, serve from aioquic.quic.configuration import QuicConfiguration from aioquic.quic.events import DatagramFrameReceived, ProtocolNegotiated except Exception: # pragma: no cover - optional dependency not installed in skeleton QuicConnectionProtocol = object # type: ignore QuicConfiguration = object # type: ignore serve = None # type: ignore DatagramFrameReceived = object # type: ignore ProtocolNegotiated = object # type: ignore class GameQuicProtocol(QuicConnectionProtocol): # type: ignore[misc] def __init__(self, *args, on_datagram: OnDatagram, peers: Dict[int, "GameQuicProtocol"], **kwargs): super().__init__(*args, **kwargs) self._on_datagram = on_datagram self._peers = peers self._peer_id: Optional[int] = None def quic_event_received(self, event) -> None: # type: ignore[override] if isinstance(event, ProtocolNegotiated): # Register by connection id self._peer_id = int(self._quic.connection_id) # type: ignore[attr-defined] self._peers[self._peer_id] = self logging.info("QUIC peer connected: id=%s", self._peer_id) elif isinstance(event, DatagramFrameReceived): # Schedule async callback if self._peer_id is None: return peer = TransportPeer(addr=self) logging.debug("QUIC datagram received from id=%s, %d bytes", self._peer_id, len(event.data)) asyncio.ensure_future(self._on_datagram(bytes(event.data), peer)) async def send_datagram(self, data: bytes) -> None: logging.debug("QUIC send datagram: %d bytes", len(data)) self._quic.send_datagram_frame(data) # type: ignore[attr-defined] await self._loop.run_in_executor(None, self.transmit) # type: ignore[attr-defined] class QuicWebTransportServer(DatagramServerTransport): def __init__(self, host: str, port: int, certfile: str, keyfile: str, on_datagram: OnDatagram): if serve is None: raise RuntimeError("aioquic is not installed. Please `pip install aioquic`.") self.host = host self.port = port self.certfile = certfile self.keyfile = keyfile self._on_datagram = on_datagram self._server = None self._peers: Dict[int, GameQuicProtocol] = {} async def send(self, data: bytes, peer: TransportPeer) -> None: proto = peer.addr # expected GameQuicProtocol if isinstance(proto, GameQuicProtocol): await proto.send_datagram(data) async def run(self) -> None: configuration = QuicConfiguration(is_client=False, alpn_protocols=["h3"]) configuration.load_cert_chain(self.certfile, self.keyfile) async def _create_protocol(*args, **kwargs): return GameQuicProtocol(*args, on_datagram=self._on_datagram, peers=self._peers, **kwargs) logging.info("QUIC datagram server listening on %s:%d", self.host, self.port) self._server = await serve(self.host, self.port, configuration=configuration, create_protocol=_create_protocol) try: await self._server.wait_closed() finally: self._server.close()