Source code for pipeworks_mud_mapper.services.validation_service

"""Validation service for map integrity checks.

This module provides validation functions that check map files for common
issues. Validations are non-destructive - they return warnings but don't
modify the map.

Validation Categories
---------------------
**Connectivity**
    - Unreachable rooms (no path from spawn)
    - Dead-end rooms (no exits)
    - Broken exit references

**Exit Consistency**
    - Asymmetric exits (A→B exists but B→A doesn't)
    - Direction conflicts (exit says "north" but coords say "south")

**Language-Direction**
    - Room names with vertical words ("Upper", "Basement") that don't
      match their navigational reality
    - This catches the "Upper Landing" problem from goblin_cartography.md

Warning Severity
----------------
Warnings have severity levels:

- **ERROR**: Must be fixed before export (broken references)
- **WARNING**: Should be reviewed but may be intentional (dead ends)
- **INFO**: Informational only (asymmetric exits may be intended)

Usage
-----
Run all validations::

    from pipeworks_mud_mapper.services import validation_service

    warnings = validation_service.validate_all(map_file)
    for warning in warnings:
        print(f"[{warning.severity}] {warning.room_id}: {warning.message}")

Run specific validations::

    conn_warnings = validation_service.validate_connectivity(map_file)
    exit_warnings = validation_service.validate_exit_consistency(map_file)

See Also
--------
- ``goblin_cartography.md`` Section 1.9: Developer Mapping Tools
- ``goblin_cartography.md`` Section 1.7: The "Upper Landing" Problem
"""

from dataclasses import dataclass
from enum import StrEnum

from pipeworks_mud_mapper.models import Direction, MapFile
from pipeworks_mud_mapper.models.room import OPPOSITE_DIRECTION


[docs] class Severity(StrEnum): """Severity level for validation warnings.""" ERROR = "error" WARNING = "warning" INFO = "info"
[docs] @dataclass class ValidationWarning: """A validation warning or error. Attributes ---------- severity : Severity How serious this issue is. category : str Category of the warning (connectivity, consistency, language). room_id : str or None Room ID related to the warning, if applicable. message : str Human-readable description of the issue. details : dict Additional structured data about the issue. """ severity: Severity category: str room_id: str | None message: str details: dict | None = None def __str__(self) -> str: """Format warning as a string.""" room_part = f" [{self.room_id}]" if self.room_id else "" return f"[{self.severity.value.upper()}]{room_part} {self.message}"
[docs] def validate_all(map_file: MapFile) -> list[ValidationWarning]: """Run all validation checks on a map file. Parameters ---------- map_file : MapFile The map file to validate. Returns ------- list[ValidationWarning] All warnings from all validation checks, sorted by severity. Examples -------- >>> warnings = validate_all(map_file) >>> errors = [w for w in warnings if w.severity == Severity.ERROR] >>> if errors: ... print("Cannot export: fix errors first") """ warnings: list[ValidationWarning] = [] warnings.extend(validate_connectivity(map_file)) warnings.extend(validate_exit_consistency(map_file)) warnings.extend(validate_language_direction(map_file)) # Sort by severity (errors first) severity_order = {Severity.ERROR: 0, Severity.WARNING: 1, Severity.INFO: 2} warnings.sort(key=lambda w: severity_order[w.severity]) return warnings
[docs] def validate_connectivity(map_file: MapFile) -> list[ValidationWarning]: """Check for connectivity issues. Checks for: - Unreachable rooms (no path from spawn) - Dead-end rooms (no exits) - Broken exit references (exit to nonexistent room) Parameters ---------- map_file : MapFile The map file to validate. Returns ------- list[ValidationWarning] Connectivity warnings. Examples -------- >>> warnings = validate_connectivity(map_file) >>> for w in warnings: ... if "unreachable" in w.message: ... print(f"Orphan room: {w.room_id}") """ warnings: list[ValidationWarning] = [] # Check for broken exit references for room_id, room in map_file.rooms.items(): for direction, target in room.exits.items(): # Skip cross-zone exits if ":" in target: continue if target not in map_file.rooms: warnings.append( ValidationWarning( severity=Severity.ERROR, category="connectivity", room_id=room_id, message=f"Exit '{direction}' leads to nonexistent room '{target}'", details={"direction": direction, "target": target}, ) ) # Check for unreachable rooms unreachable = _find_unreachable_rooms(map_file) for room_id in unreachable: warnings.append( ValidationWarning( severity=Severity.WARNING, category="connectivity", room_id=room_id, message=f"Room is unreachable from spawn '{map_file.spawn_room}'", ) ) # Check for dead-end rooms for room_id, room in map_file.rooms.items(): if not room.exits: warnings.append( ValidationWarning( severity=Severity.INFO, category="connectivity", room_id=room_id, message="Room has no exits (dead end)", ) ) return warnings
def _find_unreachable_rooms(map_file: MapFile) -> list[str]: """Find rooms that cannot be reached from spawn. Uses breadth-first search from spawn_room. """ if not map_file.rooms: return [] reachable: set[str] = set() queue = [map_file.spawn_room] while queue: current = queue.pop(0) if current in reachable: continue reachable.add(current) room = map_file.rooms.get(current) if room: for target in room.exits.values(): # Handle cross-zone exits - only follow same-zone if ":" not in target and target not in reachable: queue.append(target) all_rooms = set(map_file.rooms.keys()) return sorted(all_rooms - reachable)
[docs] def validate_exit_consistency(map_file: MapFile) -> list[ValidationWarning]: """Check for exit consistency issues. Checks for: - Asymmetric exits (A→north→B but B has no south→A) - Direction/coordinate mismatches (exit says "north" but room is south) Parameters ---------- map_file : MapFile The map file to validate. Returns ------- list[ValidationWarning] Exit consistency warnings. Notes ----- Asymmetric exits are flagged as INFO, not errors, because some asymmetry is intentional (one-way doors, hidden passages, trapdoors). """ warnings: list[ValidationWarning] = [] checked_pairs: set[tuple[str, str]] = set() for room_id, room in map_file.rooms.items(): for direction, target in room.exits.items(): # Skip cross-zone exits if ":" in target: continue # Skip if already checked this pair pair: tuple[str, str] = (min(room_id, target), max(room_id, target)) if pair in checked_pairs: continue checked_pairs.add(pair) target_room = map_file.rooms.get(target) if not target_room: continue # Already caught by connectivity check # Check for asymmetric exits opposite = OPPOSITE_DIRECTION.get(direction) if opposite: return_exit = target_room.exits.get(opposite) if return_exit != room_id: if return_exit is None: warnings.append( ValidationWarning( severity=Severity.INFO, category="consistency", room_id=room_id, message=( f"Asymmetric exit: {room_id}{direction}{target}, " f"but {target} has no {opposite} exit back" ), details={ "from_room": room_id, "direction": direction, "to_room": target, "expected_return": opposite, }, ) ) else: warnings.append( ValidationWarning( severity=Severity.INFO, category="consistency", room_id=room_id, message=( f"Exit mismatch: {room_id}{direction}{target}, " f"but {target}{opposite}{return_exit} (not back to {room_id})" ), details={ "from_room": room_id, "direction": direction, "to_room": target, "return_direction": opposite, "actual_return": return_exit, }, ) ) # Check for direction/coordinate mismatches warning = _check_direction_coords_match(room, target_room, direction) if warning: warnings.append(warning) return warnings
def _check_direction_coords_match( from_room, to_room, direction: Direction, ) -> ValidationWarning | None: """Check if an exit direction matches the coordinate relationship. For example, if room A has an exit "north" to room B, but B is actually south of A according to coordinates, this is a mismatch. """ from_coords = from_room.coords.to_tuple() to_coords = to_room.coords.to_tuple() dx = to_coords[0] - from_coords[0] dy = to_coords[1] - from_coords[1] dz = to_coords[2] - from_coords[2] # Determine actual direction based on coordinates actual_direction = None if dx > 0 and dy == 0 and dz == 0: actual_direction = "east" elif dx < 0 and dy == 0 and dz == 0: actual_direction = "west" elif dy > 0 and dx == 0 and dz == 0: actual_direction = "north" elif dy < 0 and dx == 0 and dz == 0: actual_direction = "south" elif dz > 0 and dx == 0 and dy == 0: actual_direction = "up" elif dz < 0 and dx == 0 and dy == 0: actual_direction = "down" # If movement is diagonal or zero, we can't determine direction if actual_direction and actual_direction != direction: return ValidationWarning( severity=Severity.WARNING, category="consistency", room_id=from_room.id, message=( f"Direction mismatch: exit says '{direction}' to '{to_room.id}', " f"but coordinates suggest '{actual_direction}'" ), details={ "from_room": from_room.id, "to_room": to_room.id, "exit_direction": direction, "coordinate_direction": actual_direction, "from_coords": list(from_coords), "to_coords": list(to_coords), }, ) return None # Words that imply vertical position VERTICAL_WORDS_UP = {"upper", "upstairs", "attic", "top", "tower", "rooftop", "above"} VERTICAL_WORDS_DOWN = { "lower", "downstairs", "basement", "cellar", "underground", "below", "pit", "dungeon", }
[docs] def validate_language_direction(map_file: MapFile) -> list[ValidationWarning]: """Check for language-direction conflicts in room names. Flags rooms whose names contain vertical words (like "Upper", "Basement") but are not reached via the corresponding vertical direction (up/down). This catches the "Upper Landing" problem from goblin_cartography.md: a room named "Upper Landing" reached via "north" creates cognitive dissonance for players trying to build a mental map. Parameters ---------- map_file : MapFile The map file to validate. Returns ------- list[ValidationWarning] Language-direction conflict warnings. Notes ----- These are flagged as INFO because the author may have intentional reasons for the naming, or may be using the word in a non-directional sense. """ warnings: list[ValidationWarning] = [] for room_id, room in map_file.rooms.items(): name_lower = room.name.lower() name_words = set(name_lower.split()) # Check for "upper" type words upper_matches = name_words & VERTICAL_WORDS_UP if upper_matches: # Check if any incoming exit is "up" reached_via_up = _is_room_reached_via(map_file, room_id, "up") if not reached_via_up: warnings.append( ValidationWarning( severity=Severity.INFO, category="language", room_id=room_id, message=( f"Room name contains '{list(upper_matches)[0]}' but " "is not reached via 'up' from any connected room" ), details={ "vertical_word": list(upper_matches)[0], "expected_direction": "up", }, ) ) # Check for "lower" type words lower_matches = name_words & VERTICAL_WORDS_DOWN if lower_matches: # Check if any incoming exit is "down" reached_via_down = _is_room_reached_via(map_file, room_id, "down") if not reached_via_down: warnings.append( ValidationWarning( severity=Severity.INFO, category="language", room_id=room_id, message=( f"Room name contains '{list(lower_matches)[0]}' but " "is not reached via 'down' from any connected room" ), details={ "vertical_word": list(lower_matches)[0], "expected_direction": "down", }, ) ) return warnings
def _is_room_reached_via(map_file: MapFile, room_id: str, direction: Direction) -> bool: """Check if any room has an exit in the given direction to this room.""" for other_room in map_file.rooms.values(): if other_room.exits.get(direction) == room_id: return True return False
[docs] def has_errors(warnings: list[ValidationWarning]) -> bool: """Check if any warnings are errors. Parameters ---------- warnings : list[ValidationWarning] List of validation warnings. Returns ------- bool True if any warning has ERROR severity. Examples -------- >>> warnings = validate_all(map_file) >>> if has_errors(warnings): ... print("Fix errors before exporting") """ return any(w.severity == Severity.ERROR for w in warnings)
[docs] def filter_by_severity( warnings: list[ValidationWarning], severity: Severity, ) -> list[ValidationWarning]: """Filter warnings by severity level. Parameters ---------- warnings : list[ValidationWarning] List of validation warnings. severity : Severity Severity level to filter for. Returns ------- list[ValidationWarning] Warnings matching the specified severity. Examples -------- >>> errors = filter_by_severity(warnings, Severity.ERROR) """ return [w for w in warnings if w.severity == severity]
[docs] def filter_by_category( warnings: list[ValidationWarning], category: str, ) -> list[ValidationWarning]: """Filter warnings by category. Parameters ---------- warnings : list[ValidationWarning] List of validation warnings. category : str Category to filter for (connectivity, consistency, language). Returns ------- list[ValidationWarning] Warnings matching the specified category. Examples -------- >>> conn_issues = filter_by_category(warnings, "connectivity") """ return [w for w in warnings if w.category == category]
[docs] def create_validation_report( map_file_name: str, warnings: list[ValidationWarning], ) -> dict: """Create a validation report dictionary. Creates a structured report suitable for JSON serialization and display in the UI. The report includes a summary with counts by severity and a pass/fail status based on whether any errors exist. Parameters ---------- map_file_name : str Name of the map file that was validated. warnings : list[ValidationWarning] List of validation warnings from validate_all(). Returns ------- dict Validation report with the following structure:: { "timestamp": "2026-02-02T17:00:00Z", "map_file": "ledgerfall_alley.map.json", "summary": { "errors": 0, "warnings": 2, "info": 3, "total": 5, "passed": true }, "warnings": [ { "severity": "warning", "category": "connectivity", "room_id": "room_1", "message": "Room is unreachable from spawn" } ] } Examples -------- >>> warnings = validate_all(map_file) >>> report = create_validation_report("my_zone.map.json", warnings) >>> if report["summary"]["passed"]: ... print("Validation passed!") """ from datetime import UTC, datetime # Count warnings by severity error_count = len([w for w in warnings if w.severity == Severity.ERROR]) warning_count = len([w for w in warnings if w.severity == Severity.WARNING]) info_count = len([w for w in warnings if w.severity == Severity.INFO]) # Build the report structure report = { "timestamp": datetime.now(UTC).isoformat(), "map_file": map_file_name, "summary": { "errors": error_count, "warnings": warning_count, "info": info_count, "total": len(warnings), "passed": error_count == 0, # Pass if no errors (warnings OK) }, "warnings": [ { "severity": w.severity.value, "category": w.category, "room_id": w.room_id, "message": w.message, "details": w.details, } for w in warnings ], } return report
[docs] def write_validation_report( map_file_name: str, warnings: list[ValidationWarning], output_dir: str | None = None, ) -> str: """Write a validation report to a JSON file. Creates a validation report and writes it to the data/validation/ directory. The file is named after the map file with a .validation.json extension. Parameters ---------- map_file_name : str Name of the map file that was validated (e.g., "my_zone.map.json"). warnings : list[ValidationWarning] List of validation warnings from validate_all(). output_dir : str, optional Directory to write the report to. Defaults to "data/validation/". Returns ------- str Absolute path to the written validation report file. Raises ------ OSError If the output directory cannot be created or the file cannot be written. Examples -------- >>> warnings = validate_all(map_file) >>> report_path = write_validation_report("my_zone.map.json", warnings) >>> print(f"Report written to: {report_path}") Notes ----- The validation directory is created if it doesn't exist. Previous validation reports for the same map file are overwritten. The report file uses the naming convention: ``{zone_id}.validation.json`` where zone_id is extracted from the map file name (e.g., "my_zone.map.json" -> "my_zone.validation.json"). """ import json from pathlib import Path # Default to data/validation/ directory output_path_dir: Path if output_dir is None: # Find the project data directory relative to the package package_dir = Path(__file__).parent.parent.parent.parent output_path_dir = package_dir / "data" / "validation" else: output_path_dir = Path(output_dir) # Ensure the output directory exists output_path_dir.mkdir(parents=True, exist_ok=True) # Create the report report = create_validation_report(map_file_name, warnings) # Generate output filename from map file name # "my_zone.map.json" -> "my_zone.validation.json" base_name = map_file_name if base_name.endswith(".map.json"): base_name = base_name[:-9] # Remove ".map.json" elif base_name.endswith(".json"): base_name = base_name[:-5] # Remove ".json" output_filename = f"{base_name}.validation.json" output_path = output_path_dir / output_filename # Write the report with pretty formatting with open(output_path, "w", encoding="utf-8") as f: json.dump(report, f, indent=2, ensure_ascii=False) return str(output_path.resolve())