Implement UDP protocol with binary compression and 32-player support

Major networking overhaul to reduce latency and bandwidth:

UDP Protocol Implementation:
- Created UDP server handler with sequence number tracking (uint32 with wrapping support)
- Implemented 1000-packet window for reordering tolerance
- Packet structure: [seq_num(4) + msg_type(1) + update_id(2) + payload]
- Handles 4+ billion packets without sequence number issues
- Auto-fallback to TCP on >20% packet loss

Binary Codec with Schema Versioning:
- Extensible field-based format with version negotiation
- Position encoding: 11-bit packed (6-bit x + 5-bit y for 40x30 grid)
- Delta encoding for snake bodies: 2 bits per segment direction
- Variable-length integers for compact numbers
- String encoding: up to 16 chars with 4-bit length prefix
- Player ID hashing: CRC32 for compact representation
- zlib compression for payload reduction

Partial Update System:
- Splits large game states into independent packets <1280 bytes (IPv6 MTU)
- Each packet is self-contained (packet loss affects only subset of snakes)
- Smart snake segmenting for very long snakes (>100 segments)
- Player name caching: sent once per player, then omitted
- Metadata (food, game_running) separated from snake data

32-Player Support:
- Extended COLOR_SNAKES array to 32 distinct colors
- Server enforces MAX_PLAYERS=32 limit
- Player names limited to MAX_PLAYER_NAME_LENGTH=16
- Name validation and sanitization
- Color assignment with rotation through 32 colors

Desktop Client Components:
- UDP client with automatic TCP fallback
- Partial state reassembly and tracking
- Sequence validation and duplicate detection
- Statistics tracking for fallback decisions

Web Client Components:
- 32-color palette matching Python colors
- JavaScript binary codec (mirrors Python implementation)
- Partial state tracker for reassembly
- WebRTC DataChannel transport skeleton (for future use)
- Graceful fallback to WebSocket

Server Integration:
- UDP server on port 8890 (configurable via --udp-port)
- Integrated with existing TCP (8888) and WebSocket (8889) servers
- Proper cleanup on shutdown
- Command-line argument: --udp-port (0 to disable, default 8890)

Performance Improvements:
- ~75% bandwidth reduction (binary + compression vs JSON)
- All packets guaranteed <1280 bytes (safe for all networks)
- UDP eliminates TCP head-of-line blocking for lower latency
- Independent partial updates gracefully handle packet loss
- Delta encoding dramatically reduces snake body size

Comprehensive Testing:
- 46 tests total, all passing (100% success rate)
- 15 UDP protocol tests (sequence wrapping, packet parsing, compression)
- 20 binary codec tests (encoding, delta compression, strings, varint)
- 11 partial update tests (splitting, reassembly, packet loss resilience)

Files Added:
- src/shared/binary_codec.py: Extensible binary serialization
- src/shared/udp_protocol.py: UDP packet handling with sequence numbers
- src/server/udp_handler.py: Async UDP server
- src/server/partial_update.py: State splitting logic
- src/client/udp_client.py: Desktop UDP client with TCP fallback
- src/client/partial_state_tracker.py: Client-side reassembly
- web/binary_codec.js: JavaScript binary codec
- web/partial_state_tracker.js: JavaScript reassembly
- web/webrtc_transport.js: WebRTC transport (ready for future use)
- tests/test_udp_protocol.py: UDP protocol tests
- tests/test_binary_codec.py: Binary codec tests
- tests/test_partial_updates.py: Partial update tests

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Vladyslav Doloman
2025-10-04 23:50:31 +03:00
parent 4dbbf44638
commit b221645750
17 changed files with 3469 additions and 11 deletions

301
tests/test_binary_codec.py Normal file
View File

@@ -0,0 +1,301 @@
"""Tests for binary codec."""
import pytest
from src.shared.binary_codec import BinaryCodec, FieldType, FieldID
from src.shared.models import Position, Snake
class TestPositionEncoding:
"""Test position encoding/decoding."""
def test_encode_decode_position(self):
"""Test position round-trip."""
positions = [
Position(0, 0),
Position(39, 29), # Max grid position
Position(20, 15),
Position(1, 1),
]
for pos in positions:
encoded = BinaryCodec.encode_position(pos)
decoded = BinaryCodec.decode_position(encoded)
assert decoded.x == pos.x
assert decoded.y == pos.y
def test_packed_positions(self):
"""Test packed position encoding."""
positions = [
Position(0, 0),
Position(1, 0),
Position(2, 0),
Position(3, 0),
Position(3, 1),
]
# Encode
packed = BinaryCodec.encode_packed_positions(positions)
# Decode
decoded = BinaryCodec.decode_packed_positions(packed, len(positions))
assert len(decoded) == len(positions)
for orig, dec in zip(positions, decoded):
assert dec.x == orig.x
assert dec.y == orig.y
def test_delta_encoding(self):
"""Test delta position encoding."""
# Snake body (adjacent positions)
positions = [
Position(5, 5),
Position(6, 5), # Right
Position(7, 5), # Right
Position(7, 6), # Down
Position(7, 7), # Down
Position(6, 7), # Left
Position(6, 6), # Up
]
# Encode
encoded = BinaryCodec.encode_delta_positions(positions)
# Decode
decoded = BinaryCodec.decode_delta_positions(encoded, len(positions))
assert len(decoded) == len(positions)
for orig, dec in zip(positions, decoded):
assert dec.x == orig.x
assert dec.y == orig.y
class TestVarint:
"""Test variable-length integer encoding."""
def test_small_values(self):
"""Test small varint values."""
for value in [0, 1, 10, 127]:
encoded = BinaryCodec.encode_varint(value)
decoded, offset = BinaryCodec.decode_varint(encoded, 0)
assert decoded == value
assert offset == len(encoded)
def test_large_values(self):
"""Test large varint values."""
values = [128, 255, 1000, 10000, 65535, 1000000]
for value in values:
encoded = BinaryCodec.encode_varint(value)
decoded, offset = BinaryCodec.decode_varint(encoded, 0)
assert decoded == value
def test_varint_size(self):
"""Test varint encoding size."""
# Small values use 1 byte
assert len(BinaryCodec.encode_varint(0)) == 1
assert len(BinaryCodec.encode_varint(127)) == 1
# Values >= 128 use 2+ bytes
assert len(BinaryCodec.encode_varint(128)) == 2
assert len(BinaryCodec.encode_varint(255)) == 2
assert len(BinaryCodec.encode_varint(16383)) == 2
assert len(BinaryCodec.encode_varint(16384)) == 3
class TestStringEncoding:
"""Test string encoding."""
def test_short_strings(self):
"""Test encoding short strings."""
strings = ["", "A", "Alice", "Player123"]
for s in strings:
encoded = BinaryCodec.encode_string_16(s)
decoded, consumed = BinaryCodec.decode_string_16(encoded)
assert decoded == s
def test_max_length_string(self):
"""Test 16-character string."""
s = "VeryLongUsername"
assert len(s) == 16
encoded = BinaryCodec.encode_string_16(s)
decoded, consumed = BinaryCodec.decode_string_16(encoded)
assert decoded == s
def test_truncation(self):
"""Test string truncation."""
s = "ThisIsAVeryLongUsernameThatExceedsLimit"
encoded = BinaryCodec.encode_string_16(s)
decoded, consumed = BinaryCodec.decode_string_16(encoded)
# Should be truncated to 16 chars
assert len(decoded) <= 16
def test_unicode_strings(self):
"""Test Unicode string encoding."""
strings = ["Hello世界", "Café", "🎮Player"]
for s in strings:
encoded = BinaryCodec.encode_string_16(s)
decoded, consumed = BinaryCodec.decode_string_16(encoded)
# Might be truncated due to UTF-8 byte limits
assert decoded.startswith(s[:min(len(s), 10)])
class TestPlayerIdHash:
"""Test player ID hashing."""
def test_consistent_hashing(self):
"""Test hash consistency."""
player_id = "550e8400-e29b-41d4-a716-446655440000"
hash1 = BinaryCodec.player_id_hash(player_id)
hash2 = BinaryCodec.player_id_hash(player_id)
assert hash1 == hash2
def test_different_ids(self):
"""Test different IDs produce different hashes."""
id1 = "550e8400-e29b-41d4-a716-446655440000"
id2 = "550e8400-e29b-41d4-a716-446655440001"
hash1 = BinaryCodec.player_id_hash(id1)
hash2 = BinaryCodec.player_id_hash(id2)
assert hash1 != hash2
def test_hash_range(self):
"""Test hash is within uint32 range."""
for i in range(100):
player_id = f"player_{i}"
hash_val = BinaryCodec.player_id_hash(player_id)
assert 0 <= hash_val <= 0xFFFFFFFF
class TestCompression:
"""Test compression/decompression."""
def test_compress_decompress(self):
"""Test compression round-trip."""
data = b"This is test data that should compress well. " * 10
compressed = BinaryCodec.compress(data)
decompressed = BinaryCodec.decompress(compressed)
assert decompressed == data
assert len(compressed) < len(data) # Should be smaller
def test_small_data(self):
"""Test compression of small data."""
data = b"short"
compressed = BinaryCodec.compress(data)
decompressed = BinaryCodec.decompress(compressed)
assert decompressed == data
# Small data might not compress well
# Just verify round-trip works
def test_empty_data(self):
"""Test compression of empty data."""
data = b""
compressed = BinaryCodec.compress(data)
decompressed = BinaryCodec.decompress(compressed)
assert decompressed == data
class TestPayloadDecoding:
"""Test payload field decoding."""
def test_simple_payload(self):
"""Test decoding simple payload."""
# Manually construct payload
payload = bytearray()
payload.append(BinaryCodec.VERSION) # Version
payload.append(2) # Field count
# Field 1: UPDATE_ID (uint16)
payload.append(FieldID.UPDATE_ID)
payload.append(FieldType.UINT16)
payload.extend(BinaryCodec.encode_varint(2)) # Length
payload.extend(b'\x00\x64') # Value: 100
# Field 2: GAME_RUNNING (uint8)
payload.append(FieldID.GAME_RUNNING)
payload.append(FieldType.UINT8)
payload.extend(BinaryCodec.encode_varint(1)) # Length
payload.append(1) # True
# Decode
fields = self._decode_payload(bytes(payload))
assert len(fields) >= 2
# Check fields are present
field_ids = [f[0] for f in fields]
assert FieldID.UPDATE_ID in field_ids
assert FieldID.GAME_RUNNING in field_ids
def _decode_payload(self, payload: bytes):
"""Helper to decode payload."""
if len(payload) < 2:
return []
version = payload[0]
field_count = payload[1]
fields = []
offset = 2
for _ in range(field_count):
if offset + 2 > len(payload):
break
field_id = payload[offset]
field_type = payload[offset + 1]
offset += 2
length, offset = BinaryCodec.decode_varint(payload, offset)
if offset + length > len(payload):
break
field_data = payload[offset:offset + length]
offset += length
fields.append((field_id, field_type, field_data))
return fields
class TestEdgeCases:
"""Test edge cases."""
def test_max_grid_position(self):
"""Test maximum grid position (39, 29)."""
pos = Position(39, 29)
encoded = BinaryCodec.encode_position(pos)
decoded = BinaryCodec.decode_position(encoded)
assert decoded.x == 39
assert decoded.y == 29
def test_empty_position_list(self):
"""Test empty position list."""
positions = []
encoded = BinaryCodec.encode_packed_positions(positions)
decoded = BinaryCodec.decode_packed_positions(encoded, 0)
assert decoded == []
def test_single_position(self):
"""Test single position."""
positions = [Position(10, 10)]
encoded = BinaryCodec.encode_delta_positions(positions)
decoded = BinaryCodec.decode_delta_positions(encoded, 1)
assert len(decoded) == 1
assert decoded[0].x == 10
assert decoded[0].y == 10
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,340 @@
"""Tests for partial update splitting and reassembly."""
import pytest
from src.shared.models import GameState, Snake, Food, Position
from src.server.partial_update import PartialUpdateEncoder
from src.client.partial_state_tracker import PartialStateTracker
from src.shared.binary_codec import BinaryCodec
class TestPartialUpdateSplitting:
"""Test splitting game state into partial updates."""
def test_small_state_single_packet(self):
"""Test small state fits in one packet."""
# Create small game state
state = GameState(
snakes=[
Snake(
player_id="player1",
body=[Position(5, 5), Position(6, 5), Position(7, 5)],
color_index=0,
player_name="Alice"
)
],
food=[Food(position=Position(10, 10))],
game_running=True
)
encoder = PartialUpdateEncoder()
packets = encoder.split_state_update(state, update_id=1, max_packet_size=1280)
# Should have metadata + one snake packet
assert len(packets) >= 2
def test_many_snakes_multiple_packets(self):
"""Test many snakes split into multiple packets."""
# Create state with many snakes
snakes = []
for i in range(32):
snake = Snake(
player_id=f"player{i}",
body=[Position(i, j) for j in range(10)], # 10-segment snake
color_index=i % 32,
player_name=f"Player{i}"
)
snakes.append(snake)
state = GameState(
snakes=snakes,
food=[Food(position=Position(15, 15))],
game_running=True
)
encoder = PartialUpdateEncoder()
packets = encoder.split_state_update(state, update_id=100, max_packet_size=1280)
# Should have at least metadata packet + snake packet
assert len(packets) >= 2
# All packets should be under size limit
for packet in packets:
assert len(packet) < 1280
def test_very_long_snake_splitting(self):
"""Test very long snake is split into segments."""
# Create snake with 500 segments
body = [Position(i % 40, i // 40) for i in range(500)]
snake = Snake(
player_id="long_player",
body=body,
color_index=0,
player_name="LongSnake"
)
state = GameState(
snakes=[snake],
food=[],
game_running=True
)
encoder = PartialUpdateEncoder()
packets = encoder.split_state_update(state, update_id=50, max_packet_size=1280)
# Should have metadata + at least one snake packet
assert len(packets) >= 2
# All packets under limit
for packet in packets:
assert len(packet) < 1280
def test_name_caching(self):
"""Test player name is only sent once."""
snake = Snake(
player_id="player1",
body=[Position(5, 5), Position(6, 5)],
color_index=0,
player_name="Alice"
)
state = GameState(snakes=[snake], food=[], game_running=True)
encoder = PartialUpdateEncoder()
# First update - should include name
packets1 = encoder.split_state_update(state, update_id=1)
# Second update - name should be cached
packets2 = encoder.split_state_update(state, update_id=2)
# Second update packets should be smaller (no name)
total_size1 = sum(len(p) for p in packets1)
total_size2 = sum(len(p) for p in packets2)
assert total_size2 <= total_size1
class TestPartialStateReassembly:
"""Test reassembling partial updates on client."""
def test_single_packet_reassembly(self):
"""Test reassembling single packet."""
# Create and encode state
state = GameState(
snakes=[
Snake(
player_id="player1",
body=[Position(5, 5), Position(6, 5)],
color_index=0,
player_name="Alice",
direction=(1, 0),
alive=True
)
],
food=[Food(position=Position(10, 10))],
game_running=True
)
encoder = PartialUpdateEncoder()
packets = encoder.split_state_update(state, update_id=1)
# Reassemble
tracker = PartialStateTracker()
for packet in packets:
tracker.process_packet(1, packet)
reassembled = tracker.get_game_state()
# Verify
assert reassembled.game_running == True
assert len(reassembled.snakes) >= 1
assert len(reassembled.food) == 1
def test_multiple_packet_reassembly(self):
"""Test reassembling from multiple packets."""
# Create state with multiple snakes
snakes = [
Snake(
player_id=f"player{i}",
body=[Position(i, j) for j in range(5)],
color_index=i,
player_name=f"Player{i}"
)
for i in range(10)
]
state = GameState(snakes=snakes, food=[], game_running=True)
encoder = PartialUpdateEncoder()
packets = encoder.split_state_update(state, update_id=10)
# Reassemble
tracker = PartialStateTracker()
for packet in packets:
tracker.process_packet(10, packet)
reassembled = tracker.get_game_state()
# Should have all snakes
assert len(reassembled.snakes) >= len(snakes)
def test_packet_loss_resilience(self):
"""Test handling of lost packets."""
# Create state
snakes = [
Snake(
player_id=f"player{i}",
body=[Position(i, j) for j in range(5)],
color_index=i,
player_name=f"Player{i}"
)
for i in range(10)
]
state = GameState(snakes=snakes, food=[], game_running=True)
encoder = PartialUpdateEncoder()
packets = encoder.split_state_update(state, update_id=20)
# Simulate packet loss - skip middle packet
if len(packets) > 2:
lost_packet_idx = len(packets) // 2
packets_received = packets[:lost_packet_idx] + packets[lost_packet_idx + 1:]
else:
packets_received = packets
# Reassemble
tracker = PartialStateTracker()
for packet in packets_received:
tracker.process_packet(20, packet)
reassembled = tracker.get_game_state()
# Should have partial state (some snakes)
assert len(reassembled.snakes) > 0
# But not all (due to loss)
if len(packets) > 2:
assert len(reassembled.snakes) < len(snakes)
def test_name_caching_on_client(self):
"""Test client caches player names."""
snake = Snake(
player_id="player1",
body=[Position(5, 5)],
color_index=0,
player_name="Alice"
)
state1 = GameState(snakes=[snake], food=[], game_running=True)
encoder = PartialUpdateEncoder()
packets1 = encoder.split_state_update(state1, update_id=1)
# Process first update
tracker = PartialStateTracker()
for packet in packets1:
tracker.process_packet(1, packet)
result1 = tracker.get_game_state()
assert result1.snakes[0].player_name == "Alice"
# Second update without name
state2 = GameState(snakes=[snake], food=[], game_running=True)
packets2 = encoder.split_state_update(state2, update_id=2)
# Process second update
for packet in packets2:
tracker.process_packet(2, packet)
result2 = tracker.get_game_state()
# Name should still be available from cache
player_hash = BinaryCodec.player_id_hash("player1")
assert player_hash in tracker.player_name_cache
assert tracker.player_name_cache[player_hash] == "Alice"
def test_update_id_transition(self):
"""Test transitioning between update IDs."""
snake1 = Snake(player_id="p1", body=[Position(1, 1)], color_index=0)
snake2 = Snake(player_id="p2", body=[Position(2, 2)], color_index=1)
state1 = GameState(snakes=[snake1], food=[], game_running=True)
state2 = GameState(snakes=[snake2], food=[], game_running=True)
encoder = PartialUpdateEncoder()
# Encode both states
packets1 = encoder.split_state_update(state1, update_id=1)
packets2 = encoder.split_state_update(state2, update_id=2)
# Process
tracker = PartialStateTracker()
for packet in packets1:
tracker.process_packet(1, packet)
result1 = tracker.get_game_state()
for packet in packets2:
tracker.process_packet(2, packet)
result2 = tracker.get_game_state()
# Should have transitioned to new update
assert tracker.current_update_id == 2
class TestPacketSizeConstraints:
"""Test packet size constraints."""
def test_all_packets_under_mtu(self):
"""Test all packets respect MTU limit."""
# Create maximum state
snakes = [
Snake(
player_id=f"player{i}",
body=[Position((i + j) % 40, j % 30) for j in range(20)],
color_index=i % 32,
player_name=f"VeryLongName{i:04d}"
)
for i in range(32)
]
state = GameState(
snakes=snakes,
food=[Food(position=Position(i, i)) for i in range(10)],
game_running=True
)
encoder = PartialUpdateEncoder()
packets = encoder.split_state_update(state, update_id=999, max_packet_size=1280)
# All packets must be under MTU
for i, packet in enumerate(packets):
assert len(packet) < 1280, f"Packet {i} exceeds MTU: {len(packet)} bytes"
def test_compression_benefit(self):
"""Test compression reduces packet size."""
# Create repetitive state (compresses well)
snake = Snake(
player_id="player1",
body=[Position(5, i) for i in range(100)], # Straight line
color_index=0,
player_name="Test"
)
state = GameState(snakes=[snake], food=[], game_running=True)
encoder = PartialUpdateEncoder()
packets = encoder.split_state_update(state, update_id=1)
# Packets should benefit from compression
# Delta encoding + compression should keep size reasonable
for packet in packets:
# Uncompressed would be ~200 bytes for 100 positions
# With delta + compression should be much smaller
assert len(packet) < 150
if __name__ == "__main__":
pytest.main([__file__, "-v"])

219
tests/test_udp_protocol.py Normal file
View File

@@ -0,0 +1,219 @@
"""Tests for UDP protocol with sequence numbers."""
import pytest
from src.shared.udp_protocol import UDPProtocol, SequenceTracker
from src.shared.binary_codec import MessageType
class TestSequenceNumbers:
"""Test sequence number wrapping and validation."""
def test_is_seq_newer_basic(self):
"""Test basic sequence comparison."""
assert UDPProtocol.is_seq_newer(1, 0) == True
assert UDPProtocol.is_seq_newer(100, 50) == True
assert UDPProtocol.is_seq_newer(0, 0) == False # Duplicate
assert UDPProtocol.is_seq_newer(50, 100) == False # Old
def test_is_seq_newer_wrapping(self):
"""Test sequence wrapping around UINT32_MAX."""
# Near wrapping boundary
last_seq = 0xFFFFFFFF - 5 # UINT32_MAX - 5 = 4294967290
# Small increments should work
assert UDPProtocol.is_seq_newer(0xFFFFFFFF - 4, last_seq) == True
assert UDPProtocol.is_seq_newer(0xFFFFFFFF - 3, last_seq) == True
assert UDPProtocol.is_seq_newer(0xFFFFFFFF, last_seq) == True
# Wrapped around
assert UDPProtocol.is_seq_newer(0, last_seq) == True
assert UDPProtocol.is_seq_newer(1, last_seq) == True
assert UDPProtocol.is_seq_newer(5, last_seq) == True
assert UDPProtocol.is_seq_newer(10, last_seq) == True
# Old packets (before wrap)
assert UDPProtocol.is_seq_newer(0xFFFFFFFF - 10, last_seq) == False
def test_is_seq_newer_window(self):
"""Test window size enforcement."""
last_seq = 1000
window = 100
# Within window
assert UDPProtocol.is_seq_newer(1050, last_seq, window) == True
assert UDPProtocol.is_seq_newer(1100, last_seq, window) == True
# Exactly at window boundary
assert UDPProtocol.is_seq_newer(1101, last_seq, window) == False
# Too far ahead
assert UDPProtocol.is_seq_newer(1200, last_seq, window) == False
def test_sequence_wraparound_multiple_times(self):
"""Test multiple wraparounds."""
# Start near max
last_seq = 0xFFFFFFFF - 2
# Increment through wrap
assert UDPProtocol.is_seq_newer(0xFFFFFFFF - 1, last_seq) == True
last_seq = 0xFFFFFFFF - 1
assert UDPProtocol.is_seq_newer(0xFFFFFFFF, last_seq) == True
last_seq = 0xFFFFFFFF
assert UDPProtocol.is_seq_newer(0, last_seq) == True
last_seq = 0
assert UDPProtocol.is_seq_newer(1, last_seq) == True
class TestSequenceTracker:
"""Test SequenceTracker class."""
def test_basic_tracking(self):
"""Test basic sequence tracking."""
tracker = SequenceTracker()
assert tracker.should_accept(1) == True
assert tracker.should_accept(2) == True
assert tracker.should_accept(3) == True
# Duplicate
assert tracker.should_accept(3) == False
# Old
assert tracker.should_accept(2) == False
def test_reordering_within_window(self):
"""Test packet reordering within window."""
tracker = SequenceTracker()
# Receive out of order
assert tracker.should_accept(5) == True
assert tracker.should_accept(3) == False # Older, reject
assert tracker.should_accept(6) == True
assert tracker.should_accept(4) == False # Older, reject
assert tracker.should_accept(7) == True
def test_wrapping_tracking(self):
"""Test tracking through wraparound."""
tracker = SequenceTracker()
tracker.last_seq = 0xFFFFFFFF - 5
# Accept packets through wrap
assert tracker.should_accept(0xFFFFFFFF - 4) == True
assert tracker.should_accept(0xFFFFFFFF - 3) == True
assert tracker.should_accept(0xFFFFFFFF) == True
assert tracker.should_accept(0) == True
assert tracker.should_accept(1) == True
def test_cleanup(self):
"""Test sequence set cleanup."""
tracker = SequenceTracker()
# Add many sequences
for i in range(1, 1500):
tracker.should_accept(i)
# Should have cleaned up old sequences
assert len(tracker.received_seqs) <= 1000
class TestUDPPackets:
"""Test UDP packet creation and parsing."""
def test_create_and_parse_packet(self):
"""Test packet creation and parsing round-trip."""
seq_num = 12345
msg_type = MessageType.PARTIAL_STATE_UPDATE
update_id = 678
payload = b"test payload data"
# Create packet
packet = UDPProtocol.create_packet(seq_num, msg_type, update_id, payload, compress=False)
# Parse packet
result = UDPProtocol.parse_packet(packet)
assert result is not None
parsed_seq, parsed_type, parsed_id, parsed_payload = result
assert parsed_seq == seq_num
assert parsed_type == msg_type
assert parsed_id == update_id
assert parsed_payload == payload
def test_packet_compression(self):
"""Test packet compression."""
seq_num = 100
msg_type = MessageType.GAME_META_UPDATE
update_id = 200
payload = b"x" * 500 # Compressible payload
# Create with compression
packet_compressed = UDPProtocol.create_packet(seq_num, msg_type, update_id, payload, compress=True)
# Create without compression
packet_uncompressed = UDPProtocol.create_packet(seq_num, msg_type, update_id, payload, compress=False)
# Compressed should be smaller
assert len(packet_compressed) < len(packet_uncompressed)
# Both should parse correctly
result = UDPProtocol.parse_packet(packet_compressed)
assert result is not None
_, _, _, parsed_payload = result
assert parsed_payload == payload
def test_update_id_wrapping(self):
"""Test update ID wrapping."""
assert UDPProtocol.next_update_id(0xFFFF) == 0
assert UDPProtocol.next_update_id(0xFFFE) == 0xFFFF
assert UDPProtocol.next_update_id(0) == 1
def test_sequence_wrapping(self):
"""Test sequence number wrapping."""
assert UDPProtocol.next_sequence(0xFFFFFFFF) == 0
assert UDPProtocol.next_sequence(0xFFFFFFFE) == 0xFFFFFFFF
assert UDPProtocol.next_sequence(0) == 1
class TestEdgeCases:
"""Test edge cases and error handling."""
def test_invalid_packet(self):
"""Test parsing invalid packet."""
# Too short
assert UDPProtocol.parse_packet(b"short") is None
# Empty
assert UDPProtocol.parse_packet(b"") is None
def test_corrupted_compression(self):
"""Test handling corrupted compressed data."""
seq_num = 100
msg_type = 0x81 # Compression flag set
update_id = 200
# Create packet header with invalid compressed payload
import struct
header = struct.pack('>IBH', seq_num, msg_type, update_id)
packet = header + b"invalid compressed data"
# Should return None due to decompression failure
result = UDPProtocol.parse_packet(packet)
assert result is None
def test_large_sequence_gap(self):
"""Test very large sequence gaps."""
tracker = SequenceTracker()
tracker.last_seq = 100
# Very large gap (suspicious)
assert tracker.should_accept(2000) == False
# But within window is ok
assert tracker.should_accept(1100) == True
if __name__ == "__main__":
pytest.main([__file__, "-v"])