Source code for pipeworks_mud_mapper.services.map_db_service

"""SQLite storage service for mapper authoring data.

This module replaces map JSON files as the authoring source of truth.
Maps are stored in a single shared SQLite database, while zone exports
remain JSON files for the game server.

Design goals:
- Keep SQLite as the canonical authoring store.
- Preserve existing MapFile/MapRoom models in memory.
- Use clear, explicit SQL (no ORM) to keep dependencies light.
"""

from __future__ import annotations

import json
import sqlite3
from collections.abc import Iterator
from contextlib import contextmanager
from datetime import UTC, datetime
from pathlib import Path
from typing import Any

from pipeworks_mud_mapper.models import MapFile
from pipeworks_mud_mapper.models.metadata import MapMetadata
from pipeworks_mud_mapper.services.app_config import get_path_settings

# ---------------------------------------------------------------------------
# Database connection helpers
# ---------------------------------------------------------------------------


def _resolve_db_path(db_path: Path | None) -> Path:
    """Return the configured DB path unless an override is provided."""
    if db_path is not None:
        return db_path
    return get_path_settings()["db_path"]


@contextmanager
def _connect(db_path: Path | None = None) -> Iterator[sqlite3.Connection]:
    """Open a SQLite connection with schema ensured.

    A new connection is created per operation to keep thread usage safe
    (SQLite connections are not thread-safe by default).
    """
    resolved = _resolve_db_path(db_path)
    resolved.parent.mkdir(parents=True, exist_ok=True)
    conn = sqlite3.connect(resolved)
    try:
        conn.row_factory = sqlite3.Row
        conn.execute("PRAGMA foreign_keys = ON")
        conn.execute("PRAGMA journal_mode = WAL")
        _ensure_schema(conn)
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()


def _ensure_schema(conn: sqlite3.Connection) -> None:
    """Create tables if they do not exist."""
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS maps (
            id TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            description TEXT NOT NULL DEFAULT '',
            spawn_room TEXT NOT NULL,
            schema_version TEXT NOT NULL,
            map_version TEXT NOT NULL,
            map_revision INTEGER NOT NULL DEFAULT 0,
            items_json TEXT NOT NULL DEFAULT '{}',
            created_at TEXT NOT NULL,
            updated_at TEXT NOT NULL
        );

        CREATE TABLE IF NOT EXISTS rooms (
            map_id TEXT NOT NULL,
            id TEXT NOT NULL,
            name TEXT NOT NULL,
            description TEXT NOT NULL DEFAULT '',
            x INTEGER NOT NULL,
            y INTEGER NOT NULL,
            z INTEGER NOT NULL,
            exits_json TEXT NOT NULL DEFAULT '{}',
            items_json TEXT NOT NULL DEFAULT '[]',
            llm_generation_json TEXT,
            description_validation_json TEXT,
            PRIMARY KEY (map_id, id),
            FOREIGN KEY (map_id) REFERENCES maps(id) ON DELETE CASCADE
        );

        CREATE INDEX IF NOT EXISTS idx_rooms_map_id ON rooms(map_id);
        """)


# ---------------------------------------------------------------------------
# CRUD operations
# ---------------------------------------------------------------------------


[docs] def list_maps(db_path: Path | None = None) -> list[str]: """Return map IDs stored in the database.""" with _connect(db_path) as conn: rows = conn.execute("SELECT id FROM maps ORDER BY id").fetchall() return [row["id"] for row in rows]
[docs] def map_exists(map_id: str, db_path: Path | None = None) -> bool: """Return True if the map ID exists in the database.""" with _connect(db_path) as conn: row = conn.execute("SELECT 1 FROM maps WHERE id = ?", (map_id,)).fetchone() return row is not None
[docs] def load_map(map_id: str, db_path: Path | None = None) -> MapFile: """Load a map from SQLite into a MapFile model.""" with _connect(db_path) as conn: row = conn.execute( """ SELECT id, name, description, spawn_room, schema_version, map_version, map_revision, items_json FROM maps WHERE id = ? """, (map_id,), ).fetchone() if row is None: raise KeyError(f"Map not found: {map_id}") rooms_rows = conn.execute( """ SELECT id, name, description, x, y, z, exits_json, items_json, llm_generation_json, description_validation_json FROM rooms WHERE map_id = ? ORDER BY id """, (map_id,), ).fetchall() metadata = MapMetadata( schema_version=row["schema_version"], map_version=row["map_version"], map_revision=row["map_revision"], ) rooms: dict[str, dict[str, Any]] = {} for room in rooms_rows: rooms[room["id"]] = { "id": room["id"], "name": room["name"], "description": room["description"], "coords": [room["x"], room["y"], room["z"]], "exits": json.loads(room["exits_json"] or "{}"), "items": json.loads(room["items_json"] or "[]"), "llm_generation": ( json.loads(room["llm_generation_json"]) if room["llm_generation_json"] else None ), "description_validation": ( json.loads(room["description_validation_json"]) if room["description_validation_json"] else None ), } data = { "id": row["id"], "name": row["name"], "description": row["description"], "spawn_room": row["spawn_room"], "items": json.loads(row["items_json"] or "{}"), "metadata": metadata.model_dump(), "rooms": rooms, } return MapFile.from_dict(data)
[docs] def save_map(map_file: MapFile, db_path: Path | None = None) -> None: """Persist a MapFile to SQLite (upsert + full room refresh).""" now = datetime.now(UTC).isoformat() payload = map_file.to_dict_with_list_coords() with _connect(db_path) as conn: # Upsert map row. conn.execute( """ INSERT INTO maps ( id, name, description, spawn_room, schema_version, map_version, map_revision, items_json, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET name = excluded.name, description = excluded.description, spawn_room = excluded.spawn_room, schema_version = excluded.schema_version, map_version = excluded.map_version, map_revision = excluded.map_revision, items_json = excluded.items_json, updated_at = excluded.updated_at, created_at = maps.created_at """, ( payload["id"], payload["name"], payload.get("description", ""), payload["spawn_room"], payload["metadata"]["schema_version"], payload["metadata"]["map_version"], payload["metadata"]["map_revision"], json.dumps(payload.get("items", {})), now, now, ), ) # Replace rooms for this map. Simpler and safer than diffing. conn.execute("DELETE FROM rooms WHERE map_id = ?", (payload["id"],)) room_rows = [] for room in payload.get("rooms", {}).values(): coords = room.get("coords") or [0, 0, 0] room_rows.append( ( payload["id"], room["id"], room["name"], room.get("description", ""), coords[0], coords[1], coords[2], json.dumps(room.get("exits", {})), json.dumps(room.get("items", [])), ( json.dumps(room.get("llm_generation")) if room.get("llm_generation") is not None else None ), ( json.dumps(room.get("description_validation")) if room.get("description_validation") is not None else None ), ) ) if room_rows: conn.executemany( """ INSERT INTO rooms ( map_id, id, name, description, x, y, z, exits_json, items_json, llm_generation_json, description_validation_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, room_rows, )
[docs] def delete_map(map_id: str, db_path: Path | None = None) -> None: """Delete a map and its rooms from SQLite.""" with _connect(db_path) as conn: conn.execute("DELETE FROM maps WHERE id = ?", (map_id,))
[docs] def get_db_stats(db_path: Path | None = None) -> dict[str, Any]: """Return high-level database stats for UI health panels.""" resolved = _resolve_db_path(db_path) size_bytes = resolved.stat().st_size if resolved.exists() else 0 with _connect(db_path) as conn: row = conn.execute(""" SELECT (SELECT COUNT(*) FROM maps) AS map_count, (SELECT COUNT(*) FROM rooms) AS room_count, (SELECT COUNT(*) FROM rooms WHERE llm_generation_json IS NOT NULL) AS llm_generation_count, (SELECT MAX(updated_at) FROM maps) AS last_updated """).fetchone() return { "path": resolved, "size_bytes": size_bytes, "map_count": int(row["map_count"] or 0), "room_count": int(row["room_count"] or 0), "llm_generation_count": int(row["llm_generation_count"] or 0), "last_updated": row["last_updated"], }
[docs] def get_map_overview(db_path: Path | None = None) -> list[dict[str, Any]]: """Return per-map summary rows for UI tables.""" with _connect(db_path) as conn: rows = conn.execute(""" SELECT maps.id AS map_id, maps.name AS name, maps.map_version AS map_version, maps.map_revision AS map_revision, maps.updated_at AS updated_at, COUNT(rooms.id) AS room_count FROM maps LEFT JOIN rooms ON rooms.map_id = maps.id GROUP BY maps.id ORDER BY maps.id """).fetchall() return [ { "map_id": row["map_id"], "name": row["name"], "map_version": row["map_version"], "map_revision": row["map_revision"], "updated_at": row["updated_at"], "room_count": int(row["room_count"] or 0), } for row in rows ]