Transport: integrate aioquic QUIC datagram server skeleton (QuicWebTransportServer) and QUIC mode in run.py

- New server/quic_transport.py using aioquic to accept QUIC connections and datagrams
- run.py: QUIC mode when QUIC_CERT/QUIC_KEY provided; else in-memory
- requirements.txt: aioquic + cryptography
This commit is contained in:
Vladyslav Doloman
2025-10-07 20:53:24 +03:00
parent 352da0ef54
commit e79c523034
10 changed files with 108 additions and 32 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

75
server/quic_transport.py Normal file
View File

@@ -0,0 +1,75 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import Awaitable, Callable, Dict, Optional
from .transport import DatagramServerTransport, OnDatagram, TransportPeer
try:
from aioquic.asyncio import QuicConnectionProtocol, serve
from aioquic.quic.configuration import QuicConfiguration
from aioquic.quic.events import DatagramFrameReceived, ProtocolNegotiated
except Exception: # pragma: no cover - optional dependency not installed in skeleton
QuicConnectionProtocol = object # type: ignore
QuicConfiguration = object # type: ignore
serve = None # type: ignore
DatagramFrameReceived = object # type: ignore
ProtocolNegotiated = object # type: ignore
class GameQuicProtocol(QuicConnectionProtocol): # type: ignore[misc]
def __init__(self, *args, on_datagram: OnDatagram, peers: Dict[int, "GameQuicProtocol"], **kwargs):
super().__init__(*args, **kwargs)
self._on_datagram = on_datagram
self._peers = peers
self._peer_id: Optional[int] = None
def quic_event_received(self, event) -> None: # type: ignore[override]
if isinstance(event, ProtocolNegotiated):
# Register by connection id
self._peer_id = int(self._quic.connection_id) # type: ignore[attr-defined]
self._peers[self._peer_id] = self
elif isinstance(event, DatagramFrameReceived):
# Schedule async callback
if self._peer_id is None:
return
peer = TransportPeer(addr=self)
asyncio.ensure_future(self._on_datagram(bytes(event.data), peer))
async def send_datagram(self, data: bytes) -> None:
self._quic.send_datagram_frame(data) # type: ignore[attr-defined]
await self._loop.run_in_executor(None, self.transmit) # type: ignore[attr-defined]
class QuicWebTransportServer(DatagramServerTransport):
def __init__(self, host: str, port: int, certfile: str, keyfile: str, on_datagram: OnDatagram):
if serve is None:
raise RuntimeError("aioquic is not installed. Please `pip install aioquic`.")
self.host = host
self.port = port
self.certfile = certfile
self.keyfile = keyfile
self._on_datagram = on_datagram
self._server = None
self._peers: Dict[int, GameQuicProtocol] = {}
async def send(self, data: bytes, peer: TransportPeer) -> None:
proto = peer.addr # expected GameQuicProtocol
if isinstance(proto, GameQuicProtocol):
await proto.send_datagram(data)
async def run(self) -> None:
configuration = QuicConfiguration(is_client=False, alpn_protocols=["h3"])
configuration.load_cert_chain(self.certfile, self.keyfile)
async def _create_protocol(*args, **kwargs):
return GameQuicProtocol(*args, on_datagram=self._on_datagram, peers=self._peers, **kwargs)
self._server = await serve(self.host, self.port, configuration=configuration, create_protocol=_create_protocol)
try:
await self._server.wait_closed()
finally:
self._server.close()