Source code for data_engine.helpers.excel

"""Excel workbook composition helpers for Data Engine flow authoring."""

from __future__ import annotations

from collections.abc import Sequence
from dataclasses import dataclass, field
import os
from pathlib import Path
import re
import shutil
import time
from typing import Any
from uuid import uuid4

import polars as pl
import xlsxwriter

PathLike = str | os.PathLike[str]
ExcelFrame = pl.DataFrame | pl.LazyFrame

_INVALID_SHEET_CHARS = set("[]:*?/\\")
_CELL_REFERENCE_RE = re.compile(r"^[A-Za-z]{1,3}[1-9][0-9]*$")
_TABLE_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")


[docs] @dataclass(frozen=True) class ExcelSheet: """Specification for one dataframe-backed worksheet in an Excel workbook. Attributes ---------- name : str Worksheet name. Names must follow Excel's normal sheet-name limits. df : ExcelFrame Dataframe to write. Lazy frames are collected when the workbook is composed. table_name : str | None Optional Excel table name for the written dataframe. position : str | tuple[int, int] Top-left cell where the dataframe table should be written. table_style : str | dict[str, Any] | None Optional table style forwarded to ``pl.DataFrame.write_excel``. autofit : bool Whether Polars should autofit columns after writing. autofilter : bool Whether the generated table should include filter controls. freeze_panes : object | None Optional freeze-pane setting forwarded to ``pl.DataFrame.write_excel``. write_options : dict[str, Any] Additional keyword options forwarded to ``pl.DataFrame.write_excel``. """ name: str df: ExcelFrame table_name: str | None = None position: str | tuple[int, int] = "A1" table_style: str | dict[str, Any] | None = None autofit: bool = True autofilter: bool = True freeze_panes: object | None = None write_options: dict[str, Any] = field(default_factory=dict)
[docs] def compose_excel(path: PathLike, sheets: Sequence[ExcelSheet], *, template: PathLike | None = None) -> Path: """Compose one Excel workbook from dataframe-backed sheet specifications. The workbook is written to a same-directory temporary file and then moved into place with ``os.replace``. Each sheet is written through Polars' Excel writer, so sheet-level options map directly to ``pl.DataFrame.write_excel`` behavior. Parameters ---------- path : PathLike Target Excel workbook path. sheets : Sequence[ExcelSheet] One or more worksheet specifications to write. template : PathLike | None Optional template workbook path. When provided, the template workbook is copied to a temporary file, the requested sheets are updated, and the composed copy atomically replaces ``path``. Returns ------- Path Absolute target path that was replaced. Raises ------ Exception If workbook validation, dataframe collection, Excel writing, or atomic replacement fails. Examples -------- .. code-block:: python import polars as pl from data_engine.helpers import ExcelSheet, compose_excel compose_excel( "workspaces/example/output/report.xlsx", sheets=[ ExcelSheet( name="Claims", df=pl.DataFrame({"claim_id": [1, 2]}), table_name="claims", freeze_panes="A2", ), ], ) """ normalized_sheets = tuple(sheets) _validate_sheets(normalized_sheets) target_path = Path(path).expanduser().resolve() target_path.parent.mkdir(parents=True, exist_ok=True) temporary_path = target_path.with_name(f".{target_path.name}.{uuid4().hex}.tmp.xlsx") if template is not None: return _compose_template_excel( target_path, temporary_path, sheets=normalized_sheets, template=template, ) workbook = xlsxwriter.Workbook(temporary_path) try: for sheet in normalized_sheets: frame = _collect_frame(sheet.df) options = dict(sheet.write_options) frame.write_excel( workbook, worksheet=sheet.name, position=sheet.position, table_name=sheet.table_name, table_style=sheet.table_style, autofit=sheet.autofit, autofilter=sheet.autofilter, freeze_panes=sheet.freeze_panes, **options, ) workbook.close() _replace_atomic(temporary_path, target_path) except Exception: try: workbook.close() except Exception: pass _remove_temporary_file(temporary_path) raise return target_path
def _compose_template_excel( target_path: Path, temporary_path: Path, *, sheets: tuple[ExcelSheet, ...], template: PathLike, ) -> Path: try: from openpyxl import load_workbook except ImportError as exc: raise ImportError("compose_excel(template=...) requires openpyxl.") from exc template_path = Path(template).expanduser().resolve() if not template_path.is_file(): raise ValueError(f"template workbook does not exist: {template_path}") try: shutil.copy2(template_path, temporary_path) workbook = load_workbook(temporary_path) for sheet in sheets: frame = _collect_frame(sheet.df) _replace_template_sheet(workbook, sheet, frame) workbook.save(temporary_path) _replace_atomic(temporary_path, target_path) except Exception: _remove_temporary_file(temporary_path) raise return target_path def _replace_template_sheet(workbook: Any, sheet: ExcelSheet, frame: pl.DataFrame) -> None: if sheet.name in workbook.sheetnames: worksheet = workbook[sheet.name] else: worksheet = workbook.create_sheet(sheet.name) _clear_worksheet_data(worksheet) start_row, start_column = _position_to_one_based(sheet.position) for column_offset, column_name in enumerate(frame.columns): worksheet.cell(row=start_row, column=start_column + column_offset, value=column_name) for row_offset, values in enumerate(frame.iter_rows(), start=1): for column_offset, value in enumerate(values): worksheet.cell(row=start_row + row_offset, column=start_column + column_offset, value=value) if sheet.table_name is not None and frame.columns: _add_openpyxl_table( worksheet, table_name=sheet.table_name, start_row=start_row, start_column=start_column, column_count=len(frame.columns), row_count=frame.height, table_style=sheet.table_style, ) if sheet.freeze_panes is not None: worksheet.freeze_panes = sheet.freeze_panes def _clear_worksheet_data(worksheet: Any) -> None: for table_name in list(worksheet.tables): del worksheet.tables[table_name] if worksheet.max_row and worksheet.max_column: for row in worksheet.iter_rows(): for cell in row: cell.value = None def _add_openpyxl_table( worksheet: Any, *, table_name: str, start_row: int, start_column: int, column_count: int, row_count: int, table_style: str | dict[str, Any] | None, ) -> None: from openpyxl.utils import get_column_letter from openpyxl.worksheet.table import Table, TableStyleInfo end_row = start_row + max(row_count, 1) end_column = start_column + column_count - 1 ref = ( f"{get_column_letter(start_column)}{start_row}:" f"{get_column_letter(end_column)}{end_row}" ) table = Table(displayName=table_name, ref=ref) style_name = table_style if isinstance(table_style, str) else "TableStyleMedium2" table.tableStyleInfo = TableStyleInfo( name=style_name, showFirstColumn=False, showLastColumn=False, showRowStripes=True, showColumnStripes=False, ) worksheet.add_table(table) def _position_to_one_based(position: str | tuple[int, int]) -> tuple[int, int]: if isinstance(position, str): from openpyxl.utils.cell import coordinate_to_tuple try: return coordinate_to_tuple(position) except ValueError as exc: raise ValueError(f"position must be an Excel cell reference or a row/column tuple: {position!r}") from exc if len(position) != 2: raise ValueError("position tuple must contain row and column.") row, column = position if row < 0 or column < 0: raise ValueError("position tuple values must be zero or greater.") return row + 1, column + 1 def _collect_frame(df: ExcelFrame) -> pl.DataFrame: if isinstance(df, pl.DataFrame): return df if isinstance(df, pl.LazyFrame): return df.collect() raise ValueError("ExcelSheet.df must be a Polars DataFrame or LazyFrame.") def _validate_sheets(sheets: tuple[ExcelSheet, ...]) -> None: if not sheets: raise ValueError("sheets must contain at least one ExcelSheet.") seen_sheet_names: set[str] = set() seen_table_names: set[str] = set() for sheet in sheets: if not isinstance(sheet, ExcelSheet): raise ValueError("sheets must contain only ExcelSheet values.") normalized_sheet_name = _validate_sheet_name(sheet.name) if normalized_sheet_name in seen_sheet_names: raise ValueError(f"sheet names must be unique: {sheet.name!r}") seen_sheet_names.add(normalized_sheet_name) if sheet.table_name is not None: normalized_table_name = _validate_table_name(sheet.table_name) if normalized_table_name in seen_table_names: raise ValueError(f"table names must be unique: {sheet.table_name!r}") seen_table_names.add(normalized_table_name) def _validate_sheet_name(name: str) -> str: normalized = name.strip() if not normalized: raise ValueError("sheet names must not be blank.") if len(normalized) > 31: raise ValueError(f"sheet name {name!r} must be 31 characters or fewer.") if any(char in _INVALID_SHEET_CHARS for char in normalized): raise ValueError(f"sheet name {name!r} contains an invalid Excel sheet-name character.") return normalized.casefold() def _validate_table_name(name: str) -> str: normalized = name.strip() if not normalized: raise ValueError("table names must not be blank.") if len(normalized) > 255: raise ValueError(f"table name {name!r} must be 255 characters or fewer.") if not _TABLE_NAME_RE.fullmatch(normalized): raise ValueError( f"table name {name!r} must start with a letter or underscore and contain only letters, numbers, and underscores." ) if _CELL_REFERENCE_RE.fullmatch(normalized): raise ValueError(f"table name {name!r} must not look like an Excel cell reference.") return normalized.casefold() def _replace_atomic(source_path: Path, target_path: Path) -> None: backoff_seconds = (0.0, 0.02, 0.05, 0.1, 0.2) last_error: BaseException | None = None for delay_seconds in backoff_seconds: if delay_seconds > 0.0: time.sleep(delay_seconds) try: os.replace(source_path, target_path) return except PermissionError as exc: if os.name != "nt" or getattr(exc, "winerror", None) != 5: raise last_error = exc continue if last_error is not None: raise last_error os.replace(source_path, target_path) def _remove_temporary_file(path: Path) -> None: try: path.unlink() except FileNotFoundError: pass except OSError: pass __all__ = ["ExcelSheet", "compose_excel"]