#!/usr/bin/env python3
# coding: utf-8
# @Author: ArthurBernard
# @Email: arthur.bernard.92@gmail.com
# @Date: 2019-07-26 11:54:55
# @Last modified by: ArthurBernard
""" Tools and object to load, append and save different kind of database. """
# Built-in packages
import os.path
import time
from collections.abc import Callable
from os import makedirs
from pickle import Pickler, Unpickler
from typing import Any, Literal
# Third-party packages
import polars as pl
from sqlalchemy import URL, create_engine
__all__ = ['IODataBase', 'get_df', 'save_df']
[docs]
class IODataBase:
""" Object to save a pl.DataFrame into different kind/format of database.
Parameters
----------
path : str, optional
Path of the database, default is './' (current directory).
method : {'DataFrame', 'SQLite', 'CSV', 'Excel', 'parquet', 'PostgreSQL',\
'Oracle', 'MSSQL', 'MySQL'}
Format of database, default is CSV.
Attributes
----------
path : str
Path of the database.
method : str
Kind/format of the database.
Methods
-------
save_as_dataframe
save_as_sql
save_as_sqlite
save_as_csv
save_as_excel
save_as_parquet
__call__
"""
def __init__(self, path: str = './', method: str = 'csv') -> None:
makedirs(path, exist_ok=True)
self.path = path if path.endswith('/') else path + '/'
self.method = method.lower()
self.parser: dict[str, Callable[..., None]] = {
'dataframe': self.save_as_dataframe,
'sqlite': self.save_as_sqlite,
'csv': self.save_as_csv,
'excel': self.save_as_excel,
'parquet': self.save_as_parquet,
'polars': self.save_as_parquet,
'postgresql': self.save_as_sql,
'mysql': self.save_as_sql,
'oracle': self.save_as_sql,
'mssql': self.save_as_sql,
}
if self.method not in self.parser:
raise NotImplementedError(
f"`method` must be one of {list(self.parser)}, got {method!r}"
)
def __call__(self, new_data: pl.DataFrame, **kwargs: Any) -> None:
""" Append and save *new_data* in the configured format. """
return self.parser[self.method](new_data, **kwargs)
[docs]
def save_as_dataframe(self, new_data: pl.DataFrame, name: str | None = None, ext: str = '.dat') -> None:
""" Append and save *new_data* as a pickle binary file. """
if name is None:
name = time.strftime('%y-%m-%d', time.gmtime(time.time()))
existing = get_df(self.path, name, ext=ext)
combined = pl.concat([existing, new_data]) if len(existing) > 0 else new_data
save_df(combined, self.path, name, ext=ext)
[docs]
def get_from_dataframe(self, name: str, ext: str = '.dat') -> pl.DataFrame:
""" Load data from pickle binary file. """
return get_df(self.path, name, ext=ext)
[docs]
def save_as_sqlite(self, new_data: pl.DataFrame, table: str = 'main_table', name: str | None = None, ext: str = '.db') -> None:
""" Append *new_data* into a SQLite table (stdlib sqlite3, no extra deps). """
import sqlite3
_PL_TO_SQLITE: dict[type, str] = {
pl.Int8: 'INTEGER', pl.Int16: 'INTEGER', pl.Int32: 'INTEGER', pl.Int64: 'INTEGER',
pl.UInt8: 'INTEGER', pl.UInt16: 'INTEGER', pl.UInt32: 'INTEGER', pl.UInt64: 'INTEGER',
pl.Float32: 'REAL', pl.Float64: 'REAL',
pl.Boolean: 'INTEGER',
}
if name is None:
name = time.strftime('%y', time.gmtime(time.time()))
path = self.path + name + ext
conn = sqlite3.connect(path)
cols = ', '.join(
f'"{c}" {_PL_TO_SQLITE.get(type(t), "TEXT")}'
for c, t in zip(new_data.columns, new_data.dtypes)
)
conn.execute(f'CREATE TABLE IF NOT EXISTS "{table}" ({cols})')
placeholders = ', '.join(['?'] * len(new_data.columns))
conn.executemany(f'INSERT INTO "{table}" VALUES ({placeholders})', new_data.iter_rows())
conn.commit()
conn.close()
[docs]
def get_from_sqlite(self, name: str, table: str = 'main_table', ext: str = '.db') -> pl.DataFrame:
""" Load data from a SQLite table. """
import sqlite3
path = self.path + name + ext
conn = sqlite3.connect(path)
result = pl.read_database(f'SELECT * FROM "{table}"', connection=conn)
conn.close()
return result
[docs]
def save_as_sql(self, new_data: pl.DataFrame, table: str = 'main_table', name: str | None = None, ext: str = '', driver: str | None = None, username: str | None = None, password: str | None = None, host: str | None = None, port: str | int | None = None, **kwargs: Any) -> None:
""" Append *new_data* into a SQL database via ADBC (PostgreSQL, MySQL, …).
Requires the appropriate ADBC driver, e.g. ``adbc_driver_postgresql``.
"""
if name is None:
name = time.strftime('%y', time.gmtime(time.time()))
drivername = self.method if driver is None else f"{self.method}+{driver}"
url = URL.create(
drivername, username=username, password=password, host=host,
port=port, database=self.path + name + ext, query=kwargs,
)
engine = create_engine(url)
new_data.write_database(table, connection=engine, if_table_exists='append', engine='adbc')
[docs]
def save_as_csv(self, new_data: pl.DataFrame, name: str | None = None, ext: str = '.csv') -> None:
""" Append *new_data* to a CSV file (header written once). """
if name is None:
name = time.strftime('%y', time.gmtime(time.time()))
path = self.path + name + ext
if os.path.exists(path):
with open(path, 'ab') as f:
new_data.write_csv(f, include_header=False)
else:
new_data.write_csv(path)
[docs]
def save_as_parquet(self, new_data: pl.DataFrame, name: str | None = None, ext: str = '.parquet', compression: Literal['snappy', 'gzip', 'brotli', 'lz4', 'zstd'] = 'snappy') -> None:
""" Append *new_data* to a Parquet file. """
if name is None:
name = time.strftime('%y', time.gmtime(time.time()))
path = self.path + name + ext
if os.path.exists(path):
existing = pl.read_parquet(path)
new_data = pl.concat([existing, new_data])
new_data.write_parquet(path, compression=compression)
[docs]
def save_as_excel(self, new_data: pl.DataFrame, name: str | None = None, sheet_name: str = 'Sheet1', ext: str = '.xlsx') -> None:
""" Append *new_data* to an Excel file (requires openpyxl).
Warnings
--------
Slow method, not recommended for large datasets.
"""
import openpyxl
if name is None:
name = time.strftime('%y-%m-%d', time.gmtime(time.time()))
path = self.path + name + ext
if os.path.exists(path):
wb = openpyxl.load_workbook(path)
if sheet_name in wb.sheetnames:
ws = wb[sheet_name]
write_header = False
else:
ws = wb.create_sheet(sheet_name)
write_header = True
else:
wb = openpyxl.Workbook()
ws = wb.active # type: ignore[assignment]
ws.title = sheet_name
write_header = True
if write_header:
ws.append(new_data.columns)
for row in new_data.iter_rows(named=False):
ws.append(list(row))
wb.save(path)
[docs]
def get_df(path: str, name: str, ext: str = '') -> pl.DataFrame:
""" Load a DataFrame from a pickle binary file.
Parameters
----------
path, name, ext : str
Directory path, file name, and optional extension.
Returns
-------
pl.DataFrame
Loaded DataFrame, or an empty DataFrame if the file is not found.
"""
if path[-1] != '/' and name[0] != '/':
path += '/'
if ext and not ext.startswith('.'):
ext = '.' + ext
try:
with open(path + name + ext, 'rb') as f:
return Unpickler(f).load()
except FileNotFoundError:
return pl.DataFrame()
[docs]
def save_df(df: pl.DataFrame, path: str, name: str, ext: str = '') -> None:
""" Save a DataFrame as a pickle binary file.
Parameters
----------
df : pl.DataFrame
DataFrame to persist.
path, name, ext : str
Directory path, file name, and optional extension.
"""
if path[-1] != '/' and name[0] != '/':
path += '/'
if ext and not ext.startswith('.'):
ext = '.' + ext
makedirs(path, exist_ok=True)
with open(path + name + ext, 'wb') as f:
Pickler(f).dump(df)