"""
Board class for Pykrieg - representing 20x25 game board.
This module implements Board class with territory divisions,
coordinate validation, piece management, and Lines of Communication (LOC) network system.
"""
import warnings
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
from . import constants
if TYPE_CHECKING:
from .undo_redo import UndoRedoManager
[docs]
class Board:
"""
Represents 20x25 game board with territory divisions.
The board has:
- 20 rows (0-19)
- 25 columns (0-24)
- Total 500 squares
- Territory division at row 10 (North: 0-9, South: 10-19)
"""
TERRITORY_BOUNDARY = constants.TERRITORY_BOUNDARY # Row 10 is the boundary
[docs]
def __init__(self, enable_adjacency_relay_propagation: bool = True) -> None:
"""Initialize empty board with territory boundaries.
Args:
enable_adjacency_relay_propagation: When True (default), relays activated
by proximity to an online unit propagate their LOC rays. When False,
relays only propagate via arsenal rays and other relay rays
(original Debord behavior).
"""
from .undo_redo import UndoRedoManager
self._rows = constants.BOARD_ROWS
self._cols = constants.BOARD_COLS
self._undo_redo_manager = UndoRedoManager(max_history=100) # Undo/redo support
# Use Any to handle both None and Unit objects due to circular imports
self._board: List[List[Any]] = [[None for _ in range(self._cols)]
for _ in range(self._rows)]
self._turn = constants.PLAYER_NORTH # Starting player
self._turn_number = 1 # Track turn number
self._current_phase = constants.PHASE_MOVEMENT # Track current phase
# Track pending retreats (persisted in FEN)
self._pending_retreats: List[Tuple[int, int]] = []
# New for 0.1.5: Retreat enforcement
self._units_must_retreat: Set[Tuple[int, int]] = set() # Units forced to retreat this turn
# New for 0.1.4: Per-turn tracking
self._moved_units: Set[Tuple[int, int]] = set() # Positions units moved FROM this turn
self._moved_unit_ids: Set[int] = set() # IDs of units that moved this turn
# Complete moves: [(from_row, from_col, to_row, to_col), ...]
self._moves_made: List[Tuple[int, int, int, int]] = []
self._attacks_this_turn: int = 0 # Attacks made this turn
self._attack_target: Optional[Tuple[int, int]] = None # Target square attacked this turn
# New for 0.2.0: Lines of Communication (LOC) network tracking
self._terrain: List[List[Optional[str]]] = [[None for _ in range(self._cols)]
for _ in range(self._rows)]
self._active_north: Set[Tuple[int, int]] = set() # Active units for North
self._active_south: Set[Tuple[int, int]] = set() # Active units for South
# New for 0.2.1: Arsenals are terrain, not units
self._arsenal_owners: Dict[Tuple[int, int], str] = {} # Maps (row, col) -> player
self._relay_online_status: Dict[Tuple[int, int], bool] = {} # Track relay online status
self._proximity_checked: Set[
Tuple[int, int]
] = set() # Track squares proximity-checked this cycle
self._network_coverage_north: Set[
Tuple[int, int]
] = set() # All squares covered by North's network (rays + proximity)
self._network_coverage_south: Set[
Tuple[int, int]
] = set() # All squares covered by South's network (rays + proximity)
self._ray_coverage_north: Set[
Tuple[int, int]
] = set() # Squares covered by North's arsenal/relay rays only (for display)
self._ray_coverage_south: Set[
Tuple[int, int]
] = set() # Squares covered by South's arsenal/relay rays only (for display)
self._network_calculated: bool = False # Flag if calculate_network() was called
self._network_dirty: bool = True # Flag for lazy recalculation - network needs update
# Configuration for network rules
self._enable_adjacency_relay_propagation: bool = enable_adjacency_relay_propagation
# New for 0.2.2: Game state tracking
self._game_state: str = "ONGOING" # Track game state
self._victory_result: Optional[Dict[str, object]] = None # Store victory details
# KFEN metadata storage
self._kfen_metadata: Optional[Dict[str, object]] = None
@property
def rows(self) -> int:
"""Return number of rows."""
return self._rows
@property
def cols(self) -> int:
"""Return number of columns."""
return self._cols
@property
def undo_redo_manager(self) -> 'UndoRedoManager':
"""Return the undo/redo manager."""
return self._undo_redo_manager
# KFEN metadata methods
@property
def turn(self) -> str:
"""Return current player."""
return self._turn
[docs]
def turn_side(self) -> str:
"""Return current turn side (NORTH/SOUTH)."""
return self._turn
@property
def territory_boundary(self) -> int:
"""Return row number that separates territories."""
return self.TERRITORY_BOUNDARY
[docs]
def is_valid_square(self, row: int, col: int) -> bool:
"""Check if coordinates are within board bounds."""
return (0 <= row < self._rows) and (0 <= col < self._cols)
[docs]
def get_piece(self, row: int, col: int) -> Optional[object]:
"""Get piece at given coordinates.
.. deprecated:: 0.1.2
Use :meth:`get_unit` instead. Will be removed in version 0.3.0.
"""
warnings.warn(
"get_piece() is deprecated and will be removed in version 0.3.0. "
"Use get_unit() instead.",
DeprecationWarning,
stacklevel=2
)
if not self.is_valid_square(row, col):
raise ValueError(f"Invalid coordinates: ({row}, {col})")
return self._board[row][col] # type: ignore[no-any-return]
[docs]
def set_piece(self, row: int, col: int, piece: object) -> None:
"""Set piece at given coordinates.
.. deprecated:: 0.1.2
Use :meth:`place_unit` instead. Will be removed in version 0.3.0.
"""
warnings.warn(
"set_piece() is deprecated and will be removed in version 0.3.0. "
"Use place_unit() instead.",
DeprecationWarning,
stacklevel=2
)
if not self.is_valid_square(row, col):
raise ValueError(f"Invalid coordinates: ({row}, {col})")
self._board[row][col] = piece
[docs]
def clear_square(self, row: int, col: int) -> None:
"""Remove piece from square."""
if not self.is_valid_square(row, col):
raise ValueError(f"Invalid coordinates: ({row}, {col})")
self._board[row][col] = None
self._network_dirty = True # Mark network as needing recalculation
# Unit placement methods
[docs]
def place_unit(self, row: int, col: int, unit: object) -> None:
"""Place a Unit object on the board.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
unit: Unit object to place
Raises:
ValueError: If coordinates are invalid
"""
if not self.is_valid_square(row, col):
raise ValueError(f"Invalid coordinates: ({row}, {col})")
self._board[row][col] = unit
self._network_dirty = True # Mark network as needing recalculation
[docs]
def create_and_place_unit(self, row: int, col: int,
unit_type: str, owner: str) -> object:
"""Create and place a unit on board in one step.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
unit_type: Unit type string
owner: 'NORTH' or 'SOUTH'
Returns:
The created Unit object
Raises:
ValueError: If coordinates, unit_type, or owner are invalid
"""
from .pieces import create_piece
unit = create_piece(unit_type, owner)
self.place_unit(row, col, unit)
return unit
# Unit query methods
[docs]
def get_unit(self, row: int, col: int) -> Optional[object]:
"""Get Unit object at given coordinates.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
Returns:
Unit object or None if square is empty
Raises:
ValueError: If coordinates are invalid
"""
if not self.is_valid_square(row, col):
raise ValueError(f"Invalid coordinates: ({row}, {col})")
return self._board[row][col] # type: ignore[no-any-return]
[docs]
def get_unit_type(self, row: int, col: int) -> Optional[str]:
"""Get unit type string at given coordinates.
Returns unit type string or None if square is empty.
"""
unit = self.get_unit(row, col)
if unit is None:
return None
# Use getattr to avoid circular imports with Unit type
return getattr(unit, 'unit_type', None)
[docs]
def get_unit_owner(self, row: int, col: int) -> Optional[str]:
"""Get unit owner at given coordinates.
Returns 'NORTH', 'SOUTH', or None if square is empty.
"""
unit = self.get_unit(row, col)
if unit is None:
return None
# Use getattr to avoid circular imports with Unit type
return getattr(unit, 'owner', None)
[docs]
def count_units(self, unit_type: Optional[str] = None,
owner: Optional[str] = None) -> int:
"""Count units on the board with optional filters.
Args:
unit_type: Unit type to count, or None for all types
owner: Owner to count, or None for all owners
Returns:
Number of matching units
"""
count = 0
for row in range(self._rows):
for col in range(self._cols):
unit = self._board[row][col]
if unit:
if unit_type is None or unit.unit_type == unit_type:
if owner is None or unit.owner == owner:
count += 1
return count
[docs]
def get_units_by_type(self, unit_type: str) -> List[Tuple[int, int]]:
"""Get all coordinates containing a specific unit type.
Args:
unit_type: Unit type string
Returns:
List of (row, col) tuples containing unit type
"""
units: List[Tuple[int, int]] = []
for row in range(self._rows):
for col in range(self._cols):
unit = self._board[row][col]
if unit and unit.unit_type == unit_type:
units.append((row, col))
return units
[docs]
def get_units_by_owner(self, owner: str) -> List[Tuple[int, int]]:
"""Get all coordinates containing units owned by a player.
Args:
owner: 'NORTH' or 'SOUTH'
Returns:
List of (row, col) tuples containing player's units
"""
units: List[Tuple[int, int]] = []
for row in range(self._rows):
for col in range(self._cols):
unit = self._board[row][col]
if unit and unit.owner == owner:
units.append((row, col))
return units
[docs]
def get_all_units(self) -> Dict[Tuple[int, int], object]:
"""Get all units on the board.
Returns:
Dictionary mapping (row, col) tuples to Unit objects
"""
units: Dict[Tuple[int, int], object] = {}
for row in range(self._rows):
for col in range(self._cols):
unit = self._board[row][col]
if unit:
units[(row, col)] = unit
return units
# Validation methods
[docs]
def is_valid_unit_type(self, unit_type: str) -> bool:
"""Check if unit type is valid.
Returns True if unit_type is in ALL_UNIT_TYPES.
"""
return unit_type in constants.ALL_UNIT_TYPES
[docs]
def is_valid_owner(self, owner: str) -> bool:
"""Check if owner is valid.
Returns True if owner is NORTH or SOUTH.
"""
return owner in (constants.PLAYER_NORTH, constants.PLAYER_SOUTH)
[docs]
def get_territory(self, row: int, col: int) -> str:
"""
Determine which territory a square belongs to.
Returns:
'NORTH' if row < TERRITORY_BOUNDARY
'SOUTH' if row >= TERRITORY_BOUNDARY
"""
if not self.is_valid_square(row, col):
raise ValueError(f"Invalid coordinates: ({row}, {col})")
return constants.PLAYER_NORTH if row < self.TERRITORY_BOUNDARY else constants.PLAYER_SOUTH
[docs]
def is_north_territory(self, row: int, col: int) -> bool:
"""Check if square is in North territory."""
return self.get_territory(row, col) == constants.PLAYER_NORTH
[docs]
def is_south_territory(self, row: int, col: int) -> bool:
"""Check if square is in South territory."""
return self.get_territory(row, col) == constants.PLAYER_SOUTH
[docs]
def get_territory_squares(self, territory: str) -> List[Tuple[int, int]]:
"""
Get all squares belonging to a territory.
Args:
territory: 'NORTH' or 'SOUTH'
Returns:
List of (row, col) tuples
"""
if territory not in [constants.PLAYER_NORTH, constants.PLAYER_SOUTH]:
raise ValueError(f"Invalid territory: {territory}")
squares = []
for row in range(self._rows):
for col in range(self._cols):
if self.get_territory(row, col) == territory:
squares.append((row, col))
return squares
[docs]
@staticmethod
def spreadsheet_to_tuple(coord: str) -> Tuple[int, int]:
"""
Convert spreadsheet-style coordinate to internal (row, col) tuple.
Args:
coord: String in spreadsheet format (e.g., "1A", "10AA", "25Y")
Returns:
Tuple (row, col) where:
- row: 0-based from top
- col: 0-based from left
Note: Coordinate origin is top-left (1A = top-left corner)
Format: NUMBERS (column) + LETTERS (row)
Debord's original convention: columns as numbers, rows as letters
Example:
"1A" -> (0, 0) (top-left corner)
"25T" -> (19, 24) (bottom-right on 25-col board)
"10AA" -> (26, 9) (27th column, 10th row)
"27A" -> (0, 26) (top row, 28th column)
"""
if not isinstance(coord, str):
raise TypeError(f"Coord must be string, got {type(coord)}")
# Check for empty or whitespace-only string
if not coord or coord.isspace():
raise ValueError(f"Invalid coord format: {coord}")
# Check for spaces in string
if ' ' in coord:
raise ValueError(f"Invalid coord format: {coord}")
# Separate numbers (column) and letters (row) - NUMBERS FIRST
i = 0
while i < len(coord) and coord[i].isdigit():
i += 1
col_number = coord[:i]
row_letters = coord[i:]
if not col_number or not row_letters:
raise ValueError(f"Invalid coord format: {coord}")
# Parse column (direct conversion from number)
col_index = int(col_number) - 1 # Convert 1-based to 0-based
# Validate column index (must be >= 0 after conversion)
if col_index < 0:
raise ValueError(f"Invalid coord format: {coord} (column must be >= 1)")
# Parse row (A=0, Z=25, AA=26, AZ=51, BA=52, etc.)
row_index = 0
for char in row_letters:
row_index = row_index * 26 + (ord(char.upper()) - ord('A') + 1)
row_index -= 1 # Convert to 0-based
return (row_index, col_index)
[docs]
@staticmethod
def tuple_to_spreadsheet(row: int, col: int) -> str:
"""
Convert internal (row, col) tuple to spreadsheet-style coordinate.
Args:
row: Row number (0-19, 0-based from top)
col: Column number (0-24, 0-based from left)
Returns:
String in spreadsheet format (e.g., "1A", "10AA", "25Y")
Note: Coordinate origin is top-left (1A = top-left corner)
Format: NUMBERS (column) + LETTERS (row)
Debord's original convention: columns as numbers, rows as letters
Example:
(0, 0) -> "1A" (top-left corner)
(19, 24) -> "25T" (bottom-right on 25-col board)
(9, 26) -> "27J" (10th row, 27th column)
(0, 27) -> "28A" (top row, 28th column)
"""
if not isinstance(row, int) or not isinstance(col, int):
raise TypeError("Row and col must be integers")
# Format column (direct number conversion)
col_number = col + 1 # Convert to 1-based
# Format row (0=A, 1=B, 19=T, 20=U, etc.)
row_index = row + 1 # Convert to 1-based
row_letters: List[str] = []
while row_index > 0:
row_index -= 1
row_letters.insert(0, chr(ord('A') + row_index % 26))
row_index //= 26
return f"{col_number}{''.join(row_letters)}"
[docs]
@staticmethod
def tuple_to_index(row: int, col: int, board_cols: int = 25) -> int:
"""
Convert row, col to square index (row-major order).
Args:
row: Row number (0-19)
col: Column number (0-24)
board_cols: Number of columns (default 25)
Returns:
Integer index (0-499)
Example:
(0, 0) -> 0
(0, 1) -> 1
(1, 0) -> 25
"""
if row < 0 or col < 0:
raise ValueError(f"Invalid coordinates: ({row}, {col})")
return row * board_cols + col
[docs]
@staticmethod
def index_to_tuple(index: int, board_cols: int = 25, board_rows: int = 20) -> Tuple[int, int]:
"""
Convert square index to row, col tuple.
Args:
index: Integer index (0-499)
board_cols: Number of columns (default 25)
board_rows: Number of rows (default 20)
Returns:
Tuple (row, col)
Example:
0 -> (0, 0)
1 -> (0, 1)
25 -> (1, 0)
"""
if not isinstance(index, int):
raise TypeError(f"Index must be integer, got {type(index)}")
max_index = board_cols * board_rows - 1
if not (0 <= index <= max_index):
raise ValueError(f"Invalid index: {index} (max: {max_index})")
row = index // board_cols
col = index % board_cols
return (row, col)
# Movement convenience methods
[docs]
def get_legal_moves(self, row: int, col: int) -> List[Tuple[int, int]]:
"""Get all legal moves for unit at given position.
Convenience method that wraps movement.generate_moves.
Args:
row: Row of unit (0-19)
col: Column of unit (0-24)
Returns:
List of (to_row, to_col) tuples
Raises:
ValueError: If no unit at position
"""
from .movement import generate_moves
return generate_moves(self, row, col)
[docs]
def is_legal_move(self, from_row: int, from_col: int,
to_row: int, to_col: int) -> bool:
"""Check if a move is legal.
Convenience method that wraps movement.is_valid_move.
Args:
from_row: Source row (0-19)
from_col: Source column (0-24)
to_row: Target row (0-19)
to_col: Target column (0-24)
Returns:
True if move is legal, False otherwise
"""
from .movement import is_valid_move
unit = self.get_unit(from_row, from_col)
if unit is None:
return False
player = getattr(unit, 'owner', None)
if player is None:
return False
return is_valid_move(self, from_row, from_col, to_row, to_col, player)
[docs]
def make_move(self, from_row: int, from_col: int,
to_row: int, to_col: int) -> object:
"""Make a move on the board.
Convenience method that wraps movement.execute_move.
Args:
from_row: Source row (0-19)
from_col: Source column (0-24)
to_row: Target row (0-19)
to_col: Target column (0-24)
Returns:
The Unit object that was moved
Raises:
ValueError: If move is invalid
"""
from .movement import execute_move
return execute_move(self, from_row, from_col, to_row, to_col)
# Combat convenience methods
[docs]
def calculate_combat(self, target_row: int, target_col: int,
attacker: str, defender: str) -> Dict[str, object]:
"""Calculate complete combat scenario.
Convenience method that wraps combat.calculate_combat.
Args:
target_row: Target row (0-19)
target_col: Target column (0-24)
attacker: 'NORTH' or 'SOUTH'
defender: 'NORTH' or 'SOUTH'
Returns:
Dictionary with combat results
"""
from .combat import calculate_combat
return calculate_combat(self, target_row, target_col, attacker, defender)
[docs]
def execute_capture(self, target_row: int, target_col: int) -> object:
"""Execute a capture (remove target unit from board).
Convenience method that wraps combat.execute_capture.
Args:
target_row: Target row (0-19)
target_col: Target column (0-24)
Returns:
The captured Unit object
"""
from .combat import execute_capture
return execute_capture(self, target_row, target_col)
# Retreat tracking methods
[docs]
def add_pending_retreat(self, row: int, col: int) -> None:
"""Add unit to pending retreats.
This is called when combat resolves to RETREAT outcome.
The actual retreat execution happens at the start of the defender's
next turn (enforced by turn management in 0.1.4).
Note: Retreats are tracked in-memory and not persisted in KFEN format.
They are resolved each turn by turn management system.
Args:
row: Row of unit that must retreat
col: Column of unit that must retreat
Raises:
ValueError: If no unit at position
"""
if not self.is_valid_square(row, col):
raise ValueError(f"Invalid coordinates: ({row}, {col})")
unit = self.get_unit(row, col)
if unit is None:
raise ValueError(f"No unit at ({row}, {col}) to mark for retreat")
# Check if already in retreat list to avoid duplicates
if (row, col) not in self._pending_retreats:
self._pending_retreats.append((row, col))
[docs]
def get_pending_retreats(self) -> List[Tuple[int, int]]:
"""Get all pending retreats.
Returns:
List of (row, col) tuples for units that must retreat
"""
return list(self._pending_retreats)
[docs]
def clear_pending_retreats(self) -> None:
"""Clear all pending retreats after resolution.
This should be called after all retreats have been processed
at the start of a turn.
"""
self._pending_retreats.clear()
[docs]
def has_pending_retreat(self, row: int, col: int) -> bool:
"""Check if unit at position must retreat.
Args:
row: Row of unit to check
col: Column of unit to check
Returns:
True if unit at (row, col) must retreat, False otherwise
"""
return (row, col) in self._pending_retreats
# Retreat enforcement methods for 0.1.5
[docs]
def get_units_must_retreat(self) -> List[Tuple[int, int]]:
"""Get all units that must retreat this turn.
Returns:
List of (row, col) tuples for units that must retreat
before making other moves
"""
return list(self._units_must_retreat)
[docs]
def is_unit_in_retreat(self, row: int, col: int) -> bool:
"""Check if unit is currently in retreat mode.
Args:
row: Row of unit to check
col: Column of unit to check
Returns:
True if unit at (row, col) is in retreat mode, False otherwise
"""
return (row, col) in self._units_must_retreat
[docs]
def get_valid_retreat_positions(self, row: int, col: int) -> List[Tuple[int, int]]:
"""Get valid retreat positions for a unit.
Args:
row: Row of unit to check
col: Column of unit to check
Returns:
List of (to_row, to_col) tuples representing valid retreat destinations
Note:
This uses same movement rules as normal moves (terrain-independent in 0.1.4)
"""
from .movement import generate_moves
return generate_moves(self, row, col)
# Turn management methods for 0.1.4
[docs]
def has_moved_this_turn(self, row: int, col: int) -> bool:
"""Check if a move originated from this position this turn.
Args:
row: Row of position to check
col: Column of position to check
Returns:
True if a unit moved FROM this position this turn, False otherwise
"""
return (row, col) in self._moved_units
[docs]
def get_moves_this_turn(self) -> int:
"""Get number of units moved this turn.
Returns:
Number of units moved (0-5)
"""
return len(self._moved_units)
[docs]
def can_move_more(self) -> bool:
"""Check if player can move more units this turn.
Returns:
True if fewer than 5 units have been moved, False otherwise
"""
return len(self._moved_units) < constants.MAX_MOVES_PER_TURN
[docs]
def get_attacks_this_turn(self) -> int:
"""Get number of attacks made this turn.
Returns:
Number of attacks made (0 or 1)
"""
return self._attacks_this_turn
[docs]
def get_attack_target(self) -> Optional[Tuple[int, int]]:
"""Get attack target square for current turn.
Returns:
Tuple (row, col) of target, or None if no attack/pass yet
"""
return self._attack_target
[docs]
def can_attack_more(self) -> bool:
"""Check if player can attack more this turn.
Returns:
True if 0 attacks have been made, False otherwise
"""
return self._attacks_this_turn < constants.MAX_ATTACKS_PER_TURN
[docs]
def validate_move(self, from_row: int, from_col: int,
to_row: int, to_col: int) -> bool:
"""Validate a move according to turn rules.
Checks:
1. Game is not over
2. It's the movement phase
3. The unit belongs to the current player
4. The unit hasn't moved yet this turn
5. The player hasn't moved 5 units yet
6. If retreats are pending, only allow retreat moves
7. The move is legally valid (pseudo-legal check)
Args:
from_row: Source row (0-19)
from_col: Source column (0-24)
to_row: Target row (0-19)
to_col: Target column (0-24)
Returns:
True if move is valid according to turn rules, False otherwise
"""
# NEW: Check if game is over
if self.is_game_over():
return False
# Ensure network is up-to-date before checking movement
self._ensure_network_calculated()
# Check phase
if self._current_phase != constants.PHASE_MOVEMENT:
return False
# Check unit ownership
unit = self.get_unit(from_row, from_col)
if unit is None:
return False
if unit and unit.owner != self._turn: # type: ignore[attr-defined]
return False
# Check if unit already moved (by checking unit ID)
unit_id = id(unit)
if unit_id in self._moved_unit_ids:
return False
# NEW: If units must retreat, only allow those to move
if self._units_must_retreat:
if (from_row, from_col) not in self._units_must_retreat:
return False
# Check move limit
if len(self._moved_units) >= constants.MAX_MOVES_PER_TURN:
return False
# Check move legality
from .movement import is_valid_move
unit = self.get_unit(from_row, from_col)
if unit is None:
return False
player = getattr(unit, 'owner', None)
if player is None:
return False
return is_valid_move(self, from_row, from_col, to_row, to_col, player)
[docs]
def make_turn_move(self, from_row: int, from_col: int,
to_row: int, to_col: int) -> Tuple[object, bool]:
"""Make a move with turn validation and tracking.
Modified for 0.2.2: Check for enemy arsenal destruction.
This method:
1. Validates the move according to turn rules
2. Executes the move
3. Checks if destination is enemy arsenal and destroys it
4. Tracks that the unit has moved this turn
5. Clears the retreat flag if this was a retreat move
Args:
from_row: Source row (0-19)
from_col: Source column (0-24)
to_row: Target row (0-19)
to_col: Target column (0-24)
Returns:
Tuple of (unit that moved, arsenal_destroyed flag)
Raises:
ValueError: If move is invalid according to turn rules
"""
# Validate move
if not self.validate_move(from_row, from_col, to_row, to_col):
raise ValueError(
f"Invalid turn move from ({from_row}, {from_col}) to ({to_row}, {to_col})"
)
# Get unit before move to track its ID
unit = self.get_unit(from_row, from_col)
unit_id = id(unit)
# Check if destination is enemy arsenal before moving
arsenal_destroyed = False
destroyed_arsenal_info: Optional[Tuple[int, int, str]] = None
if self._terrain[to_row][to_col] == constants.TERRAIN_ARSENAL:
arsenal_owner = self._arsenal_owners.get((to_row, to_col))
if arsenal_owner and arsenal_owner != unit.owner: # type: ignore[attr-defined]
# Capture arsenal info before destruction
destroyed_arsenal_info = (to_row, to_col, arsenal_owner)
# Destroy enemy arsenal
self.destroy_arsenal(to_row, to_col)
arsenal_destroyed = True
# Execute move
from .movement import execute_move
moved_unit = execute_move(self, from_row, from_col, to_row, to_col)
# Track move - both position and unit ID, plus complete move tuple
self._moved_units.add((from_row, from_col))
self._moved_unit_ids.add(unit_id)
self._moves_made.append((from_row, from_col, to_row, to_col))
# Clear retreat flag if this was a retreat move
was_retreat = (from_row, from_col) in self._units_must_retreat
if was_retreat:
self._units_must_retreat.remove((from_row, from_col))
# Record the move action
self._record_move_action(
from_pos=(from_row, from_col),
to_pos=(to_row, to_col),
unit_id=unit_id,
unit_type=moved_unit.unit_type, # type: ignore[attr-defined]
owner=moved_unit.owner, # type: ignore[attr-defined]
was_retreat=was_retreat,
destroyed_arsenal=destroyed_arsenal_info
)
return (moved_unit, arsenal_destroyed)
[docs]
def validate_attack(self, target_row: int, target_col: int) -> bool:
"""Validate an attack according to turn rules.
Checks:
1. Game is not over
2. It's the battle phase
3. The current player hasn't attacked yet
4. There's at least one attacking unit
5. No units must retreat (retreat enforcement)
Args:
target_row: Target row (0-19)
target_col: Target column (0-24)
Returns:
True if attack is valid according to turn rules, False otherwise
"""
# NEW: Check if game is over
if self.is_game_over():
return False
# Ensure network is up-to-date before checking attack
self._ensure_network_calculated()
# Check phase
if self._current_phase != constants.PHASE_BATTLE:
return False
# Check attack limit
if self._attacks_this_turn >= constants.MAX_ATTACKS_PER_TURN:
return False
# NEW: Check if units must retreat - block attack until retreats resolved
if self._units_must_retreat:
return False
# Check if attacker has units (can_attack will check this)
from .combat import can_attack
return can_attack(self, target_row, target_col, self._turn)
[docs]
def make_turn_attack(self, target_row: int, target_col: int) -> Dict[str, object]:
"""Make an attack with turn validation and tracking.
This method:
1. Validates the attack according to turn rules
2. Calculates the combat result
3. Executes capture if applicable
4. Marks defender for retreat if applicable
5. Tracks that an attack has been made
Args:
target_row: Target row (0-19)
target_col: Target column (0-24)
Returns:
Dictionary with combat results
Raises:
ValueError: If attack is invalid according to turn rules
"""
# Validate attack
if not self.validate_attack(target_row, target_col):
raise ValueError(
f"Invalid turn attack at ({target_row}, {target_col})"
)
# Calculate combat
defender = (
constants.PLAYER_SOUTH
if self._turn == constants.PLAYER_NORTH
else constants.PLAYER_NORTH
)
result = self.calculate_combat(target_row, target_col, self._turn, defender)
# Handle outcome
from .combat import CombatOutcome
outcome = result['outcome']
captured_unit_info = None
retreat_positions_to_record = []
if outcome == CombatOutcome.CAPTURE:
# Get captured unit info before executing capture
target_unit = self.get_unit(target_row, target_col)
if target_unit:
captured_unit_info = {
'unit_type': target_unit.unit_type, # type: ignore[attr-defined]
'owner': target_unit.owner # type: ignore[attr-defined]
}
# Execute capture
self.execute_capture(target_row, target_col)
elif outcome == CombatOutcome.RETREAT:
# Mark defender for retreat
self.add_pending_retreat(target_row, target_col)
# Track which units must retreat due to this attack
# This includes the target and any nearby units in retreat mode
retreat_positions_to_record = list(self._units_must_retreat)
# Track attack and target
self._attacks_this_turn += 1
self._attack_target = (target_row, target_col)
# Record attack action for undo/redo
self._record_attack_action(
target_pos=(target_row, target_col),
outcome=str(outcome.value) if hasattr(outcome, 'value') else str(outcome),
attacker=self._turn,
captured_unit=captured_unit_info,
retreat_positions=retreat_positions_to_record
)
return result
[docs]
def pass_attack(self) -> None:
"""Pass attack phase.
This method:
1. Validates it's battle phase
2. Marks that an attack has been made (pass counts)
3. Records pass state for KFEN serialization (attack_target = None)
Raises:
ValueError: If not in battle phase or already attacked
"""
if self._current_phase != constants.PHASE_BATTLE:
raise ValueError("Cannot pass attack: not in battle phase")
if self._attacks_this_turn >= constants.MAX_ATTACKS_PER_TURN:
raise ValueError("Cannot pass attack: already attacked")
self._attacks_this_turn += 1
self._attack_target = None # Pass = no target
[docs]
def switch_to_battle_phase(self) -> None:
"""Switch from movement phase to battle phase.
Raises:
ValueError: If not in movement phase or units must retreat
"""
if self._current_phase != constants.PHASE_MOVEMENT:
raise ValueError("Cannot switch to battle phase: not in movement phase")
# NEW: Prevent switching to battle phase if units must retreat
if self._units_must_retreat:
raise ValueError("Cannot switch to battle phase: units must retreat first")
self._current_phase = constants.PHASE_BATTLE
[docs]
def resolve_retreats(self) -> List[Tuple[int, int, object, str]]:
"""Resolve pending retreats at the start of a turn.
This method checks for pending retreats and enforces retreat rules:
1. For each unit that must retreat, find valid retreat squares
2. If valid retreat exists, mark unit as "must retreat" (player must choose destination)
3. If no valid retreat exists, capture (destroy) the unit
4. Clear pending retreats after resolution
Returns:
List of tuples (row, col, unit, reason) for captured units:
- row, col: Position where unit was captured
- unit: The captured unit object
- reason: String explaining why unit was captured (e.g., "no valid retreat")
Note:
- This is called at the start of the defender's turn
- Retreating units must move before other units can move
- Units in retreat are tracked in _units_must_retreat
TODO:
- In 0.2.0, add terrain validation to retreat moves
- In 0.2.0, check online/offline status for retreat
"""
captured_units: List[Tuple[int, int, object, str]] = []
# Only resolve retreats for current player's units
retreat_positions = list(self._pending_retreats)
for row, col in retreat_positions:
unit = self.get_unit(row, col)
if unit is None:
# Unit already captured, skip
continue
# Only resolve retreats for current player
if unit and getattr(unit, 'owner', None) != self._turn:
continue
# Find valid retreat squares
# In 0.1.4, we use basic movement rules (terrain-independent)
from .movement import generate_moves
valid_moves = generate_moves(self, row, col)
if valid_moves:
# Mark unit as must retreat (player must choose destination)
self._units_must_retreat.add((row, col))
else:
# No valid retreat: capture the unit and record info
captured_unit = self.execute_capture(row, col)
captured_units.append((row, col, captured_unit, "no valid retreat"))
# Clear pending retreats for current player
self._pending_retreats = [
pos for pos in self._pending_retreats
if pos not in retreat_positions
]
return captured_units
[docs]
def end_turn(self) -> List[Tuple[int, int, object, str]]:
"""End current turn and switch to next player.
This method:
1. Clears moved unit tracking
2. Clears attack counter
3. Switches to other player
4. Resets phase to movement
5. Increments turn number
6. Resolves any pending retreats for new player
7. Recalculates networks to ensure victory checks have current state
8. Checks victory conditions (new for 0.2.2)
Returns:
List of tuples (row, col, unit, reason) for captured units during retreat resolution
Note:
- Retreat resolution happens at the start of the new player's turn
- Turn number increments on each player switch
- Retreat state (_units_must_retreat) is NOT cleared automatically
- It persists until the NEXT end_turn() call
- This allows retreat state to persist during a player's turn
"""
# Store old state before turn changes
old_turn = (self._turn, self._turn_number)
old_phase = self._current_phase
old_moves_made = list(self._moves_made)
old_attacks_this_turn = self._attacks_this_turn
old_attack_target = self._attack_target
old_units_must_retreat = set(self._units_must_retreat)
# Switch player and increment turn
self.increment_turn()
# Clear per-turn state for new player
self._moved_units.clear()
self._moved_unit_ids.clear()
self._moves_made.clear() # Clear complete move history
self._attacks_this_turn = 0
self._attack_target = None # Clear attack target
# Resolve retreats for new player and capture any that have no valid retreat
captured = self.resolve_retreats()
# Recalculate networks before checking victory conditions
# This ensures network collapse victory is detected after arsenal destruction
if self._network_calculated:
self._relay_online_status.clear()
self.calculate_network(constants.PLAYER_NORTH)
self.calculate_network(constants.PLAYER_SOUTH)
self._network_dirty = False
# NEW: Check victory conditions after turn
self.check_victory()
# Record turn boundary action for undo/redo
new_turn = (self._turn, self._turn_number)
self._record_turn_boundary(
old_turn=old_turn,
new_turn=new_turn,
old_phase=old_phase,
old_moves_made=old_moves_made,
old_attacks_this_turn=old_attacks_this_turn,
old_attack_target=old_attack_target,
old_units_must_retreat=old_units_must_retreat
)
return captured
[docs]
def reset_turn_state(self) -> None:
"""Reset turn state without changing turn number or player.
This method is useful for:
- Undo functionality
- Testing scenarios
- Loading from FEN
Resets:
- Moved units tracking
- Attack counter
- Current phase (to movement)
Does NOT reset:
- Turn number
- Current player
- Pending retreats
- Board position
"""
self._moved_units.clear()
self._moved_unit_ids.clear()
self._moves_made.clear() # Clear complete move history
self._attacks_this_turn = 0
self._attack_target = None # Clear attack target
self._current_phase = constants.PHASE_MOVEMENT
# Turn tracking methods
@property
def turn_number(self) -> int:
"""Return current turn number."""
return self._turn_number
@turn_number.setter
def turn_number(self, value: int) -> None:
"""Set turn number."""
self._turn_number = value
@property
def current_phase(self) -> str:
"""Return current turn phase."""
return self._current_phase
@current_phase.setter
def current_phase(self, value: str) -> None:
"""Set current turn phase."""
self._current_phase = value
[docs]
def increment_turn(self) -> None:
"""Increment turn number and switch player."""
self._turn_number += 1
self._turn = (
constants.PLAYER_SOUTH
if self._turn == constants.PLAYER_NORTH
else constants.PLAYER_NORTH
)
self._current_phase = constants.PHASE_MOVEMENT # Reset to movement phase
# =====================================================================
# Undo/Redo Support
# =====================================================================
[docs]
def can_undo(self) -> bool:
"""Check if undo is available.
Returns:
True if undo is available, False otherwise
"""
return self._undo_redo_manager.can_undo()
[docs]
def can_redo(self) -> bool:
"""Check if redo is available.
Returns:
True if redo is available, False otherwise
"""
return self._undo_redo_manager.can_redo()
[docs]
def undo(self, count: int = 1) -> None:
"""Undo one or more actions.
Args:
count: Number of actions to undo (default: 1)
Raises:
ValueError: If count exceeds available actions
"""
if count == 1:
self._undo_redo_manager.undo(self)
else:
self._undo_redo_manager.undo_multiple(self, count)
[docs]
def redo(self, count: int = 1) -> None:
"""Redo one or more actions.
Args:
count: Number of actions to redo (default: 1)
Raises:
ValueError: If count exceeds available actions
"""
if count == 1:
self._undo_redo_manager.redo(self)
else:
self._undo_redo_manager.redo_multiple(self, count)
[docs]
def set_max_undo_history(self, max_size: int) -> None:
"""Set maximum undo history size.
Args:
max_size: Maximum number of actions to keep. Set to 0 for unlimited.
"""
self._undo_redo_manager.set_max_history(max_size)
def _record_move_action(
self,
from_pos: Tuple[int, int],
to_pos: Tuple[int, int],
unit_id: int,
unit_type: str,
owner: str,
was_retreat: bool,
destroyed_arsenal: Optional[Tuple[int, int, str]] = None,
) -> None:
"""Record a move action for undo/redo.
Args:
from_pos: Source position (row, col)
to_pos: Destination position (row, col)
unit_id: Python ID of the unit
unit_type: Type of unit (e.g., 'INFANTRY')
owner: 'NORTH' or 'SOUTH'
was_retreat: True if this was a retreat move
destroyed_arsenal: Optional (row, col, owner) if arsenal was destroyed
"""
from .undo_redo import MoveAction
action = MoveAction(
from_pos=from_pos,
to_pos=to_pos,
unit_id=unit_id,
unit_type=unit_type,
owner=owner,
was_retreat=was_retreat,
destroyed_arsenal=destroyed_arsenal
)
self._undo_redo_manager.record_action(action)
def _record_attack_action(self, target_pos: Tuple[int, int], outcome: str,
attacker: str, captured_unit: Optional[Dict[str, Any]] = None,
retreat_positions: Optional[List[Tuple[int, int]]] = None) -> None:
"""Record an attack action for undo/redo.
Args:
target_pos: Target position (row, col)
outcome: 'CAPTURE', 'RETREAT', or 'FAIL'
attacker: 'NORTH' or 'SOUTH'
captured_unit: Optional dict with 'unit_type' and 'owner' if captured
retreat_positions: Optional list of positions marked for retreat
"""
from .undo_redo import AttackAction
action = AttackAction(
target_pos=target_pos,
outcome=outcome,
attacker=attacker,
captured_unit=captured_unit,
retreat_positions=retreat_positions if retreat_positions is not None else []
)
self._undo_redo_manager.record_action(action)
def _record_turn_boundary(self, old_turn: Tuple[str, int], new_turn: Tuple[str, int],
old_phase: str, old_moves_made: List[Tuple[int, int, int, int]],
old_attacks_this_turn: int, old_attack_target: Optional[Tuple[int, int]],
old_units_must_retreat: Set[Tuple[int, int]]) -> None:
"""Record a turn boundary action for undo/redo.
Args:
old_turn: (player, turn_number) before turn end
new_turn: (player, turn_number) after turn end
old_phase: Phase before turn end
old_moves_made: List of moves made before turn end
old_attacks_this_turn: Attack count before turn end
old_attack_target: Attack target before turn end
old_units_must_retreat: Set of units that must retreat before turn end
"""
from .undo_redo import TurnBoundary
action = TurnBoundary(
from_turn=old_turn,
to_turn=new_turn,
from_phase=old_phase,
from_moves_made=old_moves_made,
from_attacks_this_turn=old_attacks_this_turn,
from_attack_target=old_attack_target,
from_units_must_retreat=old_units_must_retreat
)
self._undo_redo_manager.record_action(action)
# =====================================================================
# 0.2.0: Lines of Communication (LOC) Network System
# =====================================================================
# Terrain management methods
[docs]
def set_terrain(self, row: int, col: int, terrain: Optional[str]) -> None:
"""Set terrain type for a square.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
terrain: Terrain type (None, 'MOUNTAIN', 'MOUNTAIN_PASS', 'FORTRESS', 'ARSENAL')
Raises:
ValueError: If coordinates are invalid or terrain type is invalid
"""
if not self.is_valid_square(row, col):
raise ValueError(f"Invalid coordinates: ({row}, {col})")
if terrain is not None and terrain not in constants.ALL_TERRAIN_TYPES:
raise ValueError(f"Invalid terrain type: {terrain}")
self._terrain[row][col] = terrain
[docs]
def get_terrain(self, row: int, col: int) -> Optional[str]:
"""Get terrain type for a square.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
Returns:
Terrain type or None if no terrain
Raises:
ValueError: If coordinates are invalid
"""
if not self.is_valid_square(row, col):
raise ValueError(f"Invalid coordinates: ({row}, {col})")
return self._terrain[row][col]
# Network state helper methods
def _mark_square_covered(self, row: int, col: int, player: str) -> None:
"""Mark a square as covered by a player's network.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
player: 'NORTH' or 'SOUTH'
"""
if player == constants.PLAYER_NORTH:
self._network_coverage_north.add((row, col))
else:
self._network_coverage_south.add((row, col))
def _mark_unit_active(self, row: int, col: int, player: str) -> None:
"""Mark a unit as active in the network.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
player: 'NORTH' or 'SOUTH'
"""
if player == constants.PLAYER_NORTH:
self._active_north.add((row, col))
else:
self._active_south.add((row, col))
# Also mark the square as covered by the network
self._mark_square_covered(row, col, player)
# For relays/swift relays, also track online status
unit = self.get_unit(row, col)
if unit:
unit_type = getattr(unit, 'unit_type', None)
if unit_type in (constants.UNIT_RELAY, constants.UNIT_SWIFT_RELAY):
if (row, col) not in self._relay_online_status:
self._relay_online_status[(row, col)] = True
else:
# For combat units, also track in relay_online_status for consistency
# This ensures is_unit_online works correctly for all units
if (row, col) not in self._relay_online_status:
self._relay_online_status[(row, col)] = True
def _is_unit_active(self, row: int, col: int, player: str) -> bool:
"""Check if a unit is active in the network.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
player: 'NORTH' or 'SOUTH'
Returns:
True if unit is active, False otherwise
"""
if player == constants.PLAYER_NORTH:
return (row, col) in self._active_north
else:
return (row, col) in self._active_south
def _is_relay_online(self, row: int, col: int) -> bool:
"""Check if a relay/swift relay is online.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
Returns:
True if relay is online, False otherwise
"""
return self._relay_online_status.get((row, col), False)
def _set_relay_online(self, row: int, col: int, online: bool) -> None:
"""Set a relay/swift relay's online status.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
online: True if online, False otherwise
"""
self._relay_online_status[(row, col)] = online
def _get_active_units(self, player: str) -> Set[Tuple[int, int]]:
"""Get all active units for a player.
Args:
player: 'NORTH' or 'SOUTH'
Returns:
Set of (row, col) tuples
"""
if player == constants.PLAYER_NORTH:
return self._active_north.copy()
else:
return self._active_south.copy()
def _get_arsenals(self, player: str) -> List[Tuple[int, int]]:
"""Get all arsenal squares for a player.
Args:
player: 'NORTH' or 'SOUTH'
Returns:
List of (row, col) tuples
"""
arsenals = []
for (row, col), owner in self._arsenal_owners.items():
if owner == player:
arsenals.append((row, col))
return arsenals
[docs]
def set_arsenal(self, row: int, col: int, owner: str) -> None:
"""Set arsenal terrain with owner.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
owner: 'NORTH' or 'SOUTH'
Raises:
ValueError: If coordinates are invalid or owner is invalid
"""
if not self.is_valid_square(row, col):
raise ValueError(f"Invalid coordinates: ({row}, {col})")
if owner not in (constants.PLAYER_NORTH, constants.PLAYER_SOUTH):
raise ValueError(f"Invalid owner: {owner}")
self._terrain[row][col] = constants.TERRAIN_ARSENAL
self._arsenal_owners[(row, col)] = owner
self._network_dirty = True # Mark network as needing recalculation
[docs]
def get_arsenal_owner(self, row: int, col: int) -> Optional[str]:
"""Get owner of arsenal at given coordinates.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
Returns:
'NORTH', 'SOUTH', or None if no arsenal
Raises:
ValueError: If coordinates are invalid
"""
if not self.is_valid_square(row, col):
raise ValueError(f"Invalid coordinates: ({row}, {col})")
terrain = self._terrain[row][col]
if terrain == constants.TERRAIN_ARSENAL:
return self._arsenal_owners.get((row, col))
return None
[docs]
def remove_arsenal(self, row: int, col: int) -> None:
"""Remove arsenal from square.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
Raises:
ValueError: If coordinates are invalid
"""
if not self.is_valid_square(row, col):
raise ValueError(f"Invalid coordinates: ({row}, {col})")
self._terrain[row][col] = None
if (row, col) in self._arsenal_owners:
del self._arsenal_owners[(row, col)]
self._network_dirty = True # Mark network as needing recalculation
def _get_unpropagated_relays(self, player: str) -> List[Tuple[int, int]]:
"""Get all online relays/swift relays that haven't propagated yet.
Args:
player: 'NORTH' or 'SOUTH'
Returns:
List of (row, col) tuples
"""
relays = []
active_units = self._get_active_units(player)
for row, col in active_units:
unit = self.get_unit(row, col)
if unit:
unit_type = getattr(unit, 'unit_type', None)
if unit_type in (constants.UNIT_RELAY, constants.UNIT_SWIFT_RELAY):
# Only return relays that are online AND haven't propagated yet
if (
self._is_relay_online(row, col)
and (row, col) not in self._proximity_checked
):
relays.append((row, col))
return relays
def _get_newly_activated_relays(self, player: str) -> List[Tuple[int, int]]:
"""Get relays/swift relays activated in the most recent proximity check.
Args:
player: 'NORTH' or 'SOUTH'
Returns:
List of (row, col) tuples
"""
relays = []
active_units = self._get_active_units(player)
for row, col in active_units:
# If it's a relay/swift relay that's online but hasn't propagated yet
unit = self.get_unit(row, col)
if unit:
unit_type = getattr(unit, 'unit_type', None)
if unit_type in (constants.UNIT_RELAY, constants.UNIT_SWIFT_RELAY):
if (
self._is_relay_online(row, col)
and (row, col) not in self._proximity_checked
):
relays.append((row, col))
return relays
def _reset_network_state(self, player: str) -> None:
"""Reset tracking state for a new network calculation.
Args:
player: 'NORTH' or 'SOUTH'
"""
if player == constants.PLAYER_NORTH:
self._active_north.clear()
self._network_coverage_north.clear()
self._ray_coverage_north.clear()
else:
self._active_south.clear()
self._network_coverage_south.clear()
self._ray_coverage_south.clear()
# Only clear relay_online_status when recalculating for both players
# This is handled in _ensure_network_calculated()
# Don't clear it here to preserve other player's relay status
self._proximity_checked.clear()
# Ray-casting algorithm
def _cast_ray(self, origin_row: int, origin_col: int, dx: int, dy: int,
player: str, source_is_arsenal: bool) -> bool:
"""Cast a ray from origin in direction (dx, dy) until hitting a blocking obstacle.
Args:
origin_row: Starting row (0-19)
origin_col: Starting column (0-24)
dx, dy: Direction vector
player: The player whose network we're calculating
source_is_arsenal: True if ray originates from an arsenal, False if from relay
Returns:
True if any relay/swift relay was activated along this ray
"""
x, y = origin_col, origin_row
relay_activated = False
# Extend ray to board edge
while True:
x += dx
y += dy
# Check board bounds
if not self.is_valid_square(y, x):
break
current_unit = self.get_unit(y, x)
current_terrain = self._terrain[y][x]
# Case 1: Empty square - continue ray
if current_unit is None:
# Check terrain at empty square
if current_terrain == constants.TERRAIN_MOUNTAIN:
break # Mountains block the ray
# Mountain passes and fortresses don't block
# Mark empty square as covered by network
self._mark_square_covered(y, x, player)
# Also mark in ray coverage for display purposes
if player == constants.PLAYER_NORTH:
self._ray_coverage_north.add((y, x))
else:
self._ray_coverage_south.add((y, x))
continue
# Case 2: Friendly unit - activate and continue (except relays may stop)
current_owner = getattr(current_unit, 'owner', None)
current_type = getattr(current_unit, 'unit_type', None)
if current_owner == player:
self._mark_unit_active(y, x, player)
# Also mark the square as ray-covered for display purposes
# This ensures occupied terrain squares (fortresses, passes) show correct colors
if player == constants.PLAYER_NORTH:
self._ray_coverage_north.add((y, x))
else:
self._ray_coverage_south.add((y, x))
# If it's a relay/swift relay, activate it and continue
if current_type in (constants.UNIT_RELAY, constants.UNIT_SWIFT_RELAY):
if not self._is_relay_online(y, x):
self._set_relay_online(y, x, True)
relay_activated = True
# Friendly non-relay units don't block the ray (they're transparent)
continue
# Case 3: Enemy unit
# All enemy units block the ray (including relays)
break
return relay_activated
# Network calculation steps
def _step1_arsenal_propagation(self, player: str) -> None:
"""Step 1: Ray-based propagation from arsenals.
Arsenals radiate lines of communication in all 8 directions to the board edge.
Args:
player: 'NORTH' or 'SOUTH'
"""
arsenals = self._get_arsenals(player)
for arsenal_row, arsenal_col in arsenals:
# Mark arsenal as active
self._mark_unit_active(arsenal_row, arsenal_col, player)
# Ray-cast in all 8 directions
for dx, dy in constants.DIRECTIONS:
self._cast_ray(arsenal_row, arsenal_col, dx, dy, player, source_is_arsenal=True)
def _step2_relay_propagation(self, player: str) -> None:
"""Step 2: Ray-based propagation from activated relays/swift relays.
This step repeats until no more valid relays are activated.
Args:
player: 'NORTH' or 'SOUTH'
"""
while True:
new_relays_activated = False
# Get all online relays/swift relays that haven't propagated yet
active_relays = self._get_unpropagated_relays(player)
for relay_row, relay_col in active_relays:
# Ray-cast in all 8 directions
for dx, dy in constants.DIRECTIONS:
did_activate = self._cast_ray(
relay_row, relay_col, dx, dy, player, source_is_arsenal=False
)
if did_activate:
new_relays_activated = True
# Mark this relay as propagated
self._proximity_checked.add((relay_row, relay_col))
# If no new relays were activated AND all relays have propagated, we're done
if not new_relays_activated and not self._get_unpropagated_relays(player):
break
def _step3_proximity_propagation(self, player: str) -> bool:
"""Step 3: Proximity-based propagation from active units.
This step repeats until all active units have been proximity-checked.
Args:
player: 'NORTH' or 'SOUTH'
Returns:
True if new units were activated, False otherwise
"""
new_units_activated = False
units_to_check = list(
self._get_active_units(player)
) # Copy to avoid modification during iteration
for unit_row, unit_col in units_to_check:
if (unit_row, unit_col) in self._proximity_checked:
continue
# Mark this unit as proximity-checked
self._proximity_checked.add((unit_row, unit_col))
# Check all 8 adjacent squares
for dx, dy in constants.DIRECTIONS:
adj_row = unit_row + dy
adj_col = unit_col + dx
if not self.is_valid_square(adj_row, adj_col):
continue
adj_unit = self.get_unit(adj_row, adj_col)
# Case 1: Empty square - mark as covered by network
# This allows units with movement range 2+ to move through empty squares
# adjacent to friendly units (fixes path validation bug)
# BUT mountains block proximity propagation too
if adj_unit is None:
# Check terrain before marking as covered
adj_terrain = self._terrain[adj_row][adj_col]
if adj_terrain == constants.TERRAIN_MOUNTAIN:
# Mountain terrain - not covered by proximity
continue
# Passable terrain (None, MOUNTAIN_PASS, FORTRESS, ARSENAL)
self._mark_square_covered(adj_row, adj_col, player)
continue
# Case 2: Square has a unit
adj_owner = getattr(adj_unit, 'owner', None)
if adj_owner != player:
# Enemy unit - skip
continue
if self._is_unit_active(adj_row, adj_col, player):
# Already active - skip
continue
# Activate adjacent friendly unit
self._mark_unit_active(adj_row, adj_col, player)
new_units_activated = True
return new_units_activated
def _step4_relay_propagation_from_proximity(self, player: str) -> None:
"""Step 4: Ray-based propagation from relays/swift relays activated by proximity.
This step is optional and defaults to True.
Args:
player: 'NORTH' or 'SOUTH'
"""
# Get relays that were just activated by proximity but haven't propagated yet
newly_activated_relays = self._get_newly_activated_relays(player)
for relay_row, relay_col in newly_activated_relays:
# Mark as propagated
if (relay_row, relay_col) not in self._proximity_checked:
self._proximity_checked.add((relay_row, relay_col))
# Ray-cast in all 8 directions
for dx, dy in constants.DIRECTIONS:
self._cast_ray(relay_row, relay_col, dx, dy, player, source_is_arsenal=False)
# Main network calculation method
[docs]
def calculate_network(self, player: str, enable_step4: bool = True) -> None:
"""Calculate lines of communication network for specified player.
This must be called after any board state change.
Args:
player: The player to calculate network for ('NORTH' or 'SOUTH')
enable_step4: Whether to enable step 4 (ray propagation from proximity-activated relays)
"""
# Note: _network_calculated is NOT set here. It is set by enable_networks()
# to maintain backward compatibility - networks are disabled by default.
# Reset state for this calculation cycle
self._reset_network_state(player)
# Step 1: Initial ray-based propagation from arsenals
self._step1_arsenal_propagation(player)
# Step 2: Ray-based propagation from activated relays/swift relays
# Repeat until no more relays are activated
self._step2_relay_propagation(player)
# Step 3: Proximity-based propagation from all active units
# Repeat until all active units have been proximity-checked
self._step3_proximity_propagation(player)
# Step 4: Optional ray-based propagation from proximity-activated relays
if enable_step4:
self._step4_relay_propagation_from_proximity(player)
# Step 5: Return to step 3 if any units were activated in step 4
# This loop continues until no new units are activated
while True:
new_units_activated = self._step3_proximity_propagation(player)
if not new_units_activated:
break
self._step4_relay_propagation_from_proximity(player)
# Helper for lazy network calculation
def _ensure_network_calculated(self) -> None:
"""Ensure network is up-to-date before querying network state.
This lazy calculation approach recalculates networks only when needed,
avoiding unnecessary recalculations during batch operations like
test setup or FEN loading.
"""
# Only recalculate if network has been calculated before and is now dirty
# This ensures backward compatibility: if network has never been calculated,
# we don't force calculation (optimistic default applies)
if self._network_calculated and self._network_dirty:
# Clear relay status before recalculating both players
self._relay_online_status.clear()
# Recalculate when dirty (ensures online/offline status is correct)
from .constants import PLAYER_NORTH, PLAYER_SOUTH
self.calculate_network(PLAYER_NORTH)
self.calculate_network(PLAYER_SOUTH)
self._network_dirty = False
# Network configuration methods
[docs]
def set_adjacency_relay_propagation(self, enable: bool) -> None:
"""Enable/disable adjacency-based relay propagation (Step 4).
When enabled (default): Relays activated by proximity to an online unit
will propagate their own LOC rays in all 8 directions.
When disabled: Relays only propagate LOC rays when activated via
arsenal rays or other relay rays (original Debord behavior).
Args:
enable: True to enable, False to disable
"""
self._enable_adjacency_relay_propagation = enable
# Mark network as dirty so recalculation uses new setting
self._network_dirty = True
[docs]
def get_adjacency_relay_propagation(self) -> bool:
"""Check if adjacency-based relay propagation is enabled.
Returns:
True if Step 4 is enabled, False otherwise
"""
return self._enable_adjacency_relay_propagation
# Public API for network queries
[docs]
def enable_networks(self) -> None:
"""Enable network rules for both players.
This method activates Lines of Communication system, after which
units must be connected to arsenals through to network to function.
Once enabled, network system enforces:
- Units not connected to network have 0 attack/defense/range (except relays)
- Units not connected to network have 0 movement (except relays/swift relays)
Note: This should be called explicitly when network rules are desired.
The default behavior is that networks are disabled (all units online).
The optional Step 4 (adjacency-based relay propagation) is controlled
by Board constructor's enable_adjacency_relay_propagation parameter.
Example:
>>> board = Board() # Step 4 enabled (default)
>>> board.create_and_place_unit(5, 10, 'ARSENAL', 'NORTH')
>>> board.create_and_place_unit(5, 12, 'INFANTRY', 'NORTH')
>>> # Enable network rules
>>> board.enable_networks()
>>> board = Board(enable_adjacency_relay_propagation=False) # Step 4 disabled
"""
self._network_calculated = True
self.calculate_network(constants.PLAYER_NORTH,
enable_step4=self._enable_adjacency_relay_propagation)
self.calculate_network(constants.PLAYER_SOUTH,
enable_step4=self._enable_adjacency_relay_propagation)
# Clear dirty flag after manual network calculation
self._network_dirty = False
[docs]
def is_unit_online(self, row: int, col: int, player: str) -> bool:
"""Check if a square is covered by network for a player.
This checks if the square is within network coverage,
which includes both units and empty squares covered by rays.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
player: 'NORTH' or 'SOUTH'
Returns:
True if square is covered by network, False otherwise
Note:
If network has never been calculated (_network_calculated == False),
returns True for all squares (optimistic default). This ensures backward
compatibility and allows normal movement before network system is invoked.
"""
# Optimistic default: assume online if network hasn't been calculated
if not self._network_calculated:
return True
self._ensure_network_calculated() # Lazy recalculation if needed
if player == constants.PLAYER_NORTH:
return (row, col) in self._network_coverage_north
else:
return (row, col) in self._network_coverage_south
[docs]
def is_relay_online(self, row: int, col: int) -> bool:
"""Check if a relay/swift relay is online.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
Returns:
True if relay is online, False otherwise
Note:
If network has never been calculated (_network_calculated == False),
returns True for all relays (optimistic default). This ensures backward
compatibility and allows normal movement before network system is invoked.
"""
# Optimistic default: assume online if network hasn't been calculated
if not self._network_calculated:
return True
self._ensure_network_calculated() # Lazy recalculation if needed
unit = self.get_unit(row, col)
if unit:
unit_type = getattr(unit, 'unit_type', None)
if unit_type in (constants.UNIT_RELAY, constants.UNIT_SWIFT_RELAY):
return self._is_relay_online(row, col)
return False
[docs]
def get_online_units(self, player: str) -> Set[Tuple[int, int]]:
"""Get all online units for a player.
Args:
player: 'NORTH' or 'SOUTH'
Returns:
Set of (row, col) tuples
Note:
If network has never been calculated (_network_calculated == False),
returns all units for the player (optimistic default). This ensures
backward compatibility and allows normal movement before network system is invoked.
"""
# Optimistic default: return all units if network hasn't been calculated
if not self._network_calculated:
return set(self.get_units_by_owner(player))
self._ensure_network_calculated() # Lazy recalculation if needed
return self._get_active_units(player)
[docs]
def get_offline_units(self, player: str) -> Set[Tuple[int, int]]:
"""Get all offline units for a player.
Args:
player: 'NORTH' or 'SOUTH'
Returns:
Set of (row, col) tuples
Note:
If network has never been calculated (_network_calculated == False),
returns empty set (optimistic default - no units are offline).
"""
# Optimistic default: no offline units if network hasn't been calculated
if not self._network_calculated:
return set()
self._ensure_network_calculated() # Lazy recalculation if needed
all_units = set(self.get_units_by_owner(player))
online_units = self.get_online_units(player)
return all_units - online_units
[docs]
def get_network_active_relays(self, player: str) -> Set[Tuple[int, int]]:
"""Get all relays/swift relays that are online and can propagate.
Args:
player: 'NORTH' or 'SOUTH'
Returns:
Set of (row, col) tuples
"""
self._ensure_network_calculated() # Lazy recalculation if needed
online_units = self.get_online_units(player)
active_relays = set()
for row, col in online_units:
unit = self.get_unit(row, col)
if unit:
unit_type = getattr(unit, 'unit_type', None)
if unit_type in (constants.UNIT_RELAY, constants.UNIT_SWIFT_RELAY):
if self._is_relay_online(row, col):
active_relays.add((row, col))
return active_relays
[docs]
def is_ray_covered(self, row: int, col: int, player: str) -> bool:
"""Check if a square is covered by arsenal/relay rays only (for display).
This method is used by display code to determine which squares should be
shown as green (online via rays) vs grey (online only via proximity).
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
player: 'NORTH' or 'SOUTH'
Returns:
True if square is covered by arsenal/relay rays, False otherwise
Note:
- This is purely for visual representation purposes
- Does not affect game mechanics (movement, combat, etc.)
- Use is_unit_online() for game mechanic queries
- Returns True for squares covered by ray propagation (steps 1 and 2)
- Returns False for squares only covered by proximity propagation (step 3)
Example:
>>> board = Board()
>>> board.set_arsenal(10, 10, 'NORTH')
>>> board.enable_networks()
>>> # Square covered by arsenal ray will be ray_covered
>>> board.is_ray_covered(10, 15, 'NORTH') # True
>>> # Square covered only by proximity won't be ray_covered
>>> board.is_ray_covered(11, 14, 'NORTH') # False
>>> # But it's still online for game mechanics
>>> board.is_unit_online(11, 14, 'NORTH') # True
"""
self._ensure_network_calculated() # Lazy recalculation if needed
if player == constants.PLAYER_NORTH:
return (row, col) in self._ray_coverage_north
else:
return (row, col) in self._ray_coverage_south
# =====================================================================
# 0.2.2: Victory Condition Detection
# =====================================================================
@property
def game_state(self) -> str:
"""Return current game state.
Returns:
'ONGOING', 'NORTH_WINS', 'SOUTH_WINS', or 'DRAW'
"""
return self._game_state
@property
def victory_result(self) -> Optional[Dict[str, object]]:
"""Return victory result if game has ended, None otherwise.
Returns:
Dictionary with victory details or None if game is ongoing.
Keys: 'game_state', 'winner', 'victory_condition', 'details'
"""
return self._victory_result
[docs]
def is_game_over(self) -> bool:
"""Check if game has ended.
Returns:
True if game is over (winner determined or draw), False otherwise
"""
return self._game_state != "ONGOING"
[docs]
def check_victory(self) -> Dict[str, object]:
"""Check victory conditions and update game state.
This method should be called after each turn to detect victory conditions.
Returns:
Dictionary with victory details:
- 'game_state': Current game state
- 'winner': Winning player or None
- 'victory_condition': Specific condition met or None
- 'details': Human-readable explanation
"""
from .victory import check_victory_conditions
result = check_victory_conditions(self)
self._game_state = result.game_state.value
self._victory_result = {
'game_state': result.game_state.value,
'winner': result.winner,
'victory_condition': (
result.victory_condition.value
if result.victory_condition
else None
),
'details': result.details
}
return self._victory_result
[docs]
def handle_surrender(self, player: str) -> None:
"""Handle a player's surrender.
This method sets the game state to indicate the surrendering player has lost.
Args:
player: 'NORTH' or 'SOUTH' - the player who is surrendering
Raises:
ValueError: If game is already over or player is invalid
"""
# Check if game is already over
if self.is_game_over():
raise ValueError("Cannot surrender: game is already over")
# Validate player
if player not in (constants.PLAYER_NORTH, constants.PLAYER_SOUTH):
raise ValueError(f"Invalid player: {player}")
# Determine winner (opponent of surrendering player)
winner = (
constants.PLAYER_SOUTH
if player == constants.PLAYER_NORTH
else constants.PLAYER_NORTH
)
# Update game state
self._game_state = f"{winner}_WINS"
self._victory_result = {
'game_state': self._game_state,
'winner': winner,
'victory_condition': 'SURRENDER',
'details': f"{winner} wins: {player} has surrendered"
}
[docs]
def destroy_arsenal(self, row: int, col: int) -> None:
"""Destroy an arsenal at the specified position.
This is called when a unit moves onto an enemy arsenal.
The arsenal is removed from the board and the terrain becomes empty.
Args:
row: Row coordinate (0-19)
col: Column coordinate (0-24)
Raises:
ValueError: If position doesn't contain an arsenal
"""
if self._terrain[row][col] != constants.TERRAIN_ARSENAL:
raise ValueError(f"No arsenal at position ({row}, {col})")
# Remove arsenal terrain (set to None for empty terrain)
self._terrain[row][col] = None
# Remove from arsenal owners dict
if (row, col) in self._arsenal_owners:
del self._arsenal_owners[(row, col)]
# Mark network as dirty for recalculation
self._network_dirty = True
# NOTE: Victory checking is deferred until end_turn() is called
# This ensures the turn state is properly updated (turn number, captured units, etc.)
# before determining victory conditions. The network will be recalculated
# as part of end_turn() -> resolve_retreats() -> check_victory() sequence.