Application

The application layer wires domain, sources and storage into the four operations, plus the orchestration that schedules them and the events they emit.

Operations

backfill / stream / read / inventory are the verbs; everything else is plumbing. They take their infrastructure (registry, store, events) as keyword arguments so call sites stay explicit and testable.

Core operations — backfill, stream, read, inventory.

All public functions are async and accept keyword-only infrastructure arguments (registry, store, events, …) to keep call sites explicit and testable.

Key contract for paginators: each fetch_page passed to paginate_ohlc / paginate_trades must be a closure with symbol (and span for OHLC) already bound:

async def _fetch(start_ns, end_ns, limit):

return await adapter.fetch_ohlc_page(symbol, span, start_ns, end_ns, limit)

async backfill(spec, *, registry, store, runs_store=None, coverage_store=None, events=None, stop_event=None, run_id=None)[source]

Backfill historical data from a source to the Parquet store.

Parameters:
specJobSpec

Job specification (operation='backfill'). spec.params.start controls the start point:

  • 'last': resume from the last stored timestamp. If no data exists yet, defaults to 30 days ago to avoid a multi-decade paginator run from epoch 0.

  • 'origin': start from the exchange’s earliest available data (timestamp 0 — many pages will be empty before the exchange’s launch date).

  • An ISO-8601 date string ('2024-01-01') or a nanosecond integer: explicit start timestamp.

registrySourceRegistry
storeParquetStore
runs_storeRunsStore or None
coverage_storeCoverageStore or None

When set, start="last" falls back to the manifest’s recorded max_ts if no local file exists (so a dropped store doesn’t trigger a re-download), and the dataset’s extent is recorded on success.

eventsRunEvents or None
stop_eventasyncio.Event or None

Set externally to cancel mid-run cleanly.

run_idstr or None

Override the auto-generated run ID (used by the API endpoint so the polling URL matches what is stored in RunsStore).

Returns:
dict

{'run_id', 'rows_written', 'start_ns', 'end_ns'} on success; {'run_id', 'rows_written', 'error'} on failure.

inventory(*, store)[source]

Return a list of dataset descriptors for all stored data.

read(target, *, store, start_ns=None, end_ns=None, remote=None)[source]

Read stored data for target in the given nanosecond range.

Read-through restore: when remote is set and the dataset has no local Parquet (e.g. it was purged to free disk), the dataset directory is pulled back from the remote (rclone copy) before loading, so a purge is transparent to readers.

async stream(spec, *, registry, store, runs_store=None, events=None, stop_event=None)[source]

Stream live data continuously until stop_event is set.

async sync_remote(remote, *, runs_store=None, events=None, run_id=None)[source]

Run one remote-sync cycle: mirror the local store to all rclone remotes.

Records the cycle as a sync run in runs_store (so the Storage UI can show “last sync”) and emits status/log on events. Shared by the scheduler’s periodic loop and the manual “Sync now” endpoint, so the run-recording lives in exactly one place.

Parameters:
remoteRemoteStorage
runs_storeRunsStore or None
eventsRunEvents or None
run_idstr or None

Override the auto-generated run id.

Returns:
dict

{'run_id', 'results', 'ok'}results maps remote → success; ok is True only when every configured remote synced.

Jobs

A JobSpec is a declarative unit of work (operation + target + trigger + params); the config expands one JobConfig into one spec per pair.

Job model — JobSpec, JobRun, Trigger, JobParams.

class JobParams(*, start='last', depth=None, snapshot_interval=None, derive_from=None, transport=None)[source]

Operation parameters (all optional, per-operation).

class JobRun(*, run_id, spec_id, operation, target, state=RunState.PENDING, started_at=None, ended_at=None, rows_written=0, error=None, progress=None, log_tail=[])[source]

One execution of a JobSpec.

class JobSpec(*, id, operation, target, trigger, params=JobParams(start='last', depth=None, snapshot_interval=None, derive_from=None, transport=None), enabled=True, origin='config')[source]

Declarative job definition.

A histo job is: backfill + interval trigger (+ start=”last”). A stream job is: stream + supervised trigger.

classmethod make_id(operation, target)[source]

Build a stable job id from the operation and target.

class JobTarget(*, exchange, symbol, data_type, span=None)[source]

What to collect — exchange + symbol + data type.

class RunState(*values)[source]

Lifecycle state of a job run.

class Trigger(*, kind, at=None, every=None, cron=None)[source]

Job trigger — when/how to execute.

manual jobs are never auto-run by the scheduler; they exist only to be triggered on demand (the UI Run button / POST /api/jobs/run).

Scheduler

Routes each spec by trigger kind — supervised → stream worker (auto-reconnect), interval/cron → periodic backfill, once → one-shot.

class Scheduler(registry, store, runs_store=None, events=None, remote=None, sync_interval=3600, coverage_store=None, data_path=None, min_free_gb=0.0)[source]

Orchestrates JobSpecs (intervals → backfill; supervised → streams).

Parameters:
registrySourceRegistry
storeParquetStore
runs_storeRunsStore or None
eventsEventBus
remoteRemoteStorage or None

When set (rclone remotes configured), start launches a periodic loop that mirrors the local store off-box every sync_interval seconds.

sync_intervalint

Seconds between remote-sync cycles (default 3600).

register_streams(specs)[source]

Register stream workers from config without starting them.

Called by the app lifespan so stream jobs appear in /api/streams and can be started/stopped from the UI even in dccd ui mode.

async run_now(spec)[source]

Trigger a one-shot backfill for spec immediately.

async start(specs)[source]

Start all enabled specs (full daemon mode).

start_stream(spec_id)[source]

Start one registered stream by spec id; return whether it existed.

async stop()[source]

Stop all running jobs.

async stop_stream(spec_id)[source]

Stop one registered stream by spec id; return whether it existed.

stream_status()[source]

Map of stream spec id to running state.

async sync_intervals(specs)[source]

Reconcile recurring backfill loops with specs.

Starts a loop for each newly-scheduled interval/cron backfill job, cancels loops whose schedule was removed (now manual/deleted), and restarts a loop whose cadence changed. No-op unless the scheduler is running (dccd start); in dccd ui mode schedules just persist to config and take effect on the next daemon start.

async sync_streams(specs)[source]

Reconcile registered stream workers with specs.

Registers workers for new stream specs and stops + drops workers whose spec is no longer present (e.g. a job deleted from the UI). Without the drop, a deleted stream would keep running and stay controllable via /api/streams (its config entry is already gone).

Events

Event bus — Progress, Log, Status events.

class EventBus[source]

Pub-sub bus carrying operation events to interested subscribers.

Operations emit ProgressEvent, LogEvent and StatusEvent (tagged by run_id). Subscribers — the HTTP API’s SSE endpoint, the health monitor — receive them either by registering a handler or by draining an internal queue (enable_queue). Use for_run to get a small emitter bound to one run_id.

Examples

>>> bus = EventBus()
>>> seen = []
>>> bus.subscribe(seen.append)
>>> bus.for_run('r1').log('started')
>>> seen[0].message
'started'
add_queue(maxsize=1000)[source]

Register and return a new queue that receives every event (for SSE).

emit(event)[source]

Publish an event to all handlers and every registered queue.

enable_queue(maxsize=1000)[source]

Backwards-compatible alias for add_queue.

for_run(run_id)[source]

Return a small emitter bound to run_id (.progress/.log/.status).

remove_queue(queue)[source]

Unregister a queue (call on SSE disconnect).

subscribe(handler)[source]

Register a handler called for every published event.

unsubscribe(handler)[source]

Remove a previously registered handler.

class LogEvent(*, kind='log', run_id, level='info', message)[source]

A log line emitted by a run.

class ProgressEvent(*, kind='progress', run_id, done, total, unit='windows')[source]

Progress of a run (windows or time covered).

class StatusEvent(*, kind='status', run_id, state)[source]

A run state change (running, succeeded, failed, …).

class StreamSampleEvent(*, kind='sample', run_id, ts, value=None, bid=None, ask=None)[source]

A liveness sample from a running stream (last trade/price/quote).

Emitted (throttled) by dccd.application.operations.stream so the Live UI can prove a stream is actually receiving data, without persisting the sample. ts is the record timestamp (nanoseconds UTC). The values are raw numbers — the client formats them (thousands separators, etc.): value for trades (last price) and OHLC (close); bid/ask for the order book top of book.

Configuration

The Pydantic config models — AppConfig and its sections — are documented in the Configuration Reference reference.

Service factory

The single place that wires every adapter — edit it to add an exchange.

Service-object factory.

Central place that wires together all exchange adapters so that both the CLI and the API import from one location. Adding a new exchange means editing only this file.

build_coverage_store(data_path)[source]

Return a CoverageStore.

The database lives at {data_path}/.dccd/coverage.db — the manifest that lets local data be dropped without forcing a re-download on the next backfill.

Parameters:
data_pathstr or Path
Returns:
CoverageStore
build_registry()[source]

Return a SourceRegistry with all adapters registered.

Returns:
SourceRegistry
build_remote(cfg)[source]

Return a RemoteStorage, or None.

Returns None when no rclone remotes are configured (storage.remotes empty) — there is nothing to sync, so the daemon skips the sync loop. The local root is settings.data_path (the canonical store root used by build_store).

Parameters:
cfgAppConfig
Returns:
RemoteStorage or None
build_runs_store(data_path)[source]

Return a RunsStore inside data_path.

The database lives at {data_path}/.dccd/runs.db.

Parameters:
data_pathstr or Path
Returns:
RunsStore
build_store(data_path)[source]

Return a ParquetStore for data_path.

Parameters:
data_pathstr or Path
Returns:
ParquetStore