DuckDB Helpers

data_engine.helpers.duckdb is the first public helper layer for common warehouse-style authoring patterns.

The function-by-function reference lives in the helper docstrings and is rendered in the API reference. Keep signature details and copyable examples beside the functions in src/data_engine/helpers/duckdb/; that keeps editor hover help and the packaged docs aligned. This page explains the shared design shape, the current helper coverage, and when the helper family is a good fit.

These helpers are intentionally:

  • one-shot

  • explicit about the database path

  • explicit about the target table

  • responsible for their own connection lifecycle

That means each helper:

  • opens DuckDB

  • does one job

  • commits or rolls back

  • closes the connection

They are designed for flow code that wants less repeated SQL plumbing without hiding too much behavior.

Import style

from data_engine.helpers.duckdb import build_dimension
from data_engine.helpers.duckdb import attach_dimension
from data_engine.helpers.duckdb import compact_database
from data_engine.helpers.duckdb import denormalize_columns
from data_engine.helpers.duckdb import ensure_index
from data_engine.helpers.duckdb import normalize_columns
from data_engine.helpers.duckdb import read_rows_by_values
from data_engine.helpers.duckdb import read_sql
from data_engine.helpers.duckdb import read_table
from data_engine.helpers.duckdb import replace_rows_by_file
from data_engine.helpers.duckdb import replace_rows_by_values
from data_engine.helpers.duckdb import replace_table

The expected path pattern is:

db_path = context.database("docs/analytics.duckdb")

You can also use a mirrored output path or any other DuckDB file path you control. The helpers work with any DuckDB path you provide.

Shared conventions

The current helper family uses a shared shape:

helper_name(
    db_path,
    table,
    *,
    ...
)

Notes:

  • db_path is positional and required

  • table is positional and required

  • df is the incoming Polars dataframe when the helper works on one

  • return_df=True means “return the dataframe result for this helper”

  • identifiers such as table names and column names are quoted safely, including reserved words such as group

  • schema-qualified tables such as "mart.fact_claim" are supported

  • parent directories for the DuckDB path are created before connecting

  • Polars LazyFrame inputs are collected before writing or joining

  • select="*" means “all persisted table columns” for helpers that accept it

build_dimension(...)

Use this helper when you already have a dataframe trimmed down to only the natural-key columns and want to persist or extend a surrogate-key table.

Signature:

build_dimension(
    db_path,
    table,
    *,
    df,
    key_column="dimension_key",
    return_df=True,
)

Behavior:

  • treats every column in df as part of the natural key

  • requires at least one incoming column

  • rejects a key_column that already exists in df

  • creates the target schema when table is schema-qualified

  • creates the table if it does not exist

  • creates a unique index over the natural-key columns

  • inserts only missing unique combinations

  • assigns integer surrogate keys after the current maximum key, ordered by natural-key values

  • returns the natural-key-to-surrogate-key mapping when return_df=True

Example:

mapping = build_dimension(
    context.database("warehouse.duckdb"),
    "mart.dim_member",
    df=member_keys_df,
    key_column="member_key",
)

Returned mapping:

member_id | lob | member_key

attach_dimension(...)

Use this helper when the surrogate-key table already exists and you only want to join the key back onto a dataframe.

Signature:

attach_dimension(
    db_path,
    table,
    *,
    df,
    on,
    key_column="dimension_key",
    drop_key=False,
)

Key arguments:

  • on can be one column name or a list of column names

  • every on column must exist in df

  • drop_key=False keeps the natural-key columns by default

  • set drop_key=True when you want the attached surrogate key without the original key columns

Example:

attached = attach_dimension(
    context.database("warehouse.duckdb"),
    "mart.dim_member",
    df=docs_df,
    on=["member_id", "lob"],
    key_column="member_key",
)

normalize_columns(...)

Use this helper when you want to build missing surrogate keys and immediately attach them back onto the full dataframe.

Signature:

normalize_columns(
    db_path,
    table,
    *,
    df,
    on,
    key_column="dimension_key",
    drop_key=True,
    returns="df",
)

Key arguments:

  • on can be one column name or a list of column names

  • every on column is treated as part of the natural key

  • drop_key=True removes the natural-key columns after the surrogate key is joined back

  • returns="df" returns the normalized dataframe

  • returns="map" returns only the persisted mapping

  • returns=None performs side effects only

Example:

normalized = normalize_columns(
    context.database("warehouse.duckdb"),
    "mart.dim_member",
    df=docs_df,
    on=["member_id", "lob"],
    key_column="member_key",
)

If docs_df starts with:

member_id | lob | amount

Then normalized becomes:

amount | member_key

This helper uses build_dimension(...) and attach_dimension(...) internally.

denormalize_columns(...)

Use this helper when your dataframe already has a surrogate key and you want to attach the natural columns back from the persisted dimension table.

Signature:

denormalize_columns(
    db_path,
    table,
    *,
    df,
    key_column="dimension_key",
    select="*",
    drop_key=False,
)

Key arguments:

  • key_column is the surrogate key used to join from df into the dimension table

  • select="*" attaches every non-key column from the dimension table; this is the default

  • select=[...] lets you attach only a subset of natural columns

  • select must not include key_column

  • drop_key=False keeps the surrogate key by default

Example:

denormalized = denormalize_columns(
    context.database("warehouse.duckdb"),
    "mart.dim_member",
    df=fact_df,
    key_column="member_key",
)

replace_rows_by_file(...)

Use this helper when one incoming dataframe represents the full current contents for one source file.

Signature:

replace_rows_by_file(
    db_path,
    table,
    *,
    df,
    file_hash,
    file_hash_column="file_key",
    return_df=True,
)

Behavior:

  • adds a constant file-hash column to df

  • rejects a blank file_hash or a file_hash_column that already exists in df

  • creates the target schema when table is schema-qualified

  • creates the table if it does not exist

  • expands the table schema when new columns appear

  • deletes existing rows for that file hash

  • appends the current batch

  • returns the incoming dataframe with the added file-hash column when return_df=True

Example:

updated = replace_rows_by_file(
    context.database("warehouse.duckdb"),
    "canon.claim_rows",
    df=docs_df,
    file_hash=context.metadata["file_hash"],
)

This is the usual pattern for canon-style “replace one file slice” loading.

ensure_index(...)

Use this helper when repeated slice replacements or lookups need to find a small set of rows inside a large table.

Signature:

ensure_index(
    db_path,
    table,
    *,
    columns,
    name=None,
)

Behavior:

  • validates that the table and columns exist

  • creates a DuckDB index only when it does not already exist

  • generates a stable index name when name is omitted

  • returns the index name

Examples:

ensure_index(
    context.database("warehouse.duckdb"),
    "canon.claim_rows",
    columns="file_key",
)

ensure_index(
    context.database("warehouse.duckdb"),
    "mart.fact_claim",
    columns="claim_id",
    name="idx_fact_claim_claim_id",
)

Indexes are most useful when the indexed column is used repeatedly by helpers such as replace_rows_by_file(...), replace_rows_by_values(...), or read_rows_by_values(...) against a large table. They use extra disk and make writes maintain the index, so create them intentionally for hot lookup paths.

When name is omitted, Data Engine generates an idx_de_... name from the table and column list. If you pass name, it is used as the DuckDB index name and must be non-empty.

replace_rows_by_values(...)

Use this helper when one incoming dataframe represents the full current contents for one logical value slice.

Signature:

replace_rows_by_values(
    db_path,
    table,
    *,
    df,
    column,
    return_df=True,
)

Behavior:

  • requires df to contain at least one row

  • takes the distinct values from df[column]

  • deletes existing rows in the target table where column matches any of those values

  • matches null values with DuckDB’s IS NOT DISTINCT FROM semantics

  • appends the current batch

  • creates the target schema when table is schema-qualified

  • creates and expands the table as needed

  • returns the incoming dataframe when return_df=True

Example:

updated = replace_rows_by_values(
    context.database("warehouse.duckdb"),
    "mart.fact_claim",
    df=docs_for_open_status,
    column="status",
)

That says: “replace every persisted status slice represented by this batch, then insert this batch.”

compact_database(...)

Use this helper for explicit maintenance flows when you want to clean one DuckDB file after a period of ingestion.

Signature:

compact_database(
    db_path,
    *,
    tables=None,
    drop_all_null_columns=True,
    vacuum=True,
)

Behavior:

  • inspects one or more user tables in the database

  • drops columns whose persisted values are entirely null

  • preserves at least one column per table

  • drops indexes before column removal and recreates indexes that still apply

  • reports indexes that could not be recreated because compaction removed one of their columns

  • optionally runs VACUUM after schema cleanup

  • returns a Polars summary dataframe with dropped-column, index, and file-size metadata

Example:

summary = compact_database(
    context.database("warehouse.duckdb"),
    tables=["mart.fact_claim", "mart.fact_member"],
    vacuum=True,
)

This is a good fit for manual maintenance flows where you want a simple one-liner per database.

Compaction targets all user tables when tables=None. Pass one table name, a list, or a tuple to compact a smaller set. Missing requested tables raise a ValueError.

The summary dataframe includes:

  • db_path

  • table

  • dropped_column_count

  • dropped_columns

  • indexes_dropped

  • indexes_recreated

  • indexes_skipped

  • vacuum_requested

  • vacuumed

  • size_before_bytes

  • size_after_bytes

read_rows_by_values(...)

Use this helper when you want a small filtered lookup out of DuckDB as a Polars dataframe.

Signature:

read_rows_by_values(
    db_path,
    table,
    *,
    column,
    is_in,
    select,
)

Behavior:

  • returns rows where column matches one of the provided values

  • returns only the selected columns, or all columns when select="*"

  • uses a temporary lookup table internally, which works better than manually assembling long SQL IN (...) strings

  • preserves the first-seen order of distinct lookup values

  • matches None lookup values to persisted nulls

  • returns an empty dataframe with the selected schema when is_in is empty

Example:

existing = read_rows_by_values(
    context.database("warehouse.duckdb"),
    "mart.fact_claim",
    column="claim_id",
    is_in=[1001, 1002, 1003],
    select=["claim_id", "member_key", "amount"],
)

read_sql(...)

Use this helper when you already have the exact DuckDB query you want and just need the result as a Polars dataframe.

Signature:

read_sql(
    db_path,
    *,
    sql,
)

Example:

result = read_sql(
    context.database("warehouse.duckdb"),
    sql="""
        SELECT claim_id, amount
        FROM mart.fact_claim
        WHERE amount >= 100
    """,
)

This is the most direct read helper. If you already know the SQL you want, use this.

read_table(...)

Use this helper when you want a lightweight table reader without writing the whole SQL statement.

Signature:

read_table(
    db_path,
    table,
    *,
    select="*",
    where=None,
    limit=None,
)

Example:

result = read_table(
    context.database("warehouse.duckdb"),
    "mart.fact_claim",
    select=["claim_id", "amount"],
    where='"amount" >= 100',
    limit=100,
)

This helper is intentionally small:

  • select can be "*" or a list of column names; "*" is the default

  • where is passed through as SQL

  • limit is optional and must be non-negative

replace_table(...)

Use this helper when you want to replace the entire contents of one table with the current dataframe.

Signature:

replace_table(
    db_path,
    table,
    *,
    df,
    return_df=True,
)

Behavior:

  • requires df to include at least one column

  • creates the target schema when table is schema-qualified

  • creates the table if it does not exist

  • expands the table schema when new columns appear

  • deletes all existing rows

  • inserts the current dataframe

  • returns the incoming dataframe when return_df=True

Example:

replace_table(
    context.database("warehouse.duckdb"),
    "mart.current_snapshot",
    df=snapshot_df,
)

This is the simplest full-refresh write helper in the current set.

Design guidance

These helpers are best when:

  • the database path is stable

  • table ownership is clear

  • the dataframe shape is already mostly what you want

  • you want predictable transactional behavior

These helpers support common repeated patterns in flow code. Steps that need custom joins, custom window logic, or highly specific query behavior can use plain DuckDB directly.

When to use direct DuckDB instead

Prefer direct DuckDB code when:

  • the operation is highly custom

  • you want several SQL statements in one step

  • you want full manual control over relation registration, temp tables, or query flow

The helpers remove repeated boilerplate and keep common warehouse-style operations concise.