Application¶
The application layer wires domain, sources and storage into the four operations, plus the orchestration that schedules them and the events they emit.
Operations¶
backfill / stream / read / inventory are the verbs; everything
else is plumbing. They take their infrastructure (registry, store, events) as
keyword arguments so call sites stay explicit and testable.
Core operations — backfill, stream, read, inventory.
All public functions are async and accept keyword-only infrastructure arguments (registry, store, events, …) to keep call sites explicit and testable.
Key contract for paginators: each fetch_page passed to
paginate_ohlc / paginate_trades must be a closure with symbol
(and span for OHLC) already bound:
- async def _fetch(start_ns, end_ns, limit):
return await adapter.fetch_ohlc_page(symbol, span, start_ns, end_ns, limit)
- async backfill(spec, *, registry, store, runs_store=None, events=None, stop_event=None, run_id=None)[source]¶
Backfill historical data from a source to the Parquet store.
- Parameters:
- specJobSpec
Job specification (
operation='backfill').spec.params.startcontrols the start point:'last': resume from the last stored timestamp. If no data exists yet, defaults to 30 days ago to avoid a multi-decade paginator run from epoch 0.'origin': start from the exchange’s earliest available data (timestamp 0 — many pages will be empty before the exchange’s launch date).An ISO-8601 date string (
'2024-01-01') or a nanosecond integer: explicit start timestamp.
- registrySourceRegistry
- storeParquetStore
- runs_storeRunsStore or None
- eventsRunEvents or None
- stop_eventasyncio.Event or None
Set externally to cancel mid-run cleanly.
- run_idstr or None
Override the auto-generated run ID (used by the API endpoint so the polling URL matches what is stored in RunsStore).
- Returns:
- dict
{'run_id', 'rows_written', 'start_ns', 'end_ns'}on success;{'run_id', 'rows_written', 'error'}on failure.
Jobs¶
A JobSpec is a declarative unit of work
(operation + target + trigger + params); the config expands one
JobConfig into one spec per pair.
Job model — JobSpec, JobRun, Trigger, JobParams.
- class JobParams(*, start='last', depth=None, snapshot_interval=None, derive_from=None, transport=None)[source]¶
Operation parameters (all optional, per-operation).
- class JobRun(*, run_id, spec_id, operation, target, state=RunState.PENDING, started_at=None, ended_at=None, rows_written=0, error=None, progress=None, log_tail=[])[source]¶
One execution of a JobSpec.
- class JobSpec(*, id, operation, target, trigger, params=JobParams(start='last', depth=None, snapshot_interval=None, derive_from=None, transport=None), enabled=True, origin='config')[source]¶
Declarative job definition.
A histo job is: backfill + interval trigger (+ start=”last”). A stream job is: stream + supervised trigger.
Scheduler¶
Routes each spec by trigger kind — supervised → stream worker (auto-reconnect), interval/cron → periodic backfill, once → one-shot.
- class Scheduler(registry, store, runs_store=None, events=None)[source]¶
Orchestrates JobSpecs (intervals → backfill; supervised → streams).
- Parameters:
- registrySourceRegistry
- storeParquetStore
- runs_storeRunsStore or None
- eventsEventBus
- register_streams(specs)[source]¶
Register stream workers from config without starting them.
Called by the app lifespan so stream jobs appear in
/api/streamsand can be started/stopped from the UI even indccd uimode.
Events¶
Event bus — Progress, Log, Status events.
- class EventBus[source]¶
Pub-sub bus carrying operation events to interested subscribers.
Operations emit
ProgressEvent,LogEventandStatusEvent(tagged byrun_id). Subscribers — the HTTP API’s SSE endpoint, the health monitor — receive them either by registering a handler or by draining an internal queue (enable_queue). Usefor_runto get a small emitter bound to onerun_id.Examples
>>> bus = EventBus() >>> seen = [] >>> bus.subscribe(seen.append) >>> bus.for_run('r1').log('started') >>> seen[0].message 'started'
Configuration¶
The Pydantic config models — AppConfig and its
sections — are documented in the Configuration Reference reference.
Service factory¶
The single place that wires every adapter — edit it to add an exchange.
Service-object factory.
Central place that wires together all exchange adapters so that both the CLI and the API import from one location. Adding a new exchange means editing only this file.
- build_registry()[source]¶
Return a
SourceRegistrywith all adapters registered.- Returns:
- SourceRegistry
- build_runs_store(data_path)[source]¶
Return a
RunsStoreinside data_path.The database lives at
{data_path}/.dccd/runs.db.- Parameters:
- data_pathstr or Path
- Returns:
- RunsStore
- build_store(data_path)[source]¶
Return a
ParquetStorefor data_path.- Parameters:
- data_pathstr or Path
- Returns:
- ParquetStore