Source code for dccd.application.events

"""Event bus — Progress, Log, Status events."""

from __future__ import annotations

import asyncio
import logging
from collections.abc import Callable
from typing import Any, Literal

from pydantic import BaseModel

__all__ = ["Event", "ProgressEvent", "LogEvent", "StatusEvent", "EventBus"]

logger = logging.getLogger(__name__)


class ProgressEvent(BaseModel):
    kind: Literal["progress"] = "progress"
    run_id: str
    done: int
    total: int
    unit: str = "windows"


class LogEvent(BaseModel):
    kind: Literal["log"] = "log"
    run_id: str
    level: str = "info"
    message: str


class StatusEvent(BaseModel):
    kind: Literal["status"] = "status"
    run_id: str
    state: str


Event = ProgressEvent | LogEvent | StatusEvent

Handler = Callable[[Event], Any]


[docs] class EventBus: """Simple pub-sub event bus for operation progress/log/status events.""" def __init__(self) -> None: self._handlers: list[Handler] = [] self._queue: asyncio.Queue[Event] | None = None def subscribe(self, handler: Handler) -> None: self._handlers.append(handler) def unsubscribe(self, handler: Handler) -> None: self._handlers = [h for h in self._handlers if h != handler] def emit(self, event: Event) -> None: for handler in self._handlers: try: handler(event) except Exception: logger.exception("EventBus handler error") if self._queue is not None: try: self._queue.put_nowait(event) except asyncio.QueueFull: pass def enable_queue(self, maxsize: int = 1000) -> asyncio.Queue[Event]: self._queue = asyncio.Queue(maxsize=maxsize) return self._queue def for_run(self, run_id: str) -> "RunEvents": return RunEvents(self, run_id)
class RunEvents: """Scoped event emitter for a single run.""" def __init__(self, bus: EventBus, run_id: str) -> None: self._bus = bus self._run_id = run_id def progress(self, done: int, total: int, unit: str = "windows") -> None: self._bus.emit(ProgressEvent(run_id=self._run_id, done=done, total=total, unit=unit)) def log(self, msg: str, level: str = "info") -> None: self._bus.emit(LogEvent(run_id=self._run_id, level=level, message=msg)) def status(self, state: str) -> None: self._bus.emit(StatusEvent(run_id=self._run_id, state=state))