Client

The async dccd.Client is the one-stop entry point: it wires every exchange adapter and the local store and exposes the four operations as methods. Use it as an async context manager.

import asyncio
from dccd import Client

async def main():
    async with Client() as c:
        await c.backfill("binance", "BTC/USDT", "ohlc", span=3600, start="2024-01-01")
        df = c.read("binance", "BTC/USDT", "ohlc", span=3600)
        print(df.tail())

asyncio.run(main())
class Client(config_path=None)[source]

Async facade for dccd — the one-stop entry point.

Wires every exchange adapter and the local Parquet store, and exposes the four operations as methods: backfill (download history), stream (collect live), read (load stored data) and inventory (list datasets). Use it as an async context manager so the shared HTTP client is opened and closed cleanly.

Parameters:
config_pathstr or None

Path to config.yml. Resolved via the XDG fallback when None; only settings.data_path is needed for direct use.

See also

dccd.application.operations.backfill

the underlying operation.

Examples

>>> import asyncio
>>> async def main():
...     async with Client() as c:
...         await c.backfill('binance', 'BTC/USDT', 'ohlc', span=3600,
...                          start='2024-01-01')
...         return c.read('binance', 'BTC/USDT', 'ohlc', span=3600).height
>>> asyncio.run(main())
168
async backfill(exchange, symbol, data_type='ohlc', span=None, start='last')[source]

Download historical data for one dataset into the local store.

Resumes and deduplicates: running it again only adds what is missing. Trades are cursor-paginated and drain the whole requested window.

Parameters:
exchangestr

Exchange name, e.g. 'binance'. See Exchanges.

symbolstr

Trading pair, 'BTC/USDT' or 'BTC-USD'.

data_typestr, default ‘ohlc’

'ohlc', 'trades' or 'orderbook'.

spanint or None

Candle size in seconds — required for 'ohlc'.

startstr, default ‘last’

'last' (resume from the last stored row), 'origin' (full history) or an ISO date such as '2024-01-01'.

Returns:
dict

{'run_id', 'rows_written', 'start_ns', 'end_ns'} on success; {'run_id', 'rows_written', 'error'} on failure.

See also

read

load the result. stream : live collection instead of history.

Examples

>>> async def main():
...     async with Client() as c:
...         r = await c.backfill('binance', 'BTC/USDT', 'ohlc',
...                              span=3600, start='2024-01-01')
...         return r['rows_written']
>>> asyncio.run(main())
168
inventory()[source]

List every stored dataset with its coverage.

Returns:
list of dict

One entry per dataset with exchange, pair, data_type, span, rows, min_ts and max_ts (nanoseconds UTC).

Examples

>>> async def main():
...     async with Client() as c:
...         return [d['pair'] for d in c.inventory()]
>>> asyncio.run(main())
['BTC-USDT']
read(exchange, symbol, data_type='ohlc', span=None, start_ns=None, end_ns=None)[source]

Read stored data for a dataset as a Polars DataFrame.

Parameters:
exchange, symbolstr
data_typestr, default ‘ohlc’
spanint or None

Required for 'ohlc'.

start_ns, end_nsint or None

Optional inclusive nanosecond bounds.

Returns:
polars.DataFrame

Sorted by TS (nanoseconds UTC), deduplicated. Empty if no data.

See also

backfill

populate the dataset. inventory : list what is stored.

Examples

>>> async def main():
...     async with Client() as c:
...         return c.read('binance', 'BTC/USDT', 'ohlc', span=3600).columns
>>> asyncio.run(main())
['TS', 'open', 'high', 'low', 'close', 'volume', 'quote_volume', 'trades']
async stream(exchange, symbol, data_type='trades', span=None, depth=None, snapshot_interval=None, stop_event=None)[source]

Collect live data over WebSocket until stop_event is set.

Parameters mirror backfill. depth / snapshot_interval apply to order-book streams. For long-running collection prefer a supervised stream job in the config and dccd start (auto-reconnect).

Parameters:
exchange, symbolstr
data_typestr, default ‘trades’

'ohlc', 'trades' or 'orderbook'.

span, depth, snapshot_intervalint or None
stop_eventasyncio.Event or None

Set it to stop the stream cleanly.

See also

backfill

download history instead of live data.

Examples

>>> async def main():
...     async with Client() as c:
...         stop = asyncio.Event()
...         asyncio.get_running_loop().call_later(10, stop.set)
...         await c.stream('binance', 'BTC/USDT', 'trades', stop_event=stop)
>>> asyncio.run(main())