#!/usr/bin/env python3 """ WebTransport test client to verify server is working correctly. """ import asyncio import logging import sys from typing import Optional from aioquic.asyncio.client import connect from aioquic.asyncio.protocol import QuicConnectionProtocol from aioquic.h3.connection import H3Connection from aioquic.h3.events import DatagramReceived, H3Event, HeadersReceived from aioquic.quic.configuration import QuicConfiguration from aioquic.quic.events import QuicEvent logging.basicConfig( level=logging.DEBUG, format="[%(asctime)s] %(levelname)s: %(message)s" ) logger = logging.getLogger(__name__) class WebTransportClient(QuicConnectionProtocol): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._http: Optional[H3Connection] = None self._session_id: Optional[int] = None self._session_established = asyncio.Event() self._received_datagrams = [] def quic_event_received(self, event: QuicEvent) -> None: logger.debug(f"QUIC event: {type(event).__name__}") # Create H3Connection on first event if self._http is None: self._http = H3Connection(self._quic, enable_webtransport=True) logger.info("H3Connection created with WebTransport support") # Process event through HTTP/3 layer if self._http is not None: for http_event in self._http.handle_event(event): self.http_event_received(http_event) def http_event_received(self, event: H3Event) -> None: logger.debug(f"HTTP event: {type(event).__name__}") if isinstance(event, HeadersReceived): headers = dict(event.headers) status = headers.get(b":status", b"").decode() logger.info(f"Received headers: status={status}") if status == "200" and self._session_id is None: self._session_id = event.stream_id logger.info(f"WebTransport session established! session_id={self._session_id}") self._session_established.set() else: logger.error(f"WebTransport session rejected: status={status}, headers={headers}") elif isinstance(event, DatagramReceived): logger.info(f"Received datagram: {len(event.data)} bytes, flow_id={event.flow_id}") self._received_datagrams.append(event.data) async def establish_session(self, path: str = "/") -> None: """Send WebTransport CONNECT request.""" logger.info(f"Sending WebTransport CONNECT request for path: {path}") # Allocate stream ID for CONNECT request stream_id = self._quic.get_next_available_stream_id() # Send CONNECT request with WebTransport protocol headers = [ (b":method", b"CONNECT"), (b":scheme", b"https"), (b":authority", b"np.vaku.org.ua:4433"), (b":path", path.encode()), (b":protocol", b"webtransport"), ] self._http.send_headers(stream_id=stream_id, headers=headers, end_stream=False) self.transmit() logger.info(f"WebTransport CONNECT sent on stream {stream_id}") # Wait for session to be established try: await asyncio.wait_for(self._session_established.wait(), timeout=5.0) logger.info("✓ WebTransport session established successfully!") return True except asyncio.TimeoutError: logger.error("✗ Timeout waiting for WebTransport session acceptance") return False async def send_datagram(self, data: bytes) -> None: """Send a datagram over the WebTransport session.""" if self._session_id is None: logger.error("Cannot send datagram: session not established") return logger.info(f"Sending datagram: {len(data)} bytes") self._http.send_datagram(stream_id=self._session_id, data=data) self.transmit() async def wait_for_datagram(self, timeout: float = 2.0) -> Optional[bytes]: """Wait for a datagram response.""" logger.info(f"Waiting for datagram response (timeout={timeout}s)...") start = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start < timeout: if self._received_datagrams: return self._received_datagrams.pop(0) await asyncio.sleep(0.1) logger.warning("No datagram received within timeout") return None async def test_webtransport(host: str, port: int, verify_mode: bool = False): """Test WebTransport connection to the server.""" logger.info(f"Testing WebTransport connection to {host}:{port}") # Configure QUIC configuration = QuicConfiguration( is_client=True, alpn_protocols=["h3"], # HTTP/3 max_datagram_frame_size=65536, # Enable QUIC datagrams (required for WebTransport) verify_mode=0 if not verify_mode else 2, # Skip cert verification for self-signed ) try: async with connect( host, port, configuration=configuration, create_protocol=WebTransportClient, ) as client: logger.info("✓ QUIC connection established") client = client # type: WebTransportClient # Establish WebTransport session if not await client.establish_session("/"): logger.error("✗ Failed to establish WebTransport session") return False # Send a test datagram (simple game JOIN packet) # Format: version(1) | type(1) | flags(1) | seq(2) | name_len(1) | name test_packet = bytes([ 0x01, # version 0x01, # JOIN packet type 0x00, # flags 0x00, 0x01, # seq=1 0x04, # name length ord('T'), ord('E'), ord('S'), ord('T'), # name="TEST" ]) await client.send_datagram(test_packet) logger.info("✓ Test JOIN packet sent") # Wait for response response = await client.wait_for_datagram(timeout=3.0) if response: logger.info(f"✓ Received response: {len(response)} bytes") logger.info(f" Response hex: {response.hex()}") return True else: logger.warning("Server did not respond to JOIN packet (might be normal if not implemented)") return True # Connection worked even if no response except Exception as e: logger.error(f"✗ Connection failed: {e}") import traceback traceback.print_exc() return False if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: python test_webtransport_client.py [port]") print("Example: python test_webtransport_client.py np.vaku.org.ua 4433") sys.exit(1) host = sys.argv[1] port = int(sys.argv[2]) if len(sys.argv) > 2 else 4433 success = asyncio.run(test_webtransport(host, port)) sys.exit(0 if success else 1)