Source code for dccd.transport.ws

"""Async WebSocket base with auto-reconnect and checkpointing.

Exchange-specific WS inner classes only need to override :meth:`on_connect`
(send the subscription JSON) and either :meth:`parse_message` (yields domain
records from raw frames) or consume :meth:`stream_raw` directly when the
parse logic is too complex for a single async generator.
"""

from __future__ import annotations

import asyncio
import logging
from collections.abc import AsyncIterator
from typing import Any

__all__ = ["WebSocketBase"]

logger = logging.getLogger(__name__)

_INITIAL_DELAY = 1.0
_MAX_DELAY = 60.0
_BACKOFF_FACTOR = 2.0


[docs] class WebSocketBase: """Base async WebSocket client with exponential reconnect. Subclasses override :meth:`on_connect` to send subscription messages and :meth:`parse_message` to yield domain records from raw frames. For adapters that need a raw-frame async generator (e.g. to maintain local order-book state across messages), use :meth:`stream_raw` instead. Parameters ---------- url : str WebSocket endpoint URL. """ def __init__(self, url: str) -> None: self.url = url self._stop = asyncio.Event()
[docs] def stop(self) -> None: """Request graceful shutdown.""" self._stop.set()
[docs] async def on_connect(self, ws: Any) -> None: """Called once after each (re)connect. Override to send subscriptions."""
[docs] async def parse_message(self, raw: str | bytes) -> AsyncIterator[Any]: """Parse a raw frame and yield domain records. Override in subclass.""" return yield # make it an async generator
[docs] async def stream(self) -> AsyncIterator[Any]: """Yield parsed domain records, reconnecting on errors. Delegates to :meth:`parse_message` for frame parsing. """ async for raw in self.stream_raw(): async for record in self.parse_message(raw): yield record
[docs] async def stream_raw(self) -> AsyncIterator[str | bytes]: """Yield raw WebSocket frames, reconnecting with exponential backoff. Use this in adapters where :meth:`parse_message` is not convenient (e.g. stateful order-book reconstruction that spans multiple frames). """ import websockets delay = _INITIAL_DELAY while not self._stop.is_set(): try: # close_timeout keeps shutdown snappy: without it the closing # handshake blocks ~10s on stop/cancel (a "Stop" button that # appears frozen). 1s is plenty for a clean close. async with websockets.connect(self.url, close_timeout=1) as ws: delay = _INITIAL_DELAY await self.on_connect(ws) async for raw in ws: if self._stop.is_set(): return yield raw except asyncio.CancelledError: return except Exception as exc: if self._stop.is_set(): return logger.warning( "WS %s disconnected: %s — reconnect in %.1fs", self.url, exc, delay ) await asyncio.sleep(delay) delay = min(delay * _BACKOFF_FACTOR, _MAX_DELAY)