Source code for pipeworks_mud_mapper.services.io_queue

"""Background I/O queue for long-running file operations.

This module provides a tiny thread pool and job registry so callbacks
can offload disk-heavy operations without blocking the UI thread.
"""

from __future__ import annotations

import threading
import uuid
from collections.abc import Callable
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any


[docs] class IOJobRegistry: """Thread-safe registry for background I/O jobs.""" def __init__(self, max_workers: int = 2) -> None: self._executor = ThreadPoolExecutor(max_workers=max_workers) self._lock = threading.Lock() self._jobs: dict[str, Future] = {}
[docs] def submit(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> str: """Submit a job and return the job id.""" job_id = uuid.uuid4().hex future = self._executor.submit(func, *args, **kwargs) with self._lock: self._jobs[job_id] = future return job_id
[docs] def status(self, job_id: str) -> dict[str, Any] | None: """Return status info for a job id.""" with self._lock: future = self._jobs.get(job_id) if future is None: return None if not future.done(): return {"status": "pending"} try: result = future.result() return {"status": "done", "result": result} except Exception as exc: # pragma: no cover - error path surfaced in callback return {"status": "error", "error": str(exc)}
[docs] def forget(self, job_id: str) -> None: """Remove a job id from the registry.""" with self._lock: self._jobs.pop(job_id, None)
_IO_REGISTRY = IOJobRegistry()
[docs] def submit_io_job(func: Callable[..., Any], *args: Any, **kwargs: Any) -> str: """Submit a background I/O job and return its id.""" return _IO_REGISTRY.submit(func, *args, **kwargs)
[docs] def get_io_job_status(job_id: str) -> dict[str, Any] | None: """Return the status dict for a job id.""" return _IO_REGISTRY.status(job_id)
[docs] def forget_io_job(job_id: str) -> None: """Remove a job id from the registry.""" _IO_REGISTRY.forget(job_id)