Application¶
The application layer wires domain, sources and storage into the four operations, plus the orchestration that schedules them and the events they emit.
Operations¶
backfill / stream / read / inventory are the verbs; everything
else is plumbing. They take their infrastructure (registry, store, events) as
keyword arguments so call sites stay explicit and testable.
Core operations — backfill, stream, read, inventory.
All public functions are async and accept keyword-only infrastructure arguments (registry, store, events, …) to keep call sites explicit and testable.
Key contract for paginators: each fetch_page passed to
paginate_ohlc / paginate_trades must be a closure with symbol
(and span for OHLC) already bound:
- async def _fetch(start_ns, end_ns, limit):
return await adapter.fetch_ohlc_page(symbol, span, start_ns, end_ns, limit)
- async backfill(spec, *, registry, store, runs_store=None, coverage_store=None, events=None, stop_event=None, run_id=None)[source]¶
Backfill historical data from a source to the Parquet store.
- Parameters:
- specJobSpec
Job specification (
operation='backfill').spec.params.startcontrols the start point:'last': resume from the last stored timestamp. If no data exists yet, defaults to 30 days ago to avoid a multi-decade paginator run from epoch 0.'origin': start from the exchange’s earliest available data (timestamp 0 — many pages will be empty before the exchange’s launch date).An ISO-8601 date string (
'2024-01-01') or a nanosecond integer: explicit start timestamp.
- registrySourceRegistry
- storeParquetStore
- runs_storeRunsStore or None
- coverage_storeCoverageStore or None
When set,
start="last"falls back to the manifest’s recordedmax_tsif no local file exists (so a dropped store doesn’t trigger a re-download), and the dataset’s extent is recorded on success.- eventsRunEvents or None
- stop_eventasyncio.Event or None
Set externally to cancel mid-run cleanly.
- run_idstr or None
Override the auto-generated run ID (used by the API endpoint so the polling URL matches what is stored in RunsStore).
- Returns:
- dict
{'run_id', 'rows_written', 'start_ns', 'end_ns'}on success;{'run_id', 'rows_written', 'error'}on failure.
- read(target, *, store, start_ns=None, end_ns=None, remote=None)[source]¶
Read stored data for target in the given nanosecond range.
Read-through restore: when remote is set and the dataset has no local Parquet (e.g. it was purged to free disk), the dataset directory is pulled back from the remote (
rclone copy) before loading, so a purge is transparent to readers.
- async stream(spec, *, registry, store, runs_store=None, events=None, stop_event=None)[source]¶
Stream live data continuously until stop_event is set.
- async sync_remote(remote, *, runs_store=None, events=None, run_id=None)[source]¶
Run one remote-sync cycle: mirror the local store to all rclone remotes.
Records the cycle as a
syncrun in runs_store (so the Storage UI can show “last sync”) and emitsstatus/logon events. Shared by the scheduler’s periodic loop and the manual “Sync now” endpoint, so the run-recording lives in exactly one place.- Parameters:
- remoteRemoteStorage
- runs_storeRunsStore or None
- eventsRunEvents or None
- run_idstr or None
Override the auto-generated run id.
- Returns:
- dict
{'run_id', 'results', 'ok'}—resultsmaps remote → success;okis True only when every configured remote synced.
Jobs¶
A JobSpec is a declarative unit of work
(operation + target + trigger + params); the config expands one
JobConfig into one spec per pair.
Job model — JobSpec, JobRun, Trigger, JobParams.
- class JobParams(*, start='last', depth=None, snapshot_interval=None, derive_from=None, transport=None)[source]¶
Operation parameters (all optional, per-operation).
- class JobRun(*, run_id, spec_id, operation, target, state=RunState.PENDING, started_at=None, ended_at=None, rows_written=0, error=None, progress=None, log_tail=[])[source]¶
One execution of a JobSpec.
- class JobSpec(*, id, operation, target, trigger, params=JobParams(start='last', depth=None, snapshot_interval=None, derive_from=None, transport=None), enabled=True, origin='config')[source]¶
Declarative job definition.
A histo job is: backfill + interval trigger (+ start=”last”). A stream job is: stream + supervised trigger.
Scheduler¶
Routes each spec by trigger kind — supervised → stream worker (auto-reconnect), interval/cron → periodic backfill, once → one-shot.
- class Scheduler(registry, store, runs_store=None, events=None, remote=None, sync_interval=3600, coverage_store=None, data_path=None, min_free_gb=0.0)[source]¶
Orchestrates JobSpecs (intervals → backfill; supervised → streams).
- Parameters:
- registrySourceRegistry
- storeParquetStore
- runs_storeRunsStore or None
- eventsEventBus
- remoteRemoteStorage or None
When set (rclone remotes configured),
startlaunches a periodic loop that mirrors the local store off-box everysync_intervalseconds.- sync_intervalint
Seconds between remote-sync cycles (default 3600).
- register_streams(specs)[source]¶
Register stream workers from config without starting them.
Called by the app lifespan so stream jobs appear in
/api/streamsand can be started/stopped from the UI even indccd uimode.
- async stop_stream(spec_id)[source]¶
Stop one registered stream by spec id; return whether it existed.
- async sync_intervals(specs)[source]¶
Reconcile recurring backfill loops with specs.
Starts a loop for each newly-scheduled
interval/cronbackfill job, cancels loops whose schedule was removed (nowmanual/deleted), and restarts a loop whose cadence changed. No-op unless the scheduler is running (dccd start); indccd uimode schedules just persist to config and take effect on the next daemon start.
- async sync_streams(specs)[source]¶
Reconcile registered stream workers with specs.
Registers workers for new stream specs and stops + drops workers whose spec is no longer present (e.g. a job deleted from the UI). Without the drop, a deleted stream would keep running and stay controllable via
/api/streams(its config entry is already gone).
Events¶
Event bus — Progress, Log, Status events.
- class EventBus[source]¶
Pub-sub bus carrying operation events to interested subscribers.
Operations emit
ProgressEvent,LogEventandStatusEvent(tagged byrun_id). Subscribers — the HTTP API’s SSE endpoint, the health monitor — receive them either by registering a handler or by draining an internal queue (enable_queue). Usefor_runto get a small emitter bound to onerun_id.Examples
>>> bus = EventBus() >>> seen = [] >>> bus.subscribe(seen.append) >>> bus.for_run('r1').log('started') >>> seen[0].message 'started'
- class ProgressEvent(*, kind='progress', run_id, done, total, unit='windows')[source]¶
Progress of a run (windows or time covered).
- class StatusEvent(*, kind='status', run_id, state)[source]¶
A run state change (running, succeeded, failed, …).
- class StreamSampleEvent(*, kind='sample', run_id, ts, value=None, bid=None, ask=None)[source]¶
A liveness sample from a running stream (last trade/price/quote).
Emitted (throttled) by
dccd.application.operations.streamso the Live UI can prove a stream is actually receiving data, without persisting the sample.tsis the record timestamp (nanoseconds UTC). The values are raw numbers — the client formats them (thousands separators, etc.):valuefor trades (last price) and OHLC (close);bid/askfor the order book top of book.
Configuration¶
The Pydantic config models — AppConfig and its
sections — are documented in the Configuration Reference reference.
Service factory¶
The single place that wires every adapter — edit it to add an exchange.
Service-object factory.
Central place that wires together all exchange adapters so that both the CLI and the API import from one location. Adding a new exchange means editing only this file.
- build_coverage_store(data_path)[source]¶
Return a
CoverageStore.The database lives at
{data_path}/.dccd/coverage.db— the manifest that lets local data be dropped without forcing a re-download on the next backfill.- Parameters:
- data_pathstr or Path
- Returns:
- CoverageStore
- build_registry()[source]¶
Return a
SourceRegistrywith all adapters registered.- Returns:
- SourceRegistry
- build_remote(cfg)[source]¶
Return a
RemoteStorage, orNone.Returns
Nonewhen no rclone remotes are configured (storage.remotesempty) — there is nothing to sync, so the daemon skips the sync loop. The local root issettings.data_path(the canonical store root used bybuild_store).- Parameters:
- cfgAppConfig
- Returns:
- RemoteStorage or None
- build_runs_store(data_path)[source]¶
Return a
RunsStoreinside data_path.The database lives at
{data_path}/.dccd/runs.db.- Parameters:
- data_pathstr or Path
- Returns:
- RunsStore
- build_store(data_path)[source]¶
Return a
ParquetStorefor data_path.- Parameters:
- data_pathstr or Path
- Returns:
- ParquetStore