Source code for dccd.transport.http

"""Async HTTP client with retry/backoff."""

from __future__ import annotations

import logging
from typing import Any

import httpx

__all__ = ["AsyncHTTPClient", "HTTPError"]

logger = logging.getLogger(__name__)

_DEFAULT_TIMEOUT = 30.0
_DEFAULT_RETRIES = 5
_DEFAULT_BACKOFF_BASE = 2.0


class HTTPError(Exception):
    """Raised when an HTTP request fails after all retries."""

    def __init__(self, status: int, url: str, body: str = "") -> None:
        self.status = status
        self.url = url
        super().__init__(f"HTTP {status} from {url}: {body[:200]}")


[docs] class AsyncHTTPClient: """Thin wrapper around httpx.AsyncClient with retry/backoff. Parameters ---------- max_retries : int Number of retry attempts on transient errors (5xx, network errors). backoff_base : float Exponential backoff base in seconds. timeout : float Request timeout in seconds. """ def __init__( self, max_retries: int = _DEFAULT_RETRIES, backoff_base: float = _DEFAULT_BACKOFF_BASE, timeout: float = _DEFAULT_TIMEOUT, headers: dict[str, str] | None = None, ) -> None: self._max_retries = max_retries self._backoff_base = backoff_base self._timeout = timeout self._headers = headers or {} self._client: httpx.AsyncClient | None = None # Adapters share one AsyncHTTPClient and wrap each call in # ``async with self._http``. With two concurrent operations on the same # exchange (e.g. "run all jobs", or a scheduled job overlapping a manual # one) the first to finish would otherwise close the shared httpx client # mid-flight for the other ("Cannot send a request, as the client has # been closed"). Reference-count the context so the client is created on # first entry and closed only when the last concurrent user exits. Safe # under asyncio: the counter is mutated without intervening awaits. self._depth = 0 async def __aenter__(self) -> "AsyncHTTPClient": if self._client is None: self._client = httpx.AsyncClient( timeout=self._timeout, headers=self._headers, follow_redirects=True, ) self._depth += 1 return self async def __aexit__(self, *args: Any) -> None: self._depth -= 1 if self._depth <= 0 and self._client: self._depth = 0 await self._client.aclose() self._client = None
[docs] async def get(self, url: str, params: dict[str, Any] | None = None) -> Any: """Perform a GET request with retry/backoff. Returns parsed JSON.""" import asyncio client = self._client if client is None: raise RuntimeError("AsyncHTTPClient must be used as async context manager") last_exc: Exception | None = None for attempt in range(self._max_retries): try: resp = await client.get(url, params=params) if resp.status_code == 429: retry_after = float(resp.headers.get("Retry-After", self._backoff_base)) logger.warning("Rate-limited by %s, sleeping %.1fs", url, retry_after) await asyncio.sleep(retry_after) continue if resp.status_code >= 500: wait = self._backoff_base ** attempt logger.warning("HTTP %d from %s, retry in %.1fs", resp.status_code, url, wait) await asyncio.sleep(wait) last_exc = HTTPError(resp.status_code, url, resp.text) continue if resp.status_code >= 400: raise HTTPError(resp.status_code, url, resp.text) return resp.json() except (httpx.NetworkError, httpx.TimeoutException) as exc: wait = self._backoff_base ** attempt logger.warning("Network error %s (attempt %d), retry in %.1fs", exc, attempt + 1, wait) await asyncio.sleep(wait) last_exc = exc if last_exc: raise last_exc raise RuntimeError(f"GET {url} failed after {self._max_retries} retries")