"""Event bus — Progress, Log, Status events."""
from __future__ import annotations
import asyncio
import logging
from collections.abc import Callable
from typing import Any, Literal
from pydantic import BaseModel
__all__ = [
"Event",
"ProgressEvent",
"LogEvent",
"StatusEvent",
"StreamSampleEvent",
"EventBus",
]
logger = logging.getLogger(__name__)
[docs]
class ProgressEvent(BaseModel):
"""Progress of a run (windows or time covered)."""
kind: Literal["progress"] = "progress"
run_id: str
done: int
total: int
unit: str = "windows"
[docs]
class LogEvent(BaseModel):
"""A log line emitted by a run."""
kind: Literal["log"] = "log"
run_id: str
level: str = "info"
message: str
[docs]
class StatusEvent(BaseModel):
"""A run state change (running, succeeded, failed, …)."""
kind: Literal["status"] = "status"
run_id: str
state: str
[docs]
class StreamSampleEvent(BaseModel):
"""A liveness sample from a running stream (last trade/price/quote).
Emitted (throttled) by :func:`dccd.application.operations.stream` so the
Live UI can prove a stream is actually receiving data, without persisting
the sample. ``ts`` is the record timestamp (nanoseconds UTC). The values are
raw numbers — the client formats them (thousands separators, etc.):
``value`` for trades (last price) and OHLC (close); ``bid``/``ask`` for the
order book top of book.
"""
kind: Literal["sample"] = "sample"
run_id: str
ts: int
value: float | None = None
bid: float | None = None
ask: float | None = None
Event = ProgressEvent | LogEvent | StatusEvent | StreamSampleEvent
Handler = Callable[[Event], Any]
[docs]
class EventBus:
"""Pub-sub bus carrying operation events to interested subscribers.
Operations emit :class:`ProgressEvent`, :class:`LogEvent` and
:class:`StatusEvent` (tagged by ``run_id``). Subscribers — the HTTP API's
SSE endpoint, the health monitor — receive them either by registering a
handler or by draining an internal queue (:meth:`enable_queue`). Use
:meth:`for_run` to get a small emitter bound to one ``run_id``.
Examples
--------
>>> bus = EventBus()
>>> seen = []
>>> bus.subscribe(seen.append)
>>> bus.for_run('r1').log('started')
>>> seen[0].message
'started'
"""
def __init__(self) -> None:
self._handlers: list[Handler] = []
# A *set* of queues so several SSE consumers (Live + Logs + Dashboard
# in separate tabs) each receive every event. A single shared queue
# would let the last connection steal events from the others.
self._queues: set[asyncio.Queue[Event]] = set()
[docs]
def subscribe(self, handler: Handler) -> None:
"""Register a handler called for every published event."""
self._handlers.append(handler)
[docs]
def unsubscribe(self, handler: Handler) -> None:
"""Remove a previously registered handler."""
self._handlers = [h for h in self._handlers if h != handler]
[docs]
def emit(self, event: Event) -> None:
"""Publish an event to all handlers and every registered queue."""
for handler in self._handlers:
try:
handler(event)
except Exception:
logger.exception("EventBus handler error")
for queue in self._queues:
try:
queue.put_nowait(event)
except asyncio.QueueFull:
pass
[docs]
def add_queue(self, maxsize: int = 1000) -> asyncio.Queue[Event]:
"""Register and return a new queue that receives every event (for SSE)."""
queue: asyncio.Queue[Event] = asyncio.Queue(maxsize=maxsize)
self._queues.add(queue)
return queue
[docs]
def remove_queue(self, queue: asyncio.Queue[Event]) -> None:
"""Unregister a queue (call on SSE disconnect)."""
self._queues.discard(queue)
[docs]
def enable_queue(self, maxsize: int = 1000) -> asyncio.Queue[Event]:
"""Backwards-compatible alias for :meth:`add_queue`."""
return self.add_queue(maxsize)
[docs]
def for_run(self, run_id: str) -> "RunEvents":
"""Return a small emitter bound to *run_id* (``.progress/.log/.status``)."""
return RunEvents(self, run_id)
class RunEvents:
"""Scoped event emitter for a single run."""
def __init__(self, bus: EventBus, run_id: str) -> None:
self._bus = bus
self._run_id = run_id
def progress(self, done: int, total: int, unit: str = "windows") -> None:
"""Emit a :class:`ProgressEvent` for this run."""
self._bus.emit(ProgressEvent(run_id=self._run_id, done=done, total=total, unit=unit))
def log(self, msg: str, level: str = "info") -> None:
"""Emit a :class:`LogEvent` for this run."""
self._bus.emit(LogEvent(run_id=self._run_id, level=level, message=msg))
def status(self, state: str) -> None:
"""Emit a :class:`StatusEvent` for this run."""
self._bus.emit(StatusEvent(run_id=self._run_id, state=state))
def sample(
self,
ts: int,
*,
value: float | None = None,
bid: float | None = None,
ask: float | None = None,
) -> None:
"""Emit a :class:`StreamSampleEvent` (stream liveness) for this run."""
self._bus.emit(
StreamSampleEvent(run_id=self._run_id, ts=ts, value=value, bid=bid, ask=ask)
)