Initial commit: Multiplayer Snake game with server discovery

Implemented a complete network multiplayer Snake game with the following features:

Core Game:
- Client-server architecture using asyncio for networking
- Pygame-based rendering at 60 FPS
- Server-authoritative game state with 10 TPS
- Collision detection (walls, self, other players)
- Food spawning and score tracking
- Support for multiple players with color-coded snakes

Server Discovery:
- UDP multicast-based automatic server discovery (239.255.0.1:9999)
- Server beacon broadcasts presence every 2 seconds
- Client discovery with 3-second timeout
- Server selection UI for multiple servers
- Auto-connect for single server
- Graceful fallback to manual connection

Project Structure:
- src/shared/ - Protocol, models, constants, discovery utilities
- src/server/ - Game server, game logic, server beacon
- src/client/ - Game client, renderer, discovery, server selector
- tests/ - Unit tests for game logic, models, and discovery

Command-line interface with argparse for both server and client.
Comprehensive documentation in README.md and CLAUDE.md.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Vladyslav Doloman
2025-10-04 13:50:16 +03:00
commit 0703561446
28 changed files with 2523 additions and 0 deletions

View File

@@ -0,0 +1,11 @@
{
"permissions": {
"allow": [
"Bash(mkdir:*)",
"Bash(git init:*)",
"Bash(python:*)"
],
"deny": [],
"ask": []
}
}

41
.gitignore vendored Normal file
View File

@@ -0,0 +1,41 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
venv/
env/
ENV/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/
# IDEs
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db

110
CLAUDE.md Normal file
View File

@@ -0,0 +1,110 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
This is a **network multiplayer Snake game** built with Python 3.11, using asyncio for networking, pygame for graphics, and UDP multicast for automatic server discovery.
**Project Structure:**
- `src/shared/` - Shared code (models, protocol, constants, discovery utilities)
- `src/server/` - Game server with authoritative game state and multicast beacon
- `src/client/` - Game client with pygame rendering, discovery, and server selection UI
- `tests/` - Test files using pytest
## Setup Commands
```bash
# Create and activate virtual environment
python -m venv venv
venv\Scripts\activate # Windows
# source venv/bin/activate # Linux/Mac
# Install dependencies
pip install -r requirements.txt
# Install development dependencies
pip install -r requirements-dev.txt
```
## Running the Game
```bash
# Start the server (with discovery beacon enabled by default)
python run_server.py --name "My Game" --port 8888
# Start clients with auto-discovery (no host needed)
python run_client.py --name Alice
# If multiple servers found, a selection UI appears
# Manual connection (skip discovery)
python run_client.py 192.168.1.100 --port 8888 --name Bob
# Server options:
# --host HOST Bind address (default: localhost)
# --port PORT Port number (default: 8888)
# --name NAME Server name for discovery
# --no-discovery Disable multicast beacon
# Client options:
# [host] Server host (omit for auto-discovery)
# --port PORT Server port
# --name NAME Player name
# --discover Force discovery mode
# Press SPACE to start the game, arrow keys/WASD to move
```
## Development Commands
```bash
# Run all tests
pytest
# Run tests with coverage
pytest --cov=src --cov-report=html
# Format code with black
black src/ tests/
# Lint code
flake8 src/ tests/
# Type check
mypy src/
```
## Architecture
**Client-Server Model:**
- Server (`src/server/game_server.py`) runs the authoritative game loop using asyncio
- Clients connect via TCP and send input commands (MOVE, START_GAME, etc.)
- Server broadcasts game state updates to all connected clients at 10 FPS
- Clients render the game state locally at 60 FPS using pygame
**Key Components:**
- `src/shared/protocol.py` - JSON-based message protocol (MessageType enum, Message class)
- `src/shared/models.py` - Data models (Snake, Position, Food, GameState) with serialization
- `src/shared/constants.py` - Game configuration (grid size, colors, tick rate, multicast settings)
- `src/shared/discovery.py` - Multicast discovery utilities (ServerInfo, DiscoveryMessage)
- `src/server/game_logic.py` - Game rules (movement, collision detection, food spawning)
- `src/server/server_beacon.py` - UDP multicast beacon for server discovery
- `src/client/renderer.py` - Pygame rendering of game state
- `src/client/server_discovery.py` - Client-side server discovery
- `src/client/server_selector.py` - Pygame UI for selecting from multiple servers
**Game Flow:**
1. Server starts on port 8888 and broadcasts presence on multicast group 239.255.0.1:9999
2. Clients send DISCOVER message to multicast group and collect SERVER_ANNOUNCE responses
3. If multiple servers found, client shows selection UI; if one found, auto-connects
4. Clients connect via TCP and receive WELCOME message with player_id
5. Any player can press SPACE to send START_GAME message
6. Server creates snakes for all connected players and spawns food
7. Server runs game loop: update positions → check collisions → broadcast state
8. Game ends when only 0-1 snakes remain alive
**Discovery Protocol:**
- Multicast group: 239.255.0.1:9999 (local network)
- Client → Multicast: DISCOVER (broadcast request)
- Server → Client: SERVER_ANNOUNCE (direct response with host, port, name, player count)
- Server also periodically broadcasts presence every 2 seconds

93
README.md Normal file
View File

@@ -0,0 +1,93 @@
# Multiplayer Snake Game
A network multiplayer Snake game built with Python, asyncio, and pygame.
## Features
- Real-time multiplayer gameplay with client-server architecture
- **Automatic server discovery** using multicast (zero-configuration LAN play)
- Support for multiple players simultaneously
- Classic Snake gameplay with collision detection
- Color-coded snakes for each player
- Score tracking and win conditions
## Setup
```bash
# Create and activate virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
# For development
pip install -r requirements-dev.txt
```
## Running the Game
### Quick Start (Auto-Discovery)
1. **Start the server** (in one terminal):
```bash
python run_server.py
# Optional: python run_server.py --name "My Server" --port 8888
```
2. **Start one or more clients** (in separate terminals):
```bash
python run_client.py --name Alice
# Clients will automatically discover servers on the local network
```
### Manual Connection
```bash
# Server
python run_server.py --host 0.0.0.0 --port 8888 --name "Game Room"
# Client (specify host directly)
python run_client.py 192.168.1.100 --port 8888 --name Bob
```
### Server Options
```bash
python run_server.py --help
# --host HOST Host address to bind to (default: localhost)
# --port PORT Port number (default: 8888)
# --name NAME Server name for discovery (default: Snake Server)
# --no-discovery Disable multicast beacon
```
### Client Options
```bash
python run_client.py --help
# [host] Server host (omit to use auto-discovery)
# --port PORT Server port (default: 8888)
# --name NAME Your player name (default: Player)
# --discover Force discovery mode
```
### Playing the Game
- Press **SPACE** to start the game (any player can start)
- Use **arrow keys** or **WASD** to control your snake
- Eat food to grow and score points
- Avoid walls and other snakes
## Testing
```bash
pytest
pytest --cov=src --cov-report=html # With coverage
```
## Project Structure
- `src/server/` - Game server with authoritative game state
- `src/client/` - Game client with pygame rendering
- `src/shared/` - Shared code (models, protocol, constants)
- `tests/` - Unit tests

15
pyproject.toml Normal file
View File

@@ -0,0 +1,15 @@
[tool.black]
line-length = 88
target-version = ['py311']
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
[tool.mypy]
python_version = "3.11"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true

8
requirements-dev.txt Normal file
View File

@@ -0,0 +1,8 @@
-r requirements.txt
# Development dependencies
pytest>=7.0.0
pytest-cov>=4.0.0
black>=23.0.0
flake8>=6.0.0
mypy>=1.0.0

2
requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
# Production dependencies
pygame>=2.5.0

46
run_client.py Normal file
View File

@@ -0,0 +1,46 @@
"""Run the Snake game client."""
import asyncio
import argparse
from src.client.game_client import main
from src.shared.constants import DEFAULT_PORT
async def run_client() -> None:
"""Run the client with command line arguments."""
parser = argparse.ArgumentParser(description="Run the Snake game client")
parser.add_argument(
"host",
nargs="?",
default=None,
help="Server host address (omit to use auto-discovery)",
)
parser.add_argument(
"--port",
type=int,
default=DEFAULT_PORT,
help=f"Server port number (default: {DEFAULT_PORT})",
)
parser.add_argument(
"--name",
default="Player",
help="Your player name (default: Player)",
)
parser.add_argument(
"--discover",
action="store_true",
help="Force server discovery even if host is specified",
)
args = parser.parse_args()
await main(
host=args.host,
port=args.port,
name=args.name,
discover=args.discover,
)
if __name__ == "__main__":
asyncio.run(run_client())

47
run_server.py Normal file
View File

@@ -0,0 +1,47 @@
"""Run the Snake game server."""
import asyncio
import argparse
from src.server.game_server import GameServer
from src.shared.constants import DEFAULT_HOST, DEFAULT_PORT
async def main() -> None:
"""Run the server with command line arguments."""
parser = argparse.ArgumentParser(description="Run the Snake game server")
parser.add_argument(
"--host",
default=DEFAULT_HOST,
help=f"Host address to bind to (default: {DEFAULT_HOST})",
)
parser.add_argument(
"--port",
type=int,
default=DEFAULT_PORT,
help=f"Port number to bind to (default: {DEFAULT_PORT})",
)
parser.add_argument(
"--name",
default="Snake Server",
help="Server name for discovery (default: Snake Server)",
)
parser.add_argument(
"--no-discovery",
action="store_true",
help="Disable multicast discovery beacon",
)
args = parser.parse_args()
server = GameServer(
host=args.host,
port=args.port,
server_name=args.name,
enable_discovery=not args.no_discovery,
)
await server.start()
if __name__ == "__main__":
asyncio.run(main())

0
src/__init__.py Normal file
View File

0
src/client/__init__.py Normal file
View File

283
src/client/game_client.py Normal file
View File

@@ -0,0 +1,283 @@
"""Multiplayer Snake game client."""
import asyncio
import pygame
from typing import Optional
from ..shared.protocol import (
Message,
MessageType,
create_join_message,
create_move_message,
create_start_game_message,
)
from ..shared.models import GameState
from ..shared.constants import (
DEFAULT_HOST,
DEFAULT_PORT,
FPS,
UP,
DOWN,
LEFT,
RIGHT,
)
from .renderer import Renderer
from .server_discovery import discover_servers
from .server_selector import select_server
class GameClient:
"""Multiplayer Snake game client."""
def __init__(self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT, player_name: str = "Player"):
"""Initialize the game client.
Args:
host: Server host address
port: Server port number
player_name: Name of the player
"""
self.host = host
self.port = port
self.player_name = player_name
self.player_id: Optional[str] = None
self.renderer = Renderer()
self.game_state: Optional[GameState] = None
self.reader: Optional[asyncio.StreamReader] = None
self.writer: Optional[asyncio.StreamWriter] = None
self.running = True
self.clock = pygame.time.Clock()
async def connect(self) -> None:
"""Connect to the game server."""
try:
self.reader, self.writer = await asyncio.open_connection(
self.host, self.port
)
print(f"Connected to server at {self.host}:{self.port}")
# Send JOIN message
await self.send_message(create_join_message(self.player_name))
except Exception as e:
print(f"Failed to connect to server: {e}")
raise
async def send_message(self, message: Message) -> None:
"""Send a message to the server.
Args:
message: Message to send
"""
if self.writer is None:
return
try:
data = message.to_json() + "\n"
self.writer.write(data.encode())
await self.writer.drain()
except Exception as e:
print(f"Error sending message: {e}")
async def receive_messages(self) -> None:
"""Receive and process messages from the server."""
if self.reader is None:
return
try:
while self.running:
data = await self.reader.readline()
if not data:
print("Connection closed by server")
self.running = False
break
# Parse message
try:
message = Message.from_json(data.decode().strip())
await self.handle_message(message)
except Exception as e:
print(f"Error parsing message: {e}")
except asyncio.CancelledError:
pass
except Exception as e:
print(f"Error receiving messages: {e}")
self.running = False
async def handle_message(self, message: Message) -> None:
"""Handle a message from the server.
Args:
message: Message received
"""
if message.type == MessageType.WELCOME:
self.player_id = message.data.get("player_id")
print(f"Assigned player ID: {self.player_id}")
elif message.type == MessageType.STATE_UPDATE:
state_dict = message.data.get("game_state")
self.game_state = GameState.from_dict(state_dict)
elif message.type == MessageType.PLAYER_JOINED:
player_id = message.data.get("player_id")
player_name = message.data.get("player_name")
print(f"Player {player_name} joined")
elif message.type == MessageType.PLAYER_LEFT:
player_id = message.data.get("player_id")
print(f"Player {player_id} left")
elif message.type == MessageType.GAME_STARTED:
print("Game started!")
elif message.type == MessageType.GAME_OVER:
winner_id = message.data.get("winner_id")
print(f"Game over! Winner: {winner_id}")
elif message.type == MessageType.ERROR:
error = message.data.get("error")
print(f"Error from server: {error}")
def handle_input(self) -> None:
"""Handle pygame input events."""
for event in pygame.event.get():
if event.type == pygame.QUIT:
self.running = False
elif event.type == pygame.KEYDOWN:
# Movement keys
if event.key == pygame.K_UP or event.key == pygame.K_w:
asyncio.create_task(self.send_message(create_move_message(UP)))
elif event.key == pygame.K_DOWN or event.key == pygame.K_s:
asyncio.create_task(self.send_message(create_move_message(DOWN)))
elif event.key == pygame.K_LEFT or event.key == pygame.K_a:
asyncio.create_task(self.send_message(create_move_message(LEFT)))
elif event.key == pygame.K_RIGHT or event.key == pygame.K_d:
asyncio.create_task(self.send_message(create_move_message(RIGHT)))
# Start game with SPACE
elif event.key == pygame.K_SPACE:
asyncio.create_task(self.send_message(create_start_game_message()))
print("Requesting to start game...")
async def game_loop(self) -> None:
"""Main client game loop."""
# Start receiving messages in background
receive_task = asyncio.create_task(self.receive_messages())
try:
while self.running:
# Handle input
self.handle_input()
# Render current state
self.renderer.render(self.game_state, self.player_id)
# Maintain frame rate
self.clock.tick(FPS)
# Allow other async tasks to run
await asyncio.sleep(0)
finally:
receive_task.cancel()
try:
await receive_task
except asyncio.CancelledError:
pass
async def run(self) -> None:
"""Run the client."""
try:
await self.connect()
await self.game_loop()
finally:
await self.close()
async def close(self) -> None:
"""Clean up resources."""
if self.writer:
self.writer.close()
await self.writer.wait_closed()
self.renderer.close()
@classmethod
async def from_discovery(cls, player_name: str = "Player") -> Optional["GameClient"]:
"""Create a client by discovering servers on the network.
Args:
player_name: Name of the player
Returns:
GameClient instance if server selected, None if cancelled
"""
print("Discovering servers on local network...")
servers = await discover_servers()
if not servers:
print("No servers found!")
return None
# If only one server, auto-connect
if len(servers) == 1:
server = servers[0]
print(f"Auto-connecting to {server.server_name} at {server.host}:{server.port}")
return cls(server.host, server.port, player_name)
# Multiple servers - show selection UI
print(f"Found {len(servers)} server(s). Opening selection menu...")
result = select_server(servers)
if result is None:
print("Server selection cancelled")
return None
host, port = result
print(f"Selected server at {host}:{port}")
return cls(host, port, player_name)
async def main(
host: str = None,
port: int = DEFAULT_PORT,
name: str = "Player",
discover: bool = False,
) -> None:
"""Run the game client.
Args:
host: Server host address (None to use discovery)
port: Server port number
name: Player name
discover: Use server discovery instead of direct connection
"""
# Use discovery if requested or if no host specified
if discover or host is None:
client = await GameClient.from_discovery(name)
if client is None:
print("Exiting...")
return
else:
client = GameClient(host, port, name)
await client.run()
if __name__ == "__main__":
import sys
# Check for --discover flag
discover_mode = "--discover" in sys.argv
if discover_mode:
sys.argv.remove("--discover")
host = sys.argv[1] if len(sys.argv) > 1 else None
port = int(sys.argv[2]) if len(sys.argv) > 2 else DEFAULT_PORT
name = sys.argv[3] if len(sys.argv) > 3 else "Player"
asyncio.run(main(host, port, name, discover=discover_mode or host is None))

169
src/client/renderer.py Normal file
View File

@@ -0,0 +1,169 @@
"""Pygame renderer for Snake game client."""
import pygame
from typing import Optional
from ..shared.models import GameState, Snake, Position
from ..shared.constants import (
GRID_WIDTH,
GRID_HEIGHT,
CELL_SIZE,
COLOR_BACKGROUND,
COLOR_GRID,
COLOR_FOOD,
COLOR_SNAKES,
)
class Renderer:
"""Handles rendering the game using pygame."""
def __init__(self) -> None:
"""Initialize the renderer."""
pygame.init()
self.screen_width = GRID_WIDTH * CELL_SIZE
self.screen_height = GRID_HEIGHT * CELL_SIZE
self.screen = pygame.display.set_mode((self.screen_width, self.screen_height))
pygame.display.set_caption("Multiplayer Snake")
self.font = pygame.font.Font(None, 36)
self.small_font = pygame.font.Font(None, 24)
def render(self, game_state: Optional[GameState], player_id: Optional[str] = None) -> None:
"""Render the current game state.
Args:
game_state: Current game state to render
player_id: ID of the current player (for highlighting)
"""
# Clear screen
self.screen.fill(COLOR_BACKGROUND)
if game_state is None:
self.render_waiting_screen()
pygame.display.flip()
return
# Draw grid
self.draw_grid()
# Draw food
for food in game_state.food:
self.draw_cell(food.position, COLOR_FOOD)
# Draw snakes
for i, snake in enumerate(game_state.snakes):
color = COLOR_SNAKES[i % len(COLOR_SNAKES)]
# Draw body
for segment in snake.body:
self.draw_cell(segment, color)
# Draw head slightly brighter
if snake.body and snake.alive:
head_color = tuple(min(c + 50, 255) for c in color)
self.draw_cell(snake.body[0], head_color)
# Draw scores
self.draw_scores(game_state, player_id)
# Draw game over message if needed
if not game_state.game_running and game_state.snakes:
self.draw_game_over(game_state)
pygame.display.flip()
def draw_grid(self) -> None:
"""Draw the game grid."""
for x in range(0, self.screen_width, CELL_SIZE):
pygame.draw.line(self.screen, COLOR_GRID, (x, 0), (x, self.screen_height))
for y in range(0, self.screen_height, CELL_SIZE):
pygame.draw.line(self.screen, COLOR_GRID, (0, y), (self.screen_width, y))
def draw_cell(self, position: Position, color: tuple) -> None:
"""Draw a single cell.
Args:
position: Grid position to draw
color: RGB color tuple
"""
rect = pygame.Rect(
position.x * CELL_SIZE,
position.y * CELL_SIZE,
CELL_SIZE,
CELL_SIZE
)
pygame.draw.rect(self.screen, color, rect)
pygame.draw.rect(self.screen, COLOR_BACKGROUND, rect, 1) # Border
def draw_scores(self, game_state: GameState, player_id: Optional[str]) -> None:
"""Draw player scores.
Args:
game_state: Current game state
player_id: Current player's ID
"""
y_offset = 10
for i, snake in enumerate(game_state.snakes):
color = COLOR_SNAKES[i % len(COLOR_SNAKES)]
# Prepare score text
prefix = "YOU: " if snake.player_id == player_id else f"P{i+1}: "
status = "" if snake.alive else " (DEAD)"
text = f"{prefix}Score {snake.score}{status}"
# Render text with background
text_surface = self.small_font.render(text, True, color)
text_rect = text_surface.get_rect(topleft=(10, y_offset))
# Draw semi-transparent background
bg_rect = text_rect.inflate(10, 5)
bg_surface = pygame.Surface(bg_rect.size, pygame.SRCALPHA)
bg_surface.fill((0, 0, 0, 180))
self.screen.blit(bg_surface, bg_rect)
# Draw text
self.screen.blit(text_surface, text_rect)
y_offset += 30
def draw_game_over(self, game_state: GameState) -> None:
"""Draw game over message.
Args:
game_state: Final game state
"""
alive_snakes = [s for s in game_state.snakes if s.alive]
if alive_snakes:
winner = alive_snakes[0]
text = f"Game Over! Winner: {winner.player_id[:8]}"
else:
text = "Game Over! No winner"
text_surface = self.font.render(text, True, (255, 255, 255))
text_rect = text_surface.get_rect(
center=(self.screen_width // 2, self.screen_height // 2)
)
# Draw background
bg_rect = text_rect.inflate(40, 20)
bg_surface = pygame.Surface(bg_rect.size, pygame.SRCALPHA)
bg_surface.fill((0, 0, 0, 200))
self.screen.blit(bg_surface, bg_rect)
# Draw text
self.screen.blit(text_surface, text_rect)
def render_waiting_screen(self) -> None:
"""Render waiting for connection screen."""
text = "Connecting to server..."
text_surface = self.font.render(text, True, (255, 255, 255))
text_rect = text_surface.get_rect(
center=(self.screen_width // 2, self.screen_height // 2)
)
self.screen.blit(text_surface, text_rect)
def close(self) -> None:
"""Clean up pygame resources."""
pygame.quit()

View File

@@ -0,0 +1,111 @@
"""Client-side server discovery using multicast."""
import asyncio
import socket
from typing import List
from ..shared.discovery import (
ServerInfo,
DiscoveryMessage,
create_multicast_socket,
)
from ..shared.constants import MULTICAST_GROUP, MULTICAST_PORT, DISCOVERY_TIMEOUT
class ServerDiscovery:
"""Discovers game servers on the local network."""
def __init__(self) -> None:
"""Initialize server discovery."""
self.discovered_servers: List[ServerInfo] = []
async def discover_servers(self, timeout: float = DISCOVERY_TIMEOUT) -> List[ServerInfo]:
"""Discover available game servers.
Args:
timeout: How long to wait for responses (seconds)
Returns:
List of discovered ServerInfo objects
"""
self.discovered_servers = []
# Create socket for sending and receiving
sock = create_multicast_socket(bind=True)
sock.setblocking(False)
try:
# Send DISCOVER message to multicast group
discover_msg = DiscoveryMessage.create_discover()
sock.sendto(discover_msg, (MULTICAST_GROUP, MULTICAST_PORT))
print(f"Sent discovery request to {MULTICAST_GROUP}:{MULTICAST_PORT}")
# Listen for responses
loop = asyncio.get_event_loop()
end_time = loop.time() + timeout
while loop.time() < end_time:
try:
# Calculate remaining timeout
remaining = end_time - loop.time()
if remaining <= 0:
break
# Receive with timeout
sock.settimeout(min(remaining, 0.5))
try:
data, addr = sock.recvfrom(1024)
except socket.timeout:
await asyncio.sleep(0.1)
continue
# Parse response
msg_type, msg_data = DiscoveryMessage.parse(data)
if msg_type == DiscoveryMessage.SERVER_ANNOUNCE and msg_data:
# Extract host from addr if not in data
host = addr[0]
server_info = ServerInfo.from_dict(msg_data, host=host)
# Add if not already discovered
if not self._is_duplicate(server_info):
self.discovered_servers.append(server_info)
print(f"Discovered server: {server_info.server_name} at {server_info.host}:{server_info.port}")
except Exception as e:
# Ignore errors and continue listening
await asyncio.sleep(0.1)
finally:
sock.close()
print(f"Discovery complete. Found {len(self.discovered_servers)} server(s)")
return self.discovered_servers
def _is_duplicate(self, server_info: ServerInfo) -> bool:
"""Check if server is already in discovered list.
Args:
server_info: Server to check
Returns:
True if duplicate found
"""
for existing in self.discovered_servers:
if (existing.host == server_info.host and
existing.port == server_info.port):
return True
return False
async def discover_servers(timeout: float = DISCOVERY_TIMEOUT) -> List[ServerInfo]:
"""Convenience function to discover servers.
Args:
timeout: How long to wait for responses (seconds)
Returns:
List of discovered servers
"""
discovery = ServerDiscovery()
return await discovery.discover_servers(timeout)

View File

@@ -0,0 +1,142 @@
"""Server selection UI using pygame."""
import pygame
from typing import List, Optional, Tuple
from ..shared.discovery import ServerInfo
from ..shared.constants import CELL_SIZE, GRID_WIDTH, GRID_HEIGHT
class ServerSelector:
"""Simple pygame UI for selecting a server from discovered list."""
def __init__(self, servers: List[ServerInfo]):
"""Initialize the server selector.
Args:
servers: List of discovered servers
"""
self.servers = servers
self.selected_index = 0
self.screen_width = GRID_WIDTH * CELL_SIZE
self.screen_height = GRID_HEIGHT * CELL_SIZE
pygame.init()
self.screen = pygame.display.set_mode((self.screen_width, self.screen_height))
pygame.display.set_caption("Select Server")
self.title_font = pygame.font.Font(None, 48)
self.server_font = pygame.font.Font(None, 32)
self.help_font = pygame.font.Font(None, 24)
self.clock = pygame.time.Clock()
def run(self) -> Optional[Tuple[str, int]]:
"""Run the server selection UI.
Returns:
Tuple of (host, port) if server selected, None if cancelled
"""
running = True
while running:
for event in pygame.event.get():
if event.type == pygame.QUIT:
return None
elif event.type == pygame.KEYDOWN:
if event.key == pygame.K_ESCAPE:
return None
elif event.key == pygame.K_UP:
self.selected_index = max(0, self.selected_index - 1)
elif event.key == pygame.K_DOWN:
self.selected_index = min(len(self.servers) - 1, self.selected_index + 1)
elif event.key == pygame.K_RETURN or event.key == pygame.K_SPACE:
if self.servers:
selected = self.servers[self.selected_index]
return (selected.host, selected.port)
return None
self.render()
self.clock.tick(30)
return None
def render(self) -> None:
"""Render the server selection UI."""
self.screen.fill((20, 20, 30))
# Draw title
title = self.title_font.render("Select Server", True, (255, 255, 255))
title_rect = title.get_rect(center=(self.screen_width // 2, 60))
self.screen.blit(title, title_rect)
# Draw servers list
start_y = 140
for i, server in enumerate(self.servers):
color = (100, 200, 100) if i == self.selected_index else (180, 180, 180)
# Server name and info
server_text = f"{server.server_name} - {server.players_count} player(s)"
text_surface = self.server_font.render(server_text, True, color)
text_rect = text_surface.get_rect(center=(self.screen_width // 2, start_y + i * 50))
# Highlight selected
if i == self.selected_index:
highlight_rect = text_rect.inflate(20, 10)
pygame.draw.rect(self.screen, (50, 100, 50), highlight_rect, 2)
self.screen.blit(text_surface, text_rect)
# Server address (smaller text)
addr_text = f"{server.host}:{server.port}"
addr_surface = self.help_font.render(addr_text, True, (150, 150, 150))
addr_rect = addr_surface.get_rect(center=(self.screen_width // 2, start_y + i * 50 + 25))
self.screen.blit(addr_surface, addr_rect)
# Draw help text at bottom
help_y = self.screen_height - 80
help_texts = [
"↑↓ Select ENTER Join ESC Cancel",
]
for i, text in enumerate(help_texts):
help_surface = self.help_font.render(text, True, (150, 150, 150))
help_rect = help_surface.get_rect(center=(self.screen_width // 2, help_y + i * 30))
self.screen.blit(help_surface, help_rect)
# Show message if no servers found
if not self.servers:
no_server_text = "No servers found. Press ESC to exit."
text_surface = self.server_font.render(no_server_text, True, (255, 100, 100))
text_rect = text_surface.get_rect(center=(self.screen_width // 2, self.screen_height // 2))
self.screen.blit(text_surface, text_rect)
pygame.display.flip()
def close(self) -> None:
"""Clean up pygame resources."""
pygame.quit()
def select_server(servers: List[ServerInfo]) -> Optional[Tuple[str, int]]:
"""Show server selection UI and return selected server.
Args:
servers: List of discovered servers
Returns:
Tuple of (host, port) if selected, None if cancelled
"""
if not servers:
print("No servers available to select")
return None
selector = ServerSelector(servers)
try:
return selector.run()
finally:
selector.close()

0
src/server/__init__.py Normal file
View File

175
src/server/game_logic.py Normal file
View File

@@ -0,0 +1,175 @@
"""Core game logic for Snake game."""
import random
from typing import List, Tuple
from ..shared.models import Snake, Position, Food, GameState
from ..shared.constants import (
GRID_WIDTH,
GRID_HEIGHT,
INITIAL_SNAKE_LENGTH,
SNAKE_GROWTH,
OPPOSITE_DIRECTIONS,
RIGHT,
)
class GameLogic:
"""Handles game rules and state updates."""
def __init__(self) -> None:
"""Initialize game logic."""
self.state = GameState()
def create_snake(self, player_id: str) -> Snake:
"""Create a new snake for a player.
Args:
player_id: Unique identifier for the player
Returns:
New Snake instance
"""
# Find a random starting position
start_x = random.randint(INITIAL_SNAKE_LENGTH, GRID_WIDTH - INITIAL_SNAKE_LENGTH)
start_y = random.randint(INITIAL_SNAKE_LENGTH, GRID_HEIGHT - INITIAL_SNAKE_LENGTH)
# Create snake body (head first, tail last)
body = [
Position(start_x - i, start_y)
for i in range(INITIAL_SNAKE_LENGTH)
]
snake = Snake(player_id=player_id, body=body, direction=RIGHT)
return snake
def spawn_food(self) -> Food:
"""Spawn food at a random position not occupied by snakes.
Returns:
New Food instance
"""
# Get all occupied positions
occupied = set()
for snake in self.state.snakes:
for segment in snake.body:
occupied.add((segment.x, segment.y))
# Find random free position
max_attempts = 100
for _ in range(max_attempts):
x = random.randint(0, GRID_WIDTH - 1)
y = random.randint(0, GRID_HEIGHT - 1)
if (x, y) not in occupied:
return Food(position=Position(x, y))
# Fallback: return any position if grid is too full
return Food(position=Position(
random.randint(0, GRID_WIDTH - 1),
random.randint(0, GRID_HEIGHT - 1)
))
def update_snake_direction(self, player_id: str, direction: Tuple[int, int]) -> None:
"""Update a snake's direction if valid.
Args:
player_id: Player whose snake to update
direction: New direction tuple
"""
for snake in self.state.snakes:
if snake.player_id == player_id and snake.alive:
# Prevent 180-degree turns
if direction != OPPOSITE_DIRECTIONS.get(snake.direction):
snake.direction = direction
break
def move_snakes(self) -> None:
"""Move all alive snakes one step in their current direction."""
for snake in self.state.snakes:
if not snake.alive:
continue
# Calculate new head position
new_head = snake.get_head() + snake.direction
# Add new head
snake.body.insert(0, new_head)
# Check if snake ate food
ate_food = False
for food in self.state.food[:]:
if new_head.x == food.position.x and new_head.y == food.position.y:
self.state.food.remove(food)
snake.score += 10
ate_food = True
break
# Remove tail if didn't eat food (otherwise snake grows)
if not ate_food:
snake.body.pop()
def check_collisions(self) -> None:
"""Check for collisions and mark dead snakes."""
for snake in self.state.snakes:
if not snake.alive:
continue
head = snake.get_head()
# Check wall collision
if (head.x < 0 or head.x >= GRID_WIDTH or
head.y < 0 or head.y >= GRID_HEIGHT):
snake.alive = False
continue
# Check self-collision (head hits own body)
for segment in snake.body[1:]:
if head.x == segment.x and head.y == segment.y:
snake.alive = False
break
if not snake.alive:
continue
# Check collision with other snakes
for other_snake in self.state.snakes:
if other_snake.player_id == snake.player_id:
continue
# Check collision with other snake's body
for segment in other_snake.body:
if head.x == segment.x and head.y == segment.y:
snake.alive = False
break
if not snake.alive:
break
def update(self) -> None:
"""Perform one game tick: move snakes and check collisions."""
self.move_snakes()
self.check_collisions()
# Spawn food if needed
if len(self.state.food) < len([s for s in self.state.snakes if s.alive]):
self.state.food.append(self.spawn_food())
def is_game_over(self) -> bool:
"""Check if game is over (0 or 1 snakes alive).
Returns:
True if game should end
"""
alive_count = sum(1 for snake in self.state.snakes if snake.alive)
return alive_count <= 1 and len(self.state.snakes) > 1
def get_winner(self) -> str | None:
"""Get the winner's player_id if there is one.
Returns:
Winner's player_id or None
"""
alive_snakes = [s for s in self.state.snakes if s.alive]
if len(alive_snakes) == 1:
return alive_snakes[0].player_id
return None

299
src/server/game_server.py Normal file
View File

@@ -0,0 +1,299 @@
"""Multiplayer Snake game server using asyncio."""
import asyncio
import uuid
from typing import Dict, Set
from ..shared.protocol import (
Message,
MessageType,
create_welcome_message,
create_state_update_message,
create_player_joined_message,
create_player_left_message,
create_game_started_message,
create_game_over_message,
create_error_message,
)
from ..shared.constants import DEFAULT_HOST, DEFAULT_PORT, TICK_RATE
from .game_logic import GameLogic
from .server_beacon import ServerBeacon
class GameServer:
"""Multiplayer Snake game server."""
def __init__(
self,
host: str = DEFAULT_HOST,
port: int = DEFAULT_PORT,
server_name: str = "Snake Server",
enable_discovery: bool = True,
):
"""Initialize the game server.
Args:
host: Host address to bind to
port: Port number to bind to
server_name: Name of the server for discovery
enable_discovery: Enable multicast discovery beacon
"""
self.host = host
self.port = port
self.server_name = server_name
self.enable_discovery = enable_discovery
self.clients: Dict[str, asyncio.StreamWriter] = {}
self.player_names: Dict[str, str] = {}
self.game_logic = GameLogic()
self.game_task: asyncio.Task | None = None
self.beacon_task: asyncio.Task | None = None
self.beacon: ServerBeacon | None = None
async def handle_client(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter
) -> None:
"""Handle a client connection.
Args:
reader: Stream reader for receiving data
writer: Stream writer for sending data
"""
player_id = str(uuid.uuid4())
addr = writer.get_extra_info('peername')
print(f"New connection from {addr}, assigned ID: {player_id}")
try:
while True:
# Read message length prefix (4 bytes)
data = await reader.readline()
if not data:
break
# Parse message
try:
message = Message.from_json(data.decode().strip())
await self.handle_message(player_id, message, writer)
except Exception as e:
print(f"Error parsing message from {player_id}: {e}")
await self.send_message(writer, create_error_message(str(e)))
except asyncio.CancelledError:
pass
except Exception as e:
print(f"Error handling client {player_id}: {e}")
finally:
await self.remove_player(player_id)
writer.close()
await writer.wait_closed()
async def handle_message(
self,
player_id: str,
message: Message,
writer: asyncio.StreamWriter
) -> None:
"""Handle a message from a client.
Args:
player_id: ID of the player who sent the message
message: The message received
writer: Stream writer for responding
"""
if message.type == MessageType.JOIN:
await self.handle_join(player_id, message, writer)
elif message.type == MessageType.MOVE:
await self.handle_move(player_id, message)
elif message.type == MessageType.START_GAME:
await self.handle_start_game()
elif message.type == MessageType.LEAVE:
await self.remove_player(player_id)
async def handle_join(
self,
player_id: str,
message: Message,
writer: asyncio.StreamWriter
) -> None:
"""Handle a player joining.
Args:
player_id: ID of the joining player
message: JOIN message with player name
writer: Stream writer for the client
"""
player_name = message.data.get("player_name", f"Player_{player_id[:8]}")
self.clients[player_id] = writer
self.player_names[player_id] = player_name
# Send welcome message to new player
await self.send_message(writer, create_welcome_message(player_id))
# Notify all clients about new player
await self.broadcast(create_player_joined_message(player_id, player_name))
# Add snake to game if game is running
if self.game_logic.state.game_running:
snake = self.game_logic.create_snake(player_id)
self.game_logic.state.snakes.append(snake)
print(f"Player {player_name} ({player_id}) joined. Total players: {len(self.clients)}")
async def handle_move(self, player_id: str, message: Message) -> None:
"""Handle a player movement command.
Args:
player_id: ID of the player
message: MOVE message with direction
"""
direction = tuple(message.data.get("direction", (1, 0)))
self.game_logic.update_snake_direction(player_id, direction)
async def handle_start_game(self) -> None:
"""Start the game."""
if self.game_logic.state.game_running:
return
# Create snakes for all connected players
self.game_logic.state.snakes = []
for player_id in self.clients.keys():
snake = self.game_logic.create_snake(player_id)
self.game_logic.state.snakes.append(snake)
# Spawn initial food
for _ in range(3):
self.game_logic.state.food.append(self.game_logic.spawn_food())
self.game_logic.state.game_running = True
# Notify all clients
await self.broadcast(create_game_started_message())
# Start game loop
if self.game_task is None or self.game_task.done():
self.game_task = asyncio.create_task(self.game_loop())
print("Game started!")
async def remove_player(self, player_id: str) -> None:
"""Remove a player from the game.
Args:
player_id: ID of the player to remove
"""
if player_id in self.clients:
del self.clients[player_id]
if player_id in self.player_names:
player_name = self.player_names[player_id]
del self.player_names[player_id]
print(f"Player {player_name} ({player_id}) left. Total players: {len(self.clients)}")
# Remove snake from game
self.game_logic.state.snakes = [
s for s in self.game_logic.state.snakes
if s.player_id != player_id
]
# Notify remaining clients
await self.broadcast(create_player_left_message(player_id))
async def game_loop(self) -> None:
"""Main game loop."""
while self.game_logic.state.game_running:
# Update game state
self.game_logic.update()
# Check for game over
if self.game_logic.is_game_over():
winner_id = self.game_logic.get_winner()
await self.broadcast(create_game_over_message(winner_id))
self.game_logic.state.game_running = False
print(f"Game over! Winner: {winner_id}")
break
# Broadcast state to all clients
state_dict = self.game_logic.state.to_dict()
await self.broadcast(create_state_update_message(state_dict))
# Wait for next tick
await asyncio.sleep(TICK_RATE)
async def send_message(self, writer: asyncio.StreamWriter, message: Message) -> None:
"""Send a message to a client.
Args:
writer: Stream writer for the client
message: Message to send
"""
try:
data = message.to_json() + "\n"
writer.write(data.encode())
await writer.drain()
except Exception as e:
print(f"Error sending message: {e}")
async def broadcast(self, message: Message, exclude: Set[str] = None) -> None:
"""Broadcast a message to all connected clients.
Args:
message: Message to broadcast
exclude: Set of player IDs to exclude from broadcast
"""
exclude = exclude or set()
for player_id, writer in list(self.clients.items()):
if player_id not in exclude:
await self.send_message(writer, message)
def get_player_count(self) -> int:
"""Get the current number of connected players.
Returns:
Number of connected players
"""
return len(self.clients)
async def start(self) -> None:
"""Start the server."""
# Start discovery beacon if enabled
if self.enable_discovery:
self.beacon = ServerBeacon(
server_name=self.server_name,
server_port=self.port,
get_player_count=self.get_player_count,
)
self.beacon_task = asyncio.create_task(self.beacon.start())
server = await asyncio.start_server(
self.handle_client,
self.host,
self.port
)
addr = server.sockets[0].getsockname()
print(f"Snake game server '{self.server_name}' running on {addr[0]}:{addr[1]}")
try:
async with server:
await server.serve_forever()
finally:
# Clean up beacon on shutdown
if self.beacon:
await self.beacon.stop()
if self.beacon_task:
self.beacon_task.cancel()
try:
await self.beacon_task
except asyncio.CancelledError:
pass
async def main() -> None:
"""Run the server."""
server = GameServer()
await server.start()
if __name__ == "__main__":
asyncio.run(main())

125
src/server/server_beacon.py Normal file
View File

@@ -0,0 +1,125 @@
"""Server beacon for multicast discovery."""
import asyncio
import socket
from typing import Callable
from ..shared.discovery import (
ServerInfo,
DiscoveryMessage,
create_multicast_socket,
get_local_ip,
)
from ..shared.constants import MULTICAST_GROUP, MULTICAST_PORT, BEACON_INTERVAL
class ServerBeacon:
"""Broadcasts server presence and responds to discovery requests."""
def __init__(
self,
server_name: str,
server_port: int,
get_player_count: Callable[[], int],
):
"""Initialize the server beacon.
Args:
server_name: Name of the server
server_port: TCP port the game server is listening on
get_player_count: Callable that returns current player count
"""
self.server_name = server_name
self.server_port = server_port
self.get_player_count = get_player_count
self.running = False
self.sock: socket.socket | None = None
self.local_ip = get_local_ip()
def create_server_info(self) -> ServerInfo:
"""Create ServerInfo with current server state.
Returns:
Current server information
"""
return ServerInfo(
host=self.local_ip,
port=self.server_port,
server_name=self.server_name,
players_count=self.get_player_count(),
)
async def start(self) -> None:
"""Start the beacon service."""
self.running = True
self.sock = create_multicast_socket(bind=True)
self.sock.setblocking(False)
print(f"Server beacon started on {MULTICAST_GROUP}:{MULTICAST_PORT}")
print(f"Server IP: {self.local_ip}, Port: {self.server_port}")
# Run both listener and broadcaster concurrently
await asyncio.gather(
self.listen_for_discovery(),
self.broadcast_presence(),
)
async def listen_for_discovery(self) -> None:
"""Listen for DISCOVER messages and respond."""
loop = asyncio.get_event_loop()
while self.running:
try:
# Receive data (non-blocking with asyncio)
data, addr = await loop.sock_recvfrom(self.sock, 1024)
# Parse message
msg_type, msg_data = DiscoveryMessage.parse(data)
if msg_type == DiscoveryMessage.DISCOVER:
# Respond to discovery request
await self.send_announce(addr)
except Exception as e:
# Ignore errors and continue listening
await asyncio.sleep(0.1)
async def send_announce(self, addr: tuple) -> None:
"""Send SERVER_ANNOUNCE message to a specific address.
Args:
addr: (host, port) tuple to send to
"""
try:
server_info = self.create_server_info()
message = DiscoveryMessage.create_announce(server_info)
# Send directly to the requester
self.sock.sendto(message, addr)
print(f"Sent announcement to {addr}")
except Exception as e:
print(f"Error sending announcement: {e}")
async def broadcast_presence(self) -> None:
"""Periodically broadcast server presence to multicast group."""
while self.running:
try:
server_info = self.create_server_info()
message = DiscoveryMessage.create_announce(server_info)
# Broadcast to multicast group
self.sock.sendto(message, (MULTICAST_GROUP, MULTICAST_PORT))
except Exception as e:
print(f"Error broadcasting presence: {e}")
# Wait before next broadcast
await asyncio.sleep(BEACON_INTERVAL)
async def stop(self) -> None:
"""Stop the beacon service."""
self.running = False
if self.sock:
self.sock.close()
print("Server beacon stopped")

0
src/shared/__init__.py Normal file
View File

48
src/shared/constants.py Normal file
View File

@@ -0,0 +1,48 @@
"""Game constants shared between client and server."""
# Network settings
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 8888
# Multicast discovery settings
MULTICAST_GROUP = "239.255.0.1"
MULTICAST_PORT = 9999
DISCOVERY_TIMEOUT = 3.0 # seconds to wait for server responses
BEACON_INTERVAL = 2.0 # how often server announces itself
# Game grid settings
GRID_WIDTH = 40
GRID_HEIGHT = 30
CELL_SIZE = 20 # pixels
# Game timing
TICK_RATE = 0.1 # seconds (10 ticks per second)
FPS = 60 # client rendering FPS
# Game rules
INITIAL_SNAKE_LENGTH = 3
SNAKE_GROWTH = 1 # segments to grow when eating food
# Colors (RGB)
COLOR_BACKGROUND = (0, 0, 0)
COLOR_GRID = (40, 40, 40)
COLOR_FOOD = (255, 0, 0)
COLOR_SNAKES = [
(0, 255, 0), # Green - Player 1
(0, 0, 255), # Blue - Player 2
(255, 255, 0), # Yellow - Player 3
(255, 0, 255), # Magenta - Player 4
]
# Directions
UP = (0, -1)
DOWN = (0, 1)
LEFT = (-1, 0)
RIGHT = (1, 0)
OPPOSITE_DIRECTIONS = {
UP: DOWN,
DOWN: UP,
LEFT: RIGHT,
RIGHT: LEFT,
}

142
src/shared/discovery.py Normal file
View File

@@ -0,0 +1,142 @@
"""Server discovery utilities using UDP multicast."""
import json
import socket
import struct
import time
from dataclasses import dataclass
from typing import Optional
from .constants import MULTICAST_GROUP, MULTICAST_PORT
@dataclass
class ServerInfo:
"""Information about a discovered server."""
host: str
port: int
server_name: str
players_count: int
last_seen: float = 0.0
def to_dict(self) -> dict:
"""Convert to dictionary for serialization."""
return {
"host": self.host,
"port": self.port,
"server_name": self.server_name,
"players_count": self.players_count,
}
@classmethod
def from_dict(cls, data: dict, host: str = None) -> "ServerInfo":
"""Create ServerInfo from dictionary.
Args:
data: Dictionary with server information
host: Optional override for host address
"""
return cls(
host=host or data["host"],
port=data["port"],
server_name=data["server_name"],
players_count=data["players_count"],
last_seen=time.time(),
)
class DiscoveryMessage:
"""Discovery protocol messages."""
DISCOVER = "DISCOVER"
SERVER_ANNOUNCE = "SERVER_ANNOUNCE"
@staticmethod
def create_discover() -> bytes:
"""Create a DISCOVER message.
Returns:
Encoded message bytes
"""
msg = {"type": DiscoveryMessage.DISCOVER}
return json.dumps(msg).encode("utf-8")
@staticmethod
def create_announce(server_info: ServerInfo) -> bytes:
"""Create a SERVER_ANNOUNCE message.
Args:
server_info: Server information to announce
Returns:
Encoded message bytes
"""
msg = {
"type": DiscoveryMessage.SERVER_ANNOUNCE,
"data": server_info.to_dict(),
}
return json.dumps(msg).encode("utf-8")
@staticmethod
def parse(data: bytes) -> tuple[str, Optional[dict]]:
"""Parse a discovery message.
Args:
data: Raw message bytes
Returns:
Tuple of (message_type, data_dict)
"""
try:
msg = json.loads(data.decode("utf-8"))
msg_type = msg.get("type")
msg_data = msg.get("data")
return msg_type, msg_data
except Exception:
return None, None
def create_multicast_socket(bind: bool = False) -> socket.socket:
"""Create a UDP socket configured for multicast.
Args:
bind: If True, bind to multicast group (for receiving)
Returns:
Configured socket
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
# Allow multiple sockets to bind to the same port
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if bind:
# Bind to the multicast port
sock.bind(("", MULTICAST_PORT))
# Join the multicast group
mreq = struct.pack("4sl", socket.inet_aton(MULTICAST_GROUP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
# Set multicast TTL (time-to-live) for sending
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
return sock
def get_local_ip() -> str:
"""Get the local IP address for LAN communication.
Returns:
Local IP address as string
"""
try:
# Create a UDP socket and connect to a public DNS server
# This doesn't actually send data, just determines routing
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
return local_ip
except Exception:
return "127.0.0.1"

98
src/shared/models.py Normal file
View File

@@ -0,0 +1,98 @@
"""Data models shared between client and server."""
from dataclasses import dataclass, field
from typing import List, Tuple
@dataclass
class Position:
"""Represents a position on the game grid."""
x: int
y: int
def __add__(self, other: Tuple[int, int]) -> "Position":
"""Add a direction tuple to position."""
return Position(self.x + other[0], self.y + other[1])
def to_tuple(self) -> Tuple[int, int]:
"""Convert to tuple for serialization."""
return (self.x, self.y)
@classmethod
def from_tuple(cls, pos: Tuple[int, int]) -> "Position":
"""Create Position from tuple."""
return cls(pos[0], pos[1])
@dataclass
class Snake:
"""Represents a snake in the game."""
player_id: str
body: List[Position] = field(default_factory=list)
direction: Tuple[int, int] = (1, 0) # Default: moving right
alive: bool = True
score: int = 0
def get_head(self) -> Position:
"""Get the head position of the snake."""
return self.body[0] if self.body else Position(0, 0)
def to_dict(self) -> dict:
"""Convert to dictionary for serialization."""
return {
"player_id": self.player_id,
"body": [pos.to_tuple() for pos in self.body],
"direction": self.direction,
"alive": self.alive,
"score": self.score,
}
@classmethod
def from_dict(cls, data: dict) -> "Snake":
"""Create Snake from dictionary."""
snake = cls(player_id=data["player_id"])
snake.body = [Position.from_tuple(pos) for pos in data["body"]]
snake.direction = tuple(data["direction"])
snake.alive = data["alive"]
snake.score = data["score"]
return snake
@dataclass
class Food:
"""Represents food on the game grid."""
position: Position
def to_dict(self) -> dict:
"""Convert to dictionary for serialization."""
return {"position": self.position.to_tuple()}
@classmethod
def from_dict(cls, data: dict) -> "Food":
"""Create Food from dictionary."""
return cls(position=Position.from_tuple(data["position"]))
@dataclass
class GameState:
"""Represents the complete game state."""
snakes: List[Snake] = field(default_factory=list)
food: List[Food] = field(default_factory=list)
game_running: bool = False
def to_dict(self) -> dict:
"""Convert to dictionary for serialization."""
return {
"snakes": [snake.to_dict() for snake in self.snakes],
"food": [f.to_dict() for f in self.food],
"game_running": self.game_running,
}
@classmethod
def from_dict(cls, data: dict) -> "GameState":
"""Create GameState from dictionary."""
state = cls()
state.snakes = [Snake.from_dict(s) for s in data["snakes"]]
state.food = [Food.from_dict(f) for f in data["food"]]
state.game_running = data["game_running"]
return state

116
src/shared/protocol.py Normal file
View File

@@ -0,0 +1,116 @@
"""Network protocol for client-server communication."""
import json
from enum import Enum
from typing import Any, Dict
class MessageType(Enum):
"""Types of messages exchanged between client and server."""
# Client -> Server
JOIN = "JOIN"
MOVE = "MOVE"
START_GAME = "START_GAME"
LEAVE = "LEAVE"
# Server -> Client
WELCOME = "WELCOME"
STATE_UPDATE = "STATE_UPDATE"
PLAYER_JOINED = "PLAYER_JOINED"
PLAYER_LEFT = "PLAYER_LEFT"
GAME_STARTED = "GAME_STARTED"
GAME_OVER = "GAME_OVER"
ERROR = "ERROR"
class Message:
"""Represents a protocol message."""
def __init__(self, msg_type: MessageType, data: Dict[str, Any] = None):
"""Initialize a message.
Args:
msg_type: The type of message
data: Optional message data
"""
self.type = msg_type
self.data = data or {}
def to_json(self) -> str:
"""Serialize message to JSON string."""
return json.dumps({
"type": self.type.value,
"data": self.data
})
@classmethod
def from_json(cls, json_str: str) -> "Message":
"""Deserialize message from JSON string."""
obj = json.loads(json_str)
msg_type = MessageType(obj["type"])
data = obj.get("data", {})
return cls(msg_type, data)
def __repr__(self) -> str:
"""String representation of message."""
return f"Message({self.type.value}, {self.data})"
# Helper functions for creating specific messages
def create_join_message(player_name: str) -> Message:
"""Create a JOIN message."""
return Message(MessageType.JOIN, {"player_name": player_name})
def create_move_message(direction: tuple) -> Message:
"""Create a MOVE message."""
return Message(MessageType.MOVE, {"direction": direction})
def create_start_game_message() -> Message:
"""Create a START_GAME message."""
return Message(MessageType.START_GAME)
def create_leave_message() -> Message:
"""Create a LEAVE message."""
return Message(MessageType.LEAVE)
def create_welcome_message(player_id: str) -> Message:
"""Create a WELCOME message."""
return Message(MessageType.WELCOME, {"player_id": player_id})
def create_state_update_message(game_state: dict) -> Message:
"""Create a STATE_UPDATE message."""
return Message(MessageType.STATE_UPDATE, {"game_state": game_state})
def create_player_joined_message(player_id: str, player_name: str) -> Message:
"""Create a PLAYER_JOINED message."""
return Message(MessageType.PLAYER_JOINED, {
"player_id": player_id,
"player_name": player_name
})
def create_player_left_message(player_id: str) -> Message:
"""Create a PLAYER_LEFT message."""
return Message(MessageType.PLAYER_LEFT, {"player_id": player_id})
def create_game_started_message() -> Message:
"""Create a GAME_STARTED message."""
return Message(MessageType.GAME_STARTED)
def create_game_over_message(winner_id: str = None) -> Message:
"""Create a GAME_OVER message."""
return Message(MessageType.GAME_OVER, {"winner_id": winner_id})
def create_error_message(error: str) -> Message:
"""Create an ERROR message."""
return Message(MessageType.ERROR, {"error": error})

0
tests/__init__.py Normal file
View File

159
tests/test_discovery.py Normal file
View File

@@ -0,0 +1,159 @@
"""Tests for server discovery functionality."""
import pytest
from src.shared.discovery import (
ServerInfo,
DiscoveryMessage,
create_multicast_socket,
get_local_ip,
)
class TestServerInfo:
"""Test suite for ServerInfo class."""
def test_server_info_creation(self) -> None:
"""Test creating ServerInfo."""
info = ServerInfo(
host="192.168.1.100",
port=8888,
server_name="Test Server",
players_count=3,
)
assert info.host == "192.168.1.100"
assert info.port == 8888
assert info.server_name == "Test Server"
assert info.players_count == 3
def test_server_info_serialization(self) -> None:
"""Test ServerInfo to_dict and from_dict."""
info = ServerInfo(
host="10.0.0.1",
port=9999,
server_name="Game Room",
players_count=2,
)
# Serialize
data = info.to_dict()
assert data["host"] == "10.0.0.1"
assert data["port"] == 9999
assert data["server_name"] == "Game Room"
assert data["players_count"] == 2
# Deserialize
info2 = ServerInfo.from_dict(data)
assert info2.host == info.host
assert info2.port == info.port
assert info2.server_name == info.server_name
assert info2.players_count == info.players_count
def test_server_info_host_override(self) -> None:
"""Test ServerInfo from_dict with host override."""
data = {
"host": "1.2.3.4",
"port": 8888,
"server_name": "Test",
"players_count": 1,
}
# Override host
info = ServerInfo.from_dict(data, host="5.6.7.8")
assert info.host == "5.6.7.8"
assert info.port == 8888
class TestDiscoveryMessage:
"""Test suite for DiscoveryMessage class."""
def test_create_discover_message(self) -> None:
"""Test creating DISCOVER message."""
msg = DiscoveryMessage.create_discover()
assert isinstance(msg, bytes)
# Parse it back
msg_type, msg_data = DiscoveryMessage.parse(msg)
assert msg_type == DiscoveryMessage.DISCOVER
assert msg_data is None or msg_data == {}
def test_create_announce_message(self) -> None:
"""Test creating SERVER_ANNOUNCE message."""
server_info = ServerInfo(
host="192.168.1.1",
port=8888,
server_name="Test Server",
players_count=5,
)
msg = DiscoveryMessage.create_announce(server_info)
assert isinstance(msg, bytes)
# Parse it back
msg_type, msg_data = DiscoveryMessage.parse(msg)
assert msg_type == DiscoveryMessage.SERVER_ANNOUNCE
assert msg_data is not None
assert msg_data["host"] == "192.168.1.1"
assert msg_data["port"] == 8888
assert msg_data["server_name"] == "Test Server"
assert msg_data["players_count"] == 5
def test_parse_invalid_message(self) -> None:
"""Test parsing invalid message."""
msg_type, msg_data = DiscoveryMessage.parse(b"invalid json")
assert msg_type is None
assert msg_data is None
def test_message_round_trip(self) -> None:
"""Test encoding and decoding messages."""
# Test DISCOVER
discover = DiscoveryMessage.create_discover()
msg_type, _ = DiscoveryMessage.parse(discover)
assert msg_type == DiscoveryMessage.DISCOVER
# Test ANNOUNCE
info = ServerInfo(
host="10.0.0.1",
port=7777,
server_name="Round Trip",
players_count=0,
)
announce = DiscoveryMessage.create_announce(info)
msg_type, msg_data = DiscoveryMessage.parse(announce)
assert msg_type == DiscoveryMessage.SERVER_ANNOUNCE
assert msg_data["server_name"] == "Round Trip"
class TestDiscoveryUtilities:
"""Test suite for discovery utility functions."""
def test_create_multicast_socket(self) -> None:
"""Test creating multicast socket."""
# Test without binding
sock = create_multicast_socket(bind=False)
assert sock is not None
sock.close()
# Test with binding (may fail in some environments)
try:
sock = create_multicast_socket(bind=True)
assert sock is not None
sock.close()
except Exception:
# Binding may fail in restricted environments
pytest.skip("Cannot bind to multicast group in this environment")
def test_get_local_ip(self) -> None:
"""Test getting local IP address."""
ip = get_local_ip()
assert isinstance(ip, str)
assert len(ip) > 0
# Should be a valid IP format
parts = ip.split(".")
assert len(parts) == 4
# Each part should be a number 0-255
for part in parts:
num = int(part)
assert 0 <= num <= 255

148
tests/test_game_logic.py Normal file
View File

@@ -0,0 +1,148 @@
"""Tests for game logic."""
import pytest
from src.server.game_logic import GameLogic
from src.shared.models import Position, Snake
from src.shared.constants import GRID_WIDTH, GRID_HEIGHT, RIGHT, LEFT, UP, DOWN
class TestGameLogic:
"""Test suite for GameLogic class."""
def test_create_snake(self) -> None:
"""Test snake creation."""
logic = GameLogic()
snake = logic.create_snake("player1")
assert snake.player_id == "player1"
assert len(snake.body) == 3
assert snake.alive is True
assert snake.score == 0
assert snake.direction == RIGHT
def test_spawn_food(self) -> None:
"""Test food spawning."""
logic = GameLogic()
food = logic.spawn_food()
assert 0 <= food.position.x < GRID_WIDTH
assert 0 <= food.position.y < GRID_HEIGHT
def test_update_snake_direction(self) -> None:
"""Test updating snake direction."""
logic = GameLogic()
snake = logic.create_snake("player1")
logic.state.snakes.append(snake)
# Valid direction change
logic.update_snake_direction("player1", UP)
assert snake.direction == UP
# Invalid 180-degree turn (should be ignored)
logic.update_snake_direction("player1", DOWN)
assert snake.direction == UP # Should remain UP
def test_move_snakes(self) -> None:
"""Test snake movement."""
logic = GameLogic()
snake = Snake(player_id="player1", body=[
Position(5, 5),
Position(4, 5),
Position(3, 5),
], direction=RIGHT)
logic.state.snakes.append(snake)
initial_length = len(snake.body)
logic.move_snakes()
# Snake should have moved one cell to the right
assert snake.get_head().x == 6
assert snake.get_head().y == 5
assert len(snake.body) == initial_length
def test_collision_with_wall(self) -> None:
"""Test collision detection with walls."""
logic = GameLogic()
# Snake at left wall
snake = Snake(player_id="player1", body=[
Position(0, 5),
Position(1, 5),
Position(2, 5),
], direction=LEFT)
logic.state.snakes.append(snake)
logic.move_snakes()
logic.check_collisions()
assert snake.alive is False
def test_collision_with_self(self) -> None:
"""Test collision detection with self."""
logic = GameLogic()
# Create a snake that will hit itself
snake = Snake(player_id="player1", body=[
Position(5, 5),
Position(5, 6),
Position(6, 6),
Position(6, 5),
], direction=DOWN)
logic.state.snakes.append(snake)
logic.move_snakes()
logic.check_collisions()
assert snake.alive is False
def test_food_eating(self) -> None:
"""Test snake eating food and growing."""
logic = GameLogic()
snake = Snake(player_id="player1", body=[
Position(5, 5),
Position(4, 5),
Position(3, 5),
], direction=RIGHT)
logic.state.snakes.append(snake)
# Place food in front of snake
from src.shared.models import Food
food = Food(position=Position(6, 5))
logic.state.food.append(food)
initial_length = len(snake.body)
initial_score = snake.score
logic.move_snakes()
# Snake should have grown and scored
assert len(snake.body) == initial_length + 1
assert snake.score == initial_score + 10
assert food not in logic.state.food
def test_is_game_over(self) -> None:
"""Test game over detection."""
logic = GameLogic()
# No game over with multiple alive snakes
snake1 = Snake(player_id="player1", alive=True)
snake2 = Snake(player_id="player2", alive=True)
logic.state.snakes = [snake1, snake2]
assert logic.is_game_over() is False
# Game over when only one snake alive
snake2.alive = False
assert logic.is_game_over() is True
def test_get_winner(self) -> None:
"""Test winner determination."""
logic = GameLogic()
snake1 = Snake(player_id="player1", alive=True)
snake2 = Snake(player_id="player2", alive=False)
logic.state.snakes = [snake1, snake2]
winner = logic.get_winner()
assert winner == "player1"

135
tests/test_models.py Normal file
View File

@@ -0,0 +1,135 @@
"""Tests for data models."""
import pytest
from src.shared.models import Position, Snake, Food, GameState
class TestPosition:
"""Test suite for Position class."""
def test_position_creation(self) -> None:
"""Test creating a position."""
pos = Position(5, 10)
assert pos.x == 5
assert pos.y == 10
def test_position_addition(self) -> None:
"""Test adding a direction to a position."""
pos = Position(5, 10)
new_pos = pos + (1, -1)
assert new_pos.x == 6
assert new_pos.y == 9
def test_position_to_tuple(self) -> None:
"""Test converting position to tuple."""
pos = Position(3, 7)
assert pos.to_tuple() == (3, 7)
def test_position_from_tuple(self) -> None:
"""Test creating position from tuple."""
pos = Position.from_tuple((3, 7))
assert pos.x == 3
assert pos.y == 7
class TestSnake:
"""Test suite for Snake class."""
def test_snake_creation(self) -> None:
"""Test creating a snake."""
snake = Snake(player_id="test123")
assert snake.player_id == "test123"
assert snake.alive is True
assert snake.score == 0
assert snake.direction == (1, 0)
def test_get_head(self) -> None:
"""Test getting snake head position."""
snake = Snake(
player_id="test",
body=[Position(5, 5), Position(4, 5), Position(3, 5)]
)
head = snake.get_head()
assert head.x == 5
assert head.y == 5
def test_snake_serialization(self) -> None:
"""Test snake to_dict and from_dict."""
snake = Snake(
player_id="test123",
body=[Position(5, 5), Position(4, 5)],
direction=(0, 1),
alive=False,
score=100
)
# Serialize
data = snake.to_dict()
assert data["player_id"] == "test123"
assert data["body"] == [(5, 5), (4, 5)]
assert data["direction"] == (0, 1)
assert data["alive"] is False
assert data["score"] == 100
# Deserialize
snake2 = Snake.from_dict(data)
assert snake2.player_id == snake.player_id
assert len(snake2.body) == len(snake.body)
assert snake2.body[0].x == snake.body[0].x
assert snake2.direction == snake.direction
assert snake2.alive == snake.alive
assert snake2.score == snake.score
class TestFood:
"""Test suite for Food class."""
def test_food_creation(self) -> None:
"""Test creating food."""
food = Food(position=Position(10, 15))
assert food.position.x == 10
assert food.position.y == 15
def test_food_serialization(self) -> None:
"""Test food to_dict and from_dict."""
food = Food(position=Position(10, 15))
# Serialize
data = food.to_dict()
assert data["position"] == (10, 15)
# Deserialize
food2 = Food.from_dict(data)
assert food2.position.x == food.position.x
assert food2.position.y == food.position.y
class TestGameState:
"""Test suite for GameState class."""
def test_game_state_creation(self) -> None:
"""Test creating game state."""
state = GameState()
assert len(state.snakes) == 0
assert len(state.food) == 0
assert state.game_running is False
def test_game_state_serialization(self) -> None:
"""Test game state to_dict and from_dict."""
state = GameState()
state.snakes.append(Snake(player_id="p1", body=[Position(5, 5)]))
state.food.append(Food(position=Position(10, 10)))
state.game_running = True
# Serialize
data = state.to_dict()
assert len(data["snakes"]) == 1
assert len(data["food"]) == 1
assert data["game_running"] is True
# Deserialize
state2 = GameState.from_dict(data)
assert len(state2.snakes) == 1
assert len(state2.food) == 1
assert state2.game_running is True
assert state2.snakes[0].player_id == "p1"