Server fixes: - Move H3Connection initialization to ProtocolNegotiated event (matches official aioquic pattern) - Fix datagram routing to use session_id instead of flow_id - Add max_datagram_frame_size=65536 to enable QUIC datagrams - Fix send_datagram() to use keyword arguments - Add certificate chain handling for Let's Encrypt - Add no-cache headers to static server Command-line improvements: - Move settings from environment variables to argparse - Add comprehensive CLI arguments with defaults - Default mode=wt, cert=cert.pem, key=key.pem Test clients: - Add test_webtransport_client.py - Python WebTransport client that successfully connects - Add test_http3.py - Basic HTTP/3 connectivity test Client updates: - Auto-configure server URL and certificate hash from /cert-hash.json - Add ES6 module support Status: ✅ Python WebTransport client works perfectly ✅ Server properly handles WebTransport connections and datagrams ❌ Chrome fails due to cached QUIC state (QUIC_IETF_GQUIC_ERROR_MISSING) 🔍 Firefox sends packets but fails differently - to be debugged next session 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
172 lines
8.1 KiB
Python
172 lines
8.1 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from dataclasses import dataclass
|
|
from typing import Callable, Dict, Optional
|
|
|
|
from .transport import DatagramServerTransport, OnDatagram, TransportPeer
|
|
import logging
|
|
|
|
|
|
try:
|
|
from aioquic.asyncio import QuicConnectionProtocol, serve
|
|
from aioquic.quic.configuration import QuicConfiguration
|
|
from aioquic.quic.events import ProtocolNegotiated
|
|
from aioquic.h3.connection import H3_ALPN, H3Connection
|
|
from aioquic.h3.events import HeadersReceived
|
|
# Datagram event names vary by aioquic version; try both
|
|
try:
|
|
from aioquic.h3.events import DatagramReceived as H3DatagramReceived # type: ignore
|
|
except Exception: # pragma: no cover
|
|
H3DatagramReceived = object # type: ignore
|
|
except Exception: # pragma: no cover - optional dependency not installed
|
|
QuicConnectionProtocol = object # type: ignore
|
|
QuicConfiguration = object # type: ignore
|
|
ProtocolNegotiated = object # type: ignore
|
|
H3_ALPN = [] # type: ignore
|
|
H3Connection = object # type: ignore
|
|
HeadersReceived = object # type: ignore
|
|
H3DatagramReceived = object # type: ignore
|
|
serve = None # type: ignore
|
|
|
|
|
|
@dataclass
|
|
class WTSession:
|
|
flow_id: int # HTTP/3 datagram flow id (usually CONNECT stream id)
|
|
proto: "GameWTProtocol"
|
|
|
|
|
|
class GameWTProtocol(QuicConnectionProtocol): # type: ignore[misc]
|
|
def __init__(self, *args, on_datagram: OnDatagram, sessions: Dict[int, WTSession], **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self._on_datagram = on_datagram
|
|
self._sessions = sessions
|
|
self._http: Optional[H3Connection] = None # Created after ProtocolNegotiated
|
|
logging.debug("GameWTProtocol initialized")
|
|
|
|
def http_event_received(self, event) -> None: # type: ignore[override]
|
|
logging.debug("HTTP event received: %s", type(event).__name__)
|
|
# Headers for CONNECT :protocol = webtransport open a session
|
|
if isinstance(event, HeadersReceived):
|
|
headers = {k.decode().lower(): v.decode() for k, v in event.headers}
|
|
logging.debug("HeadersReceived: %s", headers)
|
|
method = headers.get(":method")
|
|
protocol = headers.get(":protocol") or headers.get("sec-webtransport-protocol")
|
|
logging.debug("Method: %s, Protocol: %s", method, protocol)
|
|
if method == "CONNECT" and (protocol == "webtransport"):
|
|
# In WebTransport over H3, datagrams use the CONNECT stream id as session id
|
|
session_id = event.stream_id # type: ignore[attr-defined]
|
|
self._sessions[session_id] = WTSession(flow_id=session_id, proto=self)
|
|
logging.info("WT CONNECT accepted: session_id=%s", session_id)
|
|
# Send 2xx to accept the session
|
|
if self._http is not None:
|
|
self._http.send_headers(event.stream_id, [(b":status", b"200")])
|
|
self.transmit() # Actually send the response to complete handshake
|
|
logging.debug("Sent 200 response for WT CONNECT")
|
|
else:
|
|
logging.warning("Unexpected CONNECT: method=%s, protocol=%s", method, protocol)
|
|
elif isinstance(event, H3DatagramReceived): # type: ignore[misc]
|
|
# Route datagram to session by stream_id (WebTransport session ID)
|
|
# DatagramReceived has: stream_id (session), data (payload)
|
|
session_id = getattr(event, "stream_id", None)
|
|
data = getattr(event, "data", None)
|
|
logging.debug("DatagramReceived event: session_id=%s, data_len=%s, has_data=%s",
|
|
session_id, len(data) if data else None, data is not None)
|
|
if session_id is None or data is None:
|
|
logging.warning("Invalid datagram event: session_id=%s, data=%s", session_id, data)
|
|
return
|
|
sess = self._sessions.get(session_id)
|
|
if not sess:
|
|
logging.warning("No session found for datagram: session_id=%s", session_id)
|
|
return
|
|
peer = TransportPeer(addr=(self, session_id))
|
|
logging.info("WT datagram received: session_id=%s, %d bytes", session_id, len(data))
|
|
asyncio.ensure_future(self._on_datagram(bytes(data), peer))
|
|
|
|
def quic_event_received(self, event) -> None: # type: ignore[override]
|
|
event_name = type(event).__name__
|
|
logging.debug("QUIC event received: %s", event_name)
|
|
|
|
# Create H3Connection after ALPN protocol is negotiated
|
|
if isinstance(event, ProtocolNegotiated):
|
|
if event.alpn_protocol in H3_ALPN:
|
|
self._http = H3Connection(self._quic, enable_webtransport=True)
|
|
logging.info("H3Connection created with WebTransport support (ALPN: %s)", event.alpn_protocol)
|
|
else:
|
|
logging.warning("Unexpected ALPN protocol: %s", event.alpn_protocol)
|
|
|
|
# Pass event to HTTP layer if connection is established
|
|
if self._http is not None:
|
|
for http_event in self._http.handle_event(event):
|
|
self.http_event_received(http_event)
|
|
|
|
async def send_h3_datagram(self, flow_id: int, data: bytes) -> None:
|
|
try:
|
|
if self._http is None:
|
|
logging.warning("Cannot send datagram: H3Connection not established")
|
|
return
|
|
logging.debug("WT send datagram: flow_id=%s, %d bytes", flow_id, len(data))
|
|
self._http.send_datagram(stream_id=flow_id, data=data)
|
|
self.transmit() # Send queued QUIC packets immediately
|
|
except Exception as e:
|
|
logging.debug("Failed to send datagram: %s", e)
|
|
|
|
|
|
class WebTransportServer(DatagramServerTransport):
|
|
"""HTTP/3 WebTransport datagram server using aioquic.
|
|
|
|
Accepts CONNECT requests with :protocol = webtransport and exchanges
|
|
RFC 9297 HTTP/3 datagrams bound to the CONNECT stream id.
|
|
"""
|
|
|
|
def __init__(self, host: str, port: int, certfile: str, keyfile: str, on_datagram: OnDatagram):
|
|
if serve is None:
|
|
raise RuntimeError("aioquic is not installed. Please `pip install aioquic`.")
|
|
self.host = host
|
|
self.port = port
|
|
self.certfile = certfile
|
|
self.keyfile = keyfile
|
|
self._on_datagram = on_datagram
|
|
self._server = None
|
|
self._sessions: Dict[int, WTSession] = {}
|
|
|
|
async def send(self, data: bytes, peer: TransportPeer) -> None:
|
|
# peer.addr is (protocol, flow_id)
|
|
if isinstance(peer.addr, tuple) and len(peer.addr) == 2:
|
|
proto, flow_id = peer.addr
|
|
if isinstance(proto, GameWTProtocol) and isinstance(flow_id, int):
|
|
await proto.send_h3_datagram(flow_id, data)
|
|
|
|
async def run(self) -> None:
|
|
from aioquic.quic.logger import QuicFileLogger
|
|
import os
|
|
|
|
# Enable QUIC logging if debug level
|
|
quic_logger = None
|
|
if logging.getLogger().getEffectiveLevel() <= logging.DEBUG:
|
|
log_dir = os.path.join(os.getcwd(), "quic_logs")
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
quic_logger = QuicFileLogger(log_dir)
|
|
logging.info("QUIC logging enabled in: %s", log_dir)
|
|
|
|
configuration = QuicConfiguration(
|
|
is_client=False,
|
|
alpn_protocols=["h3"], # HTTP/3 with WebTransport support
|
|
max_datagram_frame_size=65536, # Enable QUIC datagrams (required for WebTransport)
|
|
quic_logger=quic_logger
|
|
)
|
|
configuration.load_cert_chain(self.certfile, self.keyfile)
|
|
logging.debug("QUIC configuration: ALPN=%s, max_datagram_frame_size=%d",
|
|
configuration.alpn_protocols, configuration.max_datagram_frame_size or 0)
|
|
|
|
def _create_protocol(*args, **kwargs):
|
|
return GameWTProtocol(*args, on_datagram=self._on_datagram, sessions=self._sessions, **kwargs)
|
|
|
|
logging.info("WebTransport (H3) server listening on %s:%d", self.host, self.port)
|
|
self._server = await serve(self.host, self.port, configuration=configuration, create_protocol=_create_protocol)
|
|
try:
|
|
# Wait indefinitely until cancelled (e.g., by KeyboardInterrupt or timeout)
|
|
await asyncio.Event().wait()
|
|
finally:
|
|
self._server.close()
|