Source code for data_engine.helpers.duckdb

"""Public one-shot DuckDB helpers for flow authoring."""

from __future__ import annotations

import hashlib
from pathlib import Path

import duckdb
import polars as pl

FrameLike = pl.DataFrame | pl.LazyFrame


def _quote_identifier(value: str) -> str:
    text = str(value).strip()
    if not text:
        raise ValueError("Identifier must be non-empty.")
    return '"' + text.replace('"', '""') + '"'


def _quote_table_ref(value: str) -> str:
    parts = [part.strip() for part in str(value).split(".")]
    if not parts or any(not part for part in parts):
        raise ValueError("Table name must be non-empty.")
    return ".".join(_quote_identifier(part) for part in parts)


def _schema_ref(value: str) -> str | None:
    parts = [part.strip() for part in str(value).split(".")]
    if len(parts) <= 1:
        return None
    return ".".join(_quote_identifier(part) for part in parts[:-1])


def _join_predicate(*, left_alias: str, right_alias: str, columns: tuple[str, ...]) -> str:
    return " AND ".join(
        f'{left_alias}.{_quote_identifier(column)} IS NOT DISTINCT FROM {right_alias}.{_quote_identifier(column)}'
        for column in columns
    )


def _ordered_columns(columns: tuple[str, ...]) -> str:
    return ", ".join(_quote_identifier(column) for column in columns)


def _qualified_columns(alias: str, columns: tuple[str, ...]) -> str:
    return ", ".join(f"{alias}.{_quote_identifier(column)}" for column in columns)


def _index_name(*, table: str, columns: tuple[str, ...]) -> str:
    digest = hashlib.sha1(f"{table}|{'|'.join(columns)}".encode("utf-8")).hexdigest()[:10]
    return f"uq_dim_{digest}"


def _existing_table_columns(connection, table: str) -> list[tuple[int, str, str, bool, object, bool]]:
    schema = _schema_ref(table)
    table_name = str(table).split(".")[-1].strip()
    if schema is None:
        return connection.execute(f"PRAGMA table_info({_quote_identifier(table_name)})").fetchall()
    return connection.execute(f"PRAGMA table_info({_quote_table_ref(table)})").fetchall()


def _table_column_names(connection, table: str) -> tuple[str, ...]:
    return tuple(name for _, name, *_ in _existing_table_columns(connection, table))


def _normalize_selected_columns(select: str | list[str] | tuple[str, ...]) -> tuple[str, ...]:
    if isinstance(select, str):
        normalized = (select.strip(),)
    else:
        normalized = tuple(str(value).strip() for value in select)
    if not normalized or any(not value for value in normalized):
        raise ValueError("select must include at least one non-empty column name.")
    return normalized


def _normalize_key_columns(on: str | list[str] | tuple[str, ...]) -> tuple[str, ...]:
    if isinstance(on, str):
        normalized = (on.strip(),)
    else:
        normalized = tuple(str(value).strip() for value in on)
    if not normalized or any(not value for value in normalized):
        raise ValueError("on must include at least one non-empty column name.")
    return normalized


def _normalize_optional_limit(limit: int | None) -> int | None:
    if limit is None:
        return None
    normalized = int(limit)
    if normalized < 0:
        raise ValueError("limit must be non-negative.")
    return normalized


def _materialize_frame(df: FrameLike) -> pl.DataFrame:
    if isinstance(df, pl.DataFrame):
        return df
    if isinstance(df, pl.LazyFrame):
        return df.collect()
    raise TypeError("df must be a Polars DataFrame or LazyFrame.")


[docs] def build_dimension( db_path: str | Path, table: str, *, df: FrameLike, key_column: str = "dimension_key", return_df: bool = True, ) -> pl.DataFrame | None: """Build or extend one dimension table from unique incoming row combinations. The incoming dataframe is treated as the natural key definition: every incoming column participates in uniqueness. The helper ensures the dimension table exists, inserts only new combinations, assigns deterministic surrogate keys, and optionally returns the natural-key-to-surrogate-key mapping. Parameters ---------- db_path : str | Path DuckDB database file path. table : str Dimension table name, optionally schema-qualified. df : FrameLike Polars dataframe or lazy dataframe containing the natural key columns to persist. LazyFrames are collected before DuckDB operations run. key_column : str Surrogate key column to create in the dimension table. return_df : bool Whether to return the mapping dataframe for the incoming natural keys. Returns ------- pl.DataFrame | None Mapping dataframe with natural key columns plus ``key_column`` when ``return_df`` is true; otherwise ``None``. Raises ------ ValueError If identifiers or dataframe columns are invalid. A ``TypeError`` is raised by shared frame normalization when ``df`` is not a Polars DataFrame or LazyFrame. Exception Re-raises DuckDB transaction failures after rollback. """ df = _materialize_frame(df) natural_columns = tuple(df.columns) if not natural_columns: raise ValueError("df must include at least one column.") normalized_key_column = str(key_column).strip() if not normalized_key_column: raise ValueError("key_column must be non-empty.") if normalized_key_column in natural_columns: raise ValueError(f'key_column {normalized_key_column!r} must not already exist in df columns.') quoted_table = _quote_table_ref(table) quoted_schema = _schema_ref(table) quoted_key_column = _quote_identifier(normalized_key_column) quoted_natural_columns = _ordered_columns(natural_columns) qualified_mapping_columns = _qualified_columns("mapping", natural_columns) natural_join = _join_predicate(left_alias="candidate", right_alias="existing", columns=natural_columns) mapping_join = _join_predicate(left_alias="mapping", right_alias="incoming_distinct", columns=natural_columns) order_by_columns = quoted_natural_columns temp_view = "__data_engine_dimension_incoming" temp_distinct = "__data_engine_dimension_incoming_distinct" temp_new_rows = "__data_engine_dimension_new_rows" unique_index_name = _quote_identifier(_index_name(table=table, columns=natural_columns)) resolved_db_path = Path(db_path).expanduser().resolve() resolved_db_path.parent.mkdir(parents=True, exist_ok=True) connection = duckdb.connect(resolved_db_path) try: connection.execute("BEGIN TRANSACTION") if quoted_schema is not None: connection.execute(f"CREATE SCHEMA IF NOT EXISTS {quoted_schema}") connection.register(temp_view, df) connection.execute(f"CREATE OR REPLACE TEMP TABLE {temp_distinct} AS SELECT DISTINCT * FROM {temp_view}") connection.execute( f""" CREATE TABLE IF NOT EXISTS {quoted_table} AS SELECT CAST(NULL AS BIGINT) AS {quoted_key_column}, * FROM {temp_distinct} WHERE 1 = 0 """ ) connection.execute( f"CREATE UNIQUE INDEX IF NOT EXISTS {unique_index_name} ON {quoted_table} ({quoted_natural_columns})" ) connection.execute( f""" CREATE OR REPLACE TEMP TABLE {temp_new_rows} AS SELECT candidate.* FROM {temp_distinct} AS candidate LEFT JOIN {quoted_table} AS existing ON {natural_join} WHERE existing.{quoted_key_column} IS NULL """ ) connection.execute( f""" INSERT INTO {quoted_table} ({quoted_key_column}, {quoted_natural_columns}) SELECT current_keys.max_existing_key + ROW_NUMBER() OVER (ORDER BY {order_by_columns}) AS {quoted_key_column}, new_rows.* FROM {temp_new_rows} AS new_rows CROSS JOIN ( SELECT COALESCE(MAX({quoted_key_column}), 0) AS max_existing_key FROM {quoted_table} ) AS current_keys """ ) if not return_df: connection.execute("COMMIT") return None mapping = connection.execute( f""" SELECT {qualified_mapping_columns}, mapping.{quoted_key_column} FROM {quoted_table} AS mapping INNER JOIN {temp_distinct} AS incoming_distinct ON {mapping_join} ORDER BY {order_by_columns} """ ).pl() connection.execute("COMMIT") return mapping except Exception: try: connection.execute("ROLLBACK") except Exception: pass raise finally: connection.close()
[docs] def replace_rows_by_file( db_path: str | Path, table: str, *, df: FrameLike, file_hash: str, file_hash_column: str = "file_key", return_df: bool = True, ) -> pl.DataFrame | None: """Atomically replace one file's fact rows and append the current batch. Parameters ---------- db_path : str | Path DuckDB database file path. table : str Destination table name, optionally schema-qualified. df : FrameLike Incoming fact rows. LazyFrames are collected before DuckDB operations run. file_hash : str Stable source-file identifier used to delete the previous batch. file_hash_column : str Column name used to store ``file_hash`` in the destination table. return_df : bool Whether to return ``df`` with the file hash column attached. Returns ------- pl.DataFrame | None The inserted rows with ``file_hash_column`` when ``return_df`` is true; otherwise ``None``. Raises ------ ValueError If identifiers, file hash values, or dataframe columns are invalid. A ``TypeError`` is raised by shared frame normalization when ``df`` is not a Polars DataFrame or LazyFrame. Exception Re-raises DuckDB transaction failures after rollback. """ df = _materialize_frame(df) normalized_file_hash = str(file_hash).strip() if not normalized_file_hash: raise ValueError("file_hash must be non-empty.") normalized_file_hash_column = str(file_hash_column).strip() if not normalized_file_hash_column: raise ValueError("file_hash_column must be non-empty.") if normalized_file_hash_column in df.columns: raise ValueError(f'file_hash_column {normalized_file_hash_column!r} must not already exist in df columns.') incoming_with_hash = df.with_columns(pl.lit(normalized_file_hash).alias(normalized_file_hash_column)) incoming_columns = tuple(incoming_with_hash.columns) if not incoming_columns: raise ValueError("df must include at least one column.") quoted_table = _quote_table_ref(table) quoted_schema = _schema_ref(table) quoted_file_hash_column = _quote_identifier(normalized_file_hash_column) quoted_incoming_columns = _ordered_columns(incoming_columns) temp_view = "__data_engine_incremental_incoming" temp_table = "__data_engine_incremental_incoming_table" resolved_db_path = Path(db_path).expanduser().resolve() resolved_db_path.parent.mkdir(parents=True, exist_ok=True) connection = duckdb.connect(resolved_db_path) try: connection.execute("BEGIN TRANSACTION") if quoted_schema is not None: connection.execute(f"CREATE SCHEMA IF NOT EXISTS {quoted_schema}") connection.register(temp_view, incoming_with_hash) connection.execute(f"CREATE OR REPLACE TEMP TABLE {temp_table} AS SELECT * FROM {temp_view}") connection.execute(f"CREATE TABLE IF NOT EXISTS {quoted_table} AS SELECT * FROM {temp_table} WHERE 1 = 0") existing_columns = {name: dtype for _, name, dtype, *_ in _existing_table_columns(connection, table)} incoming_info = connection.execute(f"PRAGMA table_info({temp_table})").fetchall() for _, name, dtype, *_ in incoming_info: if name in existing_columns: continue connection.execute(f"ALTER TABLE {quoted_table} ADD COLUMN {_quote_identifier(name)} {dtype}") connection.execute( f"DELETE FROM {quoted_table} WHERE {quoted_file_hash_column} = ?", [normalized_file_hash], ) connection.execute( f""" INSERT INTO {quoted_table} ({quoted_incoming_columns}) SELECT {quoted_incoming_columns} FROM {temp_table} """ ) if not return_df: connection.execute("COMMIT") return None connection.execute("COMMIT") return incoming_with_hash except Exception: try: connection.execute("ROLLBACK") except Exception: pass raise finally: connection.close()
[docs] def replace_rows_by_values( db_path: str | Path, table: str, *, df: FrameLike, column: str, return_df: bool = True, ) -> pl.DataFrame | None: """Atomically replace one value-slice of rows and append the current batch. Parameters ---------- db_path : str | Path DuckDB database file path. table : str Destination table name, optionally schema-qualified. df : FrameLike Incoming replacement rows. LazyFrames are collected before DuckDB operations run. column : str Column whose incoming values define the rows to replace. return_df : bool Whether to return the inserted dataframe. Returns ------- pl.DataFrame | None The inserted dataframe when ``return_df`` is true; otherwise ``None``. Raises ------ ValueError If ``column`` is invalid or missing from ``df``. A ``TypeError`` is raised by shared frame normalization when ``df`` is not a Polars DataFrame or LazyFrame. Exception Re-raises DuckDB transaction failures after rollback. """ df = _materialize_frame(df) if df.is_empty(): raise ValueError("df must include at least one row.") normalized_column = str(column).strip() if not normalized_column: raise ValueError("column must be non-empty.") if normalized_column not in df.columns: raise ValueError(f'column {normalized_column!r} must exist in df columns.') lookup = df.select(pl.col(normalized_column)).unique(maintain_order=True) if lookup.is_empty(): raise ValueError("df must include at least one replacement value.") quoted_table = _quote_table_ref(table) quoted_schema = _schema_ref(table) quoted_column = _quote_identifier(normalized_column) quoted_df_columns = _ordered_columns(tuple(df.columns)) temp_view = "__data_engine_replace_values_df" temp_table = "__data_engine_replace_values_df_table" temp_lookup_view = "__data_engine_replace_values_lookup" temp_lookup_table = "__data_engine_replace_values_lookup_table" resolved_db_path = Path(db_path).expanduser().resolve() resolved_db_path.parent.mkdir(parents=True, exist_ok=True) connection = duckdb.connect(resolved_db_path) try: connection.execute("BEGIN TRANSACTION") if quoted_schema is not None: connection.execute(f"CREATE SCHEMA IF NOT EXISTS {quoted_schema}") connection.register(temp_view, df) connection.execute(f"CREATE OR REPLACE TEMP TABLE {temp_table} AS SELECT * FROM {temp_view}") connection.execute(f"CREATE TABLE IF NOT EXISTS {quoted_table} AS SELECT * FROM {temp_table} WHERE 1 = 0") existing_columns = {name: dtype for _, name, dtype, *_ in _existing_table_columns(connection, table)} incoming_info = connection.execute(f"PRAGMA table_info({temp_table})").fetchall() for _, name, dtype, *_ in incoming_info: if name in existing_columns: continue connection.execute(f"ALTER TABLE {quoted_table} ADD COLUMN {_quote_identifier(name)} {dtype}") connection.register(temp_lookup_view, lookup) connection.execute( f""" CREATE OR REPLACE TEMP TABLE {temp_lookup_table} AS SELECT {_quote_identifier(normalized_column)} AS lookup_value FROM {temp_lookup_view} """ ) connection.execute( f""" DELETE FROM {quoted_table} WHERE {quoted_column} IN ( SELECT lookup_value FROM {temp_lookup_table} ) """ ) connection.execute( f""" INSERT INTO {quoted_table} ({quoted_df_columns}) SELECT {quoted_df_columns} FROM {temp_table} """ ) if not return_df: connection.execute("COMMIT") return None connection.execute("COMMIT") return df except Exception: try: connection.execute("ROLLBACK") except Exception: pass raise finally: connection.close()
[docs] def attach_dimension( db_path: str | Path, table: str, *, df: FrameLike, on: str | list[str] | tuple[str, ...], key_column: str = "dimension_key", drop_key: bool = False, ) -> pl.DataFrame: """Attach an existing surrogate key mapping table to an input dataframe. Parameters ---------- db_path : str | Path DuckDB database file path. table : str Dimension table name, optionally schema-qualified. df : FrameLike Input dataframe containing the natural key columns. LazyFrames are collected before DuckDB operations run. on : str | list[str] | tuple[str, ...] Natural key column or columns used to join to the dimension table. key_column : str Surrogate key column to attach. drop_key : bool Whether to drop the natural key columns after attaching the surrogate key. Returns ------- pl.DataFrame Input dataframe with the surrogate key column attached. Raises ------ ValueError If join columns are invalid or missing. A ``TypeError`` is raised by shared frame normalization when ``df`` is not a Polars DataFrame or LazyFrame. """ df = _materialize_frame(df) join_columns = _normalize_key_columns(on) missing_columns = [column for column in join_columns if column not in df.columns] if missing_columns: raise ValueError(f"on columns must exist in df: {missing_columns!r}") mapping = read_rows_by_values( db_path, table, column=join_columns[0], is_in=df.get_column(join_columns[0]).unique().to_list(), select=[*join_columns, key_column], ).unique(subset=list(join_columns), maintain_order=True) normalized = df.join(mapping, on=list(join_columns), how="left", validate="m:1") if drop_key: normalized = normalized.drop(list(join_columns)) return normalized
[docs] def denormalize_columns( db_path: str | Path, table: str, *, df: FrameLike, key_column: str = "dimension_key", select: str | list[str] | tuple[str, ...] = "*", drop_key: bool = False, ) -> pl.DataFrame: """Attach natural columns from an existing dimension table onto a keyed dataframe. Parameters ---------- db_path : str | Path DuckDB database file path. table : str Dimension table name, optionally schema-qualified. df : FrameLike Input dataframe containing ``key_column``. LazyFrames are collected before DuckDB operations run. key_column : str Surrogate key column used to join to the dimension table. select : str | list[str] | tuple[str, ...] Natural columns to attach, or ``"*"`` for all non-key columns. drop_key : bool Whether to drop ``key_column`` after attaching the natural columns. Returns ------- pl.DataFrame Input dataframe with selected dimension columns attached. Raises ------ ValueError If the key column, selected columns, or dimension table are invalid. A ``TypeError`` is raised by shared frame normalization when ``df`` is not a Polars DataFrame or LazyFrame. """ df = _materialize_frame(df) normalized_key_column = str(key_column).strip() if not normalized_key_column: raise ValueError("key_column must be non-empty.") if normalized_key_column not in df.columns: raise ValueError(f"key_column {normalized_key_column!r} must exist in df.") resolved_db_path = Path(db_path).expanduser().resolve() resolved_db_path.parent.mkdir(parents=True, exist_ok=True) connection = duckdb.connect(resolved_db_path) try: table_columns = _table_column_names(connection, table) finally: connection.close() if not table_columns: raise ValueError(f"Table {table!r} does not exist or has no columns.") if normalized_key_column not in table_columns: raise ValueError(f"key_column {normalized_key_column!r} must exist in table {table!r}.") if select == "*": selected_columns = tuple(column for column in table_columns if column != normalized_key_column) else: selected_columns = _normalize_selected_columns(select) missing_columns = [column for column in selected_columns if column not in table_columns] if missing_columns: raise ValueError(f"select columns must exist in table {table!r}: {missing_columns!r}") if normalized_key_column in selected_columns: raise ValueError(f"select must not include key_column {normalized_key_column!r}.") if not selected_columns: raise ValueError("select must include at least one non-key column.") mapping = read_rows_by_values( db_path, table, column=normalized_key_column, is_in=df.get_column(normalized_key_column).unique().to_list(), select=[normalized_key_column, *selected_columns], ).unique(subset=[normalized_key_column], maintain_order=True) denormalized = df.join(mapping, on=[normalized_key_column], how="left", validate="m:1") if drop_key: denormalized = denormalized.drop([normalized_key_column]) return denormalized
[docs] def normalize_columns( db_path: str | Path, table: str, *, df: FrameLike, on: str | list[str] | tuple[str, ...], key_column: str = "dimension_key", drop_key: bool = True, returns: str | None = "df", ) -> pl.DataFrame | None: """Build missing surrogate keys and attach them back onto the input dataframe. Parameters ---------- db_path : str | Path DuckDB database file path. table : str Dimension table name, optionally schema-qualified. df : FrameLike Input dataframe containing natural key columns. LazyFrames are collected before DuckDB operations run. on : str | list[str] | tuple[str, ...] Natural key column or columns used to build the dimension. key_column : str Surrogate key column to create and attach. drop_key : bool Whether to drop natural key columns after attaching the surrogate key. returns : str | None ``"df"`` for normalized input rows, ``"map"`` for only the key mapping, or ``None`` to only persist dimension rows. Returns ------- pl.DataFrame | None Normalized dataframe, mapping dataframe, or ``None`` according to ``returns``. Raises ------ RuntimeError If dimension mapping creation unexpectedly returns no mapping. ValueError If ``returns`` or join columns are invalid. """ if returns not in {"df", "map", None}: raise ValueError('returns must be "df", "map", or None.') df = _materialize_frame(df) join_columns = _normalize_key_columns(on) natural_key_df = df.select(list(join_columns)).unique(maintain_order=True) mapping = build_dimension( db_path, table, df=natural_key_df, key_column=key_column, return_df=True, ) if mapping is None: raise RuntimeError("build_dimension() unexpectedly returned no mapping.") if returns == "map": return mapping if returns is None: return None return attach_dimension( db_path, table, df=df, on=join_columns, key_column=key_column, drop_key=drop_key, )
[docs] def read_rows_by_values( db_path: str | Path, table: str, *, column: str, is_in: list[object] | tuple[object, ...], select: str | list[str] | tuple[str, ...], ) -> pl.DataFrame: """Return selected columns for rows whose one column matches provided values. Parameters ---------- db_path : str | Path DuckDB database file path. table : str Source table name, optionally schema-qualified. column : str Column matched against ``is_in``. is_in : list[object] | tuple[object, ...] Values to include. select : str | list[str] | tuple[str, ...] Columns to return. Returns ------- pl.DataFrame Matching rows with the selected columns. Raises ------ ValueError If ``column`` or ``select`` is empty. """ normalized_column = str(column).strip() if not normalized_column: raise ValueError("column must be non-empty.") selected_columns = _normalize_selected_columns(select) quoted_table = _quote_table_ref(table) quoted_column = _quote_identifier(normalized_column) selected_sql = _qualified_columns("source_rows", selected_columns) resolved_db_path = Path(db_path).expanduser().resolve() resolved_db_path.parent.mkdir(parents=True, exist_ok=True) connection = duckdb.connect(resolved_db_path) try: if not is_in: return connection.execute( f""" SELECT {selected_sql} FROM {quoted_table} AS source_rows WHERE 1 = 0 """ ).pl() lookup = pl.DataFrame({"lookup_value": list(is_in)}).unique(maintain_order=True) connection.register("__data_engine_lookup_values", lookup) connection.execute( "CREATE OR REPLACE TEMP TABLE __data_engine_lookup_values_table AS SELECT * FROM __data_engine_lookup_values" ) return connection.execute( f""" SELECT {selected_sql} FROM {quoted_table} AS source_rows INNER JOIN __data_engine_lookup_values_table AS lookup ON source_rows.{quoted_column} IS NOT DISTINCT FROM lookup.lookup_value """ ).pl() finally: connection.close()
[docs] def read_sql( db_path: str | Path, *, sql: str, ) -> pl.DataFrame: """Run one SQL query and return the result as a Polars DataFrame. Parameters ---------- db_path : str | Path DuckDB database file path. sql : str SQL query text. Returns ------- pl.DataFrame Query result. Raises ------ ValueError If ``sql`` is empty. """ normalized_sql = str(sql).strip() if not normalized_sql: raise ValueError("sql must be non-empty.") resolved_db_path = Path(db_path).expanduser().resolve() resolved_db_path.parent.mkdir(parents=True, exist_ok=True) connection = duckdb.connect(resolved_db_path) try: return connection.execute(normalized_sql).pl() finally: connection.close()
[docs] def read_table( db_path: str | Path, table: str, *, select: str | list[str] | tuple[str, ...] = "*", where: str | None = None, limit: int | None = None, ) -> pl.DataFrame: """Read rows from one table with optional column selection, filter, and limit. Parameters ---------- db_path : str | Path DuckDB database file path. table : str Source table name, optionally schema-qualified. select : str | list[str] | tuple[str, ...] Columns to return, or ``"*"`` for all columns. where : str | None Optional SQL ``WHERE`` expression without the ``WHERE`` keyword. limit : int | None Optional maximum number of rows. Returns ------- pl.DataFrame Table rows matching the requested projection and filter. """ quoted_table = _quote_table_ref(table) normalized_where = None if where is None else str(where).strip() normalized_limit = _normalize_optional_limit(limit) if select == "*": selected_sql = "*" else: selected_columns = _normalize_selected_columns(select) selected_sql = _ordered_columns(selected_columns) query_parts = [f"SELECT {selected_sql}", f"FROM {quoted_table}"] if normalized_where: query_parts.append(f"WHERE {normalized_where}") if normalized_limit is not None: query_parts.append(f"LIMIT {normalized_limit}") return read_sql(db_path, sql="\n".join(query_parts))
[docs] def replace_table( db_path: str | Path, table: str, *, df: FrameLike, return_df: bool = True, ) -> pl.DataFrame | None: """Replace one table wholesale from a dataframe. Parameters ---------- db_path : str | Path DuckDB database file path. table : str Destination table name, optionally schema-qualified. df : FrameLike Replacement rows. New columns are added to the destination table. LazyFrames are collected before DuckDB operations run. return_df : bool Whether to return the inserted dataframe. Returns ------- pl.DataFrame | None The inserted dataframe when ``return_df`` is true; otherwise ``None``. Raises ------ ValueError If identifiers or dataframe columns are invalid. A ``TypeError`` is raised by shared frame normalization when ``df`` is not a Polars DataFrame or LazyFrame. Exception Re-raises DuckDB transaction failures after rollback. """ df = _materialize_frame(df) df_columns = tuple(df.columns) if not df_columns: raise ValueError("df must include at least one column.") quoted_table = _quote_table_ref(table) quoted_schema = _schema_ref(table) quoted_df_columns = _ordered_columns(df_columns) temp_view = "__data_engine_replace_table_df" temp_table = "__data_engine_replace_table_df_table" resolved_db_path = Path(db_path).expanduser().resolve() resolved_db_path.parent.mkdir(parents=True, exist_ok=True) connection = duckdb.connect(resolved_db_path) try: connection.execute("BEGIN TRANSACTION") if quoted_schema is not None: connection.execute(f"CREATE SCHEMA IF NOT EXISTS {quoted_schema}") connection.register(temp_view, df) connection.execute(f"CREATE OR REPLACE TEMP TABLE {temp_table} AS SELECT * FROM {temp_view}") connection.execute(f"CREATE TABLE IF NOT EXISTS {quoted_table} AS SELECT * FROM {temp_table} WHERE 1 = 0") existing_columns = {name: dtype for _, name, dtype, *_ in _existing_table_columns(connection, table)} incoming_info = connection.execute(f"PRAGMA table_info({temp_table})").fetchall() for _, name, dtype, *_ in incoming_info: if name in existing_columns: continue connection.execute(f"ALTER TABLE {quoted_table} ADD COLUMN {_quote_identifier(name)} {dtype}") connection.execute(f"DELETE FROM {quoted_table}") connection.execute( f""" INSERT INTO {quoted_table} ({quoted_df_columns}) SELECT {quoted_df_columns} FROM {temp_table} """ ) if not return_df: connection.execute("COMMIT") return None connection.execute("COMMIT") return df except Exception: try: connection.execute("ROLLBACK") except Exception: pass raise finally: connection.close()
__all__ = [ "attach_dimension", "build_dimension", "denormalize_columns", "normalize_columns", "read_rows_by_values", "read_sql", "read_table", "replace_rows_by_file", "replace_rows_by_values", "replace_table", ]