From 088c36396b5682e71eaad0106f0b5001835a5d4a Mon Sep 17 00:00:00 2001 From: Vladyslav Doloman Date: Tue, 7 Oct 2025 22:36:06 +0300 Subject: [PATCH] WebTransport: add HTTP/3 WebTransport datagram server using aioquic; run mode switch via MODE=wt|quic|mem --- run.py | 47 ++++++++----- server/webtransport_server.py | 125 ++++++++++++++++++++++++++++++++++ 2 files changed, 154 insertions(+), 18 deletions(-) create mode 100644 server/webtransport_server.py diff --git a/run.py b/run.py index 6b00d39..77c08ba 100644 --- a/run.py +++ b/run.py @@ -3,34 +3,45 @@ import os from server.server import GameServer from server.config import ServerConfig -from server.transport import InMemoryTransport -async def main(): - from server.server import GameServer +async def run_in_memory(): from server.transport import InMemoryTransport - cfg = ServerConfig() server = GameServer(transport=InMemoryTransport(lambda d, p: server.on_datagram(d, p)), config=cfg) await asyncio.gather(server.transport.run(), server.tick_loop()) +async def run_quic(): + from server.quic_transport import QuicWebTransportServer + cfg = ServerConfig() + host = os.environ.get("QUIC_HOST", "0.0.0.0") + port = int(os.environ.get("QUIC_PORT", "4433")) + cert = os.environ["QUIC_CERT"] + key = os.environ["QUIC_KEY"] + server = GameServer(transport=QuicWebTransportServer(host, port, cert, key, lambda d, p: server.on_datagram(d, p)), config=cfg) + await asyncio.gather(server.transport.run(), server.tick_loop()) + + +async def run_webtransport(): + from server.webtransport_server import WebTransportServer + cfg = ServerConfig() + host = os.environ.get("WT_HOST", os.environ.get("QUIC_HOST", "0.0.0.0")) + port = int(os.environ.get("WT_PORT", os.environ.get("QUIC_PORT", "4433"))) + cert = os.environ.get("WT_CERT") or os.environ["QUIC_CERT"] + key = os.environ.get("WT_KEY") or os.environ["QUIC_KEY"] + server = GameServer(transport=WebTransportServer(host, port, cert, key, lambda d, p: server.on_datagram(d, p)), config=cfg) + await asyncio.gather(server.transport.run(), server.tick_loop()) + + if __name__ == "__main__": try: - # Optional QUIC mode if env vars are provided - cert = os.environ.get("QUIC_CERT") - key = os.environ.get("QUIC_KEY") - host = os.environ.get("QUIC_HOST", "0.0.0.0") - port = int(os.environ.get("QUIC_PORT", "4433")) - if cert and key: - from server.quic_transport import QuicWebTransportServer - from server.server import GameServer - cfg = ServerConfig() - async def start_quic(): - server = GameServer(transport=QuicWebTransportServer(host, port, cert, key, lambda d, p: server.on_datagram(d, p)), config=cfg) - await asyncio.gather(server.transport.run(), server.tick_loop()) - asyncio.run(start_quic()) + mode = os.environ.get("MODE", "mem").lower() + if mode == "wt": + asyncio.run(run_webtransport()) + elif mode == "quic": + asyncio.run(run_quic()) else: - asyncio.run(main()) + asyncio.run(run_in_memory()) except KeyboardInterrupt: pass diff --git a/server/webtransport_server.py b/server/webtransport_server.py new file mode 100644 index 0000000..bbb429f --- /dev/null +++ b/server/webtransport_server.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +from typing import Awaitable, Callable, Dict, Optional + +from .transport import DatagramServerTransport, OnDatagram, TransportPeer + + +try: + from aioquic.asyncio import QuicConnectionProtocol, serve + from aioquic.quic.configuration import QuicConfiguration + from aioquic.h3.connection import H3Connection + from aioquic.h3.events import HeadersReceived + # Datagram event names vary by aioquic version; try both + try: + from aioquic.h3.events import DatagramReceived as H3DatagramReceived # type: ignore + except Exception: # pragma: no cover + H3DatagramReceived = object # type: ignore +except Exception: # pragma: no cover - optional dependency not installed + QuicConnectionProtocol = object # type: ignore + QuicConfiguration = object # type: ignore + H3Connection = object # type: ignore + HeadersReceived = object # type: ignore + H3DatagramReceived = object # type: ignore + serve = None # type: ignore + + +@dataclass +class WTSession: + flow_id: int # HTTP/3 datagram flow id (usually CONNECT stream id) + proto: "GameWTProtocol" + + +class GameWTProtocol(QuicConnectionProtocol): # type: ignore[misc] + def __init__(self, *args, on_datagram: OnDatagram, sessions: Dict[int, WTSession], **kwargs): + super().__init__(*args, **kwargs) + self._on_datagram = on_datagram + self._sessions = sessions + self._http: Optional[H3Connection] = None + + def http_event_received(self, event) -> None: # type: ignore[override] + # Headers for CONNECT :protocol = webtransport open a session + if isinstance(event, HeadersReceived): + headers = {k.decode().lower(): v.decode() for k, v in event.headers} + method = headers.get(":method") + protocol = headers.get(":protocol") or headers.get("sec-webtransport-protocol") + if method == "CONNECT" and (protocol == "webtransport"): + # In WebTransport over H3, datagrams use the CONNECT stream id as flow_id + flow_id = event.stream_id # type: ignore[attr-defined] + self._sessions[flow_id] = WTSession(flow_id=flow_id, proto=self) + # Send 2xx to accept the session + if self._http is not None: + self._http.send_headers(event.stream_id, [(b":status", b"200")]) + elif isinstance(event, H3DatagramReceived): # type: ignore[misc] + # Route datagram to session by flow_id + flow_id = getattr(event, "flow_id", None) + data = getattr(event, "data", None) + if flow_id is None or data is None: + return + sess = self._sessions.get(flow_id) + if not sess: + return + peer = TransportPeer(addr=(self, flow_id)) + asyncio.ensure_future(self._on_datagram(bytes(data), peer)) + + def quic_event_received(self, event) -> None: # type: ignore[override] + # Lazily create H3 connection wrapper + if self._http is None and hasattr(self, "_quic"): + try: + self._http = H3Connection(self._quic, enable_webtransport=True) # type: ignore[attr-defined] + except Exception: + self._http = None + if self._http is not None: + for http_event in self._http.handle_event(event): + self.http_event_received(http_event) + + async def send_h3_datagram(self, flow_id: int, data: bytes) -> None: + if self._http is None: + return + try: + self._http.send_datagram(flow_id, data) + await self._loop.run_in_executor(None, self.transmit) # type: ignore[attr-defined] + except Exception: + pass + + +class WebTransportServer(DatagramServerTransport): + """HTTP/3 WebTransport datagram server using aioquic. + + Accepts CONNECT requests with :protocol = webtransport and exchanges + RFC 9297 HTTP/3 datagrams bound to the CONNECT stream id. + """ + + def __init__(self, host: str, port: int, certfile: str, keyfile: str, on_datagram: OnDatagram): + if serve is None: + raise RuntimeError("aioquic is not installed. Please `pip install aioquic`.") + self.host = host + self.port = port + self.certfile = certfile + self.keyfile = keyfile + self._on_datagram = on_datagram + self._server = None + self._sessions: Dict[int, WTSession] = {} + + async def send(self, data: bytes, peer: TransportPeer) -> None: + # peer.addr is (protocol, flow_id) + if isinstance(peer.addr, tuple) and len(peer.addr) == 2: + proto, flow_id = peer.addr + if isinstance(proto, GameWTProtocol) and isinstance(flow_id, int): + await proto.send_h3_datagram(flow_id, data) + + async def run(self) -> None: + configuration = QuicConfiguration(is_client=False, alpn_protocols=["h3"]) + configuration.load_cert_chain(self.certfile, self.keyfile) + + async def _create_protocol(*args, **kwargs): + return GameWTProtocol(*args, on_datagram=self._on_datagram, sessions=self._sessions, **kwargs) + + self._server = await serve(self.host, self.port, configuration=configuration, create_protocol=_create_protocol) + try: + await self._server.wait_closed() + finally: + self._server.close() +