Source code for dccd.daemon.stream_manager

#!/usr/bin/env python3
# coding: utf-8

""" Real-time stream manager and periodic sync service for the dccd daemon.

:class:`SyncService` runs a background thread that pushes the entire local
data directory to all configured remotes at a fixed interval.

:class:`StreamManager` starts one background thread per ``(exchange, pair)``
combination (or per ``(exchange, pair, channel)`` for Bitfinex/Bitmex) and
restarts them automatically on failure.

"""

from __future__ import annotations

import asyncio
import json
import logging
import threading
import time
from collections.abc import Iterator
from pathlib import Path
from typing import TYPE_CHECKING, Any

from dccd.continuous_dl.binance import DownloadBinanceData
from dccd.continuous_dl.bitfinex import DownloadBitfinexData
from dccd.continuous_dl.bitmex import DownloadBitmexData
from dccd.continuous_dl.bybit import DownloadBybitData
from dccd.continuous_dl.exchange import ContinuousDownloader
from dccd.continuous_dl.kraken import DownloadKrakenData
from dccd.continuous_dl.okx import DownloadOKXData
from dccd.daemon.storage import RemoteStorage
from dccd.process_data import set_marketdepth, set_orders, set_trades
from dccd.storage import DataStore

if TYPE_CHECKING:
    from dccd.daemon.config import CollectorConfig, StorageConfig, StreamJob
    from dccd.daemon.health import HealthMonitor

__all__ = ['StreamManager', 'SyncService']

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Exchange class registry
# ---------------------------------------------------------------------------

_STREAM_CLASSES: dict[str, Any] = {
    'binance':  DownloadBinanceData,
    'bybit':    DownloadBybitData,
    'kraken':   DownloadKrakenData,
    'okx':      DownloadOKXData,
    'bitfinex': DownloadBitfinexData,
    'bitmex':   DownloadBitmexData,
}

# Bitfinex/Bitmex use one WS connection per channel → one thread per channel.
# Other exchanges bundle all channels in one connection → one thread per pair.
_PER_CHANNEL_EXCHANGES = frozenset({'bitfinex', 'bitmex'})

# Channel name mappings for legacy exchanges
_BITFINEX_CHANNEL: dict[str, str] = {'trades': 'trades', 'book': 'book'}
_BITMEX_CHANNEL:   dict[str, str] = {'trades': 'trade',  'book': 'orderBookL2_25'}

_RESTART_DELAY = 30  # seconds between stream restarts after a crash


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _format_pair(exchange: str, pair: str) -> str:
    """ Convert ``'BTC/USDT'`` to the exchange-specific pair format.

    Parameters
    ----------
    exchange : str
        Exchange name (lowercase).
    pair : str
        Trading pair in ``'CRYPTO/FIAT'`` format.

    Returns
    -------
    str
        Exchange-specific pair string.

    Raises
    ------
    ValueError
        If *exchange* is not supported for streaming.

    """
    crypto, fiat = pair.split('/', 1)
    if exchange in ('binance', 'bybit'):
        return crypto + fiat
    if exchange == 'kraken':
        return pair
    if exchange == 'okx':
        return f'{crypto}-{fiat}'
    if exchange == 'bitfinex':
        fiat_bf = 'UST' if fiat == 'USDT' else fiat
        return f't{crypto}{fiat_bf}'
    if exchange == 'bitmex':
        xbt = 'XBT' if crypto == 'BTC' else crypto
        return xbt + fiat
    raise ValueError(f'exchange {exchange!r} is not supported for streaming')


def _process_fn(channels: list[str]) -> Any:
    """ Return the appropriate ``process_data`` function for *channels*.

    Parameters
    ----------
    channels : list of str
        Channel names (``'trades'``, ``'book'``, …).

    Returns
    -------
    callable
        One of :func:`~dccd.process_data.set_trades`,
        :func:`~dccd.process_data.set_marketdepth`,
        :func:`~dccd.process_data.set_orders`.

    """
    has_trades = 'trades' in channels
    has_book   = 'book'   in channels
    if has_trades and has_book:
        return set_orders
    if has_trades:
        return set_trades
    if has_book:
        return set_marketdepth
    return set_orders


def _connect_kwargs(exchange: str, pair: str, channels: list[str]) -> dict[str, Any]:
    """ Return keyword arguments to pass to ``downloader._connect()``.

    Parameters
    ----------
    exchange : str
        Exchange name.
    pair : str
        Trading pair in ``'CRYPTO/FIAT'`` format.
    channels : list of str
        Channel(s) for this thread (singleton for Bitfinex/Bitmex).

    Returns
    -------
    dict
        Empty for exchanges that embed subscription in ``__init__``;
        channel/symbol kwargs for Bitfinex; ``args`` string for Bitmex.

    """
    ch = channels[0]
    if exchange == 'bitfinex':
        return {'channel': _BITFINEX_CHANNEL.get(ch, ch),
                'symbol':  _format_pair('bitfinex', pair)}
    if exchange == 'bitmex':
        bch = _BITMEX_CHANNEL.get(ch, ch)
        return {'args': f'{bch}:{_format_pair("bitmex", pair)}'}
    return {}


def _iter_tasks(job: StreamJob) -> Iterator[tuple[str, list[str]]]:
    """ Yield ``(pair, channels)`` for each thread to create.

    Bitfinex and Bitmex have one WS connection per channel, so each
    ``(pair, channel)`` pair gets its own thread.  All other exchanges
    handle multiple channels in one connection.

    Parameters
    ----------
    job : StreamJob
        Stream job configuration.

    Yields
    ------
    pair : str
    channels : list of str

    """
    if job.exchange in _PER_CHANNEL_EXCHANGES:
        for pair in job.pairs:
            for ch in job.channels:
                yield pair, [ch]
    else:
        for pair in job.pairs:
            yield pair, job.channels


# ---------------------------------------------------------------------------
# SyncService
# ---------------------------------------------------------------------------

[docs] class SyncService: """ Periodically push the entire local data directory to all remotes. This is the single point of truth for remote synchronisation. Neither histo jobs nor stream threads push data themselves — they save locally and rely on this service to replicate to remote destinations. Parameters ---------- config : StorageConfig Storage configuration (``remotes`` list + ``sync_interval``). Notes ----- If ``config.remotes`` is empty or ``config.sync_interval`` is 0, the service is a no-op and no background thread is started. """ def __init__(self, config: StorageConfig) -> None: self.config = config self._storage = RemoteStorage(config) self._stop = threading.Event() self._thread: threading.Thread | None = None
[docs] def start(self) -> None: """ Start the background sync thread (idempotent). """ if not self.config.remotes or self.config.sync_interval <= 0: logger.info( 'SyncService disabled (remotes=%d, sync_interval=%d)', len(self.config.remotes), self.config.sync_interval, ) return self._thread = threading.Thread( target=self._loop, daemon=True, name='sync-service', ) self._thread.start() logger.info('SyncService started (interval=%ds)', self.config.sync_interval)
[docs] def stop(self) -> None: """ Signal the sync thread to stop at the next interval boundary. """ self._stop.set()
[docs] def sync_now(self) -> None: """ Push ``local_path`` to all remotes immediately (blocking). On success, the timestamp and remote list are recorded in ``{local_path}/.dccd/last_sync.json`` so the web UI can display the last sync time without inspecting rclone output. """ self._storage.push(self.config.local_path) self._record_last_sync()
def _record_last_sync(self) -> None: """ Write ``last_sync.json`` after a successful push (best effort). """ try: d = Path(self.config.local_path) / '.dccd' d.mkdir(parents=True, exist_ok=True) payload = { 'last_sync_at': time.time(), 'remotes': [r.remote for r in self.config.remotes], } (d / 'last_sync.json').write_text(json.dumps(payload)) except Exception: logger.exception('SyncService: failed to write last_sync.json') def _loop(self) -> None: while not self._stop.wait(timeout=self.config.sync_interval): try: self.sync_now() except Exception: logger.exception('SyncService: sync failed')
# --------------------------------------------------------------------------- # StreamManager # ---------------------------------------------------------------------------
[docs] class StreamManager: """ Manage real-time WebSocket collection jobs. Starts one background thread per ``(exchange, pair)`` (or per ``(exchange, pair, channel)`` for Bitfinex/Bitmex). Each thread runs indefinitely and is automatically restarted after a crash. A :class:`SyncService` instance pushes data to remotes periodically. Parameters ---------- config : CollectorConfig Daemon configuration (``stream_jobs`` + ``storage``). """ def __init__(self, config: CollectorConfig, health: HealthMonitor | None = None) -> None: self.config = config self._health = health self._lock = threading.RLock() self._threads: dict[str, threading.Thread] = {} self._downloaders: dict[str, ContinuousDownloader] = {} self._stops: dict[str, threading.Event] = {} self._stop_event = threading.Event() self._sync = SyncService(config.storage) @staticmethod def _key(exchange: str, pair: str, channels: list[str]) -> str: """ Build the thread key for an ``(exchange, pair, channels)`` task. """ ch_tag = '_'.join(channels) return f'{exchange}_{pair.replace("/", "_")}_{ch_tag}'
[docs] def start(self) -> None: """ Start the sync service and all stream threads. """ self._sync.start() for job in self.config.stream_jobs: for pair, channels in _iter_tasks(job): self._start_thread(job, pair, channels)
[docs] def start_one(self, job: StreamJob, pair: str, channels: list[str]) -> str: """ Start a single stream thread for *(pair, channels)* if not running. Parameters ---------- job : StreamJob Stream job configuration (only ``exchange``/``time_step`` are used). pair : str Trading pair in ``'CRYPTO/FIAT'`` format. channels : list of str Channels for this thread. Returns ------- str The thread key (running or freshly started). """ return self._start_thread(job, pair, channels)
def _start_thread(self, job: StreamJob, pair: str, channels: list[str]) -> str: key = self._key(job.exchange, pair, channels) with self._lock: existing = self._threads.get(key) if existing is not None and existing.is_alive(): return key stop = threading.Event() self._stops[key] = stop t = threading.Thread( target=self._run_forever, args=(job, pair, channels, stop), name=key, daemon=True, ) self._threads[key] = t t.start() logger.info('stream started: %s %s channels=%s', job.exchange, pair, channels) return key
[docs] def stop(self) -> None: """ Signal all streams and the sync service to stop. """ self._stop_event.set() self._sync.stop() with self._lock: for ev in self._stops.values(): ev.set() downloaders = list(self._downloaders.values()) for dl in downloaders: dl.until = time.time() dl.is_connect = False
[docs] def stop_one(self, key: str) -> bool: """ Signal a single stream thread (by key) to stop. The thread and stop-event slots are freed immediately so a subsequent :meth:`start_one` for the same key always spawns a fresh thread instead of reusing one that is still winding down. The downloader entry is left in place; the dying thread removes its own (identity-guarded) in :meth:`_run_once`. Parameters ---------- key : str Thread key as returned by :meth:`start_one` / :meth:`status`. Returns ------- bool ``True`` if a matching running thread was found and signalled. """ with self._lock: ev = self._stops.pop(key, None) dl = self._downloaders.get(key) thread = self._threads.pop(key, None) if ev is None and thread is None: return False if ev is not None: ev.set() if dl is not None: dl.until = time.time() dl.is_connect = False return True
[docs] def running_keys(self) -> list[str]: """ Return the keys of all currently alive stream threads. """ with self._lock: return [k for k, t in self._threads.items() if t.is_alive()]
[docs] def status(self, config: CollectorConfig | None = None) -> list[dict[str, Any]]: """ Return the configured stream tasks with their running state. Parameters ---------- config : Config, optional Configuration to read ``stream_jobs`` from. Defaults to the config captured at construction; callers (e.g. the web UI) pass a freshly-loaded config so stream jobs added at runtime appear here without restarting the process. Returns ------- list of dict One entry per ``(pair, channels)`` task across all configured ``stream_jobs``, with ``key``, ``exchange``, ``pair``, ``channels``, ``time_step`` and ``running`` fields. """ cfg = config if config is not None else self.config running = set(self.running_keys()) out: list[dict[str, Any]] = [] for job in cfg.stream_jobs: for pair, channels in _iter_tasks(job): key = self._key(job.exchange, pair, channels) out.append({ 'key': key, 'exchange': job.exchange, 'pair': pair, 'channels': list(channels), 'time_step': job.time_step, 'running': key in running, }) return out
# ------------------------------------------------------------------ # Thread body # ------------------------------------------------------------------ def _run_forever(self, job: StreamJob, pair: str, channels: list[str], stop: threading.Event) -> None: while not self._stop_event.is_set() and not stop.is_set(): try: self._run_once(job, pair, channels) if self._health: self._health.record_success(job.exchange, pair) except Exception: logger.exception('stream crashed: %s %s', job.exchange, pair) if self._health: self._health.record_failure(job.exchange, pair) if not self._stop_event.is_set() and not stop.is_set(): stop.wait(timeout=_RESTART_DELAY) def _run_once(self, job: StreamJob, pair: str, channels: list[str]) -> None: cls = _STREAM_CLASSES[job.exchange] # Bitfinex/Bitmex do not take pair in __init__; they receive it # via _connect() kwargs. All other exchanges set the pair in __init__. if job.exchange in _PER_CHANNEL_EXCHANGES: downloader: ContinuousDownloader = cls( time_step=job.time_step, until=0, ) # self.parser must be initialised before _loop() / on_message() ch_map = _BITFINEX_CHANNEL if job.exchange == 'bitfinex' else _BITMEX_CHANNEL ch_key = ch_map.get(channels[0], channels[0]) downloader.parser = downloader.get_parser(ch_key) # type: ignore[attr-defined] else: downloader = cls( pair=_format_pair(job.exchange, pair), time_step=job.time_step, until=0, ) key = self._key(job.exchange, pair, channels) with self._lock: self._downloaders[key] = downloader has_trades = 'trades' in channels has_book = 'book' in channels if has_trades and has_book: data_type = 'trades' elif has_trades: data_type = 'trades' else: data_type = 'orderbook' store = DataStore( self.config.storage.local_path, job.exchange, pair, None, data_type, ) downloader.set_process_data(_process_fn(channels)) downloader.set_saver(store.save) conn_kw = _connect_kwargs(job.exchange, pair, channels) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(asyncio.gather( downloader._connect(**conn_kw), downloader._loop(), )) finally: loop.close() with self._lock: # Only remove our own downloader: a stop+restart may have # already registered a fresh one under the same key. if self._downloaders.get(key) is downloader: del self._downloaders[key] logger.info('stream ended: %s %s', job.exchange, pair)