Compare commits

...

22 Commits

Author SHA1 Message Date
Vladyslav Doloman
1de5a8f3e6 Fix WebTransport server implementation and add test client
Server fixes:
- Move H3Connection initialization to ProtocolNegotiated event (matches official aioquic pattern)
- Fix datagram routing to use session_id instead of flow_id
- Add max_datagram_frame_size=65536 to enable QUIC datagrams
- Fix send_datagram() to use keyword arguments
- Add certificate chain handling for Let's Encrypt
- Add no-cache headers to static server

Command-line improvements:
- Move settings from environment variables to argparse
- Add comprehensive CLI arguments with defaults
- Default mode=wt, cert=cert.pem, key=key.pem

Test clients:
- Add test_webtransport_client.py - Python WebTransport client that successfully connects
- Add test_http3.py - Basic HTTP/3 connectivity test

Client updates:
- Auto-configure server URL and certificate hash from /cert-hash.json
- Add ES6 module support

Status:
 Python WebTransport client works perfectly
 Server properly handles WebTransport connections and datagrams
 Chrome fails due to cached QUIC state (QUIC_IETF_GQUIC_ERROR_MISSING)
🔍 Firefox sends packets but fails differently - to be debugged next session

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-19 23:50:08 +00:00
Vladyslav Doloman
ed5cb14b30 WIP: Add input broadcasting and client-side prediction features
Changes include:
- Client: INPUT_BROADCAST packet handling and opponent prediction rendering
- Client: Protocol parsing for INPUT_BROADCAST packets
- Server: Input broadcasting to all clients except sender

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-19 15:17:16 +03:00
Vladyslav Doloman
c4a8501635 Fix: indent await send(full, peer) inside STATE_FULL if block to resolve SyntaxError 2025-10-08 01:05:47 +03:00
Vladyslav Doloman
56ac74e916 Mode: implement MODE=net to run both WebTransport (HTTP/3) and QUIC datagram servers concurrently via MultiTransport; add port envs WT_PORT/QUIC_PORT 2025-10-08 01:00:47 +03:00
Vladyslav Doloman
b94aac71f8 Mode: add scaffolding for combined network mode name (net) in help; introduce MultiTransport 2025-10-08 00:53:14 +03:00
Vladyslav Doloman
7337c27898 Transport: add MultiTransport to run WT and QUIC concurrently and route sends 2025-10-08 00:51:35 +03:00
Vladyslav Doloman
921cc42fd4 CLI: add --help to print usage and env-based configuration hints 2025-10-08 00:33:07 +03:00
Vladyslav Doloman
99e818e0fd Fix: define _run_tasks_with_optional_timeout before use; tidy run_webtransport block 2025-10-08 00:29:18 +03:00
Vladyslav Doloman
1bd4d9eac7 Fix: clean up run.py newlines and indentation; add optional RUN_SECONDS timeout helper for server tasks 2025-10-08 00:27:10 +03:00
Vladyslav Doloman
eeabda725e Logging: add informative console logs for startup, ports, connections, JOIN/deny, input, and periodic events
- Configure logging via LOG_LEVEL env (default INFO)
- Log when servers start listening (WT/QUIC/InMemory/HTTPS static)
- Log WT CONNECT accept, QUIC peer connect, datagram traffic at DEBUG
- Log GameServer creation, tick loop start, JOIN accept/deny, config_update broadcasts, and input reception
2025-10-08 00:01:05 +03:00
Vladyslav Doloman
c97c5c4723 Dev: add simple HTTPS static server (serves client/). Enabled in WT mode by default via STATIC=1; uses same cert/key as QUIC/WT 2025-10-07 23:23:29 +03:00
Vladyslav Doloman
0ceea925cd Client: minimal browser WebTransport client (HTML/JS)
- index.html with connect UI, canvas renderer, overlay
- protocol.js with header, QUIC varint, JOIN/INPUT builders, STATE_FULL parser
- client.js to connect, send JOIN, read datagrams, render snakes/apples; basic input handling
2025-10-07 22:49:51 +03:00
Vladyslav Doloman
088c36396b WebTransport: add HTTP/3 WebTransport datagram server using aioquic; run mode switch via MODE=wt|quic|mem 2025-10-07 22:36:06 +03:00
Vladyslav Doloman
1f5ca02ca4 Repo hygiene: restore IDEAS.md; add .gitignore for __pycache__ and pyc; untrack cached bytecode 2025-10-07 20:54:56 +03:00
Vladyslav Doloman
e79c523034 Transport: integrate aioquic QUIC datagram server skeleton (QuicWebTransportServer) and QUIC mode in run.py
- New server/quic_transport.py using aioquic to accept QUIC connections and datagrams
- run.py: QUIC mode when QUIC_CERT/QUIC_KEY provided; else in-memory
- requirements.txt: aioquic + cryptography
2025-10-07 20:53:24 +03:00
Vladyslav Doloman
352da0ef54 Plan: refine networking — hybrid per-snake TLV selection, PART framing details, apples only in first part, and 1200-byte safety budget
- Clarify TLV selection between 2-bit and RLE per snake
- Define PART inner_type for STATE_FULL/STATE_DELTA and merging by update_id
- Apples included only in the first part (full and delta)
- Recommend ~1200-byte post-compression budget (<=1280 hard cap)
2025-10-07 20:46:16 +03:00
Vladyslav Doloman
e555762c64 Protocol/Server: hybrid body TLV (2-bit vs RLE), state_full body builder, and partitioned snapshot on join
- Protocol: RLE packing, TLV chooser, body_2bit_chunk helper, state_full_body builder
- Server: on join, build snapshot body and partition across PART packets if over MTU; include apples only in first part; chunk single oversized snake with 2-bit chunking
2025-10-07 20:36:01 +03:00
Vladyslav Doloman
967784542d Protocol/Server: implement STATE_DELTA + PART partitioning and per-tick minimal changes
- Protocol: SnakeDelta structure; build_state_delta(_body); build_part
- Server: compute per-snake changes (move/grow/blocked-shrink), apples diffs
- Partition large deltas by snake changes with apples in first part; use update_id
- Send STATE_DELTA when under MTU; else PART packets referencing STATE_DELTA
2025-10-07 20:30:48 +03:00
Vladyslav Doloman
991b8f3660 Server: per-tick simulation skeleton (inputs, movement, blocking/shrink, apple growth, wrap behavior) and basic state delta broadcast
- Input buffer rules: enqueue+consume with 180° guard (len>1)
- Movement: wrap per config; allow moving into own tail when it vacates
- Blocking: head holds, tail shrinks to min 1
- Apples: eat= grow; ensure target apples after tick
- Broadcast: send current snakes/apples as delta (placeholder for real deltas)
2025-10-07 20:27:43 +03:00
Vladyslav Doloman
7a5f2d8794 Server: implement JOIN/JOIN_ACK/JOIN_DENY handling, input parsing/relay, spawn logic, apples maintenance; fix InMemoryTransport to address specific peer; add state_full encoder
- Protocol: join parser, join_ack/deny builders, input parser, state_full builder
- Server: on_datagram dispatch, spawn per rules (prefer length 3 else 1), join deny if no cell, immediate input_broadcast relay
- Model: occupancy map and helpers
- Transport: deliver to specified peer in in-memory mode
2025-10-07 20:22:22 +03:00
Vladyslav Doloman
9043ba81c0 Server scaffold: protocol + config + transport abstraction + tick loop skeleton
- Protocol: PacketType, TLV Body types, QUIC varint, header, input_broadcast
  and config_update builders, 2-bit body bitpacking helper
- Config/model: live-config ServerConfig, basic GameState/Snake/Session
- Transport: InMemoryTransport placeholder and QUIC server stub
- Server: asyncio tick loop, periodic config_update broadcast, immediate
  input_broadcast relay; main entry and run.py
2025-10-07 20:02:28 +03:00
Vladyslav Doloman
65bf835b8d Plan: adopt live config updates + spawn/edge/compression/browser decisions
- Tick rate default 10 TPS; live config via config_update, periodic resend
- Spawn policy: prefer length 3 if a straight strip fits; else length 1; deny join if no free cell
- Apples per snake: default 1; min 1, max 12; cap 255 total; live config
- Wrap edges: live-configurable; head-only enforcement on transitions
- Compression: DEFLATE is handshake-only (restart + reconnect)
- Browser targets: latest Firefox required; latest Chrome desirable
- Protocol: join_deny; config_update packet; input_broadcast includes base_tick + rel offsets and apply_at_tick
2025-10-07 19:15:50 +03:00
22 changed files with 3163 additions and 22 deletions

View File

@@ -0,0 +1,12 @@
{
"permissions": {
"allow": [
"Read(//g/Coding/code2/**)",
"Bash(python -m py_compile server/server.py)",
"Bash(node --check client/client.js)",
"Bash(node --check client/protocol.js)"
],
"deny": [],
"ask": []
}
}

12
.gitignore vendored Normal file
View File

@@ -0,0 +1,12 @@
# Byte-compiled / cache
__pycache__/
*.py[cod]
# Virtual environments
.venv/
venv/
# Certs (provide via env variables or local path)
*.pem
*.crt
*.key

241
CLAUDE.md Normal file
View File

@@ -0,0 +1,241 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
This is a real-time multiplayer Snake game using Python 3 (server) and vanilla JavaScript (web client), communicating via WebTransport datagrams (HTTP/3) for low-latency gameplay. The game features continuous persistent gameplay, unique collision mechanics (head-blocks-and-tail-shrinks instead of death), and sophisticated UDP packet handling with compression and partitioning.
## Running the Server
### Dependencies
```bash
# Create and activate virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
```
### Server Modes
The server supports multiple transport modes via `MODE` environment variable:
```bash
# In-memory mode (testing, no network)
MODE=mem python run.py
# WebTransport mode (HTTP/3, default for production)
MODE=wt QUIC_CERT=cert.pem QUIC_KEY=key.pem python run.py
# QUIC datagram mode only
MODE=quic QUIC_CERT=cert.pem QUIC_KEY=key.pem python run.py
# Combined mode (both WebTransport and QUIC)
MODE=net WT_PORT=4433 QUIC_PORT=4443 QUIC_CERT=cert.pem QUIC_KEY=key.pem python run.py
```
### Environment Variables
- `MODE`: Transport mode (mem|wt|quic|net)
- `QUIC_CERT` / `QUIC_KEY`: TLS certificate and key paths (required for wt/quic/net)
- `WT_CERT` / `WT_KEY`: WebTransport-specific cert/key (falls back to QUIC_* if not set)
- `WT_HOST` / `WT_PORT`: WebTransport server host/port (default: 0.0.0.0:4433)
- `QUIC_HOST` / `QUIC_PORT`: QUIC server host/port (default: 0.0.0.0:4433, or 4443 in net mode)
- `STATIC`: Enable static HTTPS server for client (default: 1 in wt mode)
- `STATIC_HOST` / `STATIC_PORT` / `STATIC_ROOT`: Static server settings (default: same as WT host, port 8443, root "client")
- `LOG_LEVEL`: Logging level (DEBUG|INFO|WARNING|ERROR)
- `RUN_SECONDS`: Optional timeout for server shutdown (testing)
### Help
```bash
python run.py --help
```
## Architecture
### Server Architecture (Python 3 + asyncio)
**Core Components:**
1. **server/server.py** (`GameServer`): Authoritative game server
- Fixed tick rate simulation (default 10 TPS, configurable 5-30)
- Manages player sessions, snakes, apples, collision detection
- Handles join/spawn logic, input processing, state broadcasting
- Automatically partitions large updates across multiple datagrams (<1200 bytes each to avoid IP fragmentation)
2. **server/model.py**: Game state entities
- `GameState`: Field grid, snakes dict, apples list, occupancy map
- `Snake`: Deque-based body representation, input buffer (capacity 3), blocked flag
- `PlayerSession`: Per-player metadata (id, name, color, peer handle)
3. **server/protocol.py**: Wire protocol implementation
- Packet types: JOIN, JOIN_ACK, JOIN_DENY, INPUT, INPUT_BROADCAST, STATE_DELTA, STATE_FULL, PART, CONFIG_UPDATE
- TLV body encoding: 2-bit direction streams, RLE compression, chunked variants for oversized snakes
- QUIC varint encoding for efficient integer serialization
- Sequence number handling with wraparound logic (16-bit)
4. **server/config.py** (`ServerConfig`): Server configuration dataclass with validation
5. **Transport layer** (pluggable):
- `server/transport.py`: Abstract `DatagramServerTransport` interface
- `server/webtransport_server.py`: WebTransport (HTTP/3) implementation using aioquic
- `server/quic_transport.py`: Raw QUIC datagram implementation
- `server/multi_transport.py`: Multiplexer for running multiple transports concurrently
- `server/static_server.py`: Optional HTTPS static file server for serving client assets
### Client Architecture (Vanilla JavaScript)
**Files:**
- `client/index.html`: Main HTML page with canvas and UI controls
- `client/client.js`: Game client, WebTransport connection, input handling, rendering
- `client/protocol.js`: Wire protocol parsing/building (mirrors server protocol)
**Key Features:**
- WebTransport datagram API for unreliable, unordered messaging
- Canvas-based rendering with auto-scaling grid
- Input buffering (up to 3 direction changes) with 180° turn filtering
- State update handling: STATE_FULL (initial/recovery), STATE_DELTA (per-tick), PART (fragmented)
- Packet sequence tracking to discard late/out-of-order updates
### Game Mechanics (Unique Collision System)
**No Death on Collision:**
- When a snake's head hits an obstacle (wall, self, or another snake), the head stays in place (blocked state)
- While blocked, the tail shrinks by 1 cell per tick until the player turns to a free direction
- Minimum length is 1 (head only); snake cannot disappear while player is connected
**Input Buffer Rules:**
- Client-side buffer holds up to 3 upcoming direction changes
- Opposite direction to last buffered input replaces the last entry (not appended)
- Consecutive duplicates are dropped
- Overflow policy: replace last element
- Server consumption: At each tick, consume at most one input; skip 180° turns when length > 1
**Apples & Scoring:**
- Displayed as current snake length (not separate score)
- Eating an apple grows snake by 1 immediately
- Target apple count: max(3, min(apples_cap, connected_players × apples_per_snake))
- When 0 players remain, pre-populate field with exactly 3 apples
**Spawning:**
- Try to allocate a 3-cell straight strip in a free direction
- Fallback: spawn length 1 (head only) at any free cell
- Deny join if no free cells available
### Protocol Details
**Packet Structure:**
```
Header (all packets):
ver (u8) | type (u8) | flags (u8) | seq (u16 big-endian) | [tick (u16 optional)]
```
**Sequence Numbers:**
- 16-bit wraparound with half-range window comparison: `is_newer(a,b) = ((a-b) & 0xFFFF) < 0x8000`
- Clients discard packets with older seq than last applied per sender
**State Encoding:**
- **STATE_FULL**: Complete snapshot (on join, periodic recovery); includes all snakes + apples
- Per-snake: id (u8), len (u16), head (x,y), body TLV (BODY_2BIT or BODY_RLE)
- **STATE_DELTA**: Incremental update (per tick); includes only changed snakes + apple diffs
- Per-change: snake_id, flags (head_moved|tail_removed|grew|blocked), direction, new_head coords
- **PART**: Multi-part fragmentation for large updates (>1200 bytes)
- Fields: update_id (u16), part_index, parts_total, inner_type (STATE_FULL or STATE_DELTA), chunk_payload
**Body TLV Types:**
- `BODY_2BIT (0x00)`: 2-bit direction stream (up=00, right=01, down=10, left=11), packed LSB-first
- `BODY_RLE (0x01)`: Run-length encoding (dir u8, count QUIC varint) for straight segments
- `BODY_2BIT_CHUNK (0x10)` / `BODY_RLE_CHUNK (0x11)`: Chunked variants for splitting oversized snakes across parts
- Chunk header: start_index (u16), dirs_in_chunk (u16)
**Input Broadcasting:**
- Server relays client inputs immediately to all other clients as INPUT_BROADCAST packets
- Enables client-side opponent prediction during late/lost state updates
- Fields: player_id, input_seq, base_tick, events (rel_tick_offset + direction), optional apply_at_tick
**Partitioning Strategy:**
- Soft limit: 1200 bytes per datagram (avoid fragmentation; MTU ~1280-1500)
- Whole-snake-first: pack complete snakes greedily; if a single snake exceeds budget, split using chunked TLV
- Apples included only in first part; clients merge across parts using update_id
## Key Simulation Details
**Tick Order:**
1. Consume at most one valid input per snake (apply 180° rule for length > 1)
2. Compute intended head step; if blocked (wall/occupied), set `blocked=True` and keep head stationary
3. If blocked: shrink tail by 1 (min length 1)
4. If moving into apple: grow by 1 (don't shrink tail that tick) and respawn apple
5. Emit per-snake delta and global apple changes
**Collision Detection:**
- Occupancy map tracks all snake cells as (coord) -> (snake_id, index)
- Allow moving into own tail if it will vacate (i.e., not growing)
- Walls block only when `wrap_edges=False`; wrapped coordinates computed by `_step_from()`
**180° Turn Handling:**
- Checked at consumption time using XOR: `(int(new_dir) ^ int(current_dir)) == 2`
- Allowed only when snake length == 1 (head only)
## Development Notes
- **Testing**: No automated test suite currently exists. Manual testing via in-memory mode (`MODE=mem`) or live WebTransport server.
- **Logging**: Use `LOG_LEVEL=DEBUG` for verbose packet-level debugging.
- **Field Size**: Default 60×40; protocol supports 3×3 to 255×255 (negotiated in JOIN_ACK).
- **Players**: Max 32 concurrent (ids 0-31); names truncated to 16 UTF-8 bytes.
- **Colors**: 32-color palette; deterministic mapping from player_id to color_id (0-31).
- **Tick Rate Changes**: Server broadcasts CONFIG_UPDATE every 50 ticks; clients apply at next tick boundary.
- **Compression**: Optional global DEFLATE mode (handshake-only; requires server restart). Currently defaults to "none".
- **Browser Compatibility**: Requires WebTransport API support (latest Firefox/Chrome).
## Common Patterns
**Adding a New Packet Type:**
1. Add enum value to `PacketType` in server/protocol.py and client/protocol.js
2. Implement builder function in protocol (e.g., `build_foo()`) and parser (e.g., `parse_foo()`)
3. Handle in server's `on_datagram()` and client's `readLoop()` switch statements
4. Update header packing if tick field is required (pass tick to `pack_header()`)
**Modifying Simulation Logic:**
1. Edit `_simulate_tick()` in server/server.py for per-tick updates
2. Update `SnakeDelta` dataclass if new change types are needed
3. Adjust `build_state_delta_body()` if encoding changes
4. Mirror changes in client rendering logic (client/client.js)
**Changing Configuration:**
1. Update `ServerConfig` dataclass in server/config.py
2. Add runtime validation in `validate_runtime()`
3. If live-configurable: broadcast via CONFIG_UPDATE packet; otherwise require server restart
4. Update JOIN_ACK builder/parser if part of handshake
## File Organization
```
G:\Coding\code2\
├── run.py # Server entrypoint with mode switching
├── requirements.txt # Python dependencies (aioquic, cryptography)
├── PROJECT_PLAN.md # Detailed design spec (authoritative reference)
├── IDEAS.md # Original feature brainstorming
├── server/
│ ├── server.py # GameServer (tick loop, join/spawn, simulation)
│ ├── model.py # GameState, Snake, PlayerSession entities
│ ├── protocol.py # Wire protocol (packet builders/parsers, TLV encoding)
│ ├── config.py # ServerConfig dataclass
│ ├── transport.py # Abstract DatagramServerTransport interface
│ ├── webtransport_server.py # HTTP/3 WebTransport implementation
│ ├── quic_transport.py # QUIC datagram implementation
│ ├── multi_transport.py # Multi-transport multiplexer
│ ├── static_server.py # Optional HTTPS static file server
│ └── utils.py # Utility functions
└── client/
├── index.html # Main HTML page
├── client.js # Game client (WebTransport, input, rendering)
└── protocol.js # Wire protocol (mirrors server protocol.py)
```
## References
- **PROJECT_PLAN.md**: Comprehensive design document covering protocol, mechanics, limits, testing strategy, and milestones.
- **aioquic**: Python QUIC and HTTP/3 library (https://github.com/aiortc/aioquic)
- **WebTransport**: W3C spec for low-latency client-server messaging over QUIC (https://w3c.github.io/webtransport/)

View File

@@ -33,15 +33,17 @@
## Server Architecture (Python 3)
- Runtime: Python 3 with asyncio.
- Transport: WebTransport (HTTP/3 datagrams). Candidate library: aioquic for server-side H3 and datagrams.
- Model:
- Authoritative simulation on the server with a fixed tick rate (target 15-20 TPS; start with 15 for network headroom).
- Model:
- Authoritative simulation on the server with a fixed tick rate (default 10 TPS). Tick rate is reconfigurable at runtime by the server operator and changes are broadcast to clients.
- Each tick: process inputs, update snakes, resolve collisions (blocked/shrink), manage apples, generate state deltas.
- Entities:
- Field: width, height, random seed, apple set.
- Snake: id, name (<=16 chars), color id (0-31), deque of cells (head-first), current direction, input buffer, blocked flag, last-move tick, last-known client seq.
- Player session: transport bindings, last-received input seq per player, anti-spam counters.
- Spawn:
- New snake spawns at a random free cell with a random legal starting direction, length 3 by default (configurable).
- Spawn:
- New snake spawns at a random free cell with a random legal starting direction.
- Initial length policy: try to allocate a 3-cell straight strip (head + two body cells) in a legal direction; if not possible, spawn with length 1.
- If the field appears full, temporarily ignore apples as obstacles for the purpose of finding space. If no free single cell exists, deny the join and inform the client to wait.
- On apple eat: grow by 1; spawn a replacement apple at a free cell.
- Disconnect: Immediately remove snake and its length; apples persist. If all disconnect, ensure field contains 3 apples.
- Rate limiting: Cap input datagrams per second and buffer growth per client.
@@ -60,26 +62,31 @@
## Networking & Protocol
Constraints and goals:
- Use datagrams (unreliable, unordered). Include a packet sequence number for late-packet discard with wraparound handling.
- Keep payloads <=1280 bytes to avoid fragmentation; if larger, partition logically.
- Keep payloads <=1280 bytes to avoid fragmentation; prefer a safety budget of ~1200 bytes post-compression when partitioning.
- Support up to 32 concurrent players; names limited to 16 bytes UTF-8.
Header (all packets):
- ver (u8): protocol version.
- type (u8): packet type (join, join_ack, input, input_broadcast, state_delta, state_full, part, ping, pong, error).
- type (u8): packet type (join, join_ack, join_deny, input, input_broadcast, state_delta, state_full, part, config_update, ping, pong, error).
- flags (u8): bit flags (compression, is_last_part, etc.).
- seq (u16): sender's monotonically incrementing sequence number (wraps at 65535). Newer-than logic uses half-range window.
- Optional tick (u16): simulation tick the packet refers to (on server updates).
Handshake (join -> join_ack):
Handshake (join -> join_ack | join_deny):
- Client sends: desired name (<=16), optional preferred color id.
- Server replies: assigned player id (u8), color id (u8), field size (width u8, height u8), tick rate (u8), random seed (u32), palette, and initial full snapshot.
- Server replies (join_ack): assigned player id (u8), color id (u8), field size (width u8, height u8), tick rate (u8), wrap_edges (bool), apples_per_snake (u8), apples_cap (u8 up to 255), compression_mode (enum: none|deflate), random seed (u32), palette, and initial full snapshot.
- Server can reply (join_deny) with a reason (e.g., no free cell; please wait).
Inputs (client -> server):
- Packet input includes: last-acknowledged server seq (u16), local input_seq (u16), one or more direction events with timestamps or relative tick offsets. Client pre-filters per buffer rules.
Input broadcast (server -> clients):
- Upon receiving a player's input, the server immediately relays those input events to all other clients as input_broadcast packets.
- Contents: player_id (u8), the player's input_seq (u16), direction events, and apply_at_tick (u16) assigned by the server (typically current_tick+1) so clients can align predictions.
- Contents:
- player_id (u8), the player's input_seq (u16)
- base_tick (u16): server tick for alignment (typically current_tick)
- events: one or more direction changes, each with rel_tick_offset (u8/varint) from base_tick
- apply_at_tick (u16): optional absolute apply tick for convenience (both provided; clients may use either)
- Purpose: Enable client-side opponent prediction during late or lost state updates. Broadcasts are small; apply rate limits if needed.
State updates (server -> client):
@@ -100,21 +107,25 @@ State updates (server -> client):
- Chunked variants (T=0x10, 0x11): used only when a single snake must be split.
- Prefix V with `start_index` (u16, first direction offset from head) and `dirs_in_chunk` (u16); then the encoding for that range (2-bit or RLE respectively).
- Client buffers chunks by `(update_id, snake_id)` and assembles when the full [0..len-2] range is present; then applies.
- Selection: server chooses per snake whichever TLV (2-bit or RLE) yields fewer bytes.
Compression:
- Primary: domain-specific packing (bits + RLE for segments and apples), then optional DEFLATE flag (flags.compressed=1) if still large.
- Primary: domain-specific packing (bits + RLE for segments and apples).
- Optional global DEFLATE mode (server-configured): when enabled, server may set flags.compressed=1 and apply DEFLATE to payloads; this mode is negotiated in join_ack and is not changed on the fly (server restart required; clients must reconnect).
- Client decompresses if set; otherwise reads packed structs directly.
Packet partitioning (if >1280 bytes after compression):
Packet partitioning (if >~12001280 bytes after compression):
- Apply to state_full (join and periodic recovery) and large state_delta updates.
- Goal: avoid IP fragmentation. Ensure each compressed datagram payload is <1280 bytes.
- Goal: avoid IP fragmentation. Ensure each compressed datagram payload is <1280 bytes (use ~1200 target for headroom).
- Strategy (whole-snake first):
- Sort snakes by estimated compressed size (head + body TLV) descending.
- Greedily pack one or more complete snakes per packet while keeping the compressed payload <1280 bytes.
- If a snake does not fit with others, send it alone if it fits <1280 bytes.
- If a single snake still exceeds 1280 bytes by itself, split that snake into multiple similar-sized chunks using the chunked TLV types.
- Framing:
- Each part carries `update_id` (u16), `part_index` (u8), `parts_total` (u8), and a sequence of per-snake TLVs (body_2bit/body_rle) and/or chunk TLVs (body_2bit_chunk/body_rle_chunk).
- Each PART carries `update_id` (u16), `part_index` (u8), `parts_total` (u8), and `inner_type` (u8: STATE_FULL or STATE_DELTA) followed by the chunk payload.
- For STATE_FULL: the chunk payload is `snakes_count + [per-snake records] + apples`. Include apples only in the first part; clients merge across parts using `update_id`.
- For STATE_DELTA: the chunk payload is a subset of per-snake changes; include `apples_added/removed` only in the first part; clients merge across parts using `update_id`.
- Clients apply complete per-snake TLVs immediately. Chunk TLVs are buffered and assembled by `(update_id, snake_id)` using `start_index` and `dirs_in_chunk` before applying.
Sequence wrap & ordering:
@@ -122,6 +133,11 @@ Sequence wrap & ordering:
- Clients ignore updates older-than last applied per-sender.
- For input_broadcast, deduplicate per (player_id, input_seq) and apply in order per player; if apply_at_tick is in the past, apply immediately up to current tick.
Live config updates (server -> clients):
- Packet type: config_update, sent on change and periodically (low frequency).
- Fields: tick_rate (u8), wrap_edges (bool), apples_per_snake (u8), apples_cap (u8 up to 255).
- Clients apply changes at the next tick boundary. Compression mode is not changed via config_update (handshake-only).
## Simulation Details
- Tick order per tick:
1) Incorporate at most one valid buffered input per snake (with 180-degree rule).
@@ -129,7 +145,7 @@ Sequence wrap & ordering:
3) If blocked=true: shrink tail by 1 (to min length 1). If the blocking cell becomes free (due to own shrink or others moving), the next tick's step in current direction proceeds.
4) If moving into an apple: grow by 1 (do not shrink tail that tick) and respawn an apple.
5) Emit per-snake delta (move, grow, or shrink) and global apple changes.
- Blocking detection: walls (0..width-1, 0..height-1), any occupied snake cell (including heads and tails that are not about to vacate this tick).
- Blocking detection: walls (0..width-1, 0..height-1), any occupied snake cell (including heads and tails that are not about to vacate this tick). Wrap-around rules are applied only to the head; if wrap_edges is turned off while a body segment is mid-edge traversal, allow the body to continue, but prevent the head from crossing walls thereafter.
- 180-degree turn allowance when length == 1 only.
### Client-side Opponent Prediction Heuristics
@@ -147,7 +163,11 @@ Sequence wrap & ordering:
- Players: max 32 (ids 0..31); deny joins beyond capacity.
- Names: UTF-8, truncate to 16 bytes; filter control chars.
- Field: default 60x40; min 3x3; max 255x255; provided by server in join_ack.
- Tick rate: default 15 TPS; configurable 10-30 TPS.
- Tick rate: default 10 TPS; server-controlled, reconfigurable on the fly via config_update (reasonable range 5-30 TPS).
- Apples per snake: default 1; server-controlled, reconfigurable on the fly; min 1/snake, max 12/snake; total apples capped at 255.
- Wrap-around edges: default off (walls are blocking); server-controlled, reconfigurable on the fly; head obeys current rule; legacy body traversal allowed during transitions.
- Compression (DEFLATE): global mode negotiated at join; requires server restart to change; clients must reconnect; when enabled server may set flags.compressed=1.
- Browser targets: must work on latest Firefox; latest Chrome is optional but desirable; others optional.
## Error Handling & Resilience
- Invalid packets: drop and optionally send error with code.
@@ -172,13 +192,9 @@ Sequence wrap & ordering:
8) Polishing: error messages, reconnect, telemetry/logging, Docker/dev scripts.
## Open Questions
- Exact tick rate target and interpolation strategy on client.
- Initial snake length on spawn (3 is suggested) and spawn retry rules in crowded fields.
- Apple count baseline when players are present (fixed count vs proportional to players vs area).
- Whether to allow wrap-around at field edges (currently: walls are blocking).
- Compression choice tradeoffs: custom bitpacking only vs optional DEFLATE.
- Minimum viable browser targets (WebTransport support varies).
- Should input_broadcast include server-assigned apply_at_tick or relative tick offsets only?
- Interpolation/smoothing specifics on client for variable tick rates.
- Any additional anti-cheat measures for input_broadcast misuse.
- Exact UI/UX for join denial when field is full (messaging/timing).
## Next Steps
- Validate the protocol choices and mechanics with stakeholders.

312
client/client.js Normal file
View File

@@ -0,0 +1,312 @@
import { PacketType, Direction, packHeader, parseHeader, buildJoin, buildInput, parseJoinAck, parseStateFullBody, parseStateDeltaBody } from './protocol.js';
const ui = {
url: document.getElementById('url'),
hash: document.getElementById('hash'),
name: document.getElementById('name'),
connect: document.getElementById('connect'),
status: document.getElementById('status'),
overlay: document.getElementById('overlay'),
canvas: document.getElementById('view'),
};
let transport = null;
let writer = null;
let seq = 0;
let lastServerSeq = 0;
let inputSeq = 0;
let playerId = null;
let colorId = null;
let fieldW = 60, fieldH = 40;
let tickRate = 10;
const snakes = new Map(); // id -> {len, head:[x,y], dirs:[]}
let apples = [];
function setStatus(t) { ui.status.textContent = t; }
function nextSeq() { seq = (seq + 1) & 0xffff; return seq; }
function colorForId(id) {
const palette = [
'#e6194B','#3cb44b','#ffe119','#4363d8','#f58231','#911eb4','#46f0f0','#f032e6',
'#bcf60c','#fabebe','#008080','#e6beff','#9A6324','#fffac8','#800000','#aaffc3',
'#808000','#ffd8b1','#000075','#808080','#ffffff','#000000'
];
return palette[id % palette.length];
}
function decodeSnakeCells(s) {
const cells = [];
let x = s.hx, y = s.hy;
cells.push([x, y]);
for (let d of s.dirs) {
if (d === Direction.UP) y -= 1;
else if (d === Direction.RIGHT) x += 1;
else if (d === Direction.DOWN) y += 1;
else if (d === Direction.LEFT) x -= 1;
cells.push([x, y]);
}
return cells;
}
function render() {
const ctx = ui.canvas.getContext('2d');
const W = ui.canvas.width = window.innerWidth;
const H = ui.canvas.height = window.innerHeight;
ctx.fillStyle = '#111'; ctx.fillRect(0,0,W,H);
const cell = Math.floor(Math.min(W / fieldW, H / fieldH));
const ox = Math.floor((W - cell*fieldW)/2);
const oy = Math.floor((H - cell*fieldH)/2);
// grid (optional)
ctx.strokeStyle = 'rgba(255,255,255,0.04)';
for (let x=0;x<=fieldW;x++){ ctx.beginPath(); ctx.moveTo(ox + x*cell, oy); ctx.lineTo(ox + x*cell, oy + fieldH*cell); ctx.stroke(); }
for (let y=0;y<=fieldH;y++){ ctx.beginPath(); ctx.moveTo(ox, oy + y*cell); ctx.lineTo(ox + fieldW*cell, oy + y*cell); ctx.stroke(); }
// apples
ctx.fillStyle = '#f00';
for (const [ax, ay] of apples) {
ctx.fillRect(ox + ax*cell, oy + ay*cell, cell, cell);
}
// snakes
for (const [sid, s] of snakes) {
const cells = decodeSnakeCells({hx:s.hx, hy:s.hy, dirs:s.dirs});
ctx.fillStyle = colorForId(sid);
for (const [x,y] of cells) ctx.fillRect(ox + x*cell, oy + y*cell, cell, cell);
}
requestAnimationFrame(render);
}
async function readLoop() {
try {
for await (const datagram of transport.datagrams.readable) {
const dv = new DataView(datagram.buffer, datagram.byteOffset, datagram.byteLength);
if (dv.byteLength < 5) continue;
const type = dv.getUint8(1);
lastServerSeq = dv.getUint16(3);
const expectTick = (type === PacketType.STATE_FULL || type === PacketType.STATE_DELTA || type === PacketType.PART || type === PacketType.CONFIG_UPDATE);
const [ver, ptype, flags, seq, tick, off0] = parseHeader(dv, 0, expectTick);
let off = off0;
switch (ptype) {
case PacketType.JOIN_ACK: {
const info = parseJoinAck(dv, off);
playerId = info.playerId; colorId = info.colorId; fieldW = info.width; fieldH = info.height; tickRate = info.tickRate;
ui.overlay.textContent = 'connected — press space to join';
break;
}
case PacketType.JOIN_DENY: {
// read reason
const [len, p] = quicVarintDecode(dv, off); off = p;
const bytes = new Uint8Array(dv.buffer, dv.byteOffset + off, len);
const reason = new TextDecoder().decode(bytes);
setStatus('Join denied: ' + reason);
break;
}
case PacketType.STATE_FULL: {
const { snakes: sn, apples: ap } = parseStateFullBody(dv, off);
snakes.clear();
for (const s of sn) snakes.set(s.id, { len: s.len, hx: s.hx, hy: s.hy, dirs: s.dirs });
apples = ap;
ui.overlay.style.display = snakes.size ? 'none' : 'flex';
break;
}
case PacketType.STATE_DELTA: {
const delta = parseStateDeltaBody(dv, off);
// Apply changes to local snake state
for (const ch of delta.changes) {
let snake = snakes.get(ch.snakeId);
if (!snake) {
// New snake appeared; skip for now (will get it in next full update)
continue;
}
// Update direction (always present)
// Note: direction is the current direction, not necessarily the new head direction
if (ch.headMoved) {
// Snake moved: prepend new head position
// Compute direction from old head to new head
const oldHx = snake.hx;
const oldHy = snake.hy;
const newHx = ch.newHeadX;
const newHy = ch.newHeadY;
// Determine direction of movement
let moveDir = ch.direction;
if (newHx === oldHx && newHy === oldHy - 1) moveDir = Direction.UP;
else if (newHx === oldHx + 1 && newHy === oldHy) moveDir = Direction.RIGHT;
else if (newHx === oldHx && newHy === oldHy + 1) moveDir = Direction.DOWN;
else if (newHx === oldHx - 1 && newHy === oldHy) moveDir = Direction.LEFT;
// Update head position
snake.hx = newHx;
snake.hy = newHy;
// If tail was removed (normal move), keep dirs same length by prepending new dir
// If grew (ate apple), prepend without removing tail
if (ch.grew) {
// Growing: add direction to front, don't remove from back
snake.dirs = [moveDir, ...snake.dirs];
snake.len += 1;
} else if (ch.tailRemoved) {
// Normal move: head advances, tail shrinks
// Keep dirs array same size (represents len-1 directions)
// Actually, we need to be careful here about the representation
// dirs[i] tells us how to get from cell i to cell i+1
// When head moves and tail shrinks, we add new dir at front, remove old from back
if (snake.dirs.length > 0) {
snake.dirs.pop(); // remove tail direction
}
snake.dirs = [moveDir, ...snake.dirs];
} else {
// Head moved but tail didn't remove (shouldn't happen in normal gameplay)
snake.dirs = [moveDir, ...snake.dirs];
snake.len += 1;
}
} else if (ch.blocked) {
// Snake is blocked: head stayed in place, tail shrunk
if (ch.tailRemoved && snake.dirs.length > 0) {
snake.dirs.pop(); // remove last direction (tail shrinks)
snake.len = Math.max(1, snake.len - 1);
}
}
}
// Apply apple changes
const appleSet = new Set(apples.map(a => `${a[0]},${a[1]}`));
for (const [x, y] of delta.applesAdded) {
appleSet.add(`${x},${y}`);
}
for (const [x, y] of delta.applesRemoved) {
appleSet.delete(`${x},${y}`);
}
apples = Array.from(appleSet).map(s => {
const [x, y] = s.split(',').map(Number);
return [x, y];
});
break;
}
default: break;
}
}
} catch (e) {
setStatus('Read error: ' + e);
}
}
async function connectWT() {
// Check if WebTransport is available
if (typeof WebTransport === 'undefined') {
setStatus('ERROR: WebTransport not supported in this browser. Use Chrome/Edge 97+ or Firefox with flag enabled.');
return;
}
const url = ui.url.value.trim();
const hashHex = ui.hash.value.trim();
if (!url) {
setStatus('ERROR: Please enter a server URL');
return;
}
setStatus('connecting...');
console.log('Connecting to:', url);
const opts = {};
if (hashHex) {
try {
const bytes = new Uint8Array(hashHex.match(/.{1,2}/g).map(b => parseInt(b,16)));
opts.serverCertificateHashes = [{ algorithm: 'sha-256', value: bytes }];
console.log('Using certificate hash:', hashHex);
} catch (err) {
setStatus('ERROR: Invalid certificate hash format');
return;
}
} else {
console.warn('No certificate hash provided - connection may fail for self-signed certs');
}
try {
const wt = new WebTransport(url, opts);
await wt.ready;
transport = wt;
writer = wt.datagrams.writable.getWriter();
setStatus('connected to server');
console.log('WebTransport connected successfully');
readLoop();
requestAnimationFrame(render);
// Send JOIN immediately (spectator → player upon space handled below)
const pkt = buildJoin(1, nextSeq(), ui.name.value.trim());
await writer.write(pkt);
} catch (err) {
console.error('WebTransport connection failed:', err);
if (err.message.includes('certificate')) {
setStatus('ERROR: Certificate validation failed. Check cert hash or use valid CA cert.');
} else if (err.message.includes('net::')) {
setStatus('ERROR: Network error - is server running on ' + url + '?');
} else {
setStatus('ERROR: ' + err.message);
}
}
}
function dirFromKey(e) {
if (e.key === 'ArrowUp' || e.key === 'w') return Direction.UP;
if (e.key === 'ArrowRight' || e.key === 'd') return Direction.RIGHT;
if (e.key === 'ArrowDown' || e.key === 's') return Direction.DOWN;
if (e.key === 'ArrowLeft' || e.key === 'a') return Direction.LEFT;
return null;
}
async function onKey(e) {
const d = dirFromKey(e);
if (d == null) return;
if (!writer) return;
const pkt = buildInput(1, nextSeq(), lastServerSeq, (inputSeq = (inputSeq + 1) & 0xffff), 0, [{ rel: 0, dir: d }]);
try { await writer.write(pkt); } catch (err) { /* ignore */ }
}
// Auto-configure on page load
async function autoConfigureFromServer() {
try {
// Try to fetch cert hash from the static server
const response = await fetch('/cert-hash.json');
if (!response.ok) {
setStatus('Auto-config unavailable, please enter manually');
return;
}
const config = await response.json();
// Auto-populate fields
if (config.wtUrl) {
// Use the hostname from browser but with the WT port from server
const hostname = window.location.hostname || '127.0.0.1';
const wtPort = config.wtPort || 4433;
ui.url.value = `https://${hostname}:${wtPort}/`;
}
if (config.sha256) {
ui.hash.value = config.sha256;
}
setStatus('Auto-configured from server');
console.log('Auto-configured:', config);
} catch (err) {
console.warn('Auto-config failed:', err);
// Fallback: just populate URL from browser location
const hostname = window.location.hostname || '127.0.0.1';
ui.url.value = `https://${hostname}:4433/`;
setStatus('Manual configuration required');
}
}
ui.connect.onclick = () => { connectWT().catch(e => setStatus('connect failed: ' + e)); };
window.addEventListener('keydown', onKey);
window.addEventListener('resize', render);
// Auto-configure when page loads
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', autoConfigureFromServer);
} else {
autoConfigureFromServer();
}

29
client/index.html Normal file
View File

@@ -0,0 +1,29 @@
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>Multiplayer Snake (WebTransport)</title>
<style>
html, body { height: 100%; margin: 0; background: #111; color: #ddd; font-family: system-ui, sans-serif; }
#ui { position: absolute; top: 10px; left: 10px; background: rgba(0,0,0,0.6); padding: 10px; border-radius: 6px; }
label { display: block; margin: 4px 0; font-size: 12px; }
input { width: 360px; }
#overlay { position: absolute; inset: 0; display: flex; align-items: center; justify-content: center; pointer-events: none; color: rgba(255,255,255,0.85); font-size: 24px; font-weight: 600; }
canvas { display: block; width: 100%; height: 100%; image-rendering: pixelated; }
</style>
</head>
<body>
<div id="ui">
<label>Server URL (WebTransport): <input id="url" value="https://localhost:4433/" placeholder="Auto-configured..."/></label>
<label>Certificate Hash (SHA-256, auto-configured): <input id="hash" placeholder="Auto-configured from server..."/></label>
<label>Name: <input id="name" value="guest"/></label>
<button id="connect">Connect</button>
<span id="status"></span>
</div>
<div id="overlay">press space to join</div>
<canvas id="view"></canvas>
<script type="module" src="client.js"></script>
</body>
</html>

214
client/protocol.js Normal file
View File

@@ -0,0 +1,214 @@
// Minimal protocol helpers in JS (match server/protocol.py)
export const PacketType = {
JOIN: 0,
JOIN_ACK: 1,
JOIN_DENY: 2,
INPUT: 3,
INPUT_BROADCAST: 4,
STATE_DELTA: 5,
STATE_FULL: 6,
PART: 7,
CONFIG_UPDATE: 8,
PING: 9,
PONG: 10,
ERROR: 11,
};
export const Direction = { UP: 0, RIGHT: 1, DOWN: 2, LEFT: 3 };
export function quicVarintEncode(n) {
if (n <= 0x3f) return new Uint8Array([n & 0x3f]);
if (n <= 0x3fff) {
const v = 0x4000 | n;
return new Uint8Array([(v >> 8) & 0xff, v & 0xff]);
}
if (n <= 0x3fffffff) {
const v = 0x80000000 | n;
return new Uint8Array([(v >>> 24) & 0xff, (v >>> 16) & 0xff, (v >>> 8) & 0xff, v & 0xff]);
}
throw new Error("varint too large for demo");
}
export function quicVarintDecode(view, offset) {
const first = view.getUint8(offset);
const prefix = first >> 6;
if (prefix === 0) return [first & 0x3f, offset + 1];
if (prefix === 1) return [view.getUint16(offset) & 0x3fff, offset + 2];
if (prefix === 2) return [view.getUint32(offset) & 0x3fffffff, offset + 4];
throw new Error("8-byte varint not supported in demo");
}
export function packHeader(ver, type, flags, seq, tick /* or null */) {
const len = tick == null ? 5 : 7;
const buf = new ArrayBuffer(len);
const dv = new DataView(buf);
dv.setUint8(0, ver & 0xff);
dv.setUint8(1, type & 0xff);
dv.setUint8(2, flags & 0xff);
dv.setUint16(3, seq & 0xffff);
if (tick != null) dv.setUint16(5, tick & 0xffff);
return new Uint8Array(buf);
}
export function parseHeader(dv, offset = 0, expectTick = false) {
if (dv.byteLength - offset < 5) throw new Error("small header");
const ver = dv.getUint8(offset);
const type = dv.getUint8(offset + 1);
const flags = dv.getUint8(offset + 2);
const seq = dv.getUint16(offset + 3);
let tick = null;
let off = offset + 5;
if (expectTick) {
tick = dv.getUint16(off);
off += 2;
}
return [ver, type, flags, seq, tick, off];
}
export function buildJoin(ver, seq, name, preferredColor = null) {
const enc = new TextEncoder();
const nb = enc.encode(name);
const nameLen = quicVarintEncode(nb.length);
const header = packHeader(ver, PacketType.JOIN, 0, seq, null);
const extra = preferredColor == null ? new Uint8Array([]) : new Uint8Array([preferredColor & 0xff]);
const out = new Uint8Array(header.length + nameLen.length + nb.length + extra.length);
out.set(header, 0);
out.set(nameLen, header.length);
out.set(nb, header.length + nameLen.length);
out.set(extra, header.length + nameLen.length + nb.length);
return out;
}
export function buildInput(ver, seq, ackSeq, inputSeq, baseTick, events /* [{rel, dir}] */) {
const header = packHeader(ver, PacketType.INPUT, 0, seq, null);
const evParts = [];
evParts.push(...[ackSeq >> 8, ackSeq & 0xff, inputSeq >> 8, inputSeq & 0xff, baseTick >> 8, baseTick & 0xff]);
const evCount = quicVarintEncode(events.length);
const chunks = [header, new Uint8Array(evParts), evCount];
for (const e of events) {
const off = quicVarintEncode(e.rel);
chunks.push(off, new Uint8Array([e.dir & 0x03]));
}
const size = chunks.reduce((s, a) => s + a.length, 0);
const out = new Uint8Array(size);
let p = 0;
for (const c of chunks) { out.set(c, p); p += c.length; }
return out;
}
export function parseJoinAck(dv, off) {
const playerId = dv.getUint8(off); off += 1;
const colorId = dv.getUint8(off); off += 1;
const width = dv.getUint8(off); off += 1;
const height = dv.getUint8(off); off += 1;
const tickRate = dv.getUint8(off); off += 1;
const wrapEdges = dv.getUint8(off) !== 0; off += 1;
const applesPerSnake = dv.getUint8(off); off += 1;
const applesCap = dv.getUint8(off); off += 1;
const compressionMode = dv.getUint8(off); off += 1;
return { playerId, colorId, width, height, tickRate, wrapEdges, applesPerSnake, applesCap, compressionMode, off };
}
export function parseStateFullBody(dv, off) {
const snakes = [];
// snakes count
let [n, p] = quicVarintDecode(dv, off); off = p;
for (let i = 0; i < n; i++) {
const id = dv.getUint8(off); off += 1;
const len = dv.getUint16(off); off += 2;
const hx = dv.getUint8(off); off += 1;
const hy = dv.getUint8(off); off += 1;
// TLV
let [t, p1] = quicVarintDecode(dv, off); off = p1;
let [L, p2] = quicVarintDecode(dv, off); off = p2;
const tStart = off;
const tEnd = off + L;
let dirs = [];
if (t === 0 /* 2-bit */) {
const needed = Math.max(0, (len - 1));
let bits = 0, acc = 0, di = 0;
for (let iB = off; iB < tEnd && di < needed; iB++) {
acc |= dv.getUint8(iB) << bits;
bits += 8;
while (bits >= 2 && di < needed) {
dirs.push(acc & 0x03);
acc >>= 2; bits -= 2; di++;
}
}
} else if (t === 1 /* RLE */) {
let countAccum = 0;
while (off < tEnd && countAccum < (len - 1)) {
const dir = dv.getUint8(off); off += 1;
let c; [c, off] = quicVarintDecode(dv, off);
for (let k = 0; k < c; k++) dirs.push(dir & 0x03);
countAccum += c;
}
} else {
// chunk types not expected in full body for initial join; skip
off = tEnd;
}
off = tEnd;
snakes.push({ id, len, hx, hy, dirs });
}
// apples
let m; [m, off] = quicVarintDecode(dv, off);
const apples = [];
for (let i = 0; i < m; i++) { apples.push([dv.getUint8(off++), dv.getUint8(off++)]); }
return { snakes, apples, off };
}
export function parseStateDeltaBody(dv, off) {
// Parse STATE_DELTA body format (mirrors server/protocol.py build_state_delta_body)
// update_id (u16)
const updateId = dv.getUint16(off); off += 2;
// changes count (QUIC varint)
let [changesCount, p1] = quicVarintDecode(dv, off); off = p1;
const changes = [];
for (let i = 0; i < changesCount; i++) {
const snakeId = dv.getUint8(off); off += 1;
const flags = dv.getUint8(off); off += 1;
const direction = dv.getUint8(off) & 0x03; off += 1;
const headMoved = (flags & 0x01) !== 0;
const tailRemoved = (flags & 0x02) !== 0;
const grew = (flags & 0x04) !== 0;
const blocked = (flags & 0x08) !== 0;
let newHeadX = 0, newHeadY = 0;
if (headMoved) {
newHeadX = dv.getUint8(off); off += 1;
newHeadY = dv.getUint8(off); off += 1;
}
changes.push({
snakeId,
headMoved,
tailRemoved,
grew,
blocked,
direction,
newHeadX,
newHeadY
});
}
// apples added (count + coords)
let [applesAddedCount, p2] = quicVarintDecode(dv, off); off = p2;
const applesAdded = [];
for (let i = 0; i < applesAddedCount; i++) {
applesAdded.push([dv.getUint8(off++), dv.getUint8(off++)]);
}
// apples removed (count + coords)
let [applesRemovedCount, p3] = quicVarintDecode(dv, off); off = p3;
const applesRemoved = [];
for (let i = 0; i < applesRemovedCount; i++) {
applesRemoved.push([dv.getUint8(off++), dv.getUint8(off++)]);
}
return { updateId, changes, applesAdded, applesRemoved, off };
}

2
requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
aioquic>=1.2.0
cryptography>=41.0.0

338
run.py Normal file
View File

@@ -0,0 +1,338 @@
import asyncio
import argparse
import logging
import sys
from server.server import GameServer
from server.config import ServerConfig
async def _run_tasks_with_optional_timeout(tasks, timeout_s=None):
"""Await tasks, optionally with a timeout to cancel after specified seconds."""
if not timeout_s:
await asyncio.gather(*tasks)
return
try:
await asyncio.wait_for(asyncio.gather(*tasks), timeout=float(timeout_s))
except asyncio.TimeoutError:
logging.info("Timeout reached (%s seconds); stopping server tasks...", timeout_s)
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
async def run_in_memory(args):
from server.transport import InMemoryTransport
cfg = ServerConfig(
width=args.width,
height=args.height,
tick_rate=args.tick_rate,
wrap_edges=args.wrap_edges,
apples_per_snake=args.apples_per_snake,
apples_cap=args.apples_cap,
players_max=args.players_max,
)
server = GameServer(transport=InMemoryTransport(lambda d, p: server.on_datagram(d, p)), config=cfg)
tasks = [asyncio.create_task(server.transport.run()), asyncio.create_task(server.tick_loop())]
await _run_tasks_with_optional_timeout(tasks, args.run_seconds)
async def run_quic(args):
import os
from server.quic_transport import QuicWebTransportServer
from server.utils import combine_cert_chain
cfg = ServerConfig(
width=args.width,
height=args.height,
tick_rate=args.tick_rate,
wrap_edges=args.wrap_edges,
apples_per_snake=args.apples_per_snake,
apples_cap=args.apples_cap,
players_max=args.players_max,
)
# Handle certificate chain
# Check if cert file contains multiple certificates (like Let's Encrypt fullchain.pem)
with open(args.cert, 'r') as f:
cert_content = f.read()
from server.utils import split_pem_certificates
certs_in_file = split_pem_certificates(cert_content)
cert_file = args.cert
temp_cert_file = None
if args.cert_chain:
# Combine cert + chain files into single file
temp_cert_file = combine_cert_chain(args.cert, args.cert_chain)
cert_file = temp_cert_file
logging.info("Combined certificate with %d chain file(s)", len(args.cert_chain))
elif len(certs_in_file) > 1:
# Certificate file contains full chain - reformat it for aioquic
temp_cert_file = combine_cert_chain(args.cert, [])
cert_file = temp_cert_file
logging.info("Reformatted certificate chain from fullchain file (%d certs)", len(certs_in_file))
server = GameServer(transport=QuicWebTransportServer(args.quic_host, args.quic_port, cert_file, args.key, lambda d, p: server.on_datagram(d, p)), config=cfg)
logging.info("QUIC server: %s:%d", args.quic_host, args.quic_port)
try:
tasks = [asyncio.create_task(server.transport.run()), asyncio.create_task(server.tick_loop())]
await _run_tasks_with_optional_timeout(tasks, args.run_seconds)
finally:
if temp_cert_file:
os.unlink(temp_cert_file) # Clean up temp combined cert file
async def run_webtransport(args):
import json
import os
from server.webtransport_server import WebTransportServer
from server.static_server import start_https_static
from server.utils import get_cert_sha256_hash, combine_cert_chain, extract_server_cert
cfg = ServerConfig(
width=args.width,
height=args.height,
tick_rate=args.tick_rate,
wrap_edges=args.wrap_edges,
apples_per_snake=args.apples_per_snake,
apples_cap=args.apples_cap,
players_max=args.players_max,
)
# Handle certificate chain
# Check if cert file contains multiple certificates (like Let's Encrypt fullchain.pem)
with open(args.cert, 'r') as f:
cert_content = f.read()
from server.utils import split_pem_certificates
certs_in_file = split_pem_certificates(cert_content)
cert_file = args.cert
temp_cert_file = None
if args.cert_chain:
# Combine cert + chain files into single file
temp_cert_file = combine_cert_chain(args.cert, args.cert_chain)
cert_file = temp_cert_file
logging.info("Combined certificate with %d chain file(s)", len(args.cert_chain))
elif len(certs_in_file) > 1:
# Certificate file contains full chain - reformat it for aioquic
temp_cert_file = combine_cert_chain(args.cert, [])
cert_file = temp_cert_file
logging.info("Reformatted certificate chain from fullchain file (%d certs)", len(certs_in_file))
# Calculate certificate hash for WebTransport client (only hash server cert, not chain)
server_cert_file = extract_server_cert(args.cert)
cert_hash = get_cert_sha256_hash(server_cert_file)
if server_cert_file != args.cert:
os.unlink(server_cert_file) # Clean up temp file
# Prepare cert-hash.json content
cert_hash_json = json.dumps({
"sha256": cert_hash,
"wtUrl": f"https://{args.wt_host}:{args.wt_port}/",
"wtPort": args.wt_port
})
# Optional static HTTPS server for client assets
httpd = None
if args.static:
httpd, _t = start_https_static(
args.static_host,
args.static_port,
cert_file,
args.key,
args.static_root,
cert_hash_json=cert_hash_json
)
print(f"HTTPS static server: https://{args.static_host}:{args.static_port}/ serving '{args.static_root}'")
print(f"Certificate SHA-256: {cert_hash}")
server = GameServer(transport=WebTransportServer(args.wt_host, args.wt_port, cert_file, args.key, lambda d, p: server.on_datagram(d, p)), config=cfg)
print(f"WebTransport server: https://{args.wt_host}:{args.wt_port}/ (HTTP/3)")
try:
tasks = [asyncio.create_task(server.transport.run()), asyncio.create_task(server.tick_loop())]
await _run_tasks_with_optional_timeout(tasks, args.run_seconds)
finally:
if httpd is not None:
httpd.shutdown()
if temp_cert_file:
os.unlink(temp_cert_file) # Clean up temp combined cert file
async def run_net(args):
import os
from server.webtransport_server import WebTransportServer
from server.quic_transport import QuicWebTransportServer
from server.multi_transport import MultiTransport
from server.utils import combine_cert_chain
cfg = ServerConfig(
width=args.width,
height=args.height,
tick_rate=args.tick_rate,
wrap_edges=args.wrap_edges,
apples_per_snake=args.apples_per_snake,
apples_cap=args.apples_cap,
players_max=args.players_max,
)
# Handle certificate chain
# Check if cert file contains multiple certificates (like Let's Encrypt fullchain.pem)
with open(args.cert, 'r') as f:
cert_content = f.read()
from server.utils import split_pem_certificates
certs_in_file = split_pem_certificates(cert_content)
cert_file = args.cert
temp_cert_file = None
if args.cert_chain:
# Combine cert + chain files into single file
temp_cert_file = combine_cert_chain(args.cert, args.cert_chain)
cert_file = temp_cert_file
logging.info("Combined certificate with %d chain file(s)", len(args.cert_chain))
elif len(certs_in_file) > 1:
# Certificate file contains full chain - reformat it for aioquic
temp_cert_file = combine_cert_chain(args.cert, [])
cert_file = temp_cert_file
logging.info("Reformatted certificate chain from fullchain file (%d certs)", len(certs_in_file))
server: GameServer
wt = WebTransportServer(args.wt_host, args.wt_port, cert_file, args.key, lambda d, p: server.on_datagram(d, p))
qu = QuicWebTransportServer(args.quic_host, args.quic_port, cert_file, args.key, lambda d, p: server.on_datagram(d, p))
m = MultiTransport(wt, qu)
server = GameServer(transport=m, config=cfg)
logging.info("WebTransport server: %s:%d", args.wt_host, args.wt_port)
logging.info("QUIC server: %s:%d", args.quic_host, args.quic_port)
try:
tasks = [asyncio.create_task(m.run()), asyncio.create_task(server.tick_loop())]
await _run_tasks_with_optional_timeout(tasks, args.run_seconds)
finally:
if temp_cert_file:
os.unlink(temp_cert_file) # Clean up temp combined cert file
def parse_args():
parser = argparse.ArgumentParser(
description="Real-time multiplayer Snake game server",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python run.py # WebTransport mode with defaults
python run.py --mode mem # In-memory mode (testing)
python run.py --mode quic --quic-port 5000 # QUIC mode on custom port
python run.py --cert my.pem --key my.key # Custom TLS certificate
python run.py --no-static # Disable static HTTPS server
python run.py --width 80 --height 60 # Custom field size
"""
)
# Transport mode
parser.add_argument("--mode", choices=["mem", "wt", "quic", "net"], default="wt",
help="Transport mode: mem (in-memory), wt (WebTransport/HTTP3), quic (QUIC datagrams), net (both wt+quic). Default: wt")
# TLS certificate/key
parser.add_argument("--cert", default="cert.pem",
help="TLS certificate file path (may contain full chain). Default: cert.pem")
parser.add_argument("--key", default="key.pem",
help="TLS private key file path. Default: key.pem")
parser.add_argument("--cert-chain", action="append", dest="cert_chain",
help="Intermediate certificate file (can be used multiple times for long chains)")
# WebTransport server settings
parser.add_argument("--wt-host", default="0.0.0.0",
help="WebTransport server host. Default: 0.0.0.0")
parser.add_argument("--wt-port", type=int, default=4433,
help="WebTransport server port. Default: 4433")
# QUIC server settings
parser.add_argument("--quic-host", default="0.0.0.0",
help="QUIC server host. Default: 0.0.0.0")
parser.add_argument("--quic-port", type=int, default=4433,
help="QUIC server port. Default: 4433 (or 4443 in net mode)")
# Static HTTPS server settings
parser.add_argument("--static", dest="static", action="store_true", default=True,
help="Enable static HTTPS server for client files (default in wt mode)")
parser.add_argument("--no-static", dest="static", action="store_false",
help="Disable static HTTPS server")
parser.add_argument("--static-host", default=None,
help="Static server host. Default: same as wt-host")
parser.add_argument("--static-port", type=int, default=8443,
help="Static server port. Default: 8443")
parser.add_argument("--static-root", default="client",
help="Static files directory. Default: client")
# Game configuration
parser.add_argument("--width", type=int, default=60,
help="Field width (3-255). Default: 60")
parser.add_argument("--height", type=int, default=40,
help="Field height (3-255). Default: 40")
parser.add_argument("--tick-rate", type=int, default=10,
help="Server tick rate in TPS (5-30). Default: 10")
parser.add_argument("--wrap-edges", action="store_true", default=False,
help="Enable edge wrapping (default: disabled)")
parser.add_argument("--apples-per-snake", type=int, default=1,
help="Apples per connected snake (1-12). Default: 1")
parser.add_argument("--apples-cap", type=int, default=255,
help="Maximum total apples (0-255). Default: 255")
parser.add_argument("--players-max", type=int, default=32,
help="Maximum concurrent players. Default: 32")
# Logging and testing
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="Logging level. Default: INFO")
parser.add_argument("--run-seconds", type=float, default=None,
help="Optional timeout in seconds for testing")
args = parser.parse_args()
# Post-process: static-host defaults to wt-host
if args.static_host is None:
args.static_host = args.wt_host
# In net mode, default quic-port to 4443 if not explicitly set
if args.mode == "net" and "--quic-port" not in sys.argv:
args.quic_port = 4443
# Validate TLS files for modes that need them
if args.mode in ("wt", "quic", "net"):
import os
if not os.path.exists(args.cert):
parser.error(f"Certificate file not found: {args.cert}")
if not os.path.exists(args.key):
parser.error(f"Key file not found: {args.key}")
return args
if __name__ == "__main__":
try:
args = parse_args()
# Logging setup
logging.basicConfig(
level=getattr(logging, args.log_level.upper()),
format="[%(asctime)s] %(levelname)s: %(message)s"
)
# Run appropriate mode
if args.mode == "wt":
logging.info("Starting in WebTransport mode")
asyncio.run(run_webtransport(args))
elif args.mode == "quic":
logging.info("Starting in QUIC datagram mode")
asyncio.run(run_quic(args))
elif args.mode == "net":
logging.info("Starting in combined WebTransport+QUIC mode")
asyncio.run(run_net(args))
else: # mem
logging.info("Starting in in-memory transport mode")
asyncio.run(run_in_memory(args))
except KeyboardInterrupt:
pass

2
server/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
__all__ = []

28
server/config.py Normal file
View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from dataclasses import dataclass
@dataclass
class ServerConfig:
width: int = 60
height: int = 40
tick_rate: int = 10 # TPS, server-controlled, live-configurable
wrap_edges: bool = False # default off
apples_per_snake: int = 1 # min 1, max 12
apples_cap: int = 255 # absolute cap
compression_mode: str = "none" # "none" | "deflate" (handshake-only)
players_max: int = 32
def validate_runtime(self) -> None:
if not (3 <= self.width <= 255 and 3 <= self.height <= 255):
raise ValueError("field size must be within 3..255")
if not (5 <= self.tick_rate <= 30):
raise ValueError("tick_rate must be 5..30 TPS")
if not (1 <= self.apples_per_snake <= 12):
raise ValueError("apples_per_snake must be 1..12")
if not (0 <= self.apples_cap <= 255):
raise ValueError("apples_cap must be 0..255")
if self.compression_mode not in ("none", "deflate"):
raise ValueError("compression_mode must be 'none' or 'deflate'")

77
server/model.py Normal file
View File

@@ -0,0 +1,77 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Deque, Dict, List, Optional, Tuple
from collections import deque
from .protocol import Direction
Coord = Tuple[int, int]
@dataclass
class Snake:
snake_id: int
head: Coord
direction: Direction
body: Deque[Coord] = field(default_factory=deque) # includes head at index 0
input_buf: Deque[Direction] = field(default_factory=deque)
blocked: bool = False
@property
def length(self) -> int:
return len(self.body)
def enqueue_direction(self, new_dir: Direction, capacity: int = 3) -> None:
"""Apply input buffer rules: size<=capacity, replace last on overflow/opposite, drop duplicates."""
last_dir = self.input_buf[-1] if self.input_buf else self.direction
# Drop duplicates
if int(new_dir) == int(last_dir):
return
# Opposite of last? replace last
if (int(new_dir) ^ int(last_dir)) == 2: # 0^2,1^3,2^0,3^1 are opposites
if self.input_buf:
self.input_buf[-1] = new_dir
else:
# No buffered inputs; just add new_dir (consumption will handle 180° rule)
self.input_buf.append(new_dir)
return
# Normal append with overflow replacement
if len(self.input_buf) >= capacity:
self.input_buf[-1] = new_dir
else:
self.input_buf.append(new_dir)
@dataclass
class PlayerSession:
player_id: int
name: str
color_id: int
peer: object # transport-specific handle
input_seq: int = 0
@dataclass
class GameState:
width: int
height: int
snakes: Dict[int, Snake] = field(default_factory=dict)
apples: List[Coord] = field(default_factory=list)
tick: int = 0
occupancy: Dict[Coord, Tuple[int, int]] = field(default_factory=dict) # cell -> (snake_id, index)
def in_bounds(self, x: int, y: int) -> bool:
return 0 <= x < self.width and 0 <= y < self.height
def cell_free(self, x: int, y: int) -> bool:
return (x, y) not in self.occupancy
def occupy_snake(self, snake: Snake) -> None:
for idx, (cx, cy) in enumerate(snake.body):
self.occupancy[(cx, cy)] = (snake.snake_id, idx)
def clear_snake(self, snake: Snake) -> None:
for (cx, cy) in list(snake.body):
self.occupancy.pop((cx, cy), None)

26
server/multi_transport.py Normal file
View File

@@ -0,0 +1,26 @@
from __future__ import annotations
import asyncio
from .transport import DatagramServerTransport, TransportPeer
class MultiTransport(DatagramServerTransport):
"""Run both WebTransport and QUIC transports and route sends.
Inbound datagrams share the same on_datagram callback from GameServer,
so GameServer sees a unified source of datagrams.
"""
def __init__(self, wt_transport: DatagramServerTransport, quic_transport: DatagramServerTransport):
self._wt = wt_transport
self._quic = quic_transport
async def send(self, data: bytes, peer: TransportPeer) -> None:
# WebTransport peers are tuples (proto, flow_id); QUIC uses protocol instance
if isinstance(peer.addr, tuple) and len(peer.addr) == 2:
await self._wt.send(data, peer)
else:
await self._quic.send(data, peer)
async def run(self) -> None:
await asyncio.gather(self._wt.run(), self._quic.run())

428
server/protocol.py Normal file
View File

@@ -0,0 +1,428 @@
from __future__ import annotations
from dataclasses import dataclass
from enum import IntEnum
from typing import Iterable, List, Sequence, Tuple
class PacketType(IntEnum):
JOIN = 0
JOIN_ACK = 1
JOIN_DENY = 2
INPUT = 3
INPUT_BROADCAST = 4
STATE_DELTA = 5
STATE_FULL = 6
PART = 7
CONFIG_UPDATE = 8
PING = 9
PONG = 10
ERROR = 11
class BodyTLV(IntEnum):
BODY_2BIT = 0x00
BODY_RLE = 0x01
BODY_2BIT_CHUNK = 0x10
BODY_RLE_CHUNK = 0x11
class Direction(IntEnum):
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3
def quic_varint_encode(value: int) -> bytes:
"""Encode QUIC varint (RFC 9000)."""
if value < 0:
raise ValueError("varint must be non-negative")
if value <= 0x3F: # 6 bits, 1 byte, 00xx xxxx
return bytes([value & 0x3F])
if value <= 0x3FFF: # 14 bits, 2 bytes, 01xx xxxx
v = 0x4000 | value
return v.to_bytes(2, "big")
if value <= 0x3FFFFFFF: # 30 bits, 4 bytes, 10xx xxxx
v = 0x80000000 | value
return v.to_bytes(4, "big")
if value <= 0x3FFFFFFFFFFFFFFF: # 62 bits, 8 bytes, 11xx xxxx
v = 0xC000000000000000 | value
return v.to_bytes(8, "big")
raise ValueError("varint too large")
def quic_varint_decode(buf: bytes, offset: int = 0) -> Tuple[int, int]:
"""Decode QUIC varint starting at offset. Returns (value, next_offset)."""
first = buf[offset]
prefix = first >> 6
if prefix == 0:
return (first & 0x3F, offset + 1)
if prefix == 1:
v = int.from_bytes(buf[offset : offset + 2], "big") & 0x3FFF
return (v, offset + 2)
if prefix == 2:
v = int.from_bytes(buf[offset : offset + 4], "big") & 0x3FFFFFFF
return (v, offset + 4)
v = int.from_bytes(buf[offset : offset + 8], "big") & 0x3FFFFFFFFFFFFFFF
return (v, offset + 8)
def pack_header(version: int, ptype: PacketType, flags: int, seq: int, tick: int | None) -> bytes:
"""Pack common header fields.
Layout:
- ver: u8
- type: u8
- flags: u8
- seq: u16 (network order)
- tick: optional u16
"""
if not (0 <= version <= 255):
raise ValueError("version out of range")
if not (0 <= flags <= 255):
raise ValueError("flags out of range")
if not (0 <= seq <= 0xFFFF):
raise ValueError("seq out of range")
parts = bytearray()
parts.append(version & 0xFF)
parts.append(int(ptype) & 0xFF)
parts.append(flags & 0xFF)
parts.extend(seq.to_bytes(2, "big"))
if tick is not None:
if not (0 <= tick <= 0xFFFF):
raise ValueError("tick out of range")
parts.extend(tick.to_bytes(2, "big"))
return bytes(parts)
def unpack_header(buf: bytes, expect_tick: bool) -> Tuple[int, PacketType, int, int, int | None, int]:
"""Unpack header; returns (ver, type, flags, seq, tick, next_offset)."""
if len(buf) < 5:
raise ValueError("buffer too small for header")
ver = buf[0]
ptype = PacketType(buf[1])
flags = buf[2]
seq = int.from_bytes(buf[3:5], "big")
off = 5
tick = None
if expect_tick:
if len(buf) < 7:
raise ValueError("buffer too small for tick")
tick = int.from_bytes(buf[5:7], "big")
off = 7
return ver, ptype, flags, seq, tick, off
# Message builders/parsers (subset)
def build_config_update(
*,
version: int,
seq: int,
tick: int,
tick_rate: int,
wrap_edges: bool,
apples_per_snake: int,
apples_cap: int,
) -> bytes:
header = pack_header(version, PacketType.CONFIG_UPDATE, 0, seq, tick)
body = bytearray()
body.append(tick_rate & 0xFF) # u8
body.append(1 if wrap_edges else 0) # bool u8
body.append(apples_per_snake & 0xFF) # u8
body.append(apples_cap & 0xFF) # u8
return header + bytes(body)
@dataclass
class InputEvent:
rel_tick_offset: int # relative to base_tick
direction: Direction
def build_input_broadcast(
*,
version: int,
seq: int,
tick: int,
player_id: int,
input_seq: int,
base_tick: int,
events: Sequence[InputEvent],
apply_at_tick: int | None = None,
) -> bytes:
header = pack_header(version, PacketType.INPUT_BROADCAST, 0, seq, tick)
body = bytearray()
body.append(player_id & 0xFF)
body.extend(int(input_seq & 0xFFFF).to_bytes(2, "big"))
body.extend(int(base_tick & 0xFFFF).to_bytes(2, "big"))
# number of events as QUIC varint
body.extend(quic_varint_encode(len(events)))
for ev in events:
# rel offset as QUIC varint, direction as u8 (low 2 bits)
body.extend(quic_varint_encode(int(ev.rel_tick_offset)))
body.append(int(ev.direction) & 0x03)
# Optional absolute apply_at_tick presence flag + value
if apply_at_tick is None:
body.append(0)
else:
body.append(1)
body.extend(int(apply_at_tick & 0xFFFF).to_bytes(2, "big"))
return header + bytes(body)
def pack_body_tlv(t: BodyTLV, payload: bytes) -> bytes:
return quic_varint_encode(int(t)) + quic_varint_encode(len(payload)) + payload
def bitpack_2bit_directions(directions: Iterable[Direction]) -> bytes:
out = bytearray()
acc = 0
bits = 0
for d in directions:
acc |= (int(d) & 0x03) << bits
bits += 2
if bits >= 8:
out.append(acc & 0xFF)
acc = acc >> 8
bits -= 8
if bits:
out.append(acc & 0xFF) # zero-padded high bits
return bytes(out)
def rle_pack_directions(directions: Sequence[Direction]) -> bytes:
"""Pack directions as runs: dir (u8) + count (QUIC varint >=1)."""
if not directions:
return b""
out = bytearray()
run_dir = directions[0]
run_len = 1
for d in directions[1:]:
if d == run_dir:
run_len += 1
else:
out.append(int(run_dir) & 0xFF)
out.extend(quic_varint_encode(run_len))
run_dir = d
run_len = 1
out.append(int(run_dir) & 0xFF)
out.extend(quic_varint_encode(run_len))
return bytes(out)
def build_body_tlv_for_dirs(directions: Sequence[Direction]) -> bytes:
"""Choose the more compact of 2-bit stream vs RLE for given directions and return its TLV."""
# 2-bit body size
two_bit_payload = bitpack_2bit_directions(directions)
two_bit = pack_body_tlv(BodyTLV.BODY_2BIT, two_bit_payload)
# RLE
rle_payload = rle_pack_directions(directions)
rle = pack_body_tlv(BodyTLV.BODY_RLE, rle_payload)
return two_bit if len(two_bit) <= len(rle) else rle
# Join / Ack / Deny
def build_join(name_utf8: bytes, preferred_color_id: int | None = None) -> bytes:
# Client-side helper (not used in server)
raise NotImplementedError
def parse_join(buf: bytes, offset: int) -> Tuple[str, int | None, int]:
name_len, off = quic_varint_decode(buf, offset)
name_b = buf[off : off + name_len]
off += name_len
preferred = None
if off < len(buf):
preferred = buf[off]
off += 1
name = name_b.decode("utf-8", errors="ignore")
return name, preferred, off
def build_join_ack(
*,
version: int,
seq: int,
player_id: int,
color_id: int,
width: int,
height: int,
tick_rate: int,
wrap_edges: bool,
apples_per_snake: int,
apples_cap: int,
compression_mode: int,
) -> bytes:
header = pack_header(version, PacketType.JOIN_ACK, 0, seq, None)
body = bytearray()
body.append(player_id & 0xFF)
body.append(color_id & 0xFF)
body.append(width & 0xFF)
body.append(height & 0xFF)
body.append(tick_rate & 0xFF)
body.append(1 if wrap_edges else 0)
body.append(apples_per_snake & 0xFF)
body.append(apples_cap & 0xFF)
body.append(compression_mode & 0xFF)
return header + bytes(body)
def build_join_deny(*, version: int, seq: int, reason: str) -> bytes:
header = pack_header(version, PacketType.JOIN_DENY, 0, seq, None)
rb = reason.encode("utf-8")[:64]
return header + quic_varint_encode(len(rb)) + rb
# Input (client -> server)
def parse_input(buf: bytes, offset: int) -> Tuple[int, int, int, List[InputEvent], int]:
ack_seq = int.from_bytes(buf[offset : offset + 2], "big")
offset += 2
input_seq = int.from_bytes(buf[offset : offset + 2], "big")
offset += 2
base_tick = int.from_bytes(buf[offset : offset + 2], "big")
offset += 2
n_ev, offset = quic_varint_decode(buf, offset)
events: List[InputEvent] = []
for _ in range(n_ev):
rel, offset = quic_varint_decode(buf, offset)
d = Direction(buf[offset] & 0x03)
offset += 1
events.append(InputEvent(rel_tick_offset=int(rel), direction=d))
return ack_seq, input_seq, base_tick, events, offset
# State snapshot (server -> client)
def build_state_full(
*,
version: int,
seq: int,
tick: int,
snakes: Sequence[Tuple[int, int, int, int, Sequence[Direction]]],
apples: Sequence[Tuple[int, int]],
) -> bytes:
"""Build a minimal state_full: per-snake header + BODY_2BIT TLV; apples list.
snakes: sequence of (snake_id, len, head_x, head_y, body_dirs_from_head)
apples: sequence of (x, y)
"""
header = pack_header(version, PacketType.STATE_FULL, 0, seq, tick & 0xFFFF)
body = build_state_full_body(snakes=snakes, apples=apples)
return header + body
def build_state_full_body(
*, snakes: Sequence[Tuple[int, int, int, int, Sequence[Direction]]], apples: Sequence[Tuple[int, int]]
) -> bytes:
body = bytearray()
# snakes count
body.extend(quic_varint_encode(len(snakes)))
for sid, slen, hx, hy, dirs in snakes:
body.append(sid & 0xFF)
body.extend(int(slen & 0xFFFF).to_bytes(2, "big"))
body.append(hx & 0xFF)
body.append(hy & 0xFF)
tlv = build_body_tlv_for_dirs(dirs)
body.extend(tlv)
# apples
body.extend(quic_varint_encode(len(apples)))
for ax, ay in apples:
body.append(ax & 0xFF)
body.append(ay & 0xFF)
return bytes(body)
def build_body_2bit_chunk(directions: Sequence[Direction], start_index: int, dirs_in_chunk: int) -> bytes:
"""Build chunk TLV for a sub-range of directions (2-bit)."""
sub = directions[start_index : start_index + dirs_in_chunk]
payload = bytearray()
payload.extend(int(start_index & 0xFFFF).to_bytes(2, "big"))
payload.extend(int(dirs_in_chunk & 0xFFFF).to_bytes(2, "big"))
payload.extend(bitpack_2bit_directions(sub))
return pack_body_tlv(BodyTLV.BODY_2BIT_CHUNK, bytes(payload))
def estimate_body_2bit_bytes(snake_len: int) -> int:
used_bits = max(0, (snake_len - 1) * 2)
return (used_bits + 7) // 8
# --- State Delta ---
@dataclass
class SnakeDelta:
snake_id: int
head_moved: bool
tail_removed: bool
grew: bool
blocked: bool
new_head_x: int = 0
new_head_y: int = 0
direction: Direction = Direction.RIGHT
def build_state_delta_body(
*, update_id: int, changes: Sequence[SnakeDelta], apples_added: Sequence[Tuple[int, int]], apples_removed: Sequence[Tuple[int, int]]
) -> bytes:
body = bytearray()
body.extend(int(update_id & 0xFFFF).to_bytes(2, "big"))
# snakes
body.extend(quic_varint_encode(len(changes)))
for ch in changes:
body.append(ch.snake_id & 0xFF)
flags = (
(1 if ch.head_moved else 0)
| ((1 if ch.tail_removed else 0) << 1)
| ((1 if ch.grew else 0) << 2)
| ((1 if ch.blocked else 0) << 3)
)
body.append(flags & 0xFF)
body.append(int(ch.direction) & 0x03)
if ch.head_moved:
body.append(ch.new_head_x & 0xFF)
body.append(ch.new_head_y & 0xFF)
# apples added
body.extend(quic_varint_encode(len(apples_added)))
for ax, ay in apples_added:
body.append(ax & 0xFF)
body.append(ay & 0xFF)
# apples removed
body.extend(quic_varint_encode(len(apples_removed)))
for rx, ry in apples_removed:
body.append(rx & 0xFF)
body.append(ry & 0xFF)
return bytes(body)
def build_state_delta(
*, version: int, seq: int, tick: int, update_id: int, changes: Sequence[SnakeDelta], apples_added: Sequence[Tuple[int, int]], apples_removed: Sequence[Tuple[int, int]]
) -> bytes:
header = pack_header(version, PacketType.STATE_DELTA, 0, seq, tick & 0xFFFF)
body = build_state_delta_body(update_id=update_id, changes=changes, apples_added=apples_added, apples_removed=apples_removed)
return header + body
def build_part(
*,
version: int,
seq: int,
tick: int,
update_id: int,
part_index: int,
parts_total: int,
inner_type: PacketType,
chunk_payload: bytes,
) -> bytes:
header = pack_header(version, PacketType.PART, 0, seq, tick & 0xFFFF)
body = bytearray()
body.extend(int(update_id & 0xFFFF).to_bytes(2, "big"))
body.append(part_index & 0xFF)
body.append(parts_total & 0xFF)
body.append(int(inner_type) & 0xFF)
# include the chunk payload bytes
body.extend(chunk_payload)
return header + bytes(body)

84
server/quic_transport.py Normal file
View File

@@ -0,0 +1,84 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import Awaitable, Callable, Dict, Optional
from .transport import DatagramServerTransport, OnDatagram, TransportPeer
import logging
try:
from aioquic.asyncio import QuicConnectionProtocol, serve
from aioquic.quic.configuration import QuicConfiguration
from aioquic.quic.events import DatagramFrameReceived, ProtocolNegotiated
except Exception: # pragma: no cover - optional dependency not installed in skeleton
QuicConnectionProtocol = object # type: ignore
QuicConfiguration = object # type: ignore
serve = None # type: ignore
DatagramFrameReceived = object # type: ignore
ProtocolNegotiated = object # type: ignore
class GameQuicProtocol(QuicConnectionProtocol): # type: ignore[misc]
def __init__(self, *args, on_datagram: OnDatagram, peers: Dict[int, "GameQuicProtocol"], **kwargs):
super().__init__(*args, **kwargs)
self._on_datagram = on_datagram
self._peers = peers
self._peer_id: Optional[int] = None
def quic_event_received(self, event) -> None: # type: ignore[override]
if isinstance(event, ProtocolNegotiated):
# Register by connection id
self._peer_id = int(self._quic.connection_id) # type: ignore[attr-defined]
self._peers[self._peer_id] = self
logging.info("QUIC peer connected: id=%s", self._peer_id)
elif isinstance(event, DatagramFrameReceived):
# Schedule async callback
if self._peer_id is None:
return
peer = TransportPeer(addr=self)
logging.debug("QUIC datagram received from id=%s, %d bytes", self._peer_id, len(event.data))
asyncio.ensure_future(self._on_datagram(bytes(event.data), peer))
async def send_datagram(self, data: bytes) -> None:
logging.debug("QUIC send datagram: %d bytes", len(data))
self._quic.send_datagram_frame(data) # type: ignore[attr-defined]
self.transmit() # Send queued QUIC packets immediately
class QuicWebTransportServer(DatagramServerTransport):
def __init__(self, host: str, port: int, certfile: str, keyfile: str, on_datagram: OnDatagram):
if serve is None:
raise RuntimeError("aioquic is not installed. Please `pip install aioquic`.")
self.host = host
self.port = port
self.certfile = certfile
self.keyfile = keyfile
self._on_datagram = on_datagram
self._server = None
self._peers: Dict[int, GameQuicProtocol] = {}
async def send(self, data: bytes, peer: TransportPeer) -> None:
proto = peer.addr # expected GameQuicProtocol
if isinstance(proto, GameQuicProtocol):
await proto.send_datagram(data)
async def run(self) -> None:
configuration = QuicConfiguration(
is_client=False,
alpn_protocols=["h3"],
max_datagram_frame_size=65536 # Enable QUIC datagrams
)
configuration.load_cert_chain(self.certfile, self.keyfile)
def _create_protocol(*args, **kwargs):
return GameQuicProtocol(*args, on_datagram=self._on_datagram, peers=self._peers, **kwargs)
logging.info("QUIC datagram server listening on %s:%d", self.host, self.port)
self._server = await serve(self.host, self.port, configuration=configuration, create_protocol=_create_protocol)
try:
# Wait indefinitely until cancelled (e.g., by KeyboardInterrupt or timeout)
await asyncio.Event().wait()
finally:
self._server.close()

624
server/server.py Normal file
View File

@@ -0,0 +1,624 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
import logging
from typing import Dict, List, Optional, Tuple
from collections import deque
from .config import ServerConfig
from .model import GameState, PlayerSession, Snake, Coord
from .protocol import (
Direction,
InputEvent,
PacketType,
build_join_ack,
build_join_deny,
build_config_update,
build_input_broadcast,
build_state_full,
build_state_full_body,
build_body_tlv_for_dirs,
build_body_2bit_chunk,
build_state_delta,
build_state_delta_body,
build_part,
SnakeDelta,
pack_header,
parse_input,
parse_join,
)
from .transport import DatagramServerTransport, TransportPeer
@dataclass
class ServerRuntime:
config: ServerConfig
state: GameState
seq: int = 0
version: int = 1
update_id: int = 0
def next_seq(self) -> int:
self.seq = (self.seq + 1) & 0xFFFF
return self.seq
def next_update_id(self) -> int:
self.update_id = (self.update_id + 1) & 0xFFFF
return self.update_id
class GameServer:
def __init__(self, transport: DatagramServerTransport, config: ServerConfig):
self.transport = transport
self.runtime = ServerRuntime(config=config, state=GameState(config.width, config.height))
self.sessions: Dict[int, PlayerSession] = {}
self._config_update_interval_ticks = 50 # periodic resend
logging.info("GameServer created: field=%dx%d, tick_rate=%d TPS, wrap=%s", config.width, config.height, config.tick_rate, config.wrap_edges)
async def on_datagram(self, data: bytes, peer: TransportPeer) -> None:
# Minimal header parse
if len(data) < 5:
return
ver = data[0]
ptype = PacketType(data[1])
flags = data[2]
# seq = int.from_bytes(data[3:5], 'big') # currently unused
off = 5
if ptype == PacketType.JOIN:
await self._handle_join(data, off, peer)
elif ptype == PacketType.INPUT:
await self._handle_input(data, off, peer)
else:
# ignore others in skeleton
return
async def broadcast_config_update(self) -> None:
r = self.runtime
payload = build_config_update(
version=r.version,
seq=r.next_seq(),
tick=r.state.tick & 0xFFFF,
tick_rate=r.config.tick_rate,
wrap_edges=r.config.wrap_edges,
apples_per_snake=r.config.apples_per_snake,
apples_cap=r.config.apples_cap,
)
# Broadcast to all sessions (requires real peer handles)
for session in list(self.sessions.values()):
await self.transport.send(payload, TransportPeer(session.peer))
logging.debug("Broadcasted config_update to %d sessions", len(self.sessions))
async def relay_input_broadcast(
self,
*,
from_player_id: int,
input_seq: int,
base_tick: int,
events: List[InputEvent],
apply_at_tick: int | None,
) -> None:
r = self.runtime
payload = build_input_broadcast(
version=r.version,
seq=r.next_seq(),
tick=r.state.tick & 0xFFFF,
player_id=from_player_id,
input_seq=input_seq,
base_tick=base_tick & 0xFFFF,
events=events,
apply_at_tick=apply_at_tick,
)
for session in list(self.sessions.values()):
if session.player_id == from_player_id:
continue
await self.transport.send(payload, TransportPeer(session.peer))
async def tick_loop(self) -> None:
r = self.runtime
tick_duration = 1.0 / max(1, r.config.tick_rate)
next_cfg_resend = self._config_update_interval_ticks
logging.info("Tick loop started at %.2f TPS", 1.0 / tick_duration)
while True:
start = asyncio.get_event_loop().time()
# process inputs, update snakes, collisions, apples, deltas
await self._simulate_tick()
r.state.tick = (r.state.tick + 1) & 0xFFFFFFFF
next_cfg_resend -= 1
if next_cfg_resend <= 0:
await self.broadcast_config_update()
next_cfg_resend = self._config_update_interval_ticks
elapsed = asyncio.get_event_loop().time() - start
await asyncio.sleep(max(0.0, tick_duration - elapsed))
# --- Simulation ---
def _consume_input_for_snake(self, s: Snake) -> None:
# Consume at most one input; skip 180° turns when length>1
while s.input_buf:
nd = s.input_buf[0]
# 180-degree check
if s.length > 1 and ((int(nd) ^ int(s.direction)) == 2):
s.input_buf.popleft()
continue
# Accept
s.direction = nd
s.input_buf.popleft()
break
def _step_from(self, x: int, y: int, d: Direction) -> Tuple[int, int, bool]:
dx = 1 if d == Direction.RIGHT else -1 if d == Direction.LEFT else 0
dy = 1 if d == Direction.DOWN else -1 if d == Direction.UP else 0
nx, ny = x + dx, y + dy
st = self.runtime.state
wrap = self.runtime.config.wrap_edges
wrapped = False
if wrap:
if nx < 0:
nx = st.width - 1; wrapped = True
elif nx >= st.width:
nx = 0; wrapped = True
if ny < 0:
ny = st.height - 1; wrapped = True
elif ny >= st.height:
ny = 0; wrapped = True
return nx, ny, wrapped
async def _simulate_tick(self) -> None:
st = self.runtime.state
cfg = self.runtime.config
apples_eaten: List[Coord] = []
apples_before = set(st.apples)
changes: List[SnakeDelta] = []
# Prepare snapshot of tails to allow moving into own tail when it vacates
tails: Dict[int, Coord] = {}
for sid, s in st.snakes.items():
if s.length > 1:
tails[sid] = s.body[-1]
# Process snakes
for sid, s in st.snakes.items():
# Consume one input if available
self._consume_input_for_snake(s)
hx, hy = s.body[0]
nx, ny, wrapped = self._step_from(hx, hy, s.direction)
# Check wall if no wrap
obstacle = False
if not self.runtime.config.wrap_edges and not st.in_bounds(nx, ny):
obstacle = True
else:
# Bounds correction already handled by _step_from
# Occupancy check
occ = st.occupancy.get((nx, ny))
if occ is not None:
# Allow moving into own tail if it will vacate (not growing)
own_tail_ok = (s.length > 1 and (nx, ny) == tails.get(sid))
if not own_tail_ok:
obstacle = True
if obstacle:
s.blocked = True
# shrink tail by 1 (to min 1)
if s.length > 1:
tx, ty = s.body.pop()
st.occupancy.pop((tx, ty), None)
changes.append(
SnakeDelta(
snake_id=sid,
head_moved=False,
tail_removed=(s.length > 1),
grew=False,
blocked=True,
new_head_x=hx,
new_head_y=hy,
direction=s.direction,
)
)
continue
# Move or grow
s.blocked = False
will_grow = (nx, ny) in st.apples
# Add new head
s.body.appendleft((nx, ny))
st.occupancy[(nx, ny)] = (sid, 0)
if will_grow:
# eat apple; no tail removal this tick
st.apples.remove((nx, ny))
apples_eaten.append((nx, ny))
changes.append(
SnakeDelta(
snake_id=sid,
head_moved=True,
tail_removed=False,
grew=True,
blocked=False,
new_head_x=nx,
new_head_y=ny,
direction=s.direction,
)
)
else:
# normal move: remove tail (unless length==0)
tx, ty = s.body.pop()
if (tx, ty) in st.occupancy:
st.occupancy.pop((tx, ty), None)
changes.append(
SnakeDelta(
snake_id=sid,
head_moved=True,
tail_removed=True,
grew=False,
blocked=False,
new_head_x=nx,
new_head_y=ny,
direction=s.direction,
)
)
# Replenish apples to target
self._ensure_apples()
apples_after = set(st.apples)
apples_added = sorted(list(apples_after - apples_before))
apples_removed = sorted(list(apples_before - apples_after))
# Broadcast a basic delta (currently full snakes + apples as delta)
update_id = self.runtime.next_update_id()
# Serialize full delta body once, then partition if large
full_body = build_state_delta_body(
update_id=update_id,
changes=changes,
apples_added=apples_added,
apples_removed=apples_removed,
)
MTU = 1200 # soft limit for payload to avoid fragmentation
if len(full_body) <= MTU:
packet = pack_header(self.runtime.version, PacketType.STATE_DELTA, 0, self.runtime.next_seq(), st.tick & 0xFFFF) + full_body
for session in list(self.sessions.values()):
await self.transport.send(packet, TransportPeer(session.peer))
else:
# Partition by splitting snake changes across parts; include apples only in the first part
parts: List[bytes] = []
remaining = list(changes)
idx = 0
part_index = 0
while remaining:
chunk: List[SnakeDelta] = []
# greedy pack until size would exceed MTU
# start with apples only for first part
add_ap = apples_added if part_index == 0 else []
rem_ap = apples_removed if part_index == 0 else []
# Try adding changes one by one
for i, ch in enumerate(remaining):
tmp_body = build_state_delta_body(update_id=update_id, changes=chunk + [ch], apples_added=add_ap, apples_removed=rem_ap)
if len(tmp_body) > MTU and chunk:
break
if len(tmp_body) > MTU:
# even a single change + apples doesn't fit; force single
chunk.append(ch)
i += 1
break
chunk.append(ch)
# Remove chunked items
remaining = remaining[len(chunk) :]
# Build chunk body
chunk_body = build_state_delta_body(update_id=update_id, changes=chunk, apples_added=add_ap, apples_removed=rem_ap)
parts.append(chunk_body)
part_index += 1
# Emit PART packets
total = len(parts)
for i, body in enumerate(parts):
pkt = build_part(
version=self.runtime.version,
seq=self.runtime.next_seq(),
tick=st.tick & 0xFFFF,
update_id=update_id,
part_index=i,
parts_total=total,
inner_type=PacketType.STATE_DELTA,
chunk_payload=body,
)
for session in list(self.sessions.values()):
await self.transport.send(pkt, TransportPeer(session.peer))
# --- Join / Spawn ---
def _allocate_player_id(self) -> Optional[int]:
used = {s.player_id for s in self.sessions.values()}
for pid in range(self.runtime.config.players_max):
if pid not in used:
return pid
return None
def _choose_color_id(self) -> int:
used = {s.color_id for s in self.sessions.values()}
for cid in range(32):
if cid not in used:
return cid
return 0
def _neighbors(self, x: int, y: int) -> List[Tuple[Direction, Coord]]:
return [
(Direction.UP, (x, y - 1)),
(Direction.RIGHT, (x + 1, y)),
(Direction.DOWN, (x, y + 1)),
(Direction.LEFT, (x - 1, y)),
]
def _find_spawn(self) -> Optional[Snake]:
st = self.runtime.state
# Try to find a 3-cell straight strip
for y in range(st.height):
for x in range(st.width):
if not st.cell_free(x, y):
continue
for d, (nx, ny) in self._neighbors(x, y):
# check two cells in direction
x2, y2 = nx, ny
x3, y3 = nx + (1 if d == Direction.RIGHT else -1 if d == Direction.LEFT else 0), ny + (1 if d == Direction.DOWN else -1 if d == Direction.UP else 0)
if st.in_bounds(x2, y2) and st.in_bounds(x3, y3) and st.cell_free(x2, y2) and st.cell_free(x3, y3):
body = [
(x, y),
(x2, y2),
(x3, y3),
]
return Snake(snake_id=-1, head=(x, y), direction=d, body=deque(body))
# Fallback: any single free cell
for y in range(st.height):
for x in range(st.width):
if st.cell_free(x, y):
return Snake(snake_id=-1, head=(x, y), direction=Direction.RIGHT, body=deque([(x, y)]))
return None
def _ensure_apples(self) -> None:
st = self.runtime.state
cfg = self.runtime.config
import random
random.seed(0xC0FFEE)
target = 3 if not self.sessions else min(cfg.apples_cap, max(0, len(self.sessions) * cfg.apples_per_snake))
# grow apples up to target
while len(st.apples) < target:
x = random.randrange(st.width)
y = random.randrange(st.height)
if st.cell_free(x, y) and (x, y) not in st.apples:
st.apples.append((x, y))
# shrink if too many
if len(st.apples) > target:
st.apples = st.apples[:target]
async def _handle_join(self, buf: bytes, off: int, peer: TransportPeer) -> None:
name, preferred, off2 = parse_join(buf, off)
# enforce name <=16 bytes utf-8
name = name.encode("utf-8")[:16].decode("utf-8", errors="ignore")
pid = self._allocate_player_id()
if pid is None:
payload = build_join_deny(version=self.runtime.version, seq=self.runtime.next_seq(), reason="Server full")
await self.transport.send(payload, peer)
logging.warning("JOIN denied: server full (name=%s)", name)
return
# Spawn snake
snake = self._find_spawn()
if snake is None:
payload = build_join_deny(version=self.runtime.version, seq=self.runtime.next_seq(), reason="No free cell, please wait")
await self.transport.send(payload, peer)
logging.warning("JOIN denied: no free cell (name=%s)", name)
return
# Register session and snake
color_id = preferred if (preferred is not None) else self._choose_color_id()
session = PlayerSession(player_id=pid, name=name, color_id=color_id, peer=peer.addr)
self.sessions[pid] = session
snake.snake_id = pid
self.runtime.state.snakes[pid] = snake
self.runtime.state.occupy_snake(snake)
self._ensure_apples()
logging.info("JOIN accepted: player_id=%d name=%s color=%d len=%d at=%s", pid, name, color_id, snake.length, snake.body[0])
# Send join_ack
cfg = self.runtime.config
ack = build_join_ack(
version=self.runtime.version,
seq=self.runtime.next_seq(),
player_id=pid,
color_id=color_id,
width=cfg.width,
height=cfg.height,
tick_rate=cfg.tick_rate,
wrap_edges=cfg.wrap_edges,
apples_per_snake=cfg.apples_per_snake,
apples_cap=cfg.apples_cap,
compression_mode=0 if cfg.compression_mode == "none" else 1,
)
await self.transport.send(ack, peer)
# Send initial state_full (partitioned if needed)
snakes_dirs: List[Tuple[int, int, int, int, List[Direction]]] = []
for s in self.runtime.state.snakes.values():
dirs: List[Direction] = []
# build directions from consecutive coords: from head toward tail
coords = list(s.body)
for i in range(len(coords) - 1):
x0, y0 = coords[i]
x1, y1 = coords[i + 1]
if x1 == x0 and y1 == y0 - 1:
dirs.append(Direction.UP)
elif x1 == x0 + 1 and y1 == y0:
dirs.append(Direction.RIGHT)
elif x1 == x0 and y1 == y0 + 1:
dirs.append(Direction.DOWN)
elif x1 == x0 - 1 and y1 == y0:
dirs.append(Direction.LEFT)
hx, hy = coords[0]
snakes_dirs.append((s.snake_id, s.length, hx, hy, dirs))
# Build body and partition across parts if needed
body = build_state_full_body(snakes=snakes_dirs, apples=self.runtime.state.apples)
MTU = 1200
if len(body) <= MTU:
full = pack_header(
self.runtime.version, PacketType.STATE_FULL, 0, self.runtime.next_seq(), self.runtime.state.tick & 0xFFFF
) + body
await self.transport.send(full, peer)
else:
# Partition by packing whole snakes first, apples only in first part; chunk a single oversized snake using 2-bit chunks
update_id = self.runtime.next_update_id()
parts: List[bytes] = []
# Prepare apples buffer for first part
def encode_apples(apples: List[Coord]) -> bytes:
from .protocol import quic_varint_encode
b = bytearray()
b.extend(quic_varint_encode(len(apples)))
for ax, ay in apples:
b.append(ax & 0xFF)
b.append(ay & 0xFF)
return bytes(b)
apples_encoded = encode_apples(self.runtime.state.apples)
# Cursor over snakes, building per part bodies
remaining = list(snakes_dirs)
first = True
while remaining:
# Start part body with snakes count placeholder; we'll rebuild as we pack
part_snakes: List[bytes] = []
packed_snakes = 0
budget = MTU
# Reserve apples size on first part
apples_this = apples_encoded if first else encode_apples([])
budget -= len(apples_this)
# Greedily add snakes
i = 0
while i < len(remaining):
sid, slen, hx, hy, dirs = remaining[i]
# Build this snake record with chosen TLV
tlv = build_body_tlv_for_dirs(dirs)
record = bytearray()
# one snake: id u8, len u16, head x y, TLV
record.append(sid & 0xFF)
record.extend(int(slen & 0xFFFF).to_bytes(2, "big"))
record.append(hx & 0xFF)
record.append(hy & 0xFF)
record.extend(tlv)
if len(record) <= budget:
part_snakes.append(bytes(record))
budget -= len(record)
packed_snakes += 1
i += 1
continue
# If single snake doesn't fit and no snakes packed yet, chunk this snake
if packed_snakes == 0:
# Use 2-bit chunking; compute directions chunk size to fit budget minus fixed headers (id/len/head + TLV type/len + chunk header)
# Fixed snake header = 1+2+1+1 = 5 bytes; TLV type/len (worst-case 2 bytes for small values) + chunk header 4 bytes
overhead = 5 + 2 + 4
dir_capacity_bytes = max(0, budget - overhead)
if dir_capacity_bytes <= 0:
break
# Convert capacity bytes to number of directions (each 2 bits → 4 dirs per byte)
dir_capacity = dir_capacity_bytes * 4
if dir_capacity <= 0:
break
# Emit at least one direction
dirs_in_chunk = max(1, min(dir_capacity, len(dirs)))
tlv_chunk = build_body_2bit_chunk(dirs, start_index=0, dirs_in_chunk=dirs_in_chunk)
record = bytearray()
record.append(sid & 0xFF)
record.extend(int(slen & 0xFFFF).to_bytes(2, "big"))
record.append(hx & 0xFF)
record.append(hy & 0xFF)
record.extend(tlv_chunk)
if len(record) <= budget:
part_snakes.append(bytes(record))
budget -= len(record)
# Replace remaining snake with truncated version (advance dirs)
remaining[i] = (sid, slen, hx, hy, list(dirs)[dirs_in_chunk:])
packed_snakes += 1
else:
break
else:
break
# Build part body: snakes_count + snakes + apples
body_part = bytearray()
from .protocol import quic_varint_encode
body_part.extend(quic_varint_encode(len(part_snakes)))
for rec in part_snakes:
body_part.extend(rec)
body_part.extend(apples_this)
parts.append(bytes(body_part))
# Advance remaining list
remaining = remaining[packed_snakes:]
first = False
# Emit part packets
total = len(parts)
for i, chunk_body in enumerate(parts):
pkt = build_part(
version=self.runtime.version,
seq=self.runtime.next_seq(),
tick=self.runtime.state.tick & 0xFFFF,
update_id=update_id,
part_index=i,
parts_total=total,
inner_type=PacketType.STATE_FULL,
chunk_payload=chunk_body,
)
await self.transport.send(pkt, peer)
async def _handle_input(self, buf: bytes, off: int, peer: TransportPeer) -> None:
try:
ack_seq, input_seq, base_tick, events, off2 = parse_input(buf, off)
except Exception:
return
# Find player by peer
player_id = None
for pid, sess in self.sessions.items():
if sess.peer is peer.addr:
player_id = pid
break
if player_id is None:
return
logging.debug("INPUT from player_id=%d: %d events (base_tick=%d)", player_id, len(events), base_tick)
# Apply inputs to snake buffer (client-side filtering already applied)
snake = self.runtime.state.snakes.get(player_id)
if snake is not None:
for ev in events:
# Apply input buffer rules from project plan:
# - Max capacity 3
# - If opposite to last buffered, replace last
# - Drop duplicates
# - Overflow: replace last
if len(snake.input_buf) > 0:
last_dir = snake.input_buf[-1]
# Check if opposite (XOR == 2)
if (int(ev.direction) ^ int(last_dir)) == 2:
# Replace last
snake.input_buf[-1] = ev.direction
continue
# Check if duplicate
if ev.direction == last_dir:
continue
# Add to buffer
if len(snake.input_buf) < 3:
snake.input_buf.append(ev.direction)
else:
# Overflow: replace last
snake.input_buf[-1] = ev.direction
# Relay to others immediately for prediction
await self.relay_input_broadcast(
from_player_id=player_id,
input_seq=input_seq,
base_tick=base_tick,
events=events,
apply_at_tick=None,
)
async def main() -> None:
from .transport import InMemoryTransport
cfg = ServerConfig()
server = GameServer(transport=InMemoryTransport(lambda d, p: server.on_datagram(d, p)), config=cfg)
# In-memory transport never returns; run tick loop in parallel
await asyncio.gather(server.transport.run(), server.tick_loop())
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass

77
server/static_server.py Normal file
View File

@@ -0,0 +1,77 @@
from __future__ import annotations
import ssl
import logging
import threading
import json
from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler
from pathlib import Path
from typing import Tuple, Optional
class _Handler(SimpleHTTPRequestHandler):
# Allow passing a base directory and cert_hash_json at construction time
cert_hash_json: Optional[str] = None
def __init__(self, *args, directory: str | None = None, **kwargs):
super().__init__(*args, directory=directory, **kwargs)
def end_headers(self):
# Add no-cache headers for all static files to force browser to fetch fresh versions
self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate')
self.send_header('Pragma', 'no-cache')
self.send_header('Expires', '0')
super().end_headers()
def do_GET(self):
# Intercept /cert-hash.json requests
if self.path == '/cert-hash.json' and self.cert_hash_json:
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(self.cert_hash_json.encode('utf-8'))
else:
# Serve regular static files
super().do_GET()
def start_https_static(
host: str,
port: int,
certfile: str,
keyfile: str,
docroot: str,
cert_hash_json: Optional[str] = None
) -> Tuple[ThreadingHTTPServer, threading.Thread]:
"""Start a simple HTTPS static file server in a background thread.
Args:
host: Host to bind to
port: Port to bind to
certfile: Path to TLS certificate
keyfile: Path to TLS private key
docroot: Document root directory
cert_hash_json: Optional JSON string to serve at /cert-hash.json
Returns the (httpd, thread). Caller is responsible for calling httpd.shutdown()
to stop the server on application exit.
"""
docroot_path = str(Path(docroot).resolve())
# Set class variable for the handler
if cert_hash_json:
_Handler.cert_hash_json = cert_hash_json
def handler(*args, **kwargs):
return _Handler(*args, directory=docroot_path, **kwargs)
httpd = ThreadingHTTPServer((host, port), handler)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(certfile=certfile, keyfile=keyfile)
httpd.socket = ctx.wrap_socket(httpd.socket, server_side=True)
t = threading.Thread(target=httpd.serve_forever, name="https-static", daemon=True)
t.start()
logging.info("HTTPS static server listening on https://%s:%d serving '%s'", host, port, docroot_path)
return httpd, t

59
server/transport.py Normal file
View File

@@ -0,0 +1,59 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
import logging
from typing import Awaitable, Callable, Optional, Tuple
OnDatagram = Callable[[bytes, object], Awaitable[None]]
@dataclass
class TransportPeer:
addr: object # opaque peer handle (e.g., QUIC session)
class DatagramServerTransport:
async def send(self, data: bytes, peer: TransportPeer) -> None:
raise NotImplementedError
async def run(self) -> None:
raise NotImplementedError
class InMemoryTransport(DatagramServerTransport):
"""A test transport that loops datagrams back to registered peers."""
def __init__(self, on_datagram: OnDatagram):
self._on_datagram = on_datagram
self._peers: list[TransportPeer] = []
def register_peer(self, peer: TransportPeer) -> None:
self._peers.append(peer)
async def send(self, data: bytes, peer: TransportPeer) -> None:
# In-memory: deliver only to the addressed peer
if peer in self._peers:
await self._on_datagram(data, peer)
async def run(self) -> None:
logging.info("InMemory transport started (no network)")
await asyncio.Future()
class QuicWebTransportServer(DatagramServerTransport):
"""Placeholder for a real WebTransport (HTTP/3) datagram server.
Integrate with aioquic or another QUIC library and invoke the provided
on_datagram callback when a datagram arrives.
"""
def __init__(self, on_datagram: OnDatagram):
self._on_datagram = on_datagram
async def send(self, data: bytes, peer: TransportPeer) -> None:
raise NotImplementedError("QUIC server not implemented in skeleton")
async def run(self) -> None:
raise NotImplementedError("QUIC server not implemented in skeleton")

142
server/utils.py Normal file
View File

@@ -0,0 +1,142 @@
from __future__ import annotations
import hashlib
import re
import tempfile
from typing import List, Optional
from pathlib import Path
def is_newer(a: int, b: int) -> bool:
"""Return True if 16-bit sequence number a is newer than b (wrap-aware).
Uses half-range window on unsigned 16-bit arithmetic.
"""
return ((a - b) & 0xFFFF) < 0x8000
def get_cert_sha256_hash(cert_path: str) -> str:
"""Calculate SHA-256 hash of a certificate file.
Returns the hash as a lowercase hex string (64 characters).
This hash can be used by WebTransport clients for certificate pinning.
"""
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
# Read the certificate file
with open(cert_path, 'rb') as f:
cert_data = f.read()
# Parse the certificate
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
# Get the DER-encoded certificate and hash it
cert_der = cert.public_bytes(encoding=x509.Encoding.DER)
hash_bytes = hashlib.sha256(cert_der).digest()
# Return as hex string
return hash_bytes.hex()
except Exception as e:
# Fallback: just hash the file contents (less accurate but works)
with open(cert_path, 'rb') as f:
return hashlib.sha256(f.read()).digest().hex()
def split_pem_certificates(pem_data: str) -> List[str]:
"""Split a PEM file containing multiple certificates into individual certs.
Args:
pem_data: String containing one or more PEM certificates
Returns:
List of individual certificate strings (each includes BEGIN/END markers)
"""
# Match all certificate blocks
cert_pattern = r'(-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----)'
certificates = re.findall(cert_pattern, pem_data, re.DOTALL)
return certificates
def combine_cert_chain(cert_path: str, chain_paths: List[str]) -> str:
"""Combine server cert and chain certs into a single properly-formatted PEM file.
Args:
cert_path: Path to server certificate (may contain full chain)
chain_paths: List of paths to intermediate/root certificates (optional)
Returns:
Path to temporary combined certificate file (caller should clean up)
"""
combined_content = []
# Read server certificate file
with open(cert_path, 'r') as f:
cert_data = f.read().strip()
certs = split_pem_certificates(cert_data)
if chain_paths:
# If chain files provided separately, only use first cert from cert_path
if certs:
combined_content.append(certs[0].strip())
else:
combined_content.append(cert_data)
else:
# No separate chain files - include all certs from cert_path (fullchain case)
if certs:
combined_content.extend([c.strip() for c in certs])
else:
combined_content.append(cert_data)
# Read separate chain certificate files
for chain_path in chain_paths:
with open(chain_path, 'r') as f:
chain_data = f.read().strip()
# Each chain file might contain multiple certs
chain_certs = split_pem_certificates(chain_data)
if chain_certs:
combined_content.extend([c.strip() for c in chain_certs])
else:
combined_content.append(chain_data)
# Write to temporary file with proper formatting
# Each cert should be separated by a newline
temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False)
temp_file.write('\n'.join(combined_content))
temp_file.write('\n') # End with newline
temp_file.close()
return temp_file.name
def extract_server_cert(cert_path: str) -> str:
"""Extract just the server certificate from a file that may contain a full chain.
Args:
cert_path: Path to certificate file (may contain chain)
Returns:
Path to temporary file containing only the server certificate
"""
with open(cert_path, 'r') as f:
cert_data = f.read()
# Split into individual certificates
certs = split_pem_certificates(cert_data)
if not certs:
# No certs found, return original path
return cert_path
# First cert is the server cert
server_cert = certs[0].strip()
# Write to temporary file
temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False)
temp_file.write(server_cert)
temp_file.write('\n')
temp_file.close()
return temp_file.name

View File

@@ -0,0 +1,171 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import Callable, Dict, Optional
from .transport import DatagramServerTransport, OnDatagram, TransportPeer
import logging
try:
from aioquic.asyncio import QuicConnectionProtocol, serve
from aioquic.quic.configuration import QuicConfiguration
from aioquic.quic.events import ProtocolNegotiated
from aioquic.h3.connection import H3_ALPN, H3Connection
from aioquic.h3.events import HeadersReceived
# Datagram event names vary by aioquic version; try both
try:
from aioquic.h3.events import DatagramReceived as H3DatagramReceived # type: ignore
except Exception: # pragma: no cover
H3DatagramReceived = object # type: ignore
except Exception: # pragma: no cover - optional dependency not installed
QuicConnectionProtocol = object # type: ignore
QuicConfiguration = object # type: ignore
ProtocolNegotiated = object # type: ignore
H3_ALPN = [] # type: ignore
H3Connection = object # type: ignore
HeadersReceived = object # type: ignore
H3DatagramReceived = object # type: ignore
serve = None # type: ignore
@dataclass
class WTSession:
flow_id: int # HTTP/3 datagram flow id (usually CONNECT stream id)
proto: "GameWTProtocol"
class GameWTProtocol(QuicConnectionProtocol): # type: ignore[misc]
def __init__(self, *args, on_datagram: OnDatagram, sessions: Dict[int, WTSession], **kwargs):
super().__init__(*args, **kwargs)
self._on_datagram = on_datagram
self._sessions = sessions
self._http: Optional[H3Connection] = None # Created after ProtocolNegotiated
logging.debug("GameWTProtocol initialized")
def http_event_received(self, event) -> None: # type: ignore[override]
logging.debug("HTTP event received: %s", type(event).__name__)
# Headers for CONNECT :protocol = webtransport open a session
if isinstance(event, HeadersReceived):
headers = {k.decode().lower(): v.decode() for k, v in event.headers}
logging.debug("HeadersReceived: %s", headers)
method = headers.get(":method")
protocol = headers.get(":protocol") or headers.get("sec-webtransport-protocol")
logging.debug("Method: %s, Protocol: %s", method, protocol)
if method == "CONNECT" and (protocol == "webtransport"):
# In WebTransport over H3, datagrams use the CONNECT stream id as session id
session_id = event.stream_id # type: ignore[attr-defined]
self._sessions[session_id] = WTSession(flow_id=session_id, proto=self)
logging.info("WT CONNECT accepted: session_id=%s", session_id)
# Send 2xx to accept the session
if self._http is not None:
self._http.send_headers(event.stream_id, [(b":status", b"200")])
self.transmit() # Actually send the response to complete handshake
logging.debug("Sent 200 response for WT CONNECT")
else:
logging.warning("Unexpected CONNECT: method=%s, protocol=%s", method, protocol)
elif isinstance(event, H3DatagramReceived): # type: ignore[misc]
# Route datagram to session by stream_id (WebTransport session ID)
# DatagramReceived has: stream_id (session), data (payload)
session_id = getattr(event, "stream_id", None)
data = getattr(event, "data", None)
logging.debug("DatagramReceived event: session_id=%s, data_len=%s, has_data=%s",
session_id, len(data) if data else None, data is not None)
if session_id is None or data is None:
logging.warning("Invalid datagram event: session_id=%s, data=%s", session_id, data)
return
sess = self._sessions.get(session_id)
if not sess:
logging.warning("No session found for datagram: session_id=%s", session_id)
return
peer = TransportPeer(addr=(self, session_id))
logging.info("WT datagram received: session_id=%s, %d bytes", session_id, len(data))
asyncio.ensure_future(self._on_datagram(bytes(data), peer))
def quic_event_received(self, event) -> None: # type: ignore[override]
event_name = type(event).__name__
logging.debug("QUIC event received: %s", event_name)
# Create H3Connection after ALPN protocol is negotiated
if isinstance(event, ProtocolNegotiated):
if event.alpn_protocol in H3_ALPN:
self._http = H3Connection(self._quic, enable_webtransport=True)
logging.info("H3Connection created with WebTransport support (ALPN: %s)", event.alpn_protocol)
else:
logging.warning("Unexpected ALPN protocol: %s", event.alpn_protocol)
# Pass event to HTTP layer if connection is established
if self._http is not None:
for http_event in self._http.handle_event(event):
self.http_event_received(http_event)
async def send_h3_datagram(self, flow_id: int, data: bytes) -> None:
try:
if self._http is None:
logging.warning("Cannot send datagram: H3Connection not established")
return
logging.debug("WT send datagram: flow_id=%s, %d bytes", flow_id, len(data))
self._http.send_datagram(stream_id=flow_id, data=data)
self.transmit() # Send queued QUIC packets immediately
except Exception as e:
logging.debug("Failed to send datagram: %s", e)
class WebTransportServer(DatagramServerTransport):
"""HTTP/3 WebTransport datagram server using aioquic.
Accepts CONNECT requests with :protocol = webtransport and exchanges
RFC 9297 HTTP/3 datagrams bound to the CONNECT stream id.
"""
def __init__(self, host: str, port: int, certfile: str, keyfile: str, on_datagram: OnDatagram):
if serve is None:
raise RuntimeError("aioquic is not installed. Please `pip install aioquic`.")
self.host = host
self.port = port
self.certfile = certfile
self.keyfile = keyfile
self._on_datagram = on_datagram
self._server = None
self._sessions: Dict[int, WTSession] = {}
async def send(self, data: bytes, peer: TransportPeer) -> None:
# peer.addr is (protocol, flow_id)
if isinstance(peer.addr, tuple) and len(peer.addr) == 2:
proto, flow_id = peer.addr
if isinstance(proto, GameWTProtocol) and isinstance(flow_id, int):
await proto.send_h3_datagram(flow_id, data)
async def run(self) -> None:
from aioquic.quic.logger import QuicFileLogger
import os
# Enable QUIC logging if debug level
quic_logger = None
if logging.getLogger().getEffectiveLevel() <= logging.DEBUG:
log_dir = os.path.join(os.getcwd(), "quic_logs")
os.makedirs(log_dir, exist_ok=True)
quic_logger = QuicFileLogger(log_dir)
logging.info("QUIC logging enabled in: %s", log_dir)
configuration = QuicConfiguration(
is_client=False,
alpn_protocols=["h3"], # HTTP/3 with WebTransport support
max_datagram_frame_size=65536, # Enable QUIC datagrams (required for WebTransport)
quic_logger=quic_logger
)
configuration.load_cert_chain(self.certfile, self.keyfile)
logging.debug("QUIC configuration: ALPN=%s, max_datagram_frame_size=%d",
configuration.alpn_protocols, configuration.max_datagram_frame_size or 0)
def _create_protocol(*args, **kwargs):
return GameWTProtocol(*args, on_datagram=self._on_datagram, sessions=self._sessions, **kwargs)
logging.info("WebTransport (H3) server listening on %s:%d", self.host, self.port)
self._server = await serve(self.host, self.port, configuration=configuration, create_protocol=_create_protocol)
try:
# Wait indefinitely until cancelled (e.g., by KeyboardInterrupt or timeout)
await asyncio.Event().wait()
finally:
self._server.close()

59
test_http3.py Normal file
View File

@@ -0,0 +1,59 @@
#!/usr/bin/env python3
"""Simple HTTP/3 client to test the server."""
import asyncio
import logging
from aioquic.asyncio import connect
from aioquic.quic.configuration import QuicConfiguration
logging.basicConfig(level=logging.DEBUG)
async def test_http3(url: str):
"""Test HTTP/3 connection to the server."""
print(f"Testing HTTP/3 connection to: {url}")
# Parse URL
if url.startswith("https://"):
url = url[8:]
host, port = url.split(":")
port = int(port.rstrip("/"))
print(f"Connecting to {host}:{port}...")
# Create QUIC configuration
configuration = QuicConfiguration(
is_client=True,
alpn_protocols=["h3"],
verify_mode=0 # Skip certificate verification for testing
)
try:
async with connect(
host,
port,
configuration=configuration,
) as protocol:
print(f"✓ QUIC connection established!")
print(f" ALPN protocol: {protocol._quic.tls.alpn_negotiated}")
print(f" Remote address: {protocol._quic.remote_address}")
# Just test the connection, don't send HTTP requests yet
await asyncio.sleep(1)
print("✓ Connection successful!")
return True
except Exception as e:
print(f"✗ Connection failed: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
import sys
url = sys.argv[1] if len(sys.argv) > 1 else "https://127.0.0.1:4433"
success = asyncio.run(test_http3(url))
sys.exit(0 if success else 1)

188
test_webtransport_client.py Normal file
View File

@@ -0,0 +1,188 @@
#!/usr/bin/env python3
"""
WebTransport test client to verify server is working correctly.
"""
import asyncio
import logging
import sys
from typing import Optional
from aioquic.asyncio.client import connect
from aioquic.asyncio.protocol import QuicConnectionProtocol
from aioquic.h3.connection import H3Connection
from aioquic.h3.events import DatagramReceived, H3Event, HeadersReceived
from aioquic.quic.configuration import QuicConfiguration
from aioquic.quic.events import QuicEvent
logging.basicConfig(
level=logging.DEBUG,
format="[%(asctime)s] %(levelname)s: %(message)s"
)
logger = logging.getLogger(__name__)
class WebTransportClient(QuicConnectionProtocol):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._http: Optional[H3Connection] = None
self._session_id: Optional[int] = None
self._session_established = asyncio.Event()
self._received_datagrams = []
def quic_event_received(self, event: QuicEvent) -> None:
logger.debug(f"QUIC event: {type(event).__name__}")
# Create H3Connection on first event
if self._http is None:
self._http = H3Connection(self._quic, enable_webtransport=True)
logger.info("H3Connection created with WebTransport support")
# Process event through HTTP/3 layer
if self._http is not None:
for http_event in self._http.handle_event(event):
self.http_event_received(http_event)
def http_event_received(self, event: H3Event) -> None:
logger.debug(f"HTTP event: {type(event).__name__}")
if isinstance(event, HeadersReceived):
headers = dict(event.headers)
status = headers.get(b":status", b"").decode()
logger.info(f"Received headers: status={status}")
if status == "200" and self._session_id is None:
self._session_id = event.stream_id
logger.info(f"WebTransport session established! session_id={self._session_id}")
self._session_established.set()
else:
logger.error(f"WebTransport session rejected: status={status}, headers={headers}")
elif isinstance(event, DatagramReceived):
logger.info(f"Received datagram: {len(event.data)} bytes, flow_id={event.flow_id}")
self._received_datagrams.append(event.data)
async def establish_session(self, path: str = "/") -> None:
"""Send WebTransport CONNECT request."""
logger.info(f"Sending WebTransport CONNECT request for path: {path}")
# Allocate stream ID for CONNECT request
stream_id = self._quic.get_next_available_stream_id()
# Send CONNECT request with WebTransport protocol
headers = [
(b":method", b"CONNECT"),
(b":scheme", b"https"),
(b":authority", b"np.vaku.org.ua:4433"),
(b":path", path.encode()),
(b":protocol", b"webtransport"),
]
self._http.send_headers(stream_id=stream_id, headers=headers, end_stream=False)
self.transmit()
logger.info(f"WebTransport CONNECT sent on stream {stream_id}")
# Wait for session to be established
try:
await asyncio.wait_for(self._session_established.wait(), timeout=5.0)
logger.info("✓ WebTransport session established successfully!")
return True
except asyncio.TimeoutError:
logger.error("✗ Timeout waiting for WebTransport session acceptance")
return False
async def send_datagram(self, data: bytes) -> None:
"""Send a datagram over the WebTransport session."""
if self._session_id is None:
logger.error("Cannot send datagram: session not established")
return
logger.info(f"Sending datagram: {len(data)} bytes")
self._http.send_datagram(stream_id=self._session_id, data=data)
self.transmit()
async def wait_for_datagram(self, timeout: float = 2.0) -> Optional[bytes]:
"""Wait for a datagram response."""
logger.info(f"Waiting for datagram response (timeout={timeout}s)...")
start = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start < timeout:
if self._received_datagrams:
return self._received_datagrams.pop(0)
await asyncio.sleep(0.1)
logger.warning("No datagram received within timeout")
return None
async def test_webtransport(host: str, port: int, verify_mode: bool = False):
"""Test WebTransport connection to the server."""
logger.info(f"Testing WebTransport connection to {host}:{port}")
# Configure QUIC
configuration = QuicConfiguration(
is_client=True,
alpn_protocols=["h3"], # HTTP/3
max_datagram_frame_size=65536, # Enable QUIC datagrams (required for WebTransport)
verify_mode=0 if not verify_mode else 2, # Skip cert verification for self-signed
)
try:
async with connect(
host,
port,
configuration=configuration,
create_protocol=WebTransportClient,
) as client:
logger.info("✓ QUIC connection established")
client = client # type: WebTransportClient
# Establish WebTransport session
if not await client.establish_session("/"):
logger.error("✗ Failed to establish WebTransport session")
return False
# Send a test datagram (simple game JOIN packet)
# Format: version(1) | type(1) | flags(1) | seq(2) | name_len(1) | name
test_packet = bytes([
0x01, # version
0x01, # JOIN packet type
0x00, # flags
0x00, 0x01, # seq=1
0x04, # name length
ord('T'), ord('E'), ord('S'), ord('T'), # name="TEST"
])
await client.send_datagram(test_packet)
logger.info("✓ Test JOIN packet sent")
# Wait for response
response = await client.wait_for_datagram(timeout=3.0)
if response:
logger.info(f"✓ Received response: {len(response)} bytes")
logger.info(f" Response hex: {response.hex()}")
return True
else:
logger.warning("Server did not respond to JOIN packet (might be normal if not implemented)")
return True # Connection worked even if no response
except Exception as e:
logger.error(f"✗ Connection failed: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python test_webtransport_client.py <host> [port]")
print("Example: python test_webtransport_client.py np.vaku.org.ua 4433")
sys.exit(1)
host = sys.argv[1]
port = int(sys.argv[2]) if len(sys.argv) > 2 else 4433
success = asyncio.run(test_webtransport(host, port))
sys.exit(0 if success else 1)