Client¶
The async dccd.Client is the one-stop entry point: it wires every
exchange adapter and the local store and exposes the four operations as methods.
Use it as an async context manager.
import asyncio
from dccd import Client
async def main():
async with Client() as c:
await c.backfill("binance", "BTC/USDT", "ohlc", span=3600, start="2024-01-01")
df = c.read("binance", "BTC/USDT", "ohlc", span=3600)
print(df.tail())
asyncio.run(main())
- class Client(config_path=None)[source]¶
Async facade for dccd — the one-stop entry point.
Wires every exchange adapter and the local Parquet store, and exposes the four operations as methods:
backfill(download history),stream(collect live),read(load stored data) andinventory(list datasets). Use it as an async context manager so the shared HTTP client is opened and closed cleanly.- Parameters:
- config_pathstr or None
Path to
config.yml. Resolved via the XDG fallback whenNone; onlysettings.data_pathis needed for direct use.
See also
dccd.application.operations.backfillthe underlying operation.
Examples
>>> import asyncio >>> async def main(): ... async with Client() as c: ... await c.backfill('binance', 'BTC/USDT', 'ohlc', span=3600, ... start='2024-01-01') ... return c.read('binance', 'BTC/USDT', 'ohlc', span=3600).height >>> asyncio.run(main()) 168
- async backfill(exchange, symbol, data_type='ohlc', span=None, start='last')[source]¶
Download historical data for one dataset into the local store.
Resumes and deduplicates: running it again only adds what is missing. Trades are cursor-paginated and drain the whole requested window.
- Parameters:
- exchangestr
Exchange name, e.g.
'binance'. See Exchanges.- symbolstr
Trading pair,
'BTC/USDT'or'BTC-USD'.- data_typestr, default ‘ohlc’
'ohlc','trades'or'orderbook'.- spanint or None
Candle size in seconds — required for
'ohlc'.- startstr, default ‘last’
'last'(resume from the last stored row),'origin'(full history) or an ISO date such as'2024-01-01'.
- Returns:
- dict
{'run_id', 'rows_written', 'start_ns', 'end_ns'}on success;{'run_id', 'rows_written', 'error'}on failure.
See also
readload the result. stream : live collection instead of history.
Examples
>>> async def main(): ... async with Client() as c: ... r = await c.backfill('binance', 'BTC/USDT', 'ohlc', ... span=3600, start='2024-01-01') ... return r['rows_written'] >>> asyncio.run(main()) 168
- inventory()[source]¶
List every stored dataset with its coverage.
- Returns:
- list of dict
One entry per dataset with
exchange,pair,data_type,span,rows,min_tsandmax_ts(nanoseconds UTC).
Examples
>>> async def main(): ... async with Client() as c: ... return [d['pair'] for d in c.inventory()] >>> asyncio.run(main()) ['BTC-USDT']
- read(exchange, symbol, data_type='ohlc', span=None, start_ns=None, end_ns=None)[source]¶
Read stored data for a dataset as a Polars DataFrame.
- Parameters:
- exchange, symbolstr
- data_typestr, default ‘ohlc’
- spanint or None
Required for
'ohlc'.- start_ns, end_nsint or None
Optional inclusive nanosecond bounds.
- Returns:
- polars.DataFrame
Sorted by
TS(nanoseconds UTC), deduplicated. Empty if no data.
See also
backfillpopulate the dataset. inventory : list what is stored.
Examples
>>> async def main(): ... async with Client() as c: ... return c.read('binance', 'BTC/USDT', 'ohlc', span=3600).columns >>> asyncio.run(main()) ['TS', 'open', 'high', 'low', 'close', 'volume', 'quote_volume', 'trades']
- async stream(exchange, symbol, data_type='trades', span=None, depth=None, snapshot_interval=None, stop_event=None)[source]¶
Collect live data over WebSocket until stop_event is set.
Parameters mirror
backfill.depth/snapshot_intervalapply to order-book streams. For long-running collection prefer asupervisedstream job in the config anddccd start(auto-reconnect).- Parameters:
- exchange, symbolstr
- data_typestr, default ‘trades’
'ohlc','trades'or'orderbook'.- span, depth, snapshot_intervalint or None
- stop_eventasyncio.Event or None
Set it to stop the stream cleanly.
See also
backfilldownload history instead of live data.
Examples
>>> async def main(): ... async with Client() as c: ... stop = asyncio.Event() ... asyncio.get_running_loop().call_later(10, stop.set) ... await c.stream('binance', 'BTC/USDT', 'trades', stop_event=stop) >>> asyncio.run(main())