Source code for dccd.application.service_factory
"""Service-object factory.
Central place that wires together all exchange adapters so that both the CLI
and the API import from one location. Adding a new exchange means editing only
this file.
"""
from __future__ import annotations
import pathlib
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from dccd.sources.registry import SourceRegistry
from dccd.storage.parquet import ParquetStore
from dccd.storage.runs_sqlite import RunsStore
__all__ = ["build_registry", "build_store", "build_runs_store"]
[docs]
def build_registry() -> "SourceRegistry":
"""Return a :class:`~dccd.sources.registry.SourceRegistry` with all adapters registered.
Returns
-------
SourceRegistry
"""
from dccd.sources.binance import BinanceSource
from dccd.sources.bitfinex import BitfinexSource
from dccd.sources.bitmex import BitMEXSource
from dccd.sources.bybit import BybitSource
from dccd.sources.coinbase import CoinbaseSource
from dccd.sources.kraken import KrakenSource
from dccd.sources.okx import OKXSource
from dccd.sources.registry import SourceRegistry
reg = SourceRegistry()
reg.register("binance", BinanceSource())
reg.register("coinbase", CoinbaseSource())
reg.register("kraken", KrakenSource())
reg.register("bybit", BybitSource())
reg.register("okx", OKXSource())
reg.register("bitfinex", BitfinexSource())
reg.register("bitmex", BitMEXSource())
return reg
[docs]
def build_store(data_path: str | pathlib.Path) -> "ParquetStore":
"""Return a :class:`~dccd.storage.parquet.ParquetStore` for *data_path*.
Parameters
----------
data_path : str or Path
Returns
-------
ParquetStore
"""
from dccd.storage.parquet import ParquetStore
return ParquetStore(data_path)
[docs]
def build_runs_store(data_path: str | pathlib.Path) -> "RunsStore":
"""Return a :class:`~dccd.storage.runs_sqlite.RunsStore` inside *data_path*.
The database lives at ``{data_path}/.dccd/runs.db``.
Parameters
----------
data_path : str or Path
Returns
-------
RunsStore
"""
from dccd.storage.runs_sqlite import RunsStore
return RunsStore(pathlib.Path(data_path) / ".dccd" / "runs.db")