Source code for dccd.daemon.scheduler

#!/usr/bin/env python3
# coding: utf-8

""" Historical data scheduler for the dccd daemon.

Wraps APScheduler 3.x BackgroundScheduler to run periodic REST API
collection jobs defined in a :class:`~dccd.daemon.config.CollectorConfig`.

"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from apscheduler.schedulers.background import BackgroundScheduler

from dccd.histo_dl.binance import FromBinance
from dccd.histo_dl.bybit import FromBybit
from dccd.histo_dl.coinbase import FromCoinbase
from dccd.histo_dl.exchange import ImportDataCryptoCurrencies
from dccd.histo_dl.kraken import FromKraken
from dccd.histo_dl.okx import FromOKX

if TYPE_CHECKING:
    from dccd.daemon.config import CollectorConfig, HistoJob
    from dccd.daemon.health import HealthMonitor

__all__ = ['build_histo_scheduler', 'run_histo_job', 'run_once']

logger = logging.getLogger(__name__)

_HISTO_CLASSES: dict[str, type[ImportDataCryptoCurrencies]] = {
    'binance': FromBinance,
    'kraken': FromKraken,
    'bybit': FromBybit,
    'okx': FromOKX,
    'coinbase': FromCoinbase,
}


[docs] def run_histo_job(job: HistoJob, pair: str, base_path: str, tz: str = 'local', health: HealthMonitor | None = None) -> None: """ Download and save one (exchange, pair) candle job locally. Data is saved to ``base_path`` on the daemon host. Remote sync is handled separately by :class:`~dccd.daemon.stream_manager.SyncService`. Parameters ---------- job : HistoJob Job configuration (exchange, span, format). pair : str Trading pair in ``'CRYPTO/FIAT'`` format (e.g. ``'BTC/USDT'``). base_path : str Root directory for local storage (``CollectorConfig.storage.local_path``). tz : str, optional Timezone for file labelling (``CollectorConfig.settings.timezone``). health : HealthMonitor or None, optional Health monitor to record success/failure metrics. """ crypto, fiat = pair.split('/', 1) cls = _HISTO_CLASSES[job.exchange] try: obj = cls(base_path, crypto, job.span, fiat, form=job.format, tz=tz) obj.import_data('last', 'now').save() _data = getattr(obj, 'data', None) rows = len(_data) if _data is not None else 0 logger.info('histo job done: %s %s span=%s', job.exchange, pair, job.span) if health: health.record_success(job.exchange, pair, rows) except Exception: if health: health.record_failure(job.exchange, pair) raise
[docs] def build_histo_scheduler(config: CollectorConfig, health: HealthMonitor | None = None) -> BackgroundScheduler: """ Build an APScheduler BackgroundScheduler from a CollectorConfig. One interval job is registered per ``(exchange, pair)`` combination in ``config.histo_jobs``. Each job runs with ``coalesce=True`` and ``max_instances=1`` to prevent overlapping executions. Parameters ---------- config : CollectorConfig Daemon configuration. health : HealthMonitor or None, optional Health monitor forwarded to each scheduled job. Returns ------- apscheduler.schedulers.background.BackgroundScheduler Configured scheduler, not yet started. Examples -------- >>> from dccd.daemon.config import load_config >>> from dccd.daemon.scheduler import build_histo_scheduler >>> # config = load_config('config.yml') >>> # scheduler = build_histo_scheduler(config) >>> # scheduler.start() """ scheduler = BackgroundScheduler() for job in config.histo_jobs: for pair in job.pairs: job_id = f'{job.exchange}_{pair.replace("/", "_")}_{job.span}' scheduler.add_job( run_histo_job, trigger='interval', seconds=job.span, kwargs={ 'job': job, 'pair': pair, 'base_path': config.storage.local_path, 'tz': config.settings.timezone, 'health': health, }, id=job_id, name=f'{job.exchange} {pair} {job.span}s', coalesce=True, max_instances=1, ) logger.debug('registered job %s', job_id) return scheduler
[docs] def run_once(config: CollectorConfig, health: HealthMonitor | None = None) -> None: """ Execute all histo_jobs once and return. Each ``(exchange, pair)`` combination is run sequentially. A job failure is logged and skipped — other jobs continue regardless. Parameters ---------- config : CollectorConfig Daemon configuration. health : HealthMonitor or None, optional Health monitor forwarded to each job call. """ for job in config.histo_jobs: for pair in job.pairs: try: run_histo_job(job, pair, config.storage.local_path, tz=config.settings.timezone, health=health) except Exception: logger.exception( 'histo job failed: %s %s', job.exchange, pair )