commit 070356144652ab88dd25d93b9dac625e8b9f5bc2 Author: Vladyslav Doloman Date: Sat Oct 4 13:50:16 2025 +0300 Initial commit: Multiplayer Snake game with server discovery Implemented a complete network multiplayer Snake game with the following features: Core Game: - Client-server architecture using asyncio for networking - Pygame-based rendering at 60 FPS - Server-authoritative game state with 10 TPS - Collision detection (walls, self, other players) - Food spawning and score tracking - Support for multiple players with color-coded snakes Server Discovery: - UDP multicast-based automatic server discovery (239.255.0.1:9999) - Server beacon broadcasts presence every 2 seconds - Client discovery with 3-second timeout - Server selection UI for multiple servers - Auto-connect for single server - Graceful fallback to manual connection Project Structure: - src/shared/ - Protocol, models, constants, discovery utilities - src/server/ - Game server, game logic, server beacon - src/client/ - Game client, renderer, discovery, server selector - tests/ - Unit tests for game logic, models, and discovery Command-line interface with argparse for both server and client. Comprehensive documentation in README.md and CLAUDE.md. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..54aab38 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,11 @@ +{ + "permissions": { + "allow": [ + "Bash(mkdir:*)", + "Bash(git init:*)", + "Bash(python:*)" + ], + "deny": [], + "ask": [] + } +} \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b5e9f5e --- /dev/null +++ b/.gitignore @@ -0,0 +1,41 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +venv/ +env/ +ENV/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ + +# IDEs +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..abc6f19 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,110 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +This is a **network multiplayer Snake game** built with Python 3.11, using asyncio for networking, pygame for graphics, and UDP multicast for automatic server discovery. + +**Project Structure:** +- `src/shared/` - Shared code (models, protocol, constants, discovery utilities) +- `src/server/` - Game server with authoritative game state and multicast beacon +- `src/client/` - Game client with pygame rendering, discovery, and server selection UI +- `tests/` - Test files using pytest + +## Setup Commands + +```bash +# Create and activate virtual environment +python -m venv venv +venv\Scripts\activate # Windows +# source venv/bin/activate # Linux/Mac + +# Install dependencies +pip install -r requirements.txt + +# Install development dependencies +pip install -r requirements-dev.txt +``` + +## Running the Game + +```bash +# Start the server (with discovery beacon enabled by default) +python run_server.py --name "My Game" --port 8888 + +# Start clients with auto-discovery (no host needed) +python run_client.py --name Alice +# If multiple servers found, a selection UI appears + +# Manual connection (skip discovery) +python run_client.py 192.168.1.100 --port 8888 --name Bob + +# Server options: +# --host HOST Bind address (default: localhost) +# --port PORT Port number (default: 8888) +# --name NAME Server name for discovery +# --no-discovery Disable multicast beacon + +# Client options: +# [host] Server host (omit for auto-discovery) +# --port PORT Server port +# --name NAME Player name +# --discover Force discovery mode + +# Press SPACE to start the game, arrow keys/WASD to move +``` + +## Development Commands + +```bash +# Run all tests +pytest + +# Run tests with coverage +pytest --cov=src --cov-report=html + +# Format code with black +black src/ tests/ + +# Lint code +flake8 src/ tests/ + +# Type check +mypy src/ +``` + +## Architecture + +**Client-Server Model:** +- Server (`src/server/game_server.py`) runs the authoritative game loop using asyncio +- Clients connect via TCP and send input commands (MOVE, START_GAME, etc.) +- Server broadcasts game state updates to all connected clients at 10 FPS +- Clients render the game state locally at 60 FPS using pygame + +**Key Components:** +- `src/shared/protocol.py` - JSON-based message protocol (MessageType enum, Message class) +- `src/shared/models.py` - Data models (Snake, Position, Food, GameState) with serialization +- `src/shared/constants.py` - Game configuration (grid size, colors, tick rate, multicast settings) +- `src/shared/discovery.py` - Multicast discovery utilities (ServerInfo, DiscoveryMessage) +- `src/server/game_logic.py` - Game rules (movement, collision detection, food spawning) +- `src/server/server_beacon.py` - UDP multicast beacon for server discovery +- `src/client/renderer.py` - Pygame rendering of game state +- `src/client/server_discovery.py` - Client-side server discovery +- `src/client/server_selector.py` - Pygame UI for selecting from multiple servers + +**Game Flow:** +1. Server starts on port 8888 and broadcasts presence on multicast group 239.255.0.1:9999 +2. Clients send DISCOVER message to multicast group and collect SERVER_ANNOUNCE responses +3. If multiple servers found, client shows selection UI; if one found, auto-connects +4. Clients connect via TCP and receive WELCOME message with player_id +5. Any player can press SPACE to send START_GAME message +6. Server creates snakes for all connected players and spawns food +7. Server runs game loop: update positions → check collisions → broadcast state +8. Game ends when only 0-1 snakes remain alive + +**Discovery Protocol:** +- Multicast group: 239.255.0.1:9999 (local network) +- Client → Multicast: DISCOVER (broadcast request) +- Server → Client: SERVER_ANNOUNCE (direct response with host, port, name, player count) +- Server also periodically broadcasts presence every 2 seconds diff --git a/README.md b/README.md new file mode 100644 index 0000000..d465e54 --- /dev/null +++ b/README.md @@ -0,0 +1,93 @@ +# Multiplayer Snake Game + +A network multiplayer Snake game built with Python, asyncio, and pygame. + +## Features + +- Real-time multiplayer gameplay with client-server architecture +- **Automatic server discovery** using multicast (zero-configuration LAN play) +- Support for multiple players simultaneously +- Classic Snake gameplay with collision detection +- Color-coded snakes for each player +- Score tracking and win conditions + +## Setup + +```bash +# Create and activate virtual environment +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate + +# Install dependencies +pip install -r requirements.txt + +# For development +pip install -r requirements-dev.txt +``` + +## Running the Game + +### Quick Start (Auto-Discovery) + +1. **Start the server** (in one terminal): + ```bash + python run_server.py + # Optional: python run_server.py --name "My Server" --port 8888 + ``` + +2. **Start one or more clients** (in separate terminals): + ```bash + python run_client.py --name Alice + # Clients will automatically discover servers on the local network + ``` + +### Manual Connection + +```bash +# Server +python run_server.py --host 0.0.0.0 --port 8888 --name "Game Room" + +# Client (specify host directly) +python run_client.py 192.168.1.100 --port 8888 --name Bob +``` + +### Server Options + +```bash +python run_server.py --help +# --host HOST Host address to bind to (default: localhost) +# --port PORT Port number (default: 8888) +# --name NAME Server name for discovery (default: Snake Server) +# --no-discovery Disable multicast beacon +``` + +### Client Options + +```bash +python run_client.py --help +# [host] Server host (omit to use auto-discovery) +# --port PORT Server port (default: 8888) +# --name NAME Your player name (default: Player) +# --discover Force discovery mode +``` + +### Playing the Game + +- Press **SPACE** to start the game (any player can start) +- Use **arrow keys** or **WASD** to control your snake +- Eat food to grow and score points +- Avoid walls and other snakes + +## Testing + +```bash +pytest +pytest --cov=src --cov-report=html # With coverage +``` + +## Project Structure + +- `src/server/` - Game server with authoritative game state +- `src/client/` - Game client with pygame rendering +- `src/shared/` - Shared code (models, protocol, constants) +- `tests/` - Unit tests diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..69cbf53 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,15 @@ +[tool.black] +line-length = 88 +target-version = ['py311'] + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py"] +python_classes = ["Test*"] +python_functions = ["test_*"] + +[tool.mypy] +python_version = "3.11" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..b53b9cc --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,8 @@ +-r requirements.txt + +# Development dependencies +pytest>=7.0.0 +pytest-cov>=4.0.0 +black>=23.0.0 +flake8>=6.0.0 +mypy>=1.0.0 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..58cb28d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +# Production dependencies +pygame>=2.5.0 diff --git a/run_client.py b/run_client.py new file mode 100644 index 0000000..791a59f --- /dev/null +++ b/run_client.py @@ -0,0 +1,46 @@ +"""Run the Snake game client.""" + +import asyncio +import argparse +from src.client.game_client import main +from src.shared.constants import DEFAULT_PORT + + +async def run_client() -> None: + """Run the client with command line arguments.""" + parser = argparse.ArgumentParser(description="Run the Snake game client") + parser.add_argument( + "host", + nargs="?", + default=None, + help="Server host address (omit to use auto-discovery)", + ) + parser.add_argument( + "--port", + type=int, + default=DEFAULT_PORT, + help=f"Server port number (default: {DEFAULT_PORT})", + ) + parser.add_argument( + "--name", + default="Player", + help="Your player name (default: Player)", + ) + parser.add_argument( + "--discover", + action="store_true", + help="Force server discovery even if host is specified", + ) + + args = parser.parse_args() + + await main( + host=args.host, + port=args.port, + name=args.name, + discover=args.discover, + ) + + +if __name__ == "__main__": + asyncio.run(run_client()) diff --git a/run_server.py b/run_server.py new file mode 100644 index 0000000..c3eb726 --- /dev/null +++ b/run_server.py @@ -0,0 +1,47 @@ +"""Run the Snake game server.""" + +import asyncio +import argparse +from src.server.game_server import GameServer +from src.shared.constants import DEFAULT_HOST, DEFAULT_PORT + + +async def main() -> None: + """Run the server with command line arguments.""" + parser = argparse.ArgumentParser(description="Run the Snake game server") + parser.add_argument( + "--host", + default=DEFAULT_HOST, + help=f"Host address to bind to (default: {DEFAULT_HOST})", + ) + parser.add_argument( + "--port", + type=int, + default=DEFAULT_PORT, + help=f"Port number to bind to (default: {DEFAULT_PORT})", + ) + parser.add_argument( + "--name", + default="Snake Server", + help="Server name for discovery (default: Snake Server)", + ) + parser.add_argument( + "--no-discovery", + action="store_true", + help="Disable multicast discovery beacon", + ) + + args = parser.parse_args() + + server = GameServer( + host=args.host, + port=args.port, + server_name=args.name, + enable_discovery=not args.no_discovery, + ) + + await server.start() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/client/__init__.py b/src/client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/client/game_client.py b/src/client/game_client.py new file mode 100644 index 0000000..e29d553 --- /dev/null +++ b/src/client/game_client.py @@ -0,0 +1,283 @@ +"""Multiplayer Snake game client.""" + +import asyncio +import pygame +from typing import Optional + +from ..shared.protocol import ( + Message, + MessageType, + create_join_message, + create_move_message, + create_start_game_message, +) +from ..shared.models import GameState +from ..shared.constants import ( + DEFAULT_HOST, + DEFAULT_PORT, + FPS, + UP, + DOWN, + LEFT, + RIGHT, +) +from .renderer import Renderer +from .server_discovery import discover_servers +from .server_selector import select_server + + +class GameClient: + """Multiplayer Snake game client.""" + + def __init__(self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT, player_name: str = "Player"): + """Initialize the game client. + + Args: + host: Server host address + port: Server port number + player_name: Name of the player + """ + self.host = host + self.port = port + self.player_name = player_name + self.player_id: Optional[str] = None + + self.renderer = Renderer() + self.game_state: Optional[GameState] = None + + self.reader: Optional[asyncio.StreamReader] = None + self.writer: Optional[asyncio.StreamWriter] = None + + self.running = True + self.clock = pygame.time.Clock() + + async def connect(self) -> None: + """Connect to the game server.""" + try: + self.reader, self.writer = await asyncio.open_connection( + self.host, self.port + ) + print(f"Connected to server at {self.host}:{self.port}") + + # Send JOIN message + await self.send_message(create_join_message(self.player_name)) + + except Exception as e: + print(f"Failed to connect to server: {e}") + raise + + async def send_message(self, message: Message) -> None: + """Send a message to the server. + + Args: + message: Message to send + """ + if self.writer is None: + return + + try: + data = message.to_json() + "\n" + self.writer.write(data.encode()) + await self.writer.drain() + except Exception as e: + print(f"Error sending message: {e}") + + async def receive_messages(self) -> None: + """Receive and process messages from the server.""" + if self.reader is None: + return + + try: + while self.running: + data = await self.reader.readline() + if not data: + print("Connection closed by server") + self.running = False + break + + # Parse message + try: + message = Message.from_json(data.decode().strip()) + await self.handle_message(message) + except Exception as e: + print(f"Error parsing message: {e}") + + except asyncio.CancelledError: + pass + except Exception as e: + print(f"Error receiving messages: {e}") + self.running = False + + async def handle_message(self, message: Message) -> None: + """Handle a message from the server. + + Args: + message: Message received + """ + if message.type == MessageType.WELCOME: + self.player_id = message.data.get("player_id") + print(f"Assigned player ID: {self.player_id}") + + elif message.type == MessageType.STATE_UPDATE: + state_dict = message.data.get("game_state") + self.game_state = GameState.from_dict(state_dict) + + elif message.type == MessageType.PLAYER_JOINED: + player_id = message.data.get("player_id") + player_name = message.data.get("player_name") + print(f"Player {player_name} joined") + + elif message.type == MessageType.PLAYER_LEFT: + player_id = message.data.get("player_id") + print(f"Player {player_id} left") + + elif message.type == MessageType.GAME_STARTED: + print("Game started!") + + elif message.type == MessageType.GAME_OVER: + winner_id = message.data.get("winner_id") + print(f"Game over! Winner: {winner_id}") + + elif message.type == MessageType.ERROR: + error = message.data.get("error") + print(f"Error from server: {error}") + + def handle_input(self) -> None: + """Handle pygame input events.""" + for event in pygame.event.get(): + if event.type == pygame.QUIT: + self.running = False + + elif event.type == pygame.KEYDOWN: + # Movement keys + if event.key == pygame.K_UP or event.key == pygame.K_w: + asyncio.create_task(self.send_message(create_move_message(UP))) + elif event.key == pygame.K_DOWN or event.key == pygame.K_s: + asyncio.create_task(self.send_message(create_move_message(DOWN))) + elif event.key == pygame.K_LEFT or event.key == pygame.K_a: + asyncio.create_task(self.send_message(create_move_message(LEFT))) + elif event.key == pygame.K_RIGHT or event.key == pygame.K_d: + asyncio.create_task(self.send_message(create_move_message(RIGHT))) + + # Start game with SPACE + elif event.key == pygame.K_SPACE: + asyncio.create_task(self.send_message(create_start_game_message())) + print("Requesting to start game...") + + async def game_loop(self) -> None: + """Main client game loop.""" + # Start receiving messages in background + receive_task = asyncio.create_task(self.receive_messages()) + + try: + while self.running: + # Handle input + self.handle_input() + + # Render current state + self.renderer.render(self.game_state, self.player_id) + + # Maintain frame rate + self.clock.tick(FPS) + + # Allow other async tasks to run + await asyncio.sleep(0) + + finally: + receive_task.cancel() + try: + await receive_task + except asyncio.CancelledError: + pass + + async def run(self) -> None: + """Run the client.""" + try: + await self.connect() + await self.game_loop() + finally: + await self.close() + + async def close(self) -> None: + """Clean up resources.""" + if self.writer: + self.writer.close() + await self.writer.wait_closed() + + self.renderer.close() + + @classmethod + async def from_discovery(cls, player_name: str = "Player") -> Optional["GameClient"]: + """Create a client by discovering servers on the network. + + Args: + player_name: Name of the player + + Returns: + GameClient instance if server selected, None if cancelled + """ + print("Discovering servers on local network...") + servers = await discover_servers() + + if not servers: + print("No servers found!") + return None + + # If only one server, auto-connect + if len(servers) == 1: + server = servers[0] + print(f"Auto-connecting to {server.server_name} at {server.host}:{server.port}") + return cls(server.host, server.port, player_name) + + # Multiple servers - show selection UI + print(f"Found {len(servers)} server(s). Opening selection menu...") + result = select_server(servers) + + if result is None: + print("Server selection cancelled") + return None + + host, port = result + print(f"Selected server at {host}:{port}") + return cls(host, port, player_name) + + +async def main( + host: str = None, + port: int = DEFAULT_PORT, + name: str = "Player", + discover: bool = False, +) -> None: + """Run the game client. + + Args: + host: Server host address (None to use discovery) + port: Server port number + name: Player name + discover: Use server discovery instead of direct connection + """ + # Use discovery if requested or if no host specified + if discover or host is None: + client = await GameClient.from_discovery(name) + if client is None: + print("Exiting...") + return + else: + client = GameClient(host, port, name) + + await client.run() + + +if __name__ == "__main__": + import sys + + # Check for --discover flag + discover_mode = "--discover" in sys.argv + if discover_mode: + sys.argv.remove("--discover") + + host = sys.argv[1] if len(sys.argv) > 1 else None + port = int(sys.argv[2]) if len(sys.argv) > 2 else DEFAULT_PORT + name = sys.argv[3] if len(sys.argv) > 3 else "Player" + + asyncio.run(main(host, port, name, discover=discover_mode or host is None)) diff --git a/src/client/renderer.py b/src/client/renderer.py new file mode 100644 index 0000000..23ef918 --- /dev/null +++ b/src/client/renderer.py @@ -0,0 +1,169 @@ +"""Pygame renderer for Snake game client.""" + +import pygame +from typing import Optional + +from ..shared.models import GameState, Snake, Position +from ..shared.constants import ( + GRID_WIDTH, + GRID_HEIGHT, + CELL_SIZE, + COLOR_BACKGROUND, + COLOR_GRID, + COLOR_FOOD, + COLOR_SNAKES, +) + + +class Renderer: + """Handles rendering the game using pygame.""" + + def __init__(self) -> None: + """Initialize the renderer.""" + pygame.init() + + self.screen_width = GRID_WIDTH * CELL_SIZE + self.screen_height = GRID_HEIGHT * CELL_SIZE + self.screen = pygame.display.set_mode((self.screen_width, self.screen_height)) + pygame.display.set_caption("Multiplayer Snake") + + self.font = pygame.font.Font(None, 36) + self.small_font = pygame.font.Font(None, 24) + + def render(self, game_state: Optional[GameState], player_id: Optional[str] = None) -> None: + """Render the current game state. + + Args: + game_state: Current game state to render + player_id: ID of the current player (for highlighting) + """ + # Clear screen + self.screen.fill(COLOR_BACKGROUND) + + if game_state is None: + self.render_waiting_screen() + pygame.display.flip() + return + + # Draw grid + self.draw_grid() + + # Draw food + for food in game_state.food: + self.draw_cell(food.position, COLOR_FOOD) + + # Draw snakes + for i, snake in enumerate(game_state.snakes): + color = COLOR_SNAKES[i % len(COLOR_SNAKES)] + + # Draw body + for segment in snake.body: + self.draw_cell(segment, color) + + # Draw head slightly brighter + if snake.body and snake.alive: + head_color = tuple(min(c + 50, 255) for c in color) + self.draw_cell(snake.body[0], head_color) + + # Draw scores + self.draw_scores(game_state, player_id) + + # Draw game over message if needed + if not game_state.game_running and game_state.snakes: + self.draw_game_over(game_state) + + pygame.display.flip() + + def draw_grid(self) -> None: + """Draw the game grid.""" + for x in range(0, self.screen_width, CELL_SIZE): + pygame.draw.line(self.screen, COLOR_GRID, (x, 0), (x, self.screen_height)) + for y in range(0, self.screen_height, CELL_SIZE): + pygame.draw.line(self.screen, COLOR_GRID, (0, y), (self.screen_width, y)) + + def draw_cell(self, position: Position, color: tuple) -> None: + """Draw a single cell. + + Args: + position: Grid position to draw + color: RGB color tuple + """ + rect = pygame.Rect( + position.x * CELL_SIZE, + position.y * CELL_SIZE, + CELL_SIZE, + CELL_SIZE + ) + pygame.draw.rect(self.screen, color, rect) + pygame.draw.rect(self.screen, COLOR_BACKGROUND, rect, 1) # Border + + def draw_scores(self, game_state: GameState, player_id: Optional[str]) -> None: + """Draw player scores. + + Args: + game_state: Current game state + player_id: Current player's ID + """ + y_offset = 10 + for i, snake in enumerate(game_state.snakes): + color = COLOR_SNAKES[i % len(COLOR_SNAKES)] + + # Prepare score text + prefix = "YOU: " if snake.player_id == player_id else f"P{i+1}: " + status = "" if snake.alive else " (DEAD)" + text = f"{prefix}Score {snake.score}{status}" + + # Render text with background + text_surface = self.small_font.render(text, True, color) + text_rect = text_surface.get_rect(topleft=(10, y_offset)) + + # Draw semi-transparent background + bg_rect = text_rect.inflate(10, 5) + bg_surface = pygame.Surface(bg_rect.size, pygame.SRCALPHA) + bg_surface.fill((0, 0, 0, 180)) + self.screen.blit(bg_surface, bg_rect) + + # Draw text + self.screen.blit(text_surface, text_rect) + y_offset += 30 + + def draw_game_over(self, game_state: GameState) -> None: + """Draw game over message. + + Args: + game_state: Final game state + """ + alive_snakes = [s for s in game_state.snakes if s.alive] + + if alive_snakes: + winner = alive_snakes[0] + text = f"Game Over! Winner: {winner.player_id[:8]}" + else: + text = "Game Over! No winner" + + text_surface = self.font.render(text, True, (255, 255, 255)) + text_rect = text_surface.get_rect( + center=(self.screen_width // 2, self.screen_height // 2) + ) + + # Draw background + bg_rect = text_rect.inflate(40, 20) + bg_surface = pygame.Surface(bg_rect.size, pygame.SRCALPHA) + bg_surface.fill((0, 0, 0, 200)) + self.screen.blit(bg_surface, bg_rect) + + # Draw text + self.screen.blit(text_surface, text_rect) + + def render_waiting_screen(self) -> None: + """Render waiting for connection screen.""" + text = "Connecting to server..." + text_surface = self.font.render(text, True, (255, 255, 255)) + text_rect = text_surface.get_rect( + center=(self.screen_width // 2, self.screen_height // 2) + ) + self.screen.blit(text_surface, text_rect) + + def close(self) -> None: + """Clean up pygame resources.""" + pygame.quit() diff --git a/src/client/server_discovery.py b/src/client/server_discovery.py new file mode 100644 index 0000000..606f143 --- /dev/null +++ b/src/client/server_discovery.py @@ -0,0 +1,111 @@ +"""Client-side server discovery using multicast.""" + +import asyncio +import socket +from typing import List + +from ..shared.discovery import ( + ServerInfo, + DiscoveryMessage, + create_multicast_socket, +) +from ..shared.constants import MULTICAST_GROUP, MULTICAST_PORT, DISCOVERY_TIMEOUT + + +class ServerDiscovery: + """Discovers game servers on the local network.""" + + def __init__(self) -> None: + """Initialize server discovery.""" + self.discovered_servers: List[ServerInfo] = [] + + async def discover_servers(self, timeout: float = DISCOVERY_TIMEOUT) -> List[ServerInfo]: + """Discover available game servers. + + Args: + timeout: How long to wait for responses (seconds) + + Returns: + List of discovered ServerInfo objects + """ + self.discovered_servers = [] + + # Create socket for sending and receiving + sock = create_multicast_socket(bind=True) + sock.setblocking(False) + + try: + # Send DISCOVER message to multicast group + discover_msg = DiscoveryMessage.create_discover() + sock.sendto(discover_msg, (MULTICAST_GROUP, MULTICAST_PORT)) + print(f"Sent discovery request to {MULTICAST_GROUP}:{MULTICAST_PORT}") + + # Listen for responses + loop = asyncio.get_event_loop() + end_time = loop.time() + timeout + + while loop.time() < end_time: + try: + # Calculate remaining timeout + remaining = end_time - loop.time() + if remaining <= 0: + break + + # Receive with timeout + sock.settimeout(min(remaining, 0.5)) + try: + data, addr = sock.recvfrom(1024) + except socket.timeout: + await asyncio.sleep(0.1) + continue + + # Parse response + msg_type, msg_data = DiscoveryMessage.parse(data) + + if msg_type == DiscoveryMessage.SERVER_ANNOUNCE and msg_data: + # Extract host from addr if not in data + host = addr[0] + server_info = ServerInfo.from_dict(msg_data, host=host) + + # Add if not already discovered + if not self._is_duplicate(server_info): + self.discovered_servers.append(server_info) + print(f"Discovered server: {server_info.server_name} at {server_info.host}:{server_info.port}") + + except Exception as e: + # Ignore errors and continue listening + await asyncio.sleep(0.1) + + finally: + sock.close() + + print(f"Discovery complete. Found {len(self.discovered_servers)} server(s)") + return self.discovered_servers + + def _is_duplicate(self, server_info: ServerInfo) -> bool: + """Check if server is already in discovered list. + + Args: + server_info: Server to check + + Returns: + True if duplicate found + """ + for existing in self.discovered_servers: + if (existing.host == server_info.host and + existing.port == server_info.port): + return True + return False + + +async def discover_servers(timeout: float = DISCOVERY_TIMEOUT) -> List[ServerInfo]: + """Convenience function to discover servers. + + Args: + timeout: How long to wait for responses (seconds) + + Returns: + List of discovered servers + """ + discovery = ServerDiscovery() + return await discovery.discover_servers(timeout) diff --git a/src/client/server_selector.py b/src/client/server_selector.py new file mode 100644 index 0000000..31aa590 --- /dev/null +++ b/src/client/server_selector.py @@ -0,0 +1,142 @@ +"""Server selection UI using pygame.""" + +import pygame +from typing import List, Optional, Tuple + +from ..shared.discovery import ServerInfo +from ..shared.constants import CELL_SIZE, GRID_WIDTH, GRID_HEIGHT + + +class ServerSelector: + """Simple pygame UI for selecting a server from discovered list.""" + + def __init__(self, servers: List[ServerInfo]): + """Initialize the server selector. + + Args: + servers: List of discovered servers + """ + self.servers = servers + self.selected_index = 0 + self.screen_width = GRID_WIDTH * CELL_SIZE + self.screen_height = GRID_HEIGHT * CELL_SIZE + + pygame.init() + self.screen = pygame.display.set_mode((self.screen_width, self.screen_height)) + pygame.display.set_caption("Select Server") + + self.title_font = pygame.font.Font(None, 48) + self.server_font = pygame.font.Font(None, 32) + self.help_font = pygame.font.Font(None, 24) + + self.clock = pygame.time.Clock() + + def run(self) -> Optional[Tuple[str, int]]: + """Run the server selection UI. + + Returns: + Tuple of (host, port) if server selected, None if cancelled + """ + running = True + + while running: + for event in pygame.event.get(): + if event.type == pygame.QUIT: + return None + + elif event.type == pygame.KEYDOWN: + if event.key == pygame.K_ESCAPE: + return None + + elif event.key == pygame.K_UP: + self.selected_index = max(0, self.selected_index - 1) + + elif event.key == pygame.K_DOWN: + self.selected_index = min(len(self.servers) - 1, self.selected_index + 1) + + elif event.key == pygame.K_RETURN or event.key == pygame.K_SPACE: + if self.servers: + selected = self.servers[self.selected_index] + return (selected.host, selected.port) + return None + + self.render() + self.clock.tick(30) + + return None + + def render(self) -> None: + """Render the server selection UI.""" + self.screen.fill((20, 20, 30)) + + # Draw title + title = self.title_font.render("Select Server", True, (255, 255, 255)) + title_rect = title.get_rect(center=(self.screen_width // 2, 60)) + self.screen.blit(title, title_rect) + + # Draw servers list + start_y = 140 + for i, server in enumerate(self.servers): + color = (100, 200, 100) if i == self.selected_index else (180, 180, 180) + + # Server name and info + server_text = f"{server.server_name} - {server.players_count} player(s)" + text_surface = self.server_font.render(server_text, True, color) + text_rect = text_surface.get_rect(center=(self.screen_width // 2, start_y + i * 50)) + + # Highlight selected + if i == self.selected_index: + highlight_rect = text_rect.inflate(20, 10) + pygame.draw.rect(self.screen, (50, 100, 50), highlight_rect, 2) + + self.screen.blit(text_surface, text_rect) + + # Server address (smaller text) + addr_text = f"{server.host}:{server.port}" + addr_surface = self.help_font.render(addr_text, True, (150, 150, 150)) + addr_rect = addr_surface.get_rect(center=(self.screen_width // 2, start_y + i * 50 + 25)) + self.screen.blit(addr_surface, addr_rect) + + # Draw help text at bottom + help_y = self.screen_height - 80 + help_texts = [ + "↑↓ Select ENTER Join ESC Cancel", + ] + + for i, text in enumerate(help_texts): + help_surface = self.help_font.render(text, True, (150, 150, 150)) + help_rect = help_surface.get_rect(center=(self.screen_width // 2, help_y + i * 30)) + self.screen.blit(help_surface, help_rect) + + # Show message if no servers found + if not self.servers: + no_server_text = "No servers found. Press ESC to exit." + text_surface = self.server_font.render(no_server_text, True, (255, 100, 100)) + text_rect = text_surface.get_rect(center=(self.screen_width // 2, self.screen_height // 2)) + self.screen.blit(text_surface, text_rect) + + pygame.display.flip() + + def close(self) -> None: + """Clean up pygame resources.""" + pygame.quit() + + +def select_server(servers: List[ServerInfo]) -> Optional[Tuple[str, int]]: + """Show server selection UI and return selected server. + + Args: + servers: List of discovered servers + + Returns: + Tuple of (host, port) if selected, None if cancelled + """ + if not servers: + print("No servers available to select") + return None + + selector = ServerSelector(servers) + try: + return selector.run() + finally: + selector.close() diff --git a/src/server/__init__.py b/src/server/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/server/game_logic.py b/src/server/game_logic.py new file mode 100644 index 0000000..967a5e8 --- /dev/null +++ b/src/server/game_logic.py @@ -0,0 +1,175 @@ +"""Core game logic for Snake game.""" + +import random +from typing import List, Tuple + +from ..shared.models import Snake, Position, Food, GameState +from ..shared.constants import ( + GRID_WIDTH, + GRID_HEIGHT, + INITIAL_SNAKE_LENGTH, + SNAKE_GROWTH, + OPPOSITE_DIRECTIONS, + RIGHT, +) + + +class GameLogic: + """Handles game rules and state updates.""" + + def __init__(self) -> None: + """Initialize game logic.""" + self.state = GameState() + + def create_snake(self, player_id: str) -> Snake: + """Create a new snake for a player. + + Args: + player_id: Unique identifier for the player + + Returns: + New Snake instance + """ + # Find a random starting position + start_x = random.randint(INITIAL_SNAKE_LENGTH, GRID_WIDTH - INITIAL_SNAKE_LENGTH) + start_y = random.randint(INITIAL_SNAKE_LENGTH, GRID_HEIGHT - INITIAL_SNAKE_LENGTH) + + # Create snake body (head first, tail last) + body = [ + Position(start_x - i, start_y) + for i in range(INITIAL_SNAKE_LENGTH) + ] + + snake = Snake(player_id=player_id, body=body, direction=RIGHT) + return snake + + def spawn_food(self) -> Food: + """Spawn food at a random position not occupied by snakes. + + Returns: + New Food instance + """ + # Get all occupied positions + occupied = set() + for snake in self.state.snakes: + for segment in snake.body: + occupied.add((segment.x, segment.y)) + + # Find random free position + max_attempts = 100 + for _ in range(max_attempts): + x = random.randint(0, GRID_WIDTH - 1) + y = random.randint(0, GRID_HEIGHT - 1) + if (x, y) not in occupied: + return Food(position=Position(x, y)) + + # Fallback: return any position if grid is too full + return Food(position=Position( + random.randint(0, GRID_WIDTH - 1), + random.randint(0, GRID_HEIGHT - 1) + )) + + def update_snake_direction(self, player_id: str, direction: Tuple[int, int]) -> None: + """Update a snake's direction if valid. + + Args: + player_id: Player whose snake to update + direction: New direction tuple + """ + for snake in self.state.snakes: + if snake.player_id == player_id and snake.alive: + # Prevent 180-degree turns + if direction != OPPOSITE_DIRECTIONS.get(snake.direction): + snake.direction = direction + break + + def move_snakes(self) -> None: + """Move all alive snakes one step in their current direction.""" + for snake in self.state.snakes: + if not snake.alive: + continue + + # Calculate new head position + new_head = snake.get_head() + snake.direction + + # Add new head + snake.body.insert(0, new_head) + + # Check if snake ate food + ate_food = False + for food in self.state.food[:]: + if new_head.x == food.position.x and new_head.y == food.position.y: + self.state.food.remove(food) + snake.score += 10 + ate_food = True + break + + # Remove tail if didn't eat food (otherwise snake grows) + if not ate_food: + snake.body.pop() + + def check_collisions(self) -> None: + """Check for collisions and mark dead snakes.""" + for snake in self.state.snakes: + if not snake.alive: + continue + + head = snake.get_head() + + # Check wall collision + if (head.x < 0 or head.x >= GRID_WIDTH or + head.y < 0 or head.y >= GRID_HEIGHT): + snake.alive = False + continue + + # Check self-collision (head hits own body) + for segment in snake.body[1:]: + if head.x == segment.x and head.y == segment.y: + snake.alive = False + break + + if not snake.alive: + continue + + # Check collision with other snakes + for other_snake in self.state.snakes: + if other_snake.player_id == snake.player_id: + continue + + # Check collision with other snake's body + for segment in other_snake.body: + if head.x == segment.x and head.y == segment.y: + snake.alive = False + break + + if not snake.alive: + break + + def update(self) -> None: + """Perform one game tick: move snakes and check collisions.""" + self.move_snakes() + self.check_collisions() + + # Spawn food if needed + if len(self.state.food) < len([s for s in self.state.snakes if s.alive]): + self.state.food.append(self.spawn_food()) + + def is_game_over(self) -> bool: + """Check if game is over (0 or 1 snakes alive). + + Returns: + True if game should end + """ + alive_count = sum(1 for snake in self.state.snakes if snake.alive) + return alive_count <= 1 and len(self.state.snakes) > 1 + + def get_winner(self) -> str | None: + """Get the winner's player_id if there is one. + + Returns: + Winner's player_id or None + """ + alive_snakes = [s for s in self.state.snakes if s.alive] + if len(alive_snakes) == 1: + return alive_snakes[0].player_id + return None diff --git a/src/server/game_server.py b/src/server/game_server.py new file mode 100644 index 0000000..0f7c237 --- /dev/null +++ b/src/server/game_server.py @@ -0,0 +1,299 @@ +"""Multiplayer Snake game server using asyncio.""" + +import asyncio +import uuid +from typing import Dict, Set + +from ..shared.protocol import ( + Message, + MessageType, + create_welcome_message, + create_state_update_message, + create_player_joined_message, + create_player_left_message, + create_game_started_message, + create_game_over_message, + create_error_message, +) +from ..shared.constants import DEFAULT_HOST, DEFAULT_PORT, TICK_RATE +from .game_logic import GameLogic +from .server_beacon import ServerBeacon + + +class GameServer: + """Multiplayer Snake game server.""" + + def __init__( + self, + host: str = DEFAULT_HOST, + port: int = DEFAULT_PORT, + server_name: str = "Snake Server", + enable_discovery: bool = True, + ): + """Initialize the game server. + + Args: + host: Host address to bind to + port: Port number to bind to + server_name: Name of the server for discovery + enable_discovery: Enable multicast discovery beacon + """ + self.host = host + self.port = port + self.server_name = server_name + self.enable_discovery = enable_discovery + self.clients: Dict[str, asyncio.StreamWriter] = {} + self.player_names: Dict[str, str] = {} + self.game_logic = GameLogic() + self.game_task: asyncio.Task | None = None + self.beacon_task: asyncio.Task | None = None + self.beacon: ServerBeacon | None = None + + async def handle_client( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter + ) -> None: + """Handle a client connection. + + Args: + reader: Stream reader for receiving data + writer: Stream writer for sending data + """ + player_id = str(uuid.uuid4()) + addr = writer.get_extra_info('peername') + print(f"New connection from {addr}, assigned ID: {player_id}") + + try: + while True: + # Read message length prefix (4 bytes) + data = await reader.readline() + if not data: + break + + # Parse message + try: + message = Message.from_json(data.decode().strip()) + await self.handle_message(player_id, message, writer) + except Exception as e: + print(f"Error parsing message from {player_id}: {e}") + await self.send_message(writer, create_error_message(str(e))) + + except asyncio.CancelledError: + pass + except Exception as e: + print(f"Error handling client {player_id}: {e}") + finally: + await self.remove_player(player_id) + writer.close() + await writer.wait_closed() + + async def handle_message( + self, + player_id: str, + message: Message, + writer: asyncio.StreamWriter + ) -> None: + """Handle a message from a client. + + Args: + player_id: ID of the player who sent the message + message: The message received + writer: Stream writer for responding + """ + if message.type == MessageType.JOIN: + await self.handle_join(player_id, message, writer) + elif message.type == MessageType.MOVE: + await self.handle_move(player_id, message) + elif message.type == MessageType.START_GAME: + await self.handle_start_game() + elif message.type == MessageType.LEAVE: + await self.remove_player(player_id) + + async def handle_join( + self, + player_id: str, + message: Message, + writer: asyncio.StreamWriter + ) -> None: + """Handle a player joining. + + Args: + player_id: ID of the joining player + message: JOIN message with player name + writer: Stream writer for the client + """ + player_name = message.data.get("player_name", f"Player_{player_id[:8]}") + self.clients[player_id] = writer + self.player_names[player_id] = player_name + + # Send welcome message to new player + await self.send_message(writer, create_welcome_message(player_id)) + + # Notify all clients about new player + await self.broadcast(create_player_joined_message(player_id, player_name)) + + # Add snake to game if game is running + if self.game_logic.state.game_running: + snake = self.game_logic.create_snake(player_id) + self.game_logic.state.snakes.append(snake) + + print(f"Player {player_name} ({player_id}) joined. Total players: {len(self.clients)}") + + async def handle_move(self, player_id: str, message: Message) -> None: + """Handle a player movement command. + + Args: + player_id: ID of the player + message: MOVE message with direction + """ + direction = tuple(message.data.get("direction", (1, 0))) + self.game_logic.update_snake_direction(player_id, direction) + + async def handle_start_game(self) -> None: + """Start the game.""" + if self.game_logic.state.game_running: + return + + # Create snakes for all connected players + self.game_logic.state.snakes = [] + for player_id in self.clients.keys(): + snake = self.game_logic.create_snake(player_id) + self.game_logic.state.snakes.append(snake) + + # Spawn initial food + for _ in range(3): + self.game_logic.state.food.append(self.game_logic.spawn_food()) + + self.game_logic.state.game_running = True + + # Notify all clients + await self.broadcast(create_game_started_message()) + + # Start game loop + if self.game_task is None or self.game_task.done(): + self.game_task = asyncio.create_task(self.game_loop()) + + print("Game started!") + + async def remove_player(self, player_id: str) -> None: + """Remove a player from the game. + + Args: + player_id: ID of the player to remove + """ + if player_id in self.clients: + del self.clients[player_id] + + if player_id in self.player_names: + player_name = self.player_names[player_id] + del self.player_names[player_id] + print(f"Player {player_name} ({player_id}) left. Total players: {len(self.clients)}") + + # Remove snake from game + self.game_logic.state.snakes = [ + s for s in self.game_logic.state.snakes + if s.player_id != player_id + ] + + # Notify remaining clients + await self.broadcast(create_player_left_message(player_id)) + + async def game_loop(self) -> None: + """Main game loop.""" + while self.game_logic.state.game_running: + # Update game state + self.game_logic.update() + + # Check for game over + if self.game_logic.is_game_over(): + winner_id = self.game_logic.get_winner() + await self.broadcast(create_game_over_message(winner_id)) + self.game_logic.state.game_running = False + print(f"Game over! Winner: {winner_id}") + break + + # Broadcast state to all clients + state_dict = self.game_logic.state.to_dict() + await self.broadcast(create_state_update_message(state_dict)) + + # Wait for next tick + await asyncio.sleep(TICK_RATE) + + async def send_message(self, writer: asyncio.StreamWriter, message: Message) -> None: + """Send a message to a client. + + Args: + writer: Stream writer for the client + message: Message to send + """ + try: + data = message.to_json() + "\n" + writer.write(data.encode()) + await writer.drain() + except Exception as e: + print(f"Error sending message: {e}") + + async def broadcast(self, message: Message, exclude: Set[str] = None) -> None: + """Broadcast a message to all connected clients. + + Args: + message: Message to broadcast + exclude: Set of player IDs to exclude from broadcast + """ + exclude = exclude or set() + for player_id, writer in list(self.clients.items()): + if player_id not in exclude: + await self.send_message(writer, message) + + def get_player_count(self) -> int: + """Get the current number of connected players. + + Returns: + Number of connected players + """ + return len(self.clients) + + async def start(self) -> None: + """Start the server.""" + # Start discovery beacon if enabled + if self.enable_discovery: + self.beacon = ServerBeacon( + server_name=self.server_name, + server_port=self.port, + get_player_count=self.get_player_count, + ) + self.beacon_task = asyncio.create_task(self.beacon.start()) + + server = await asyncio.start_server( + self.handle_client, + self.host, + self.port + ) + + addr = server.sockets[0].getsockname() + print(f"Snake game server '{self.server_name}' running on {addr[0]}:{addr[1]}") + + try: + async with server: + await server.serve_forever() + finally: + # Clean up beacon on shutdown + if self.beacon: + await self.beacon.stop() + if self.beacon_task: + self.beacon_task.cancel() + try: + await self.beacon_task + except asyncio.CancelledError: + pass + + +async def main() -> None: + """Run the server.""" + server = GameServer() + await server.start() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/server/server_beacon.py b/src/server/server_beacon.py new file mode 100644 index 0000000..06c0b1d --- /dev/null +++ b/src/server/server_beacon.py @@ -0,0 +1,125 @@ +"""Server beacon for multicast discovery.""" + +import asyncio +import socket +from typing import Callable + +from ..shared.discovery import ( + ServerInfo, + DiscoveryMessage, + create_multicast_socket, + get_local_ip, +) +from ..shared.constants import MULTICAST_GROUP, MULTICAST_PORT, BEACON_INTERVAL + + +class ServerBeacon: + """Broadcasts server presence and responds to discovery requests.""" + + def __init__( + self, + server_name: str, + server_port: int, + get_player_count: Callable[[], int], + ): + """Initialize the server beacon. + + Args: + server_name: Name of the server + server_port: TCP port the game server is listening on + get_player_count: Callable that returns current player count + """ + self.server_name = server_name + self.server_port = server_port + self.get_player_count = get_player_count + self.running = False + self.sock: socket.socket | None = None + self.local_ip = get_local_ip() + + def create_server_info(self) -> ServerInfo: + """Create ServerInfo with current server state. + + Returns: + Current server information + """ + return ServerInfo( + host=self.local_ip, + port=self.server_port, + server_name=self.server_name, + players_count=self.get_player_count(), + ) + + async def start(self) -> None: + """Start the beacon service.""" + self.running = True + self.sock = create_multicast_socket(bind=True) + self.sock.setblocking(False) + + print(f"Server beacon started on {MULTICAST_GROUP}:{MULTICAST_PORT}") + print(f"Server IP: {self.local_ip}, Port: {self.server_port}") + + # Run both listener and broadcaster concurrently + await asyncio.gather( + self.listen_for_discovery(), + self.broadcast_presence(), + ) + + async def listen_for_discovery(self) -> None: + """Listen for DISCOVER messages and respond.""" + loop = asyncio.get_event_loop() + + while self.running: + try: + # Receive data (non-blocking with asyncio) + data, addr = await loop.sock_recvfrom(self.sock, 1024) + + # Parse message + msg_type, msg_data = DiscoveryMessage.parse(data) + + if msg_type == DiscoveryMessage.DISCOVER: + # Respond to discovery request + await self.send_announce(addr) + + except Exception as e: + # Ignore errors and continue listening + await asyncio.sleep(0.1) + + async def send_announce(self, addr: tuple) -> None: + """Send SERVER_ANNOUNCE message to a specific address. + + Args: + addr: (host, port) tuple to send to + """ + try: + server_info = self.create_server_info() + message = DiscoveryMessage.create_announce(server_info) + + # Send directly to the requester + self.sock.sendto(message, addr) + print(f"Sent announcement to {addr}") + + except Exception as e: + print(f"Error sending announcement: {e}") + + async def broadcast_presence(self) -> None: + """Periodically broadcast server presence to multicast group.""" + while self.running: + try: + server_info = self.create_server_info() + message = DiscoveryMessage.create_announce(server_info) + + # Broadcast to multicast group + self.sock.sendto(message, (MULTICAST_GROUP, MULTICAST_PORT)) + + except Exception as e: + print(f"Error broadcasting presence: {e}") + + # Wait before next broadcast + await asyncio.sleep(BEACON_INTERVAL) + + async def stop(self) -> None: + """Stop the beacon service.""" + self.running = False + if self.sock: + self.sock.close() + print("Server beacon stopped") diff --git a/src/shared/__init__.py b/src/shared/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/shared/constants.py b/src/shared/constants.py new file mode 100644 index 0000000..f4d4054 --- /dev/null +++ b/src/shared/constants.py @@ -0,0 +1,48 @@ +"""Game constants shared between client and server.""" + +# Network settings +DEFAULT_HOST = "localhost" +DEFAULT_PORT = 8888 + +# Multicast discovery settings +MULTICAST_GROUP = "239.255.0.1" +MULTICAST_PORT = 9999 +DISCOVERY_TIMEOUT = 3.0 # seconds to wait for server responses +BEACON_INTERVAL = 2.0 # how often server announces itself + +# Game grid settings +GRID_WIDTH = 40 +GRID_HEIGHT = 30 +CELL_SIZE = 20 # pixels + +# Game timing +TICK_RATE = 0.1 # seconds (10 ticks per second) +FPS = 60 # client rendering FPS + +# Game rules +INITIAL_SNAKE_LENGTH = 3 +SNAKE_GROWTH = 1 # segments to grow when eating food + +# Colors (RGB) +COLOR_BACKGROUND = (0, 0, 0) +COLOR_GRID = (40, 40, 40) +COLOR_FOOD = (255, 0, 0) +COLOR_SNAKES = [ + (0, 255, 0), # Green - Player 1 + (0, 0, 255), # Blue - Player 2 + (255, 255, 0), # Yellow - Player 3 + (255, 0, 255), # Magenta - Player 4 +] + +# Directions +UP = (0, -1) +DOWN = (0, 1) +LEFT = (-1, 0) +RIGHT = (1, 0) + +OPPOSITE_DIRECTIONS = { + UP: DOWN, + DOWN: UP, + LEFT: RIGHT, + RIGHT: LEFT, +} diff --git a/src/shared/discovery.py b/src/shared/discovery.py new file mode 100644 index 0000000..c068f18 --- /dev/null +++ b/src/shared/discovery.py @@ -0,0 +1,142 @@ +"""Server discovery utilities using UDP multicast.""" + +import json +import socket +import struct +import time +from dataclasses import dataclass +from typing import Optional + +from .constants import MULTICAST_GROUP, MULTICAST_PORT + + +@dataclass +class ServerInfo: + """Information about a discovered server.""" + host: str + port: int + server_name: str + players_count: int + last_seen: float = 0.0 + + def to_dict(self) -> dict: + """Convert to dictionary for serialization.""" + return { + "host": self.host, + "port": self.port, + "server_name": self.server_name, + "players_count": self.players_count, + } + + @classmethod + def from_dict(cls, data: dict, host: str = None) -> "ServerInfo": + """Create ServerInfo from dictionary. + + Args: + data: Dictionary with server information + host: Optional override for host address + """ + return cls( + host=host or data["host"], + port=data["port"], + server_name=data["server_name"], + players_count=data["players_count"], + last_seen=time.time(), + ) + + +class DiscoveryMessage: + """Discovery protocol messages.""" + + DISCOVER = "DISCOVER" + SERVER_ANNOUNCE = "SERVER_ANNOUNCE" + + @staticmethod + def create_discover() -> bytes: + """Create a DISCOVER message. + + Returns: + Encoded message bytes + """ + msg = {"type": DiscoveryMessage.DISCOVER} + return json.dumps(msg).encode("utf-8") + + @staticmethod + def create_announce(server_info: ServerInfo) -> bytes: + """Create a SERVER_ANNOUNCE message. + + Args: + server_info: Server information to announce + + Returns: + Encoded message bytes + """ + msg = { + "type": DiscoveryMessage.SERVER_ANNOUNCE, + "data": server_info.to_dict(), + } + return json.dumps(msg).encode("utf-8") + + @staticmethod + def parse(data: bytes) -> tuple[str, Optional[dict]]: + """Parse a discovery message. + + Args: + data: Raw message bytes + + Returns: + Tuple of (message_type, data_dict) + """ + try: + msg = json.loads(data.decode("utf-8")) + msg_type = msg.get("type") + msg_data = msg.get("data") + return msg_type, msg_data + except Exception: + return None, None + + +def create_multicast_socket(bind: bool = False) -> socket.socket: + """Create a UDP socket configured for multicast. + + Args: + bind: If True, bind to multicast group (for receiving) + + Returns: + Configured socket + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + + # Allow multiple sockets to bind to the same port + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + if bind: + # Bind to the multicast port + sock.bind(("", MULTICAST_PORT)) + + # Join the multicast group + mreq = struct.pack("4sl", socket.inet_aton(MULTICAST_GROUP), socket.INADDR_ANY) + sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + + # Set multicast TTL (time-to-live) for sending + sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) + + return sock + + +def get_local_ip() -> str: + """Get the local IP address for LAN communication. + + Returns: + Local IP address as string + """ + try: + # Create a UDP socket and connect to a public DNS server + # This doesn't actually send data, just determines routing + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) + local_ip = s.getsockname()[0] + s.close() + return local_ip + except Exception: + return "127.0.0.1" diff --git a/src/shared/models.py b/src/shared/models.py new file mode 100644 index 0000000..d8a2995 --- /dev/null +++ b/src/shared/models.py @@ -0,0 +1,98 @@ +"""Data models shared between client and server.""" + +from dataclasses import dataclass, field +from typing import List, Tuple + + +@dataclass +class Position: + """Represents a position on the game grid.""" + x: int + y: int + + def __add__(self, other: Tuple[int, int]) -> "Position": + """Add a direction tuple to position.""" + return Position(self.x + other[0], self.y + other[1]) + + def to_tuple(self) -> Tuple[int, int]: + """Convert to tuple for serialization.""" + return (self.x, self.y) + + @classmethod + def from_tuple(cls, pos: Tuple[int, int]) -> "Position": + """Create Position from tuple.""" + return cls(pos[0], pos[1]) + + +@dataclass +class Snake: + """Represents a snake in the game.""" + player_id: str + body: List[Position] = field(default_factory=list) + direction: Tuple[int, int] = (1, 0) # Default: moving right + alive: bool = True + score: int = 0 + + def get_head(self) -> Position: + """Get the head position of the snake.""" + return self.body[0] if self.body else Position(0, 0) + + def to_dict(self) -> dict: + """Convert to dictionary for serialization.""" + return { + "player_id": self.player_id, + "body": [pos.to_tuple() for pos in self.body], + "direction": self.direction, + "alive": self.alive, + "score": self.score, + } + + @classmethod + def from_dict(cls, data: dict) -> "Snake": + """Create Snake from dictionary.""" + snake = cls(player_id=data["player_id"]) + snake.body = [Position.from_tuple(pos) for pos in data["body"]] + snake.direction = tuple(data["direction"]) + snake.alive = data["alive"] + snake.score = data["score"] + return snake + + +@dataclass +class Food: + """Represents food on the game grid.""" + position: Position + + def to_dict(self) -> dict: + """Convert to dictionary for serialization.""" + return {"position": self.position.to_tuple()} + + @classmethod + def from_dict(cls, data: dict) -> "Food": + """Create Food from dictionary.""" + return cls(position=Position.from_tuple(data["position"])) + + +@dataclass +class GameState: + """Represents the complete game state.""" + snakes: List[Snake] = field(default_factory=list) + food: List[Food] = field(default_factory=list) + game_running: bool = False + + def to_dict(self) -> dict: + """Convert to dictionary for serialization.""" + return { + "snakes": [snake.to_dict() for snake in self.snakes], + "food": [f.to_dict() for f in self.food], + "game_running": self.game_running, + } + + @classmethod + def from_dict(cls, data: dict) -> "GameState": + """Create GameState from dictionary.""" + state = cls() + state.snakes = [Snake.from_dict(s) for s in data["snakes"]] + state.food = [Food.from_dict(f) for f in data["food"]] + state.game_running = data["game_running"] + return state diff --git a/src/shared/protocol.py b/src/shared/protocol.py new file mode 100644 index 0000000..322ca28 --- /dev/null +++ b/src/shared/protocol.py @@ -0,0 +1,116 @@ +"""Network protocol for client-server communication.""" + +import json +from enum import Enum +from typing import Any, Dict + + +class MessageType(Enum): + """Types of messages exchanged between client and server.""" + # Client -> Server + JOIN = "JOIN" + MOVE = "MOVE" + START_GAME = "START_GAME" + LEAVE = "LEAVE" + + # Server -> Client + WELCOME = "WELCOME" + STATE_UPDATE = "STATE_UPDATE" + PLAYER_JOINED = "PLAYER_JOINED" + PLAYER_LEFT = "PLAYER_LEFT" + GAME_STARTED = "GAME_STARTED" + GAME_OVER = "GAME_OVER" + ERROR = "ERROR" + + +class Message: + """Represents a protocol message.""" + + def __init__(self, msg_type: MessageType, data: Dict[str, Any] = None): + """Initialize a message. + + Args: + msg_type: The type of message + data: Optional message data + """ + self.type = msg_type + self.data = data or {} + + def to_json(self) -> str: + """Serialize message to JSON string.""" + return json.dumps({ + "type": self.type.value, + "data": self.data + }) + + @classmethod + def from_json(cls, json_str: str) -> "Message": + """Deserialize message from JSON string.""" + obj = json.loads(json_str) + msg_type = MessageType(obj["type"]) + data = obj.get("data", {}) + return cls(msg_type, data) + + def __repr__(self) -> str: + """String representation of message.""" + return f"Message({self.type.value}, {self.data})" + + +# Helper functions for creating specific messages + +def create_join_message(player_name: str) -> Message: + """Create a JOIN message.""" + return Message(MessageType.JOIN, {"player_name": player_name}) + + +def create_move_message(direction: tuple) -> Message: + """Create a MOVE message.""" + return Message(MessageType.MOVE, {"direction": direction}) + + +def create_start_game_message() -> Message: + """Create a START_GAME message.""" + return Message(MessageType.START_GAME) + + +def create_leave_message() -> Message: + """Create a LEAVE message.""" + return Message(MessageType.LEAVE) + + +def create_welcome_message(player_id: str) -> Message: + """Create a WELCOME message.""" + return Message(MessageType.WELCOME, {"player_id": player_id}) + + +def create_state_update_message(game_state: dict) -> Message: + """Create a STATE_UPDATE message.""" + return Message(MessageType.STATE_UPDATE, {"game_state": game_state}) + + +def create_player_joined_message(player_id: str, player_name: str) -> Message: + """Create a PLAYER_JOINED message.""" + return Message(MessageType.PLAYER_JOINED, { + "player_id": player_id, + "player_name": player_name + }) + + +def create_player_left_message(player_id: str) -> Message: + """Create a PLAYER_LEFT message.""" + return Message(MessageType.PLAYER_LEFT, {"player_id": player_id}) + + +def create_game_started_message() -> Message: + """Create a GAME_STARTED message.""" + return Message(MessageType.GAME_STARTED) + + +def create_game_over_message(winner_id: str = None) -> Message: + """Create a GAME_OVER message.""" + return Message(MessageType.GAME_OVER, {"winner_id": winner_id}) + + +def create_error_message(error: str) -> Message: + """Create an ERROR message.""" + return Message(MessageType.ERROR, {"error": error}) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_discovery.py b/tests/test_discovery.py new file mode 100644 index 0000000..e5d840b --- /dev/null +++ b/tests/test_discovery.py @@ -0,0 +1,159 @@ +"""Tests for server discovery functionality.""" + +import pytest +from src.shared.discovery import ( + ServerInfo, + DiscoveryMessage, + create_multicast_socket, + get_local_ip, +) + + +class TestServerInfo: + """Test suite for ServerInfo class.""" + + def test_server_info_creation(self) -> None: + """Test creating ServerInfo.""" + info = ServerInfo( + host="192.168.1.100", + port=8888, + server_name="Test Server", + players_count=3, + ) + + assert info.host == "192.168.1.100" + assert info.port == 8888 + assert info.server_name == "Test Server" + assert info.players_count == 3 + + def test_server_info_serialization(self) -> None: + """Test ServerInfo to_dict and from_dict.""" + info = ServerInfo( + host="10.0.0.1", + port=9999, + server_name="Game Room", + players_count=2, + ) + + # Serialize + data = info.to_dict() + assert data["host"] == "10.0.0.1" + assert data["port"] == 9999 + assert data["server_name"] == "Game Room" + assert data["players_count"] == 2 + + # Deserialize + info2 = ServerInfo.from_dict(data) + assert info2.host == info.host + assert info2.port == info.port + assert info2.server_name == info.server_name + assert info2.players_count == info.players_count + + def test_server_info_host_override(self) -> None: + """Test ServerInfo from_dict with host override.""" + data = { + "host": "1.2.3.4", + "port": 8888, + "server_name": "Test", + "players_count": 1, + } + + # Override host + info = ServerInfo.from_dict(data, host="5.6.7.8") + assert info.host == "5.6.7.8" + assert info.port == 8888 + + +class TestDiscoveryMessage: + """Test suite for DiscoveryMessage class.""" + + def test_create_discover_message(self) -> None: + """Test creating DISCOVER message.""" + msg = DiscoveryMessage.create_discover() + assert isinstance(msg, bytes) + + # Parse it back + msg_type, msg_data = DiscoveryMessage.parse(msg) + assert msg_type == DiscoveryMessage.DISCOVER + assert msg_data is None or msg_data == {} + + def test_create_announce_message(self) -> None: + """Test creating SERVER_ANNOUNCE message.""" + server_info = ServerInfo( + host="192.168.1.1", + port=8888, + server_name="Test Server", + players_count=5, + ) + + msg = DiscoveryMessage.create_announce(server_info) + assert isinstance(msg, bytes) + + # Parse it back + msg_type, msg_data = DiscoveryMessage.parse(msg) + assert msg_type == DiscoveryMessage.SERVER_ANNOUNCE + assert msg_data is not None + assert msg_data["host"] == "192.168.1.1" + assert msg_data["port"] == 8888 + assert msg_data["server_name"] == "Test Server" + assert msg_data["players_count"] == 5 + + def test_parse_invalid_message(self) -> None: + """Test parsing invalid message.""" + msg_type, msg_data = DiscoveryMessage.parse(b"invalid json") + assert msg_type is None + assert msg_data is None + + def test_message_round_trip(self) -> None: + """Test encoding and decoding messages.""" + # Test DISCOVER + discover = DiscoveryMessage.create_discover() + msg_type, _ = DiscoveryMessage.parse(discover) + assert msg_type == DiscoveryMessage.DISCOVER + + # Test ANNOUNCE + info = ServerInfo( + host="10.0.0.1", + port=7777, + server_name="Round Trip", + players_count=0, + ) + announce = DiscoveryMessage.create_announce(info) + msg_type, msg_data = DiscoveryMessage.parse(announce) + assert msg_type == DiscoveryMessage.SERVER_ANNOUNCE + assert msg_data["server_name"] == "Round Trip" + + +class TestDiscoveryUtilities: + """Test suite for discovery utility functions.""" + + def test_create_multicast_socket(self) -> None: + """Test creating multicast socket.""" + # Test without binding + sock = create_multicast_socket(bind=False) + assert sock is not None + sock.close() + + # Test with binding (may fail in some environments) + try: + sock = create_multicast_socket(bind=True) + assert sock is not None + sock.close() + except Exception: + # Binding may fail in restricted environments + pytest.skip("Cannot bind to multicast group in this environment") + + def test_get_local_ip(self) -> None: + """Test getting local IP address.""" + ip = get_local_ip() + assert isinstance(ip, str) + assert len(ip) > 0 + + # Should be a valid IP format + parts = ip.split(".") + assert len(parts) == 4 + + # Each part should be a number 0-255 + for part in parts: + num = int(part) + assert 0 <= num <= 255 diff --git a/tests/test_game_logic.py b/tests/test_game_logic.py new file mode 100644 index 0000000..676e279 --- /dev/null +++ b/tests/test_game_logic.py @@ -0,0 +1,148 @@ +"""Tests for game logic.""" + +import pytest +from src.server.game_logic import GameLogic +from src.shared.models import Position, Snake +from src.shared.constants import GRID_WIDTH, GRID_HEIGHT, RIGHT, LEFT, UP, DOWN + + +class TestGameLogic: + """Test suite for GameLogic class.""" + + def test_create_snake(self) -> None: + """Test snake creation.""" + logic = GameLogic() + snake = logic.create_snake("player1") + + assert snake.player_id == "player1" + assert len(snake.body) == 3 + assert snake.alive is True + assert snake.score == 0 + assert snake.direction == RIGHT + + def test_spawn_food(self) -> None: + """Test food spawning.""" + logic = GameLogic() + food = logic.spawn_food() + + assert 0 <= food.position.x < GRID_WIDTH + assert 0 <= food.position.y < GRID_HEIGHT + + def test_update_snake_direction(self) -> None: + """Test updating snake direction.""" + logic = GameLogic() + snake = logic.create_snake("player1") + logic.state.snakes.append(snake) + + # Valid direction change + logic.update_snake_direction("player1", UP) + assert snake.direction == UP + + # Invalid 180-degree turn (should be ignored) + logic.update_snake_direction("player1", DOWN) + assert snake.direction == UP # Should remain UP + + def test_move_snakes(self) -> None: + """Test snake movement.""" + logic = GameLogic() + snake = Snake(player_id="player1", body=[ + Position(5, 5), + Position(4, 5), + Position(3, 5), + ], direction=RIGHT) + logic.state.snakes.append(snake) + + initial_length = len(snake.body) + logic.move_snakes() + + # Snake should have moved one cell to the right + assert snake.get_head().x == 6 + assert snake.get_head().y == 5 + assert len(snake.body) == initial_length + + def test_collision_with_wall(self) -> None: + """Test collision detection with walls.""" + logic = GameLogic() + + # Snake at left wall + snake = Snake(player_id="player1", body=[ + Position(0, 5), + Position(1, 5), + Position(2, 5), + ], direction=LEFT) + logic.state.snakes.append(snake) + + logic.move_snakes() + logic.check_collisions() + + assert snake.alive is False + + def test_collision_with_self(self) -> None: + """Test collision detection with self.""" + logic = GameLogic() + + # Create a snake that will hit itself + snake = Snake(player_id="player1", body=[ + Position(5, 5), + Position(5, 6), + Position(6, 6), + Position(6, 5), + ], direction=DOWN) + logic.state.snakes.append(snake) + + logic.move_snakes() + logic.check_collisions() + + assert snake.alive is False + + def test_food_eating(self) -> None: + """Test snake eating food and growing.""" + logic = GameLogic() + + snake = Snake(player_id="player1", body=[ + Position(5, 5), + Position(4, 5), + Position(3, 5), + ], direction=RIGHT) + logic.state.snakes.append(snake) + + # Place food in front of snake + from src.shared.models import Food + food = Food(position=Position(6, 5)) + logic.state.food.append(food) + + initial_length = len(snake.body) + initial_score = snake.score + + logic.move_snakes() + + # Snake should have grown and scored + assert len(snake.body) == initial_length + 1 + assert snake.score == initial_score + 10 + assert food not in logic.state.food + + def test_is_game_over(self) -> None: + """Test game over detection.""" + logic = GameLogic() + + # No game over with multiple alive snakes + snake1 = Snake(player_id="player1", alive=True) + snake2 = Snake(player_id="player2", alive=True) + logic.state.snakes = [snake1, snake2] + + assert logic.is_game_over() is False + + # Game over when only one snake alive + snake2.alive = False + assert logic.is_game_over() is True + + def test_get_winner(self) -> None: + """Test winner determination.""" + logic = GameLogic() + + snake1 = Snake(player_id="player1", alive=True) + snake2 = Snake(player_id="player2", alive=False) + logic.state.snakes = [snake1, snake2] + + winner = logic.get_winner() + assert winner == "player1" diff --git a/tests/test_models.py b/tests/test_models.py new file mode 100644 index 0000000..44166b6 --- /dev/null +++ b/tests/test_models.py @@ -0,0 +1,135 @@ +"""Tests for data models.""" + +import pytest +from src.shared.models import Position, Snake, Food, GameState + + +class TestPosition: + """Test suite for Position class.""" + + def test_position_creation(self) -> None: + """Test creating a position.""" + pos = Position(5, 10) + assert pos.x == 5 + assert pos.y == 10 + + def test_position_addition(self) -> None: + """Test adding a direction to a position.""" + pos = Position(5, 10) + new_pos = pos + (1, -1) + assert new_pos.x == 6 + assert new_pos.y == 9 + + def test_position_to_tuple(self) -> None: + """Test converting position to tuple.""" + pos = Position(3, 7) + assert pos.to_tuple() == (3, 7) + + def test_position_from_tuple(self) -> None: + """Test creating position from tuple.""" + pos = Position.from_tuple((3, 7)) + assert pos.x == 3 + assert pos.y == 7 + + +class TestSnake: + """Test suite for Snake class.""" + + def test_snake_creation(self) -> None: + """Test creating a snake.""" + snake = Snake(player_id="test123") + assert snake.player_id == "test123" + assert snake.alive is True + assert snake.score == 0 + assert snake.direction == (1, 0) + + def test_get_head(self) -> None: + """Test getting snake head position.""" + snake = Snake( + player_id="test", + body=[Position(5, 5), Position(4, 5), Position(3, 5)] + ) + head = snake.get_head() + assert head.x == 5 + assert head.y == 5 + + def test_snake_serialization(self) -> None: + """Test snake to_dict and from_dict.""" + snake = Snake( + player_id="test123", + body=[Position(5, 5), Position(4, 5)], + direction=(0, 1), + alive=False, + score=100 + ) + + # Serialize + data = snake.to_dict() + assert data["player_id"] == "test123" + assert data["body"] == [(5, 5), (4, 5)] + assert data["direction"] == (0, 1) + assert data["alive"] is False + assert data["score"] == 100 + + # Deserialize + snake2 = Snake.from_dict(data) + assert snake2.player_id == snake.player_id + assert len(snake2.body) == len(snake.body) + assert snake2.body[0].x == snake.body[0].x + assert snake2.direction == snake.direction + assert snake2.alive == snake.alive + assert snake2.score == snake.score + + +class TestFood: + """Test suite for Food class.""" + + def test_food_creation(self) -> None: + """Test creating food.""" + food = Food(position=Position(10, 15)) + assert food.position.x == 10 + assert food.position.y == 15 + + def test_food_serialization(self) -> None: + """Test food to_dict and from_dict.""" + food = Food(position=Position(10, 15)) + + # Serialize + data = food.to_dict() + assert data["position"] == (10, 15) + + # Deserialize + food2 = Food.from_dict(data) + assert food2.position.x == food.position.x + assert food2.position.y == food.position.y + + +class TestGameState: + """Test suite for GameState class.""" + + def test_game_state_creation(self) -> None: + """Test creating game state.""" + state = GameState() + assert len(state.snakes) == 0 + assert len(state.food) == 0 + assert state.game_running is False + + def test_game_state_serialization(self) -> None: + """Test game state to_dict and from_dict.""" + state = GameState() + state.snakes.append(Snake(player_id="p1", body=[Position(5, 5)])) + state.food.append(Food(position=Position(10, 10))) + state.game_running = True + + # Serialize + data = state.to_dict() + assert len(data["snakes"]) == 1 + assert len(data["food"]) == 1 + assert data["game_running"] is True + + # Deserialize + state2 = GameState.from_dict(data) + assert len(state2.snakes) == 1 + assert len(state2.food) == 1 + assert state2.game_running is True + assert state2.snakes[0].player_id == "p1"