from __future__ import annotations import asyncio from dataclasses import dataclass from typing import Dict, List, Optional, Tuple from .config import ServerConfig from .model import GameState, PlayerSession, Snake, Coord from .protocol import ( Direction, InputEvent, PacketType, build_join_ack, build_join_deny, build_config_update, build_input_broadcast, build_state_full, pack_header, parse_input, parse_join, ) from .transport import DatagramServerTransport, TransportPeer @dataclass class ServerRuntime: config: ServerConfig state: GameState seq: int = 0 version: int = 1 def next_seq(self) -> int: self.seq = (self.seq + 1) & 0xFFFF return self.seq class GameServer: def __init__(self, transport: DatagramServerTransport, config: ServerConfig): self.transport = transport self.runtime = ServerRuntime(config=config, state=GameState(config.width, config.height)) self.sessions: Dict[int, PlayerSession] = {} self._config_update_interval_ticks = 50 # periodic resend async def on_datagram(self, data: bytes, peer: TransportPeer) -> None: # Minimal header parse if len(data) < 5: return ver = data[0] ptype = PacketType(data[1]) flags = data[2] # seq = int.from_bytes(data[3:5], 'big') # currently unused off = 5 if ptype == PacketType.JOIN: await self._handle_join(data, off, peer) elif ptype == PacketType.INPUT: await self._handle_input(data, off, peer) else: # ignore others in skeleton return async def broadcast_config_update(self) -> None: r = self.runtime payload = build_config_update( version=r.version, seq=r.next_seq(), tick=r.state.tick & 0xFFFF, tick_rate=r.config.tick_rate, wrap_edges=r.config.wrap_edges, apples_per_snake=r.config.apples_per_snake, apples_cap=r.config.apples_cap, ) # Broadcast to all sessions (requires real peer handles) for session in list(self.sessions.values()): await self.transport.send(payload, TransportPeer(session.peer)) async def relay_input_broadcast( self, *, from_player_id: int, input_seq: int, base_tick: int, events: List[InputEvent], apply_at_tick: int | None, ) -> None: r = self.runtime payload = build_input_broadcast( version=r.version, seq=r.next_seq(), tick=r.state.tick & 0xFFFF, player_id=from_player_id, input_seq=input_seq, base_tick=base_tick & 0xFFFF, events=events, apply_at_tick=apply_at_tick, ) for session in list(self.sessions.values()): if session.player_id == from_player_id: continue await self.transport.send(payload, TransportPeer(session.peer)) async def tick_loop(self) -> None: r = self.runtime tick_duration = 1.0 / max(1, r.config.tick_rate) next_cfg_resend = self._config_update_interval_ticks while True: start = asyncio.get_event_loop().time() # TODO: process inputs, update snakes, collisions, apples, deltas r.state.tick = (r.state.tick + 1) & 0xFFFFFFFF next_cfg_resend -= 1 if next_cfg_resend <= 0: await self.broadcast_config_update() next_cfg_resend = self._config_update_interval_ticks elapsed = asyncio.get_event_loop().time() - start await asyncio.sleep(max(0.0, tick_duration - elapsed)) # --- Join / Spawn --- def _allocate_player_id(self) -> Optional[int]: used = {s.player_id for s in self.sessions.values()} for pid in range(self.runtime.config.players_max): if pid not in used: return pid return None def _choose_color_id(self) -> int: used = {s.color_id for s in self.sessions.values()} for cid in range(32): if cid not in used: return cid return 0 def _neighbors(self, x: int, y: int) -> List[Tuple[Direction, Coord]]: return [ (Direction.UP, (x, y - 1)), (Direction.RIGHT, (x + 1, y)), (Direction.DOWN, (x, y + 1)), (Direction.LEFT, (x - 1, y)), ] def _find_spawn(self) -> Optional[Snake]: st = self.runtime.state # Try to find a 3-cell straight strip for y in range(st.height): for x in range(st.width): if not st.cell_free(x, y): continue for d, (nx, ny) in self._neighbors(x, y): # check two cells in direction x2, y2 = nx, ny x3, y3 = nx + (1 if d == Direction.RIGHT else -1 if d == Direction.LEFT else 0), ny + (1 if d == Direction.DOWN else -1 if d == Direction.UP else 0) if st.in_bounds(x2, y2) and st.in_bounds(x3, y3) and st.cell_free(x2, y2) and st.cell_free(x3, y3): body = [ (x, y), (x2, y2), (x3, y3), ] return Snake(snake_id=-1, head=(x, y), direction=d, body=deque(body)) # Fallback: any single free cell for y in range(st.height): for x in range(st.width): if st.cell_free(x, y): return Snake(snake_id=-1, head=(x, y), direction=Direction.RIGHT, body=deque([(x, y)])) return None def _ensure_apples(self) -> None: st = self.runtime.state cfg = self.runtime.config import random random.seed(0xC0FFEE) target = 3 if not self.sessions else min(cfg.apples_cap, max(0, len(self.sessions) * cfg.apples_per_snake)) # grow apples up to target while len(st.apples) < target: x = random.randrange(st.width) y = random.randrange(st.height) if st.cell_free(x, y) and (x, y) not in st.apples: st.apples.append((x, y)) # shrink if too many if len(st.apples) > target: st.apples = st.apples[:target] async def _handle_join(self, buf: bytes, off: int, peer: TransportPeer) -> None: name, preferred, off2 = parse_join(buf, off) # enforce name <=16 bytes utf-8 name = name.encode("utf-8")[:16].decode("utf-8", errors="ignore") pid = self._allocate_player_id() if pid is None: payload = build_join_deny(version=self.runtime.version, seq=self.runtime.next_seq(), reason="Server full") await self.transport.send(payload, peer) return # Spawn snake snake = self._find_spawn() if snake is None: payload = build_join_deny(version=self.runtime.version, seq=self.runtime.next_seq(), reason="No free cell, please wait") await self.transport.send(payload, peer) return # Register session and snake color_id = preferred if (preferred is not None) else self._choose_color_id() session = PlayerSession(player_id=pid, name=name, color_id=color_id, peer=peer.addr) self.sessions[pid] = session snake.snake_id = pid self.runtime.state.snakes[pid] = snake self.runtime.state.occupy_snake(snake) self._ensure_apples() # Send join_ack cfg = self.runtime.config ack = build_join_ack( version=self.runtime.version, seq=self.runtime.next_seq(), player_id=pid, color_id=color_id, width=cfg.width, height=cfg.height, tick_rate=cfg.tick_rate, wrap_edges=cfg.wrap_edges, apples_per_snake=cfg.apples_per_snake, apples_cap=cfg.apples_cap, compression_mode=0 if cfg.compression_mode == "none" else 1, ) await self.transport.send(ack, peer) # Send initial state_full snakes_dirs: List[Tuple[int, int, int, int, List[Direction]]] = [] for s in self.runtime.state.snakes.values(): dirs: List[Direction] = [] # build directions from consecutive coords: from head toward tail coords = list(s.body) for i in range(len(coords) - 1): x0, y0 = coords[i] x1, y1 = coords[i + 1] if x1 == x0 and y1 == y0 - 1: dirs.append(Direction.UP) elif x1 == x0 + 1 and y1 == y0: dirs.append(Direction.RIGHT) elif x1 == x0 and y1 == y0 + 1: dirs.append(Direction.DOWN) elif x1 == x0 - 1 and y1 == y0: dirs.append(Direction.LEFT) hx, hy = coords[0] snakes_dirs.append((s.snake_id, s.length, hx, hy, dirs)) full = build_state_full( version=self.runtime.version, seq=self.runtime.next_seq(), tick=self.runtime.state.tick, snakes=snakes_dirs, apples=self.runtime.state.apples, ) await self.transport.send(full, peer) async def _handle_input(self, buf: bytes, off: int, peer: TransportPeer) -> None: try: ack_seq, input_seq, base_tick, events, off2 = parse_input(buf, off) except Exception: return # Find player by peer player_id = None for pid, sess in self.sessions.items(): if sess.peer is peer.addr: player_id = pid break if player_id is None: return # Relay to others immediately for prediction await self.relay_input_broadcast( from_player_id=player_id, input_seq=input_seq, base_tick=base_tick, events=events, apply_at_tick=None, ) async def main() -> None: from .transport import InMemoryTransport cfg = ServerConfig() server = GameServer(transport=InMemoryTransport(lambda d, p: server.on_datagram(d, p)), config=cfg) # In-memory transport never returns; run tick loop in parallel await asyncio.gather(server.transport.run(), server.tick_loop()) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass