Application

The application layer wires domain, sources and storage into the four operations, plus the orchestration that schedules them and the events they emit.

Operations

backfill / stream / read / inventory are the verbs; everything else is plumbing. They take their infrastructure (registry, store, events) as keyword arguments so call sites stay explicit and testable.

Core operations — backfill, stream, read, inventory.

All public functions are async and accept keyword-only infrastructure arguments (registry, store, events, …) to keep call sites explicit and testable.

Key contract for paginators: each fetch_page passed to paginate_ohlc / paginate_trades must be a closure with symbol (and span for OHLC) already bound:

async def _fetch(start_ns, end_ns, limit):

return await adapter.fetch_ohlc_page(symbol, span, start_ns, end_ns, limit)

async backfill(spec, *, registry, store, runs_store=None, events=None, stop_event=None, run_id=None)[source]

Backfill historical data from a source to the Parquet store.

Parameters:
specJobSpec

Job specification (operation='backfill'). spec.params.start controls the start point:

  • 'last': resume from the last stored timestamp. If no data exists yet, defaults to 30 days ago to avoid a multi-decade paginator run from epoch 0.

  • 'origin': start from the exchange’s earliest available data (timestamp 0 — many pages will be empty before the exchange’s launch date).

  • An ISO-8601 date string ('2024-01-01') or a nanosecond integer: explicit start timestamp.

registrySourceRegistry
storeParquetStore
runs_storeRunsStore or None
eventsRunEvents or None
stop_eventasyncio.Event or None

Set externally to cancel mid-run cleanly.

run_idstr or None

Override the auto-generated run ID (used by the API endpoint so the polling URL matches what is stored in RunsStore).

Returns:
dict

{'run_id', 'rows_written', 'start_ns', 'end_ns'} on success; {'run_id', 'rows_written', 'error'} on failure.

inventory(*, store)[source]

Return a list of dataset descriptors for all stored data.

read(target, *, store, start_ns=None, end_ns=None)[source]

Read stored data for target in the given nanosecond range.

async stream(spec, *, registry, store, runs_store=None, events=None, stop_event=None)[source]

Stream live data continuously until stop_event is set.

Jobs

A JobSpec is a declarative unit of work (operation + target + trigger + params); the config expands one JobConfig into one spec per pair.

Job model — JobSpec, JobRun, Trigger, JobParams.

class JobParams(*, start='last', depth=None, snapshot_interval=None, derive_from=None, transport=None)[source]

Operation parameters (all optional, per-operation).

class JobRun(*, run_id, spec_id, operation, target, state=RunState.PENDING, started_at=None, ended_at=None, rows_written=0, error=None, progress=None, log_tail=[])[source]

One execution of a JobSpec.

class JobSpec(*, id, operation, target, trigger, params=JobParams(start='last', depth=None, snapshot_interval=None, derive_from=None, transport=None), enabled=True, origin='config')[source]

Declarative job definition.

A histo job is: backfill + interval trigger (+ start=”last”). A stream job is: stream + supervised trigger.

classmethod make_id(operation, target)[source]

Build a stable job id from the operation and target.

class JobTarget(*, exchange, symbol, data_type, span=None)[source]

What to collect — exchange + symbol + data type.

class RunState(*values)[source]

Lifecycle state of a job run.

class Trigger(*, kind, at=None, every=None, cron=None)[source]

Job trigger — when/how to execute.

Scheduler

Routes each spec by trigger kind — supervised → stream worker (auto-reconnect), interval/cron → periodic backfill, once → one-shot.

class Scheduler(registry, store, runs_store=None, events=None)[source]

Orchestrates JobSpecs (intervals → backfill; supervised → streams).

Parameters:
registrySourceRegistry
storeParquetStore
runs_storeRunsStore or None
eventsEventBus
register_streams(specs)[source]

Register stream workers from config without starting them.

Called by the app lifespan so stream jobs appear in /api/streams and can be started/stopped from the UI even in dccd ui mode.

async run_now(spec)[source]

Trigger a one-shot backfill for spec immediately.

async start(specs)[source]

Start all enabled specs (full daemon mode).

start_stream(spec_id)[source]

Start one registered stream by spec id; return whether it existed.

async stop()[source]

Stop all running jobs.

async stop_stream(spec_id)[source]

Stop one registered stream by spec id; return whether it existed.

stream_status()[source]

Map of stream spec id to running state.

Events

Event bus — Progress, Log, Status events.

class EventBus[source]

Pub-sub bus carrying operation events to interested subscribers.

Operations emit ProgressEvent, LogEvent and StatusEvent (tagged by run_id). Subscribers — the HTTP API’s SSE endpoint, the health monitor — receive them either by registering a handler or by draining an internal queue (enable_queue). Use for_run to get a small emitter bound to one run_id.

Examples

>>> bus = EventBus()
>>> seen = []
>>> bus.subscribe(seen.append)
>>> bus.for_run('r1').log('started')
>>> seen[0].message
'started'
emit(event)[source]

Publish an event to all handlers and the queue (if enabled).

enable_queue(maxsize=1000)[source]

Create and return an asyncio queue that receives every event (for SSE).

for_run(run_id)[source]

Return a small emitter bound to run_id (.progress/.log/.status).

subscribe(handler)[source]

Register a handler called for every published event.

unsubscribe(handler)[source]

Remove a previously registered handler.

class LogEvent(*, kind='log', run_id, level='info', message)[source]

A log line emitted by a run.

class ProgressEvent(*, kind='progress', run_id, done, total, unit='windows')[source]

Progress of a run (windows or time covered).

class StatusEvent(*, kind='status', run_id, state)[source]

A run state change (running, succeeded, failed, …).

Configuration

The Pydantic config models — AppConfig and its sections — are documented in the Configuration Reference reference.

Service factory

The single place that wires every adapter — edit it to add an exchange.

Service-object factory.

Central place that wires together all exchange adapters so that both the CLI and the API import from one location. Adding a new exchange means editing only this file.

build_registry()[source]

Return a SourceRegistry with all adapters registered.

Returns:
SourceRegistry
build_runs_store(data_path)[source]

Return a RunsStore inside data_path.

The database lives at {data_path}/.dccd/runs.db.

Parameters:
data_pathstr or Path
Returns:
RunsStore
build_store(data_path)[source]

Return a ParquetStore for data_path.

Parameters:
data_pathstr or Path
Returns:
ParquetStore