Source code for dccd.tools.websocket
#!/usr/bin/env python3
# coding: utf-8
# @Author: ArthurBernard
# @Email: arthur.bernard.92@gmail.com
# @Date: 2019-07-31 10:38:29
# @Last modified by: ArthurBernard
# @Last modified time: 2019-08-30 09:59:54
""" Connector objects to WebSockets API client to download data. """
# Built-in packages
import asyncio
import json
import logging
import time
from typing import Any
# Third party packages
import websockets
# Local packages
__all__ = ['BasisWebSocket']
# =========================================================================== #
# Basis objects #
# =========================================================================== #
[docs]
class BasisWebSocket:
""" Basis object to connect at a specified stream to websocket client API.
Parameters
----------
host : str
Adress of host to connect.
conn : dict
Parameters to connection setting.
subs : dict
Data to subscribe to a stream.
Attributes
----------
host : str
Adress of host to connect.
conn_para : dict
Parameters of websocket connection.
ws : websockets.client.WebSocketClientProtocol
Connection with the websocket client.
is_connect : bool
- True if connected.
- False`otherwise.
Methods
-------
on_open
"""
ws = False
is_connect = False
def __init__(self, host: str, conn: dict[str, Any] | None = None, subs: dict[str, Any] | None = None, max_retries: int = 5, retry_delay: int = 5) -> None:
""" Initialize object. """
# Set websocket variables
self.host = host
self.conn_para = conn if conn is not None else {}
self.subs_data = subs if subs is not None else {}
self.max_retries = max_retries
self.retry_delay = retry_delay
# Set logger
self.logger = logging.getLogger(__name__)
self.logger.info('Init websocket object.')
async def _connect(self, **kwargs: Any) -> None:
""" Connect to websocket. """
# Connect to host websocket
async with websockets.connect(self.host, **self.conn_para) as self.ws:
self.logger.info('Websocket connected to {}.'.format(self.host))
# Subscribe to a stream
await self._subscribe(**kwargs)
await self.wait_that('is_connect')
# Loop on received message
try:
async for msg in self.ws:
message = json.loads(msg)
await self.on_message(message)
# Stop if disconnect
if not self.is_connect:
return
# Exit due to closed connection
except websockets.exceptions.ConnectionClosed:
await self.on_error(
'ConnectionClosed',
"Code is {}\n".format(self.ws.close_code),
"Reason is '{}'".format(self.ws.close_reason)
)
async def _subscribe(self, **kwargs: Any) -> None:
""" Connect to a stream. """
# data = {"event": "subscribe", **kwargs}
data = {**self.subs_data, **kwargs}
self.logger.info('Subscription data: {}'.format(data))
# Wait the connection
await self.wait_that('ws')
# Send data to subscribe
await self.ws.send(json.dumps(data))
self.is_connect = True
return
[docs]
async def on_error(self, error: str, *args: Any) -> None:
""" On websocket error print and fire event. """
self.logger.error(error + ': ' + ''.join(args))
self.on_close()
[docs]
def on_close(self) -> None:
""" On websocket close print and fire event. """
self.logger.info("Websocket closed.")
self.is_connect = False
self.ws.close()
[docs]
def on_open(self, **kwargs: Any) -> None:
""" On websocket open.
Parameters
----------
**kwargs
Any relevant keyword arguments to set connection.
"""
self.logger.info("Websocket open.")
for attempt in range(self.max_retries):
try:
asyncio.run(self._connect(**kwargs))
break
except Exception as exc:
self.logger.warning(
f"Reconnect attempt {attempt + 1}/{self.max_retries}: {exc}"
)
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
else:
self.logger.error("Max retries reached, giving up.")
raise
[docs]
async def on_message(self, message: dict[str, Any] | list[Any]) -> None:
""" On websocket display message. """
self.logger.info('Message: {}'.format(message))
[docs]
async def wait_that(self, is_true: str) -> None:
""" Wait before running. """
while not self.__getattribute__(is_true):
self.logger.debug('Please wait that "{}".'.format(is_true))
await asyncio.sleep(1)