"""Bitfinex source adapter — OHLC (10000/req), trades, order book."""
from __future__ import annotations
import json
import logging
import time
from collections.abc import AsyncIterator
from typing import Any
from dccd.domain.capability import Capability
from dccd.domain.records import OHLCBar, OrderBookLevel, OrderBookSnapshot, Trade
from dccd.domain.symbol import Symbol
from dccd.domain.timeutils import s_to_ns
from dccd.domain.types import DataType
from dccd.sources.base import (
OHLCHistory,
OHLCLive,
OrderBookLive,
OrderBookSnapshotREST,
TradesHistory,
TradesLive,
)
from dccd.transport.http import AsyncHTTPClient
from dccd.transport.ws import WebSocketBase
__all__ = ["BitfinexSource"]
logger = logging.getLogger(__name__)
_BASE = "https://api-pub.bitfinex.com/v2"
_WS_URL = "wss://api-pub.bitfinex.com/ws/2"
_BITFINEX_SPANS = {
60: "1m", 300: "5m", 900: "15m", 1800: "30m",
3600: "1h", 10800: "3h", 21600: "6h", 43200: "12h",
86400: "1D", 604800: "1W", 1209600: "14D", 2592000: "1M",
}
def _bfx_symbol(s: Symbol) -> str:
"""Bitfinex trading pair format, e.g. ``tBTCUSD``.
Bitfinex labels Tether as ``UST`` (not ``USDT``), so ``BTC/USDT`` must map to
``tBTCUST`` — ``tBTCUSDT`` returns an empty list (HTTP 200), which would
otherwise look like "0 rows" with no error. Symbols whose base or quote is
longer than 3 characters use the ``tBASE:QUOTE`` form.
"""
quote = "UST" if s.quote == "USDT" else s.quote
if len(s.base) > 3 or len(quote) > 3:
return f"t{s.base}:{quote}"
return f"t{s.base}{quote}"
[docs]
class BitfinexSource(OHLCHistory, TradesHistory, OrderBookSnapshotREST, OHLCLive, TradesLive, OrderBookLive):
"""Bitfinex source adapter.
- **Backfill**: OHLC, trades, order-book snapshot — up to **10 000 items per
request** (the largest limit of any adapter).
- **Stream**: OHLC (``candles``), trades. Order-book streaming is not
implemented and not declared.
Notes
-----
Bitfinex labels Tether ``UST``, so ``BTC/USDT`` is rendered as ``tBTCUST``
(``tBTCUSDT`` returns an empty list). Symbols with a part longer than three
characters use the ``tBASE:QUOTE`` form.
See Also
--------
dccd.Client : the public facade.
Examples
--------
>>> from dccd.sources.bitfinex import _bfx_symbol
>>> _bfx_symbol(Symbol(base='BTC', quote='USDT'))
'tBTCUST'
"""
exchange = "bitfinex"
def __init__(self, http: AsyncHTTPClient | None = None) -> None:
self._http = http or AsyncHTTPClient()
[docs]
def capabilities(self) -> list[Capability]:
"""Declared capabilities, one per (data type × transport × mode)."""
return [
Capability(
data_type=DataType.OHLC, transport="rest", mode="historical",
history="full", max_per_request=10000, page_direction="forward",
spans=list(_BITFINEX_SPANS.keys()),
),
Capability(
data_type=DataType.TRADES, transport="rest", mode="historical",
history="full", max_per_request=10000, page_direction="backward",
),
Capability(
data_type=DataType.ORDERBOOK, transport="rest", mode="historical",
max_depth=250,
),
Capability(data_type=DataType.OHLC, transport="ws", mode="live"),
Capability(data_type=DataType.TRADES, transport="ws", mode="live"),
# Order-book WS is not parsed yet (the book channel is unhandled) —
# not declared so the engine rejects it rather than streaming empty.
]
[docs]
def render_symbol(self, s: Symbol) -> str:
"""Render a canonical :class:`~dccd.domain.symbol.Symbol` to this exchange's string."""
return _bfx_symbol(s)
[docs]
async def fetch_ohlc_page(
self,
symbol: Symbol,
span: int,
start_ns: int,
end_ns: int,
limit: int,
) -> list[OHLCBar]:
"""Fetch one page of OHLC bars (see :meth:`~dccd.sources.base.OHLCHistory.fetch_ohlc_page`)."""
tf = _BITFINEX_SPANS.get(span)
if not tf:
return []
sym = _bfx_symbol(symbol)
params: dict[str, Any] = {
"start": start_ns // 1_000_000,
"end": end_ns // 1_000_000,
"limit": min(limit, 10000),
"sort": 1,
}
async with self._http as client:
data = await client.get(f"{_BASE}/candles/trade:{tf}:{sym}/hist", params)
return [
OHLCBar(
ts=int(e[0]) * 1_000_000,
open=float(e[1]),
close=float(e[2]),
high=float(e[3]),
low=float(e[4]),
volume=float(e[5]),
)
for e in (data if isinstance(data, list) else [])
if isinstance(e, list) and len(e) >= 6
]
[docs]
async def fetch_trades_page(
self,
symbol: Symbol,
start_ns: int,
end_ns: int,
limit: int,
cursor: str | None = None,
) -> tuple[list[Trade], str | None]:
"""Fetch one page of trades (cursor = ``start`` ts in ms, forward).
Bitfinex returns up to 10 000 trades ascending in ``[start, end]``. When
a full page comes back the window held more than one page, so we advance
the cursor to the last timestamp + 1 ms and continue.
"""
sym = _bfx_symbol(symbol)
page_limit = min(limit, 10000)
start_ms = int(cursor) if cursor is not None else start_ns // 1_000_000
params: dict[str, Any] = {
"start": start_ms,
"end": end_ns // 1_000_000,
"limit": page_limit,
"sort": 1,
}
async with self._http as client:
data = await client.get(f"{_BASE}/trades/{sym}/hist", params)
rows = [e for e in (data if isinstance(data, list) else [])
if isinstance(e, list) and len(e) >= 4]
trades = [
Trade(
ts=int(e[1]) * 1_000_000,
price=float(e[3]),
amount=abs(float(e[2])),
side="buy" if float(e[2]) > 0 else "sell",
tid=str(e[0]),
)
for e in rows
]
if len(rows) < page_limit:
return trades, None
next_cursor = str(int(rows[-1][1]) + 1)
return trades, next_cursor
[docs]
async def fetch_orderbook(self, symbol: Symbol, depth: int) -> OrderBookSnapshot:
"""Fetch a current order-book snapshot up to *depth* levels."""
sym = _bfx_symbol(symbol)
params = {"len": min(depth, 250)}
async with self._http as client:
data = await client.get(f"{_BASE}/book/{sym}/P0", params)
bids, asks = [], []
for e in (data if isinstance(data, list) else []):
if not isinstance(e, list) or len(e) < 3:
continue
price, count, amount = float(e[0]), int(e[1]), float(e[2])
if amount > 0:
bids.append(OrderBookLevel(price=price, amount=amount, count=count))
else:
asks.append(OrderBookLevel(price=price, amount=abs(amount), count=count))
return OrderBookSnapshot(
ts=s_to_ns(time.time()),
bids=sorted(bids, key=lambda x: -x.price),
asks=sorted(asks, key=lambda x: x.price),
)
[docs]
def stream_ohlc(self, symbol: Symbol, span: int) -> AsyncIterator[OHLCBar]:
"""Stream live OHLC bars over WebSocket."""
ws = _BitfinexWS(_bfx_symbol(symbol), "candles", _BITFINEX_SPANS.get(span, "1m"))
return ws.stream()
[docs]
def stream_trades(self, symbol: Symbol) -> AsyncIterator[Trade]:
"""Stream live trades over WebSocket."""
ws = _BitfinexWS(_bfx_symbol(symbol), "trades")
return ws.stream()
[docs]
def stream_orderbook(self, symbol: Symbol, depth: int) -> AsyncIterator[OrderBookSnapshot]:
"""Stream live order-book snapshots/deltas over WebSocket."""
raise NotImplementedError("Bitfinex live order book stream is not implemented")
class _BitfinexWS(WebSocketBase):
def __init__(self, symbol: str, channel: str, tf: str = "") -> None:
super().__init__(_WS_URL)
self._symbol = symbol
self._channel = channel
self._tf = tf
self._chan_id: int | None = None
async def on_connect(self, ws: Any) -> None:
"""Send the subscription message after each (re)connect."""
sub: dict[str, Any] = {"event": "subscribe", "channel": self._channel, "symbol": self._symbol}
if self._channel == "candles":
sub["key"] = f"trade:{self._tf}:{self._symbol}"
await ws.send(json.dumps(sub))
async def parse_message(self, raw: str | bytes) -> AsyncIterator[Any]:
"""Parse a raw WebSocket frame into domain records."""
data = json.loads(raw)
if isinstance(data, dict):
if data.get("event") == "subscribed":
self._chan_id = data.get("chanId")
return
if not isinstance(data, list) or len(data) < 2:
return
if data[0] != self._chan_id:
return
payload = data[1]
if payload == "hb":
return
if self._channel == "trades" and isinstance(payload, list) and len(payload) >= 4:
ts_ns = int(payload[1]) * 1_000_000
yield Trade(
ts=ts_ns,
price=float(payload[3]),
amount=abs(float(payload[2])),
side="buy" if float(payload[2]) > 0 else "sell",
tid=str(payload[0]),
)
elif self._channel == "candles" and isinstance(payload, list):
items = payload if isinstance(payload[0], list) else [payload]
for e in items:
if len(e) >= 6:
yield OHLCBar(
ts=int(e[0]) * 1_000_000,
open=float(e[1]),
close=float(e[2]),
high=float(e[3]),
low=float(e[4]),
volume=float(e[5]),
)