WebTransport: add HTTP/3 WebTransport datagram server using aioquic; run mode switch via MODE=wt|quic|mem

This commit is contained in:
Vladyslav Doloman
2025-10-07 22:36:06 +03:00
parent 1f5ca02ca4
commit 088c36396b
2 changed files with 154 additions and 18 deletions

47
run.py
View File

@@ -3,34 +3,45 @@ import os
from server.server import GameServer
from server.config import ServerConfig
from server.transport import InMemoryTransport
async def main():
from server.server import GameServer
async def run_in_memory():
from server.transport import InMemoryTransport
cfg = ServerConfig()
server = GameServer(transport=InMemoryTransport(lambda d, p: server.on_datagram(d, p)), config=cfg)
await asyncio.gather(server.transport.run(), server.tick_loop())
async def run_quic():
from server.quic_transport import QuicWebTransportServer
cfg = ServerConfig()
host = os.environ.get("QUIC_HOST", "0.0.0.0")
port = int(os.environ.get("QUIC_PORT", "4433"))
cert = os.environ["QUIC_CERT"]
key = os.environ["QUIC_KEY"]
server = GameServer(transport=QuicWebTransportServer(host, port, cert, key, lambda d, p: server.on_datagram(d, p)), config=cfg)
await asyncio.gather(server.transport.run(), server.tick_loop())
async def run_webtransport():
from server.webtransport_server import WebTransportServer
cfg = ServerConfig()
host = os.environ.get("WT_HOST", os.environ.get("QUIC_HOST", "0.0.0.0"))
port = int(os.environ.get("WT_PORT", os.environ.get("QUIC_PORT", "4433")))
cert = os.environ.get("WT_CERT") or os.environ["QUIC_CERT"]
key = os.environ.get("WT_KEY") or os.environ["QUIC_KEY"]
server = GameServer(transport=WebTransportServer(host, port, cert, key, lambda d, p: server.on_datagram(d, p)), config=cfg)
await asyncio.gather(server.transport.run(), server.tick_loop())
if __name__ == "__main__":
try:
# Optional QUIC mode if env vars are provided
cert = os.environ.get("QUIC_CERT")
key = os.environ.get("QUIC_KEY")
host = os.environ.get("QUIC_HOST", "0.0.0.0")
port = int(os.environ.get("QUIC_PORT", "4433"))
if cert and key:
from server.quic_transport import QuicWebTransportServer
from server.server import GameServer
cfg = ServerConfig()
async def start_quic():
server = GameServer(transport=QuicWebTransportServer(host, port, cert, key, lambda d, p: server.on_datagram(d, p)), config=cfg)
await asyncio.gather(server.transport.run(), server.tick_loop())
asyncio.run(start_quic())
mode = os.environ.get("MODE", "mem").lower()
if mode == "wt":
asyncio.run(run_webtransport())
elif mode == "quic":
asyncio.run(run_quic())
else:
asyncio.run(main())
asyncio.run(run_in_memory())
except KeyboardInterrupt:
pass

View File

@@ -0,0 +1,125 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import Awaitable, Callable, Dict, Optional
from .transport import DatagramServerTransport, OnDatagram, TransportPeer
try:
from aioquic.asyncio import QuicConnectionProtocol, serve
from aioquic.quic.configuration import QuicConfiguration
from aioquic.h3.connection import H3Connection
from aioquic.h3.events import HeadersReceived
# Datagram event names vary by aioquic version; try both
try:
from aioquic.h3.events import DatagramReceived as H3DatagramReceived # type: ignore
except Exception: # pragma: no cover
H3DatagramReceived = object # type: ignore
except Exception: # pragma: no cover - optional dependency not installed
QuicConnectionProtocol = object # type: ignore
QuicConfiguration = object # type: ignore
H3Connection = object # type: ignore
HeadersReceived = object # type: ignore
H3DatagramReceived = object # type: ignore
serve = None # type: ignore
@dataclass
class WTSession:
flow_id: int # HTTP/3 datagram flow id (usually CONNECT stream id)
proto: "GameWTProtocol"
class GameWTProtocol(QuicConnectionProtocol): # type: ignore[misc]
def __init__(self, *args, on_datagram: OnDatagram, sessions: Dict[int, WTSession], **kwargs):
super().__init__(*args, **kwargs)
self._on_datagram = on_datagram
self._sessions = sessions
self._http: Optional[H3Connection] = None
def http_event_received(self, event) -> None: # type: ignore[override]
# Headers for CONNECT :protocol = webtransport open a session
if isinstance(event, HeadersReceived):
headers = {k.decode().lower(): v.decode() for k, v in event.headers}
method = headers.get(":method")
protocol = headers.get(":protocol") or headers.get("sec-webtransport-protocol")
if method == "CONNECT" and (protocol == "webtransport"):
# In WebTransport over H3, datagrams use the CONNECT stream id as flow_id
flow_id = event.stream_id # type: ignore[attr-defined]
self._sessions[flow_id] = WTSession(flow_id=flow_id, proto=self)
# Send 2xx to accept the session
if self._http is not None:
self._http.send_headers(event.stream_id, [(b":status", b"200")])
elif isinstance(event, H3DatagramReceived): # type: ignore[misc]
# Route datagram to session by flow_id
flow_id = getattr(event, "flow_id", None)
data = getattr(event, "data", None)
if flow_id is None or data is None:
return
sess = self._sessions.get(flow_id)
if not sess:
return
peer = TransportPeer(addr=(self, flow_id))
asyncio.ensure_future(self._on_datagram(bytes(data), peer))
def quic_event_received(self, event) -> None: # type: ignore[override]
# Lazily create H3 connection wrapper
if self._http is None and hasattr(self, "_quic"):
try:
self._http = H3Connection(self._quic, enable_webtransport=True) # type: ignore[attr-defined]
except Exception:
self._http = None
if self._http is not None:
for http_event in self._http.handle_event(event):
self.http_event_received(http_event)
async def send_h3_datagram(self, flow_id: int, data: bytes) -> None:
if self._http is None:
return
try:
self._http.send_datagram(flow_id, data)
await self._loop.run_in_executor(None, self.transmit) # type: ignore[attr-defined]
except Exception:
pass
class WebTransportServer(DatagramServerTransport):
"""HTTP/3 WebTransport datagram server using aioquic.
Accepts CONNECT requests with :protocol = webtransport and exchanges
RFC 9297 HTTP/3 datagrams bound to the CONNECT stream id.
"""
def __init__(self, host: str, port: int, certfile: str, keyfile: str, on_datagram: OnDatagram):
if serve is None:
raise RuntimeError("aioquic is not installed. Please `pip install aioquic`.")
self.host = host
self.port = port
self.certfile = certfile
self.keyfile = keyfile
self._on_datagram = on_datagram
self._server = None
self._sessions: Dict[int, WTSession] = {}
async def send(self, data: bytes, peer: TransportPeer) -> None:
# peer.addr is (protocol, flow_id)
if isinstance(peer.addr, tuple) and len(peer.addr) == 2:
proto, flow_id = peer.addr
if isinstance(proto, GameWTProtocol) and isinstance(flow_id, int):
await proto.send_h3_datagram(flow_id, data)
async def run(self) -> None:
configuration = QuicConfiguration(is_client=False, alpn_protocols=["h3"])
configuration.load_cert_chain(self.certfile, self.keyfile)
async def _create_protocol(*args, **kwargs):
return GameWTProtocol(*args, on_datagram=self._on_datagram, sessions=self._sessions, **kwargs)
self._server = await serve(self.host, self.port, configuration=configuration, create_protocol=_create_protocol)
try:
await self._server.wait_closed()
finally:
self._server.close()