Source code for data_engine.helpers.polars

"""Polars namespace helpers for Data Engine flow authoring."""

from __future__ import annotations

from collections.abc import Callable, Iterable
import os
from pathlib import Path
from uuid import uuid4

import polars as pl

from data_engine.helpers.duckdb import attach_dimension as _attach_dimension
from data_engine.helpers.duckdb import build_dimension as _build_dimension
from data_engine.helpers.duckdb import denormalize_columns as _denormalize_columns
from data_engine.helpers.duckdb import normalize_columns as _normalize_columns
from data_engine.helpers.duckdb import replace_rows_by_file as _replace_rows_by_file
from data_engine.helpers.duckdb import replace_rows_by_values as _replace_rows_by_values
from data_engine.helpers.duckdb import replace_table as _replace_table
from data_engine.helpers.schema import normalize_column_names as _normalize_column_names

PathLike = str | os.PathLike[str]
ColumnNames = str | list[str] | tuple[str, ...]
ReturnMode = str | None


[docs] def write_parquet_atomic(df: pl.DataFrame, path: PathLike, **write_options: object) -> Path: """Write a Polars dataframe to parquet with same-directory atomic replacement. The dataframe is first written to a unique temporary file beside the target, then moved into place with ``os.replace``. This keeps readers from seeing a partially written parquet file while preserving normal Polars write options. Parameters ---------- df : pl.DataFrame Eager Polars dataframe to write. path : PathLike Target parquet path. **write_options : object Keyword options forwarded to ``pl.DataFrame.write_parquet``. Returns ------- Path Absolute target path that was replaced. Examples -------- .. code-block:: python import polars as pl from data_engine.helpers import write_parquet_atomic target = write_parquet_atomic( pl.DataFrame({"claim_id": [1, 2]}), "workspaces/example/output/claims.parquet", ) df = pl.DataFrame({"claim_id": [3]}) df.de.write_parquet_atomic(target) """ return _write_atomic(path, lambda temporary_path: df.write_parquet(temporary_path, **write_options))
[docs] def write_excel_atomic( df: pl.DataFrame, path: PathLike, worksheet: str | None = None, **write_options: object, ) -> Path: """Write a Polars dataframe to Excel with same-directory atomic replacement. The dataframe is first written to a unique temporary workbook beside the target, then moved into place with ``os.replace``. All keyword options are forwarded to ``pl.DataFrame.write_excel``. Parameters ---------- df : pl.DataFrame Eager Polars dataframe to write. path : PathLike Target Excel workbook path. worksheet : str | None Optional worksheet name forwarded to ``pl.DataFrame.write_excel``. **write_options : object Keyword options forwarded to ``pl.DataFrame.write_excel``. Returns ------- Path Absolute target path that was replaced. Examples -------- .. code-block:: python import polars as pl from data_engine.helpers import write_excel_atomic target = write_excel_atomic( pl.DataFrame({"claim_id": [1, 2]}), "workspaces/example/output/claims.xlsx", worksheet="Claims", table_name="claims", autofit=True, ) df = pl.DataFrame({"claim_id": [3]}) df.de.write_excel_atomic(target, worksheet="Claims") """ return _write_atomic( path, lambda temporary_path: df.write_excel(temporary_path, worksheet=worksheet, **write_options), )
[docs] def sink_parquet_atomic(lf: pl.LazyFrame, path: PathLike, **sink_options: object) -> Path: """Sink a Polars lazy frame to parquet with same-directory atomic replacement. The lazy query is executed into a unique temporary file beside the target, then moved into place with ``os.replace``. Use the default eager sink mode so the helper can complete the replacement in the same call. Parameters ---------- lf : pl.LazyFrame Lazy Polars frame to execute and write. path : PathLike Target parquet path. **sink_options : object Keyword options forwarded to ``pl.LazyFrame.sink_parquet``. Returns ------- Path Absolute target path that was replaced. Raises ------ ValueError If ``lazy=True`` is passed. Examples -------- .. code-block:: python import polars as pl import data_engine.helpers lf = pl.DataFrame({"claim_id": [1, 2]}).lazy() lf.de.sink_parquet_atomic("workspaces/example/output/claims.parquet") """ if sink_options.get("lazy") is True: raise ValueError("Atomic LazyFrame parquet writes require eager sink execution; pass lazy=False or omit lazy.") return _write_atomic(path, lambda temporary_path: lf.sink_parquet(temporary_path, **sink_options))
def _write_atomic(path: PathLike, write: Callable[[Path], object]) -> Path: target_path = Path(path).expanduser().resolve() target_path.parent.mkdir(parents=True, exist_ok=True) temporary_path = target_path.with_name(f".{target_path.name}.{uuid4().hex}.tmp") try: write(temporary_path) os.replace(temporary_path, target_path) except BaseException: _remove_temporary_file(temporary_path) raise return target_path def _remove_temporary_file(path: Path) -> None: try: path.unlink() except FileNotFoundError: pass except OSError: pass
[docs] @pl.api.register_dataframe_namespace("de") class DataEngineDataFrameNamespace: """Data Engine helpers available from ``pl.DataFrame.de``.""" def __init__(self, df: pl.DataFrame) -> None: self._df = df
[docs] def normalize_column_names(self, columns: Iterable[object] | None = None) -> pl.DataFrame: """Normalize column names on this dataframe. Parameters ---------- columns : Iterable[object] | None Optional subset of column names to normalize. When omitted, all dataframe columns are normalized. Returns ------- pl.DataFrame Dataframe with normalized column names. """ return _normalize_column_names(self._df, columns)
[docs] def write_parquet_atomic(self, path: PathLike, **write_options: object) -> Path: """Write this dataframe to parquet with atomic target replacement. Parameters ---------- path : PathLike Target parquet path. **write_options : object Keyword options forwarded to ``pl.DataFrame.write_parquet``. Returns ------- Path Absolute target path that was replaced. """ return write_parquet_atomic(self._df, path, **write_options)
[docs] def write_excel_atomic( self, path: PathLike, worksheet: str | None = None, **write_options: object, ) -> Path: """Write this dataframe to Excel with atomic target replacement. Parameters ---------- path : PathLike Target Excel workbook path. worksheet : str | None Optional worksheet name forwarded to ``pl.DataFrame.write_excel``. **write_options : object Keyword options forwarded to ``pl.DataFrame.write_excel``. Returns ------- Path Absolute target path that was replaced. """ return write_excel_atomic(self._df, path, worksheet=worksheet, **write_options)
[docs] def build_dimension( self, db_path: PathLike, table: str, *, key_column: str = "dimension_key", return_df: bool = True, ) -> pl.DataFrame | None: """Build or extend one DuckDB dimension table from this dataframe. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. key_column : str Surrogate key column to create in the dimension table. return_df : bool Whether to return the mapping dataframe for this dataframe's natural keys. Returns ------- pl.DataFrame | None Mapping dataframe when ``return_df`` is true; otherwise ``None``. """ return _build_dimension(db_path, table, df=self._df, key_column=key_column, return_df=return_df)
[docs] def attach_dimension( self, db_path: PathLike, table: str, *, on: ColumnNames, key_column: str = "dimension_key", drop_key: bool = False, ) -> pl.DataFrame: """Attach an existing DuckDB dimension key to this dataframe. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. on : ColumnNames Natural key column or columns used to join to the dimension table. key_column : str Surrogate key column to attach. drop_key : bool Whether to drop the natural key columns after attaching the surrogate key. Returns ------- pl.DataFrame Dataframe with the surrogate key column attached. """ return _attach_dimension( db_path, table, df=self._df, on=on, key_column=key_column, drop_key=drop_key, )
[docs] def denormalize_columns( self, db_path: PathLike, table: str, *, key_column: str = "dimension_key", select: ColumnNames = "*", drop_key: bool = False, ) -> pl.DataFrame: """Attach natural columns from an existing DuckDB dimension table. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. key_column : str Surrogate key column used to join to the dimension table. select : ColumnNames Natural columns to attach, or ``"*"`` for all non-key columns. drop_key : bool Whether to drop ``key_column`` after attaching the natural columns. Returns ------- pl.DataFrame Dataframe with selected dimension columns attached. """ return _denormalize_columns( db_path, table, df=self._df, key_column=key_column, select=select, drop_key=drop_key, )
[docs] def normalize_columns( self, db_path: PathLike, table: str, *, on: ColumnNames, key_column: str = "dimension_key", drop_key: bool = True, returns: ReturnMode = "df", ) -> pl.DataFrame | None: """Build dimension keys and attach them back onto this dataframe. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. on : ColumnNames Natural key column or columns used to build the dimension. key_column : str Surrogate key column to create and attach. drop_key : bool Whether to drop natural key columns after attaching the surrogate key. returns : ReturnMode ``"df"`` for normalized input rows, ``"map"`` for only the key mapping, or ``None`` to only persist dimension rows. Returns ------- pl.DataFrame | None Normalized dataframe, mapping dataframe, or ``None`` according to ``returns``. """ return _normalize_columns( db_path, table, df=self._df, on=on, key_column=key_column, drop_key=drop_key, returns=returns, )
[docs] def replace_rows_by_file( self, db_path: PathLike, table: str, *, file_hash: str, file_hash_column: str = "file_key", return_df: bool = True, ) -> pl.DataFrame | None: """Replace one file's DuckDB rows and append this dataframe. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Destination table name, optionally schema-qualified. file_hash : str Stable source-file identifier used to delete the previous batch. file_hash_column : str Column name used to store ``file_hash`` in the destination table. return_df : bool Whether to return this dataframe with the file hash column attached. Returns ------- pl.DataFrame | None Inserted rows with ``file_hash_column`` when ``return_df`` is true; otherwise ``None``. """ return _replace_rows_by_file( db_path, table, df=self._df, file_hash=file_hash, file_hash_column=file_hash_column, return_df=return_df, )
[docs] def replace_rows_by_values( self, db_path: PathLike, table: str, *, column: str, return_df: bool = True, ) -> pl.DataFrame | None: """Replace DuckDB rows matching this dataframe's values for one column. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Destination table name, optionally schema-qualified. column : str Column whose incoming values define the rows to replace. return_df : bool Whether to return the inserted dataframe. Returns ------- pl.DataFrame | None Inserted dataframe when ``return_df`` is true; otherwise ``None``. """ return _replace_rows_by_values(db_path, table, df=self._df, column=column, return_df=return_df)
[docs] def replace_table( self, db_path: PathLike, table: str, *, return_df: bool = True, ) -> pl.DataFrame | None: """Replace one DuckDB table wholesale from this dataframe. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Destination table name, optionally schema-qualified. return_df : bool Whether to return the inserted dataframe. Returns ------- pl.DataFrame | None Inserted dataframe when ``return_df`` is true; otherwise ``None``. """ return _replace_table(db_path, table, df=self._df, return_df=return_df)
[docs] @pl.api.register_lazyframe_namespace("de") class DataEngineLazyFrameNamespace: """Data Engine helpers available from ``pl.LazyFrame.de``.""" def __init__(self, lf: pl.LazyFrame) -> None: self._lf = lf
[docs] def normalize_column_names(self, columns: Iterable[object] | None = None) -> pl.LazyFrame: """Normalize column names on this lazy frame. Parameters ---------- columns : Iterable[object] | None Optional subset of column names to normalize. When omitted, all lazy-frame columns are normalized. Returns ------- pl.LazyFrame Lazy frame with normalized column names. """ return _normalize_column_names(self._lf, columns)
[docs] def sink_parquet_atomic(self, path: PathLike, **sink_options: object) -> Path: """Execute this lazy frame to parquet with atomic target replacement. Parameters ---------- path : PathLike Target parquet path. **sink_options : object Keyword options forwarded to ``pl.LazyFrame.sink_parquet``. Returns ------- Path Absolute target path that was replaced. """ return sink_parquet_atomic(self._lf, path, **sink_options)
[docs] def build_dimension( self, db_path: PathLike, table: str, *, key_column: str = "dimension_key", return_df: bool = True, ) -> pl.DataFrame | None: """Build or extend one DuckDB dimension table from this lazy frame. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. key_column : str Surrogate key column to create in the dimension table. return_df : bool Whether to return the mapping dataframe for this lazy frame's natural keys. Returns ------- pl.DataFrame | None Mapping dataframe when ``return_df`` is true; otherwise ``None``. """ return _build_dimension(db_path, table, df=self._lf, key_column=key_column, return_df=return_df)
[docs] def attach_dimension( self, db_path: PathLike, table: str, *, on: ColumnNames, key_column: str = "dimension_key", drop_key: bool = False, ) -> pl.DataFrame: """Attach an existing DuckDB dimension key to this lazy frame. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. on : ColumnNames Natural key column or columns used to join to the dimension table. key_column : str Surrogate key column to attach. drop_key : bool Whether to drop the natural key columns after attaching the surrogate key. Returns ------- pl.DataFrame Collected dataframe with the surrogate key column attached. """ return _attach_dimension( db_path, table, df=self._lf, on=on, key_column=key_column, drop_key=drop_key, )
[docs] def denormalize_columns( self, db_path: PathLike, table: str, *, key_column: str = "dimension_key", select: ColumnNames = "*", drop_key: bool = False, ) -> pl.DataFrame: """Attach natural columns from an existing DuckDB dimension table. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. key_column : str Surrogate key column used to join to the dimension table. select : ColumnNames Natural columns to attach, or ``"*"`` for all non-key columns. drop_key : bool Whether to drop ``key_column`` after attaching the natural columns. Returns ------- pl.DataFrame Collected dataframe with selected dimension columns attached. """ return _denormalize_columns( db_path, table, df=self._lf, key_column=key_column, select=select, drop_key=drop_key, )
[docs] def normalize_columns( self, db_path: PathLike, table: str, *, on: ColumnNames, key_column: str = "dimension_key", drop_key: bool = True, returns: ReturnMode = "df", ) -> pl.DataFrame | None: """Build dimension keys and attach them back onto this lazy frame. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Dimension table name, optionally schema-qualified. on : ColumnNames Natural key column or columns used to build the dimension. key_column : str Surrogate key column to create and attach. drop_key : bool Whether to drop natural key columns after attaching the surrogate key. returns : ReturnMode ``"df"`` for normalized input rows, ``"map"`` for only the key mapping, or ``None`` to only persist dimension rows. Returns ------- pl.DataFrame | None Normalized dataframe, mapping dataframe, or ``None`` according to ``returns``. """ return _normalize_columns( db_path, table, df=self._lf, on=on, key_column=key_column, drop_key=drop_key, returns=returns, )
[docs] def replace_rows_by_file( self, db_path: PathLike, table: str, *, file_hash: str, file_hash_column: str = "file_key", return_df: bool = True, ) -> pl.DataFrame | None: """Replace one file's DuckDB rows and append this lazy frame. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Destination table name, optionally schema-qualified. file_hash : str Stable source-file identifier used to delete the previous batch. file_hash_column : str Column name used to store ``file_hash`` in the destination table. return_df : bool Whether to return the collected frame with the file hash column attached. Returns ------- pl.DataFrame | None Inserted rows with ``file_hash_column`` when ``return_df`` is true; otherwise ``None``. """ return _replace_rows_by_file( db_path, table, df=self._lf, file_hash=file_hash, file_hash_column=file_hash_column, return_df=return_df, )
[docs] def replace_rows_by_values( self, db_path: PathLike, table: str, *, column: str, return_df: bool = True, ) -> pl.DataFrame | None: """Replace DuckDB rows matching this lazy frame's values for one column. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Destination table name, optionally schema-qualified. column : str Column whose incoming values define the rows to replace. return_df : bool Whether to return the inserted dataframe. Returns ------- pl.DataFrame | None Inserted dataframe when ``return_df`` is true; otherwise ``None``. """ return _replace_rows_by_values(db_path, table, df=self._lf, column=column, return_df=return_df)
[docs] def replace_table( self, db_path: PathLike, table: str, *, return_df: bool = True, ) -> pl.DataFrame | None: """Replace one DuckDB table wholesale from this lazy frame. Parameters ---------- db_path : PathLike DuckDB database file path. table : str Destination table name, optionally schema-qualified. return_df : bool Whether to return the inserted dataframe. Returns ------- pl.DataFrame | None Inserted dataframe when ``return_df`` is true; otherwise ``None``. """ return _replace_table(db_path, table, df=self._lf, return_df=return_df)