API Reference
This reference renders the docstrings for the public authoring surface and the
helper modules intended for flow code. For task-oriented examples, start with
the author guides:
The package entrypoints most flow authors import are:
Top-Level Package
Top-level package for the Data Engine workbook runtime.
Flow Authoring
Flow DSL and public authoring entrypoints.
-
class Flow(group, name=None, label=None, trigger=None, mirror_spec=None, steps=(), _workspace_root=None)[source]
Bases: Flow
Public authoring flow with execution conveniences layered over core definitions.
- Parameters:
group (str)
name (str | None)
label (str | None)
trigger (WatchSpec | None)
mirror_spec (MirrorSpec | None)
steps (tuple[StepSpec, ...])
_workspace_root (Path | None)
-
run_once(*, authoring_services=None, runtime_execution_service=None)[source]
Run this flow once and return completed runtime contexts.
- Parameters:
authoring_services (AuthoringServices | None) – Optional service bundle used by tests or embedded hosts.
runtime_execution_service (RuntimeExecutionService | None) – Optional runtime execution service override.
- Returns:
One context per executed source.
- Return type:
list[FlowContext]
-
preview(*, use=None, authoring_services=None, runtime_execution_service=None)[source]
Run this flow in preview mode and return one preview value.
- Parameters:
use (str | None) – Optional named object slot to preview instead of the final current
value.
authoring_services (AuthoringServices | None) – Optional service bundle used by tests or embedded hosts.
runtime_execution_service (RuntimeExecutionService | None) – Optional runtime execution service override.
- Returns:
Preview value returned by the runtime execution service.
- Return type:
object
- Raises:
FlowValidationError – If preview is requested from inside a compiled flow module.
-
run(*, authoring_services=None, runtime_execution_service=None)[source]
Run this flow continuously according to its trigger.
- Parameters:
authoring_services (AuthoringServices | None) – Optional service bundle used by tests or embedded hosts.
runtime_execution_service (RuntimeExecutionService | None) – Optional runtime execution service override.
- Returns:
Completed contexts collected before the runtime exits.
- Return type:
list[FlowContext]
-
discover_flows(*, data_root=None, authoring_services=None, flow_execution_service=None)[source]
Discover and build all code-defined flows from compiled flow modules.
- Parameters:
-
- Return type:
tuple[Flow, …]
-
load_flow(name, *, data_root=None, authoring_services=None, flow_execution_service=None)[source]
Load one code-defined flow by flow-module name.
- Parameters:
-
- Return type:
Flow
-
run(*flows, authoring_services=None, runtime_execution_service=None)[source]
Run multiple flows with sequential execution per group and parallel groups.
- Parameters:
-
- Return type:
list[FlowContext]
Core Flow Model
Pure flow definitions and fluent builder methods.
-
class Flow(group, name=None, label=None, trigger=None, mirror_spec=None, steps=(), _workspace_root=None)[source]
Bases: object
Immutable fluent builder for generic runtime flows.
Flow is the authoring object most flow modules return. Builder methods
never mutate the existing instance; each method returns a new flow so chained
definitions stay predictable and easy to review.
Examples
from data_engine import Flow
flow = (
Flow(group="Docs")
.watch(mode="poll", source="incoming", interval="30s", extensions=[".xlsx"])
.mirror(root="processed")
)
assert flow.mode == "poll"
- Variables:
group (str) – Display group used by the GUI, TUI, and runtime summaries.
name (str | None) – Stable flow identifier. When omitted in a flow module, the module loader
can derive it from the module name.
label (str | None) – Optional human-readable display label.
trigger (WatchSpec | None) – Runtime trigger configuration.
mirror_spec (MirrorSpec | None) – Mirrored output path configuration.
steps (tuple[StepSpec, ...]) – Ordered callable steps to run.
- Parameters:
group (str)
name (str | None)
label (str | None)
trigger (WatchSpec | None)
mirror_spec (MirrorSpec | None)
steps (tuple[StepSpec, ...])
_workspace_root (Path | None)
Notes
Builder methods return a new Flow instance, so calls can be chained.
-
group: str
-
name: str | None = None
-
label: str | None = None
-
trigger: WatchSpec | None = None
-
mirror_spec: MirrorSpec | None = None
-
steps: tuple[StepSpec, ...] = ()
-
watch(*, mode, run_as='individual', max_parallel=1, source=None, interval=None, time=None, extensions=None, settle=1)[source]
Configure how this flow is triggered.
Use manual for explicit operator-driven runs, poll when source
file changes should trigger work, and schedule when time should
trigger work. run_as="individual" runs once per concrete source
file; run_as="batch" runs once for the watched source root.
Examples
from data_engine import Flow
manual_flow = Flow(group="Docs").watch(mode="manual")
poll_flow = Flow(group="Docs").watch(mode="poll", source="incoming", interval="5s")
schedule_flow = Flow(group="Docs").watch(mode="schedule", interval="15m")
assert manual_flow.mode == "manual"
assert poll_flow.mode == "poll"
assert schedule_flow.mode == "schedule"
- Parameters:
mode (str) – Trigger mode: "manual", "poll", or "schedule".
run_as (str) – "individual" to run once per source file, or "batch" to run
once for the full source set.
max_parallel (int) – Maximum number of concurrent source-file runs for one flow when
run_as="individual". Defaults to 1.
source (str | Path | None) – File or directory watched by poll/schedule triggers.
interval (str | None) – Duration string such as "10s" or "5m" for poll intervals or
recurring schedules.
time (str | tuple[str, ...] | list[str] | set[str] | None) – One or more HH:MM daily schedule times.
extensions (tuple[str, ...] | list[str] | set[str] | None) – Optional file extensions used when discovering source files.
settle (int) – Polling settle window in seconds before a changed file is queued.
- Returns:
A new flow with the trigger configuration attached.
- Return type:
Flow
- Raises:
FlowValidationError – If the trigger mode, source, schedule, or polling options are
inconsistent.
-
mirror(*, root)[source]
Bind a mirrored output namespace rooted at one directory.
mirror only configures paths. It does not write files by itself; the
runtime later exposes write-ready paths through context.mirror.
Examples
from data_engine import Flow
flow = Flow(group="Docs").mirror(root="processed")
assert str(flow.mirror_spec.root).endswith("processed")
- Parameters:
root (str | Path) – Directory used by context.mirror helpers when writing outputs.
- Returns:
A new flow with mirror output helpers enabled.
- Return type:
Flow
-
step(fn, *, use=None, save_as=None, label=None)[source]
Append one callable step to the flow.
step is the default workhorse for flow authoring. The callable
receives one FlowContext and its return value becomes the next
context.current. Use save_as to keep an intermediate result under
context.objects and use to load a saved object before a later
step runs.
Examples
from data_engine import Flow
def read_docs(context):
return context.current
flow = Flow(group="Docs").step(read_docs, save_as="raw_df")
assert flow.steps[0].save_as == "raw_df"
- Parameters:
fn (Callable[[FlowContext], object]) – Callable that accepts a single FlowContext and returns the next
value for context.current.
use (str | None) – Optional named object slot to load into context.current before
the step runs.
save_as (str | None) – Optional named object slot to store the step result in.
label (str | None) – Optional display label for logs and UI step summaries.
- Returns:
A new flow with the step appended.
- Return type:
Flow
- Raises:
FlowValidationError – If fn is not callable or does not accept exactly one argument.
-
map(fn, *, use=None, save_as=None, label=None)[source]
Append a step that maps a callable over the current iterable value.
map is best when the same callable should run once per collected
item. The callable may accept either item or context, item and
the mapped values are returned as a Batch.
Examples
from data_engine import Flow
def summarize(file_ref):
return file_ref.name
flow = Flow(group="Docs").collect(extensions=[".xlsx"]).map(fn=summarize)
assert len(flow.steps) == 2
- Parameters:
fn (Callable[..., object]) – Callable accepting either item or context, item.
use (str | None) – Optional named object slot to map instead of the current value.
save_as (str | None) – Optional named object slot to store the mapped Batch result in.
label (str | None) – Optional display label for logs and UI step summaries.
- Returns:
A new flow with the mapping step appended.
- Return type:
Flow
- Raises:
FlowValidationError – If fn is not callable, has an unsupported signature, or the
mapped current value is not iterable.
-
collect(extensions, *, root=None, recursive=False, use=None, save_as=None, label=None)[source]
Append a step that collects source files into a Batch.
If root is omitted, collection uses the active source root from the
runtime context. This keeps scheduled or poll-driven batch flows concise
because the watched source directory can also be the collection root.
Examples
from data_engine import Flow
flow = Flow(group="Docs").collect(extensions=[".pdf"], recursive=True)
assert flow.steps[0].label == "Collect Files"
- Parameters:
extensions (tuple[str, ...] | list[str] | set[str]) – File extensions to include, such as (".xlsx", ".csv").
root (str | Path | None) – Optional search root. Defaults to the active source root.
recursive (bool) – Whether to search child directories.
use (str | None) – Optional named object slot to load before collecting.
save_as (str | None) – Optional named object slot to store the collected batch in.
label (str | None) – Optional display label for logs and UI step summaries.
- Returns:
A new flow with the collection step appended.
- Return type:
Flow
-
step_each(fn, *, use=None, save_as=None, label=None)[source]
Alias for map that reads naturally in step chains.
- Parameters:
fn (Callable[..., object]) – Callable accepting either item or context, item.
use (str | None) – Optional named object slot to map instead of the current value.
save_as (str | None) – Optional named object slot to store the mapped Batch result in.
label (str | None) – Optional display label for logs and UI step summaries.
- Returns:
A new flow with the per-item step appended.
- Return type:
Flow
-
property mode: str
Return the trigger mode or manual for unconfigured flows.
Core Primitives
Core flow specs, contexts, and small containers.
-
class Batch(items)[source]
Bases: Generic[T]
Small iterable runtime container used instead of exposing raw lists by default.
- Parameters:
items (tuple[T, ...])
-
items: tuple[T, ...]
-
names()[source]
Return each item name when all items expose a string name.
- Return type:
tuple[str, …]
-
paths()[source]
Return each item path when all items expose a Path-valued path.
- Return type:
tuple[Path, …]
-
class FileRef(path)[source]
Bases: object
Thin runtime wrapper for one filesystem path in a batch-oriented flow.
- Parameters:
path (Path)
-
path: Path
-
property name: str
Return the file name including extension.
-
property stem: str
Return the file name without extension.
-
property suffix: str
Return the file extension.
-
property parent: Path
Return the parent directory.
-
exists()[source]
Return whether the referenced path currently exists.
- Return type:
bool
-
class FlowContext(flow_name, group, source=None, mirror=None, current=None, objects=<factory>, metadata=<factory>, config=<factory>, debug=None)[source]
Bases: object
Mutable runtime state shared across steps during one flow execution.
Steps receive a FlowContext object. current is the active value,
objects stores named intermediate values created with save_as,
metadata holds runtime annotations, and source/mirror expose
source and output path helpers when the flow configuration provides them.
- Variables:
flow_name (str) – Stable flow name for the current execution.
group (str) – Flow group used by operator surfaces.
source (SourceContext | None) – Source path helper for source-backed executions.
mirror (MirrorContext | None) – Write-ready mirrored output helper when the flow configured a mirror.
current (object | None) – Active value passed between steps.
objects (dict[str, object]) – Named intermediate values saved by save_as.
metadata (dict[str, object]) – Runtime metadata attached to the execution.
config (WorkspaceConfigContext) – Lazy workspace config reader.
debug (FlowDebugContext | None) – Optional debug artifact writer for the active execution.
- Parameters:
-
Examples
from data_engine.core.primitives import FlowContext
context = FlowContext(flow_name="docs", group="Docs", current=1)
context.objects["raw"] = context.current
assert context.current == 1
assert context.objects["raw"] == 1
-
flow_name: str
-
group: str
-
source: SourceContext | None = None
-
mirror: MirrorContext | None = None
-
current: object | None = None
-
objects: dict[str, object]
-
metadata: dict[str, object]
-
config: WorkspaceConfigContext
-
debug: FlowDebugContext | None = None
-
source_metadata()[source]
Return filesystem metadata for the current source file when available.
- Return type:
SourceMetadata | None
-
database(name)[source]
Return a write-ready path beneath the workspace databases directory.
Use this for workspace-owned DuckDB files and other durable database
artifacts. The returned path is rooted under
<workspace>/databases/ and parent directories are created for you.
- Parameters:
name (str | Path) – Relative database file name, such as "analytics.duckdb" or
"docs/analytics.duckdb".
- Returns:
Absolute write-ready database path.
- Return type:
Path
- Raises:
FlowValidationError – If the flow is not running from an authored workspace, or if
name is absolute or empty.
-
class FlowDebugContext(root, workspace_id, flow_name, run_id, source_path, step_name=None)[source]
Bases: object
Author-facing debug artifact helpers for one concrete flow run.
- Parameters:
-
-
root: Path
-
workspace_id: str | None
-
flow_name: str
-
run_id: str | None
-
source_path: str | None
-
step_name: str | None = None
-
set_step(step_name)[source]
Update the active step label used for subsequent debug artifact saves.
- Parameters:
step_name (str | None)
- Return type:
None
-
save_frame(frame, *, name=None, info=None)[source]
Save one dataframe-like value plus linked metadata for in-app debug viewing.
- Parameters:
-
- Return type:
Path
-
save_json(value, *, name=None, info=None)[source]
Save one JSON artifact for in-app debug viewing.
- Parameters:
-
- Return type:
Path
-
class MirrorContext(root, source_path=None, relative_path=None)[source]
Bases: object
Write-ready mirrored output namespace for one runtime source.
context.mirror is available when a flow was configured with
Flow.mirror(root=...). The helpers return paths and create parent
directories as needed, but they do not write file contents.
- Parameters:
-
-
root: Path
-
source_path: Path | None = None
-
relative_path: Path | None = None
-
property dir: Path
Return a write-ready namespace directory for derived files.
-
property folder: Path
Return the mirrored parent folder for the current source file.
-
with_suffix(suffix)[source]
Return the canonical mirrored source path with a replaced suffix.
- Parameters:
suffix (str)
- Return type:
Path
-
with_extension(suffix)[source]
Return the canonical mirrored source path with a replaced extension.
- Parameters:
suffix (str)
- Return type:
Path
-
file(name)[source]
Return a write-ready file path in the mirrored source folder.
- Parameters:
name (str | Path)
- Return type:
Path
-
namespaced_file(name)[source]
Return a write-ready derived file path inside the mirrored source namespace.
- Parameters:
name (str | Path)
- Return type:
Path
-
root_file(name)[source]
Return a write-ready file path directly beneath the mirror root.
- Parameters:
name (str | Path)
- Return type:
Path
-
class MirrorSpec(root)[source]
Bases: object
Static flow-level mirror binding.
- Parameters:
root (Path)
-
root: Path
-
class SourceContext(root, path=None, relative_path=None)[source]
Bases: object
Resolved source namespace for one runtime source.
context.source points at the watched source root and, for individual
file runs, the concrete source file. Its helpers are read-oriented path
conveniences; unlike MirrorContext they do not create directories.
- Parameters:
-
-
root: Path
-
path: Path | None = None
-
relative_path: Path | None = None
-
property dir: Path
Return the namespace directory for files derived from the active source.
-
property folder: Path
Return the parent folder for the active source file.
-
with_suffix(suffix)[source]
Return the source path with a replaced suffix.
- Parameters:
suffix (str)
- Return type:
Path
-
with_extension(suffix)[source]
Return the source path with a replaced extension.
- Parameters:
suffix (str)
- Return type:
Path
-
file(name)[source]
Return a derived file path in the active source folder.
- Parameters:
name (str | Path)
- Return type:
Path
-
namespaced_file(name)[source]
Return a derived file path inside the active source namespace.
- Parameters:
name (str | Path)
- Return type:
Path
-
root_file(name)[source]
Return a file path directly beneath the source root.
- Parameters:
name (str | Path)
- Return type:
Path
-
class SourceMetadata(path, name, size_bytes, modified_at_utc)[source]
Bases: object
Resolved filesystem metadata for the current source file.
- Parameters:
-
-
path: Path
-
name: str
-
size_bytes: int
-
modified_at_utc: datetime
-
class StepSpec(fn, use, save_as, label, function_name)[source]
Bases: object
One generic callable step in a flow.
- Parameters:
-
-
fn: Callable[[...], object]
-
use: str | None
-
save_as: str | None
-
label: str
-
function_name: str
-
class WatchSpec(mode, run_as, max_parallel=1, source=None, interval=None, interval_seconds=None, time=None, times=(), time_slots=(), extensions=None, settle=1)[source]
Bases: object
Normalized runtime watch configuration.
- Parameters:
mode (str)
run_as (str)
max_parallel (int)
source (Path | None)
interval (str | None)
interval_seconds (float | None)
time (str | tuple[str, ...] | None)
times (tuple[str, ...])
time_slots (tuple[tuple[int, int], ...])
extensions (tuple[str, ...] | None)
settle (int)
-
mode: str
-
run_as: str
-
max_parallel: int = 1
-
source: Path | None = None
-
interval: str | None = None
-
interval_seconds: float | None = None
-
time: str | tuple[str, ...] | None = None
-
times: tuple[str, ...] = ()
-
time_slots: tuple[tuple[int, int], ...] = ()
-
extensions: tuple[str, ...] | None = None
-
settle: int = 1
-
class WorkspaceConfigContext(workspace_root=None, _cache=<factory>, _names=None)[source]
Bases: object
Lazy read-only access to workspace-local TOML config files.
context.config reads files from <workspace>/config/*.toml on demand.
It returns dictionaries so flows can keep environment-specific settings out
of Python modules without introducing a larger configuration framework.
- Variables:
workspace_root (Path | None) – Authored workspace root. When omitted, config lookup is unavailable and
returns no names.
- Parameters:
workspace_root (Path | None)
_cache (dict[str, dict[str, object]])
_names (tuple[str, ...] | None)
Examples
from data_engine.core.primitives import WorkspaceConfigContext
config = WorkspaceConfigContext()
assert config.names() == ()
-
workspace_root: Path | None = None
-
property config_dir: Path | None
Return the conventional config directory for the authored workspace.
-
names()[source]
Return available config file stems beneath config/.
- Return type:
tuple[str, …]
-
get(name)[source]
Return one parsed config mapping when available.
- Parameters:
name (str)
- Return type:
dict[str, object] | None
-
require(name)[source]
Return one parsed config mapping or fail loudly when missing.
- Parameters:
name (str)
- Return type:
dict[str, object]
-
all()[source]
Return all parsed config mappings keyed by file stem.
- Return type:
dict[str, dict[str, object]]
-
collect_files(extensions, *, root=None, recursive=False)[source]
Return a step callable that collects matching files into a Batch of FileRef items.
- Parameters:
-
- Return type:
Callable[[FlowContext], Batch[FileRef]]
Core Runtime Models
Core errors for Data Engine flow definitions and execution.
-
exception FlowExecutionError(*, flow_name, phase, detail, step_label=None, function_name=None, source_path=None)[source]
Bases: FlowValidationError
Raised when a flow module fails during import, build, or runtime execution.
- Parameters:
-
- Return type:
None
-
exception FlowStoppedError[source]
Bases: RuntimeError
Raised when a running flow is stopped by an external control.
-
exception FlowValidationError[source]
Bases: ValueError
Raised when a flow configuration or runtime input cannot be validated.
Runtime Engine
Command-shaped runtime engine for executing core flows.
-
class RuntimeEngine(*, runtime_ledger=None, runtime_stop_event=None, flow_stop_event=None, status_callback=None, workspace_id=None, flow_runtime_type=<class 'data_engine.runtime.execution.single.FlowRuntime'>, grouped_runtime_type=<class 'data_engine.runtime.execution.grouped.GroupedFlowRuntime'>, run_stop_controller=None)[source]
Bases: object
Execute flows through explicit runtime commands.
The engine does not know about GUI, TUI, CLI, local settings, or daemon
wiring. Hosts pass state/event collaborators in explicitly; the current
implementation adapts the existing sequential and grouped runtimes while
giving higher layers a command-shaped seam to target.
- Parameters:
runtime_ledger (RuntimeCacheStore | None)
runtime_stop_event (Event | None)
flow_stop_event (Event | None)
status_callback (Callable[[str], None] | None)
workspace_id (str | None)
flow_runtime_type (type[FlowRuntime])
grouped_runtime_type (type[GroupedFlowRuntime])
run_stop_controller (RuntimeStopController | None)
-
run_once(flow)[source]
Run one flow once using its configured startup sources.
- Parameters:
flow (CoreFlow)
- Return type:
list[FlowContext]
-
run_once_and_discard(flow)[source]
Run one flow once while releasing completed contexts eagerly.
- Parameters:
flow (CoreFlow)
- Return type:
None
-
static release_completed_results(result)[source]
- Parameters:
result (object)
- Return type:
object
-
run_source(flow, source_path)[source]
Run one flow for a specific source path.
- Parameters:
flow (CoreFlow)
source_path (str | Path)
- Return type:
FlowContext
-
run_batch(flow)[source]
Run one flow once in batch mode using the configured source root.
- Parameters:
flow (CoreFlow)
- Return type:
FlowContext
-
preview(flow, *, use=None)[source]
Preview one flow through the one-shot runtime path.
- Parameters:
flow (CoreFlow)
use (str | None)
- Return type:
object
-
run_continuous(flow)[source]
Run one flow continuously according to its trigger.
- Parameters:
flow (CoreFlow)
- Return type:
list[FlowContext]
-
run_grouped(flows, *, continuous=True)[source]
Run flows grouped by flow group with sequential execution inside each group.
- Parameters:
-
- Return type:
list[FlowContext]
-
stop(run_id)[source]
Request that the active runtime stop a run by id.
- Parameters:
run_id (str)
- Return type:
None
-
release_completed_results(result)[source]
Drop bulky references from completed flow results when callers do not need them.
- Parameters:
result (object)
- Return type:
object
Runtime execution internals for authored flows.
-
class FlowRuntime(flows, *, continuous, runtime_stop_event=None, flow_stop_event=None, status_callback=None, runtime_ledger=None, runtime_ledger_service=None, runtime_ledger_factory=None, run_stop_controller=None, workspace_id=None)[source]
Bases: object
Sequential runtime that executes one or more configured flows.
- Parameters:
flows (tuple['CoreFlow', ...])
continuous (bool)
runtime_stop_event (threading.Event | None)
flow_stop_event (threading.Event | None)
status_callback (Callable[[str], None] | None)
runtime_ledger (RuntimeCacheStore | None)
runtime_ledger_service (RuntimeCacheLedgerService | None)
runtime_ledger_factory (Callable[[], RuntimeCacheStore] | None)
run_stop_controller (RuntimeStopController | None)
workspace_id (str | None)
-
run()[source]
- Return type:
list[FlowContext]
-
run_and_discard()[source]
Run one-shot flows while releasing completed contexts immediately.
- Return type:
None
-
preview(*, use=None)[source]
Run exactly one flow for notebook-style inspection and return one object.
- Parameters:
use (str | None)
-
run_source(flow, source_path)[source]
Run one flow for a specific source path.
- Parameters:
flow (CoreFlow)
source_path (str | Path)
- Return type:
FlowContext
-
run_batch(flow)[source]
Run one flow once in batch mode using the configured source root.
- Parameters:
flow (CoreFlow)
- Return type:
FlowContext
-
max_parallel_for_flow(flow)[source]
Return the allowed per-flow source concurrency for one flow.
- Parameters:
flow (CoreFlow)
- Return type:
int
-
dispatch_queued_jobs(queue, queued_keys, pending_futures, executor, *, results)[source]
Submit queued source jobs up to each flow’s bounded concurrency and drain completions.
- Parameters:
queued_keys (set[tuple[str, str | None]])
pending_futures (dict[Future[FlowContext], tuple[QueuedRunJob, int]])
executor (ThreadPoolExecutor)
results (list[FlowContext] | None)
- Return type:
None
-
wait_for_dispatched_jobs(pending_futures, *, results)[source]
Wait for all pending queued jobs to complete.
- Parameters:
-
- Return type:
None
-
register_run(run_id)[source]
Mark one run id as active.
- Parameters:
run_id (str)
- Return type:
None
-
unregister_run(run_id)[source]
Clear active and requested state for one completed run id.
- Parameters:
run_id (str)
- Return type:
None
-
check_run(run_id)[source]
Raise when runtime-wide or run-id stop has been requested.
- Parameters:
run_id (str | None)
- Return type:
None
-
class GroupedFlowRuntime(flows, *, continuous, runtime_stop_event=None, flow_stop_event=None, status_callback=None, runtime_ledger=None, runtime_ledger_service=None, runtime_ledger_factory=None, run_stop_controller=None, workspace_id=None)[source]
Bases: object
Grouped orchestrator: sequential within a group, parallel across groups.
- Parameters:
flows (tuple['Flow', ...])
continuous (bool)
runtime_stop_event (threading.Event | None)
flow_stop_event (threading.Event | None)
status_callback (Callable[[str], None] | None)
runtime_ledger (RuntimeCacheStore | None)
runtime_ledger_service (RuntimeCacheLedgerService | None)
runtime_ledger_factory (Callable[[], RuntimeCacheStore] | None)
run_stop_controller (RuntimeStopController | None)
workspace_id (str | None)
-
run()[source]
- Return type:
list[FlowContext]
Run-id-aware runtime stop control.
-
class RuntimeStopController[source]
Bases: object
Track stop requests for specific active runtime run ids.
-
request_stop(run_id)[source]
Request that one active or future run id stop.
- Parameters:
run_id (str)
- Return type:
None
-
register_run(run_id)[source]
Mark one run id as active.
- Parameters:
run_id (str)
- Return type:
None
-
unregister_run(run_id)[source]
Clear active and requested state for one completed run id.
- Parameters:
run_id (str)
- Return type:
None
-
check_run(run_id)[source]
Raise when stop has been requested for run_id.
- Parameters:
run_id (str | None)
- Return type:
None
-
active_run_ids()[source]
Return active run ids in stable order.
- Return type:
tuple[str, …]
File Watching
Filesystem discovery and polling services used by the flow runtime.
-
class IFileWatcher(*args, **kwargs)[source]
Bases: Protocol
Common interface for runtime file watchers.
-
start()[source]
Begin watching for filesystem changes.
- Return type:
None
-
stop()[source]
Stop watching for filesystem changes.
- Return type:
None
-
drain_events()[source]
Return any queued filesystem events observed since the last drain.
- Return type:
list[Path]
-
class PollingWatcher(input_root, *, recursive=True, extensions=None, settle=1)[source]
Bases: object
Filesystem polling watcher for one file or directory root.
- Parameters:
-
-
start()[source]
Capture an initial filesystem snapshot and begin watching.
- Return type:
None
-
stop()[source]
Stop watching for new filesystem events.
- Return type:
None
-
drain_events()[source]
Return newly stable files observed since the last poll.
- Return type:
list[Path]
-
iter_candidate_paths(input_root, *, extensions=None, recursive=True, allow_missing=False)[source]
Yield candidate files from one root using optional extension filters.
- Parameters:
-
- Return type:
Iterable[Path]
-
is_temporary_file_path(path)[source]
Return whether a path looks like a transient temp or download file.
- Parameters:
path (Path)
- Return type:
bool
Authoring Helpers
data_engine.helpers re-exports the commonly used helper functions for
flow modules. Importing from a focused helper module is still encouraged when
it keeps the flow dependency clear.
Public authoring helper modules.
-
class TableSchema(columns=(), dtypes=(), rename=(), drop=())[source]
Column cleanup helper for compact Polars dataframe chains.
TableSchema is intentionally small: it stores an explicit column
projection, a source-column dtype map, a rename map, and a drop list. Each
attribute exposes an apply method so flow code can decide the cleanup
order explicitly instead of relying on a magical all-in-one schema
operation.
- Variables:
columns (Iterable[str] | ColumnSelection) – Explicit projection columns. Use schema.columns.apply(df) wherever
that projection belongs in your chain.
dtypes (ColumnDtypes | ColumnCasts) – Source column names mapped to Polars dtype objects. Use
schema.dtypes.apply(df) to cast them. Remaining frame columns are
cast to pl.String.
rename (ColumnRenames) – Source-to-target column names. Use schema.rename.apply(df) to rename
them.
drop (Iterable[str]) – Source columns to remove. Use schema.drop.apply(df) to drop them.
- Parameters:
columns (Iterable[str] | ColumnSelection)
dtypes (Mapping[str, object] | Iterable[tuple[str, object]] | ColumnCasts)
rename (Mapping[str, str] | Iterable[tuple[str, str]])
drop (Iterable[str])
Notes
columns is an explicit projection applied at the point you call
schema.columns.apply(df). For example, you might cast all incoming
columns, drop private fields before persistence, write to DuckDB, and then
select the columns to return.
Examples
import polars as pl
from data_engine.helpers import TableSchema
schema = TableSchema(
columns=("step", "time", "workflow"),
dtypes={"step_to": pl.String, "time": pl.Time},
rename={"step_to": "step", "workflow_to": "workflow"},
drop=("workflow_from", "ssn"),
)
df = pl.DataFrame(
{
"step_to": ["review"],
"time": ["09:30:00"],
"workflow_to": ["docs"],
"workflow_from": ["intake"],
"ssn": ["000-00-0000"],
}
).with_columns(pl.col("time").str.to_time())
df = schema.dtypes.apply(df)
df = schema.drop.apply(df)
df = schema.rename.apply(df)
df = schema.columns.apply(df)
assert df.columns == ["step", "time", "workflow"]
assert df.schema["workflow"] == pl.String
Normalize all incoming names first when source files use inconsistent
spacing or capitalization:
df = pl.DataFrame({"Workflow\tTo": ["docs"]})
df = schema.normalize_column_names(df)
assert df.columns == ["workflow_to"]
-
columns: Iterable[str] | ColumnSelection = ()
-
dtypes: Mapping[str, object] | Iterable[tuple[str, object]] | ColumnCasts = ()
-
rename: Mapping[str, str] | Iterable[tuple[str, str]] = ()
-
drop: Iterable[str] = ()
-
normalize_column_names(df)[source]
Normalize all column names on a Polars frame.
- Parameters:
df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to rename.
- Returns:
Frame with normalized column names.
- Return type:
pl.DataFrame | pl.LazyFrame
-
class DataEngineDataFrameNamespace(df)[source]
Data Engine helpers available from pl.DataFrame.de.
- Parameters:
df (pl.DataFrame)
-
normalize_column_names(columns=None)[source]
Normalize column names on this dataframe.
- Parameters:
columns (Iterable[object] | None) – Optional subset of column names to normalize. When omitted, all
dataframe columns are normalized.
- Returns:
Dataframe with normalized column names.
- Return type:
pl.DataFrame
-
remove_null_columns()[source]
Remove columns from this dataframe when every value is null.
- Returns:
Dataframe containing only columns with at least one non-null value.
- Return type:
pl.DataFrame
-
networkdays(start, end, *, holidays=None, count_first_day=False, mask=None)[source]
Return an Excel-style business-day count expression for this dataframe.
This is a convenience wrapper around data_engine.helpers.networkdays().
The returned value is still a normal pl.Expr, so it can be chained
into cumulative windows and other Polars expressions.
- Parameters:
start (ExprLike) – Start date expression, column name, or scalar date/datetime.
end (ExprLike) – End date expression, column name, or scalar date/datetime.
holidays (HolidayDates) – Optional holiday dates removed from the business-day count.
count_first_day (bool) – Whether to force the first day into the count when it would
normally be excluded.
mask (Iterable[bool] | None) – Monday-first seven-item business-day mask.
- Returns:
Expression that evaluates to the signed business-day count.
- Return type:
pl.Expr
Example
df = df.with_columns(
business_days=df.de.networkdays(
"start_date",
"end_date",
holidays=[date(2026, 4, 15)],
)
)
df = df.sort(["claim_id", "sequence_number"]).with_columns(
cumulative_business_days=
pl.when(pl.col("use_days"))
.then(df.de.networkdays("start_date", "end_date"))
.otherwise(pl.lit(0))
.cum_sum()
.over("claim_id")
)
-
workday(start, days, *, holidays=None, count_first_day=False, mask=None)[source]
Return an Excel-style workday offset expression for this dataframe.
This is a convenience wrapper around data_engine.helpers.workday().
- Parameters:
start (ExprLike) – Start date expression, column name, or scalar date/datetime.
days (IntExprLike) – Signed business-day offset expression, column name, or scalar
integer.
holidays (HolidayDates) – Optional holiday dates skipped while calculating the result.
count_first_day (bool) – Whether the start date itself can count as day 1.
mask (Iterable[bool] | None) – Monday-first seven-item business-day mask.
- Returns:
Expression that evaluates to a Date result.
- Return type:
pl.Expr
Example
df = df.with_columns(
due_date=df.de.workday(
"received_date",
"sla_days",
holidays=[date(2026, 4, 15)],
)
)
-
propagate_last_value(value, *, by, sort_by, where=None, descending=False, nulls_last=False, ignore_nulls=True)[source]
Return an expression broadcasting the last ordered value per window.
This is a convenience wrapper around
data_engine.helpers.propagate_last_value().
- Parameters:
value (ColumnExpr) – Column name or expression containing the value to propagate.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define the last row in each
window.
where (pl.Expr | None) – Optional row predicate that limits which sorted rows can supply the
propagated value.
descending (DescendingLike) – Sort direction passed to Expr.sort_by.
nulls_last (bool) – Whether null sort-key values are ordered last.
ignore_nulls (bool) – Whether null value rows are skipped before taking the last
value.
- Returns:
Window expression suitable for with_columns or select.
- Return type:
pl.Expr
-
propagate_first_value(value, *, by, sort_by, where=None, descending=False, nulls_last=False, ignore_nulls=True)[source]
Return an expression broadcasting the first ordered value per window.
This is a convenience wrapper around
data_engine.helpers.propagate_first_value().
- Parameters:
value (ColumnExpr) – Column name or expression containing the value to propagate.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define the first row in each
window.
where (pl.Expr | None) – Optional row predicate that limits which sorted rows can supply the
propagated value.
descending (DescendingLike) – Sort direction passed to Expr.sort_by.
nulls_last (bool) – Whether null sort-key values are ordered last.
ignore_nulls (bool) – Whether null value rows are skipped before taking the first
value.
- Returns:
Window expression suitable for with_columns or select.
- Return type:
pl.Expr
-
visit_counter(value, *, by, sort_by, descending=False, nulls_last=False)[source]
Return a per-value contiguous-run visit number expression.
This is a convenience wrapper around
data_engine.helpers.visit_counter().
- Parameters:
value (ColumnExpr) – Column name or expression containing the state to count visits for.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define row sequence inside each
window.
descending (DescendingLike) – Sort direction passed to DataFrame.sort for the in-window
order.
nulls_last (bool) – Whether null sort-key values are ordered last.
- Returns:
Unsigned integer expression containing the one-based visit number
for each row’s current value.
- Return type:
pl.Expr
-
write_parquet_atomic(path, **write_options)[source]
Write this dataframe to parquet with atomic target replacement.
- Parameters:
-
- Returns:
Absolute target path that was replaced.
- Return type:
Path
-
write_excel_atomic(path, worksheet=None, **write_options)[source]
Write this dataframe to Excel with atomic target replacement.
- Parameters:
path (PathLike) – Target Excel workbook path.
worksheet (str | None) – Optional worksheet name forwarded to pl.DataFrame.write_excel.
**write_options (object) – Keyword options forwarded to pl.DataFrame.write_excel.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
-
compose_excel(path, *, sheet_name, table_name=None, template=None, position='A1', table_style=None, autofit=True, autofilter=True, freeze_panes=None, **write_options)[source]
Compose this dataframe into a single-sheet Excel workbook.
This is a convenience wrapper around
data_engine.helpers.compose_excel().
- Parameters:
path (PathLike) – Target Excel workbook path.
sheet_name (str) – Worksheet name to write.
table_name (str | None) – Optional Excel table name for this dataframe.
template (PathLike | None) – Optional template workbook path.
position (str | tuple[int, int]) – Top-left cell where the dataframe table should be written.
table_style (str | dict[str, object] | None) – Optional table style.
autofit (bool) – Whether Polars should autofit columns in fresh-workbook mode.
autofilter (bool) – Whether the generated table should include filter controls.
freeze_panes (object | None) – Optional freeze-pane setting.
**write_options (object) – Additional write options forwarded to the sheet specification.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
-
build_dimension(db_path, table, *, key_column='dimension_key', return_df=True)[source]
Build or extend one DuckDB dimension table from this dataframe.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
key_column (str) – Surrogate key column to create in the dimension table.
return_df (bool) – Whether to return the mapping dataframe for this dataframe’s
natural keys.
- Returns:
Mapping dataframe when return_df is true; otherwise None.
- Return type:
pl.DataFrame | None
-
attach_dimension(db_path, table, *, on, key_column='dimension_key', drop_key=False)[source]
Attach an existing DuckDB dimension key to this dataframe.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
on (ColumnNames) – Natural key column or columns used to join to the dimension table.
key_column (str) – Surrogate key column to attach.
drop_key (bool) – Whether to drop the natural key columns after attaching the
surrogate key.
- Returns:
Dataframe with the surrogate key column attached.
- Return type:
pl.DataFrame
-
denormalize_columns(db_path, table, *, key_column='dimension_key', select='*', drop_key=False)[source]
Attach natural columns from an existing DuckDB dimension table.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
key_column (str) – Surrogate key column used to join to the dimension table.
select (ColumnNames) – Natural columns to attach, or "*" for all non-key columns.
drop_key (bool) – Whether to drop key_column after attaching the natural columns.
- Returns:
Dataframe with selected dimension columns attached.
- Return type:
pl.DataFrame
-
normalize_columns(db_path, table, *, on, key_column='dimension_key', drop_key=True, returns='df')[source]
Build dimension keys and attach them back onto this dataframe.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
on (ColumnNames) – Natural key column or columns used to build the dimension.
key_column (str) – Surrogate key column to create and attach.
drop_key (bool) – Whether to drop natural key columns after attaching the surrogate
key.
returns (ReturnMode) – "df" for normalized input rows, "map" for only the key
mapping, or None to only persist dimension rows.
- Returns:
Normalized dataframe, mapping dataframe, or None according to
returns.
- Return type:
pl.DataFrame | None
-
replace_rows_by_file(db_path, table, *, file_hash, file_hash_column='file_key', return_df=True)[source]
Replace one file’s DuckDB rows and append this dataframe.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
file_hash (str) – Stable source-file identifier used to delete the previous batch.
file_hash_column (str) – Column name used to store file_hash in the destination table.
return_df (bool) – Whether to return this dataframe with the file hash column attached.
- Returns:
Inserted rows with file_hash_column when return_df is true;
otherwise None.
- Return type:
pl.DataFrame | None
-
replace_rows_by_values(db_path, table, *, column, return_df=True)[source]
Replace DuckDB rows matching this dataframe’s values for one column.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
column (str) – Column whose incoming values define the rows to replace.
return_df (bool) – Whether to return the inserted dataframe.
- Returns:
Inserted dataframe when return_df is true; otherwise None.
- Return type:
pl.DataFrame | None
-
replace_table(db_path, table, *, return_df=True)[source]
Replace one DuckDB table wholesale from this dataframe.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
return_df (bool) – Whether to return the inserted dataframe.
- Returns:
Inserted dataframe when return_df is true; otherwise None.
- Return type:
pl.DataFrame | None
-
class DataEngineLazyFrameNamespace(lf)[source]
Data Engine helpers available from pl.LazyFrame.de.
- Parameters:
lf (pl.LazyFrame)
-
normalize_column_names(columns=None)[source]
Normalize column names on this lazy frame.
- Parameters:
columns (Iterable[object] | None) – Optional subset of column names to normalize. When omitted, all
lazy-frame columns are normalized.
- Returns:
Lazy frame with normalized column names.
- Return type:
pl.LazyFrame
-
remove_null_columns()[source]
Remove columns from this lazy frame when every value is null.
- Returns:
Lazy frame containing only columns with at least one non-null
value.
- Return type:
pl.LazyFrame
-
networkdays(start, end, *, holidays=None, count_first_day=False, mask=None)[source]
Return an Excel-style business-day count expression for this lazy frame.
This is a convenience wrapper around data_engine.helpers.networkdays().
The returned value stays lazy and can be chained into window
expressions before collect().
- Parameters:
start (ExprLike) – Start date expression, column name, or scalar date/datetime.
end (ExprLike) – End date expression, column name, or scalar date/datetime.
holidays (HolidayDates) – Optional holiday dates removed from the business-day count.
count_first_day (bool) – Whether to force the first day into the count when it would
normally be excluded.
mask (Iterable[bool] | None) – Monday-first seven-item business-day mask.
- Returns:
Expression that evaluates to the signed business-day count.
- Return type:
pl.Expr
-
workday(start, days, *, holidays=None, count_first_day=False, mask=None)[source]
Return an Excel-style workday offset expression for this lazy frame.
This is a convenience wrapper around data_engine.helpers.workday().
- Parameters:
start (ExprLike) – Start date expression, column name, or scalar date/datetime.
days (IntExprLike) – Signed business-day offset expression, column name, or scalar
integer.
holidays (HolidayDates) – Optional holiday dates skipped while calculating the result.
count_first_day (bool) – Whether the start date itself can count as day 1.
mask (Iterable[bool] | None) – Monday-first seven-item business-day mask.
- Returns:
Expression that evaluates to a Date result.
- Return type:
pl.Expr
-
propagate_last_value(value, *, by, sort_by, where=None, descending=False, nulls_last=False, ignore_nulls=True)[source]
Return an expression broadcasting the last ordered value per window.
This is a convenience wrapper around
data_engine.helpers.propagate_last_value().
- Parameters:
value (ColumnExpr) – Column name or expression containing the value to propagate.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define the last row in each
window.
where (pl.Expr | None) – Optional row predicate that limits which sorted rows can supply the
propagated value.
descending (DescendingLike) – Sort direction passed to Expr.sort_by.
nulls_last (bool) – Whether null sort-key values are ordered last.
ignore_nulls (bool) – Whether null value rows are skipped before taking the last
value.
- Returns:
Window expression suitable for with_columns or select.
- Return type:
pl.Expr
-
propagate_first_value(value, *, by, sort_by, where=None, descending=False, nulls_last=False, ignore_nulls=True)[source]
Return an expression broadcasting the first ordered value per window.
This is a convenience wrapper around
data_engine.helpers.propagate_first_value().
- Parameters:
value (ColumnExpr) – Column name or expression containing the value to propagate.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define the first row in each
window.
where (pl.Expr | None) – Optional row predicate that limits which sorted rows can supply the
propagated value.
descending (DescendingLike) – Sort direction passed to Expr.sort_by.
nulls_last (bool) – Whether null sort-key values are ordered last.
ignore_nulls (bool) – Whether null value rows are skipped before taking the first
value.
- Returns:
Window expression suitable for with_columns or select.
- Return type:
pl.Expr
-
visit_counter(value, *, by, sort_by, descending=False, nulls_last=False)[source]
Return a per-value contiguous-run visit number expression.
This is a convenience wrapper around
data_engine.helpers.visit_counter().
- Parameters:
value (ColumnExpr) – Column name or expression containing the state to count visits for.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define row sequence inside each
window.
descending (DescendingLike) – Sort direction passed to DataFrame.sort for the in-window
order.
nulls_last (bool) – Whether null sort-key values are ordered last.
- Returns:
Unsigned integer expression containing the one-based visit number
for each row’s current value.
- Return type:
pl.Expr
-
sink_parquet_atomic(path, **sink_options)[source]
Execute this lazy frame to parquet with atomic target replacement.
- Parameters:
-
- Returns:
Absolute target path that was replaced.
- Return type:
Path
-
compose_excel(path, *, sheet_name, table_name=None, template=None, position='A1', table_style=None, autofit=True, autofilter=True, freeze_panes=None, **write_options)[source]
Compose this lazy frame into a single-sheet Excel workbook.
This is a convenience wrapper around
data_engine.helpers.compose_excel(). The lazy frame is collected
when the workbook is composed.
- Parameters:
path (PathLike) – Target Excel workbook path.
sheet_name (str) – Worksheet name to write.
table_name (str | None) – Optional Excel table name for this lazy frame.
template (PathLike | None) – Optional template workbook path.
position (str | tuple[int, int]) – Top-left cell where the lazy frame table should be written.
table_style (str | dict[str, object] | None) – Optional table style.
autofit (bool) – Whether Polars should autofit columns in fresh-workbook mode.
autofilter (bool) – Whether the generated table should include filter controls.
freeze_panes (object | None) – Optional freeze-pane setting.
**write_options (object) – Additional write options forwarded to the sheet specification.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
-
build_dimension(db_path, table, *, key_column='dimension_key', return_df=True)[source]
Build or extend one DuckDB dimension table from this lazy frame.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
key_column (str) – Surrogate key column to create in the dimension table.
return_df (bool) – Whether to return the mapping dataframe for this lazy frame’s
natural keys.
- Returns:
Mapping dataframe when return_df is true; otherwise None.
- Return type:
pl.DataFrame | None
-
attach_dimension(db_path, table, *, on, key_column='dimension_key', drop_key=False)[source]
Attach an existing DuckDB dimension key to this lazy frame.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
on (ColumnNames) – Natural key column or columns used to join to the dimension table.
key_column (str) – Surrogate key column to attach.
drop_key (bool) – Whether to drop the natural key columns after attaching the
surrogate key.
- Returns:
Collected dataframe with the surrogate key column attached.
- Return type:
pl.DataFrame
-
denormalize_columns(db_path, table, *, key_column='dimension_key', select='*', drop_key=False)[source]
Attach natural columns from an existing DuckDB dimension table.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
key_column (str) – Surrogate key column used to join to the dimension table.
select (ColumnNames) – Natural columns to attach, or "*" for all non-key columns.
drop_key (bool) – Whether to drop key_column after attaching the natural columns.
- Returns:
Collected dataframe with selected dimension columns attached.
- Return type:
pl.DataFrame
-
normalize_columns(db_path, table, *, on, key_column='dimension_key', drop_key=True, returns='df')[source]
Build dimension keys and attach them back onto this lazy frame.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
on (ColumnNames) – Natural key column or columns used to build the dimension.
key_column (str) – Surrogate key column to create and attach.
drop_key (bool) – Whether to drop natural key columns after attaching the surrogate
key.
returns (ReturnMode) – "df" for normalized input rows, "map" for only the key
mapping, or None to only persist dimension rows.
- Returns:
Normalized dataframe, mapping dataframe, or None according to
returns.
- Return type:
pl.DataFrame | None
-
replace_rows_by_file(db_path, table, *, file_hash, file_hash_column='file_key', return_df=True)[source]
Replace one file’s DuckDB rows and append this lazy frame.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
file_hash (str) – Stable source-file identifier used to delete the previous batch.
file_hash_column (str) – Column name used to store file_hash in the destination table.
return_df (bool) – Whether to return the collected frame with the file hash column
attached.
- Returns:
Inserted rows with file_hash_column when return_df is true;
otherwise None.
- Return type:
pl.DataFrame | None
-
replace_rows_by_values(db_path, table, *, column, return_df=True)[source]
Replace DuckDB rows matching this lazy frame’s values for one column.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
column (str) – Column whose incoming values define the rows to replace.
return_df (bool) – Whether to return the inserted dataframe.
- Returns:
Inserted dataframe when return_df is true; otherwise None.
- Return type:
pl.DataFrame | None
-
replace_table(db_path, table, *, return_df=True)[source]
Replace one DuckDB table wholesale from this lazy frame.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
return_df (bool) – Whether to return the inserted dataframe.
- Returns:
Inserted dataframe when return_df is true; otherwise None.
- Return type:
pl.DataFrame | None
-
class ExcelSheet(name, df, table_name=None, position='A1', table_style=None, autofit=True, autofilter=True, freeze_panes=None, write_options=<factory>)[source]
Specification for one dataframe-backed worksheet in an Excel workbook.
- Variables:
name (str) – Worksheet name. Names must follow Excel’s normal sheet-name limits.
df (ExcelFrame) – Dataframe to write. Lazy frames are collected when the workbook is
composed.
table_name (str | None) – Optional Excel table name for the written dataframe.
position (str | tuple[int, int]) – Top-left cell where the dataframe table should be written.
table_style (str | dict[str, Any] | None) – Optional table style forwarded to pl.DataFrame.write_excel.
autofit (bool) – Whether Polars should autofit columns after writing.
autofilter (bool) – Whether the generated table should include filter controls.
freeze_panes (object | None) – Optional freeze-pane setting forwarded to pl.DataFrame.write_excel.
write_options (dict[str, Any]) – Additional keyword options forwarded to pl.DataFrame.write_excel.
- Parameters:
name (str)
df (DataFrame | LazyFrame)
table_name (str | None)
position (str | tuple[int, int])
table_style (str | dict[str, Any] | None)
autofit (bool)
autofilter (bool)
freeze_panes (object | None)
write_options (dict[str, Any])
-
name: str
-
df: DataFrame | LazyFrame
-
table_name: str | None = None
-
position: str | tuple[int, int] = 'A1'
-
table_style: str | dict[str, Any] | None = None
-
autofit: bool = True
-
autofilter: bool = True
-
freeze_panes: object | None = None
-
write_options: dict[str, Any]
-
attach_dimension(db_path, table, *, df, on, key_column='dimension_key', drop_key=False)[source]
Attach an existing surrogate key mapping table to an input dataframe.
- Parameters:
-
-
build_dimension(db_path, table, *, df, key_column='dimension_key', return_df=True)[source]
Build or extend one dimension table from unique incoming row combinations.
- Parameters:
-
-
compact_database(db_path, *, tables=None, drop_all_null_columns=True, vacuum=True)[source]
Compact one DuckDB database by dropping all-null columns and optionally vacuuming.
Existing indexes on compacted tables are dropped before column removal and
recreated afterward when their original CREATE INDEX statement still
applies to the compacted table. Indexes that reference dropped columns are
reported as skipped.
- Parameters:
-
- Return type:
DataFrame
-
compose_excel(path, sheets, *, template=None)[source]
Compose one Excel workbook from dataframe-backed sheet specifications.
The workbook is written to a same-directory temporary file and then moved
into place with os.replace. Each sheet is written through Polars’
Excel writer, so sheet-level options map directly to
pl.DataFrame.write_excel behavior.
- Parameters:
path (PathLike) – Target Excel workbook path.
sheets (Sequence[ExcelSheet]) – One or more worksheet specifications to write.
template (PathLike | None) – Optional template workbook path. When provided, the template workbook is
copied to a temporary file, the requested sheets are updated, and the
composed copy atomically replaces path.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
- Raises:
Exception – If workbook validation, dataframe collection, Excel writing, or atomic
replacement fails.
Examples
import polars as pl
from data_engine.helpers import ExcelSheet, compose_excel
compose_excel(
"workspaces/example/output/report.xlsx",
sheets=[
ExcelSheet(
name="Claims",
df=pl.DataFrame({"claim_id": [1, 2]}),
table_name="claims",
freeze_panes="A2",
),
],
)
-
denormalize_columns(db_path, table, *, df, key_column='dimension_key', select='*', drop_key=False)[source]
Attach natural columns from an existing dimension table onto a keyed dataframe.
- Parameters:
-
-
ensure_index(db_path, table, *, columns, name=None)[source]
Create one DuckDB index if it does not already exist.
- Parameters:
db_path (str | Path) – DuckDB database file path.
table (str) – Target table name, optionally schema-qualified.
columns (str | list[str] | tuple[str, ...]) – Column or columns to index.
name (str | None) – Optional index name. When omitted, Data Engine generates a stable name
from the table and columns.
- Returns:
Index name that exists after the call.
- Return type:
str
- Raises:
ValueError – If the table does not exist, selected columns do not exist, or the
provided index name is empty.
Examples
Index a file-slice column before repeated replace_rows_by_file calls:
data_engine.helpers.duckdb.ensure_index(
context.database("warehouse.duckdb"),
"fact_claim",
columns="file_key",
)
Index a lookup column before repeated read_rows_by_values calls:
data_engine.helpers.duckdb.ensure_index(
context.database("warehouse.duckdb"),
"fact_claim",
columns="claim_id",
name="idx_fact_claim_claim_id",
)
-
networkdays(start, end, *, holidays=None, count_first_day=False, mask=None)[source]
Return Excel-style business-day counts as a Polars expression.
This helper matches Excel NETWORKDAYS semantics by counting both
endpoints when they are business days. Weekends default to Saturday/Sunday,
and optional holidays are excluded from the count.
The one intentional extension is count_first_day. When enabled, the
start date is still counted even if it falls on a masked weekday or one of
the supplied holidays.
- Parameters:
start (ExprLike) – Start date expression, column name, or scalar date/datetime.
end (ExprLike) – End date expression, column name, or scalar date/datetime.
holidays (HolidayDates) – Optional holiday dates removed from the business-day count. String
values must use ISO date text such as "2026-04-15".
count_first_day (bool) – Whether to force the first day into the count when it would normally be
excluded by the weekday mask or holiday list.
mask (Iterable[bool] | None) – Monday-first seven-item business-day mask. Every item must be a real
bool. None uses the Excel default: Monday-Friday counted,
Saturday-Sunday excluded.
- Returns:
Expression that evaluates to the signed business-day count. Datetime
inputs are normalized to their calendar date before counting.
- Return type:
pl.Expr
Examples
Add a row-level business-day count:
from datetime import date
import polars as pl
import data_engine.helpers
df = pl.DataFrame(
{
"received_date": [date(2026, 4, 13), date(2026, 4, 14)],
"due_date": [date(2026, 4, 17), date(2026, 4, 21)],
}
).with_columns(
business_days=data_engine.helpers.networkdays(
"received_date",
"due_date",
holidays=[date(2026, 4, 15)],
)
)
Use scalar datetimes and count the first day:
from datetime import datetime
df = df.with_columns(
sla_days=data_engine.helpers.networkdays(
datetime(2026, 4, 13, 8, 30),
pl.col("resolved_at"),
count_first_day=True,
)
)
Chain the expression into a grouped cumulative total:
df = (
df.sort(["claim_id", "sequence_number"])
.with_columns(
cumulative_business_days=
pl.when(pl.col("use_days"))
.then(
data_engine.helpers.networkdays(
"start_date",
"end_date",
holidays=[date(2026, 4, 15)],
)
)
.otherwise(pl.lit(0))
.cum_sum()
.over("claim_id")
)
)
Notes
networkdays(...) returns a normal pl.Expr. You can chain it into
cum_sum(), window expressions, filters, and any other Polars expression
pipeline.
-
normalize_columns(db_path, table, *, df, on, key_column='dimension_key', drop_key=True, returns='df')[source]
Build missing surrogate keys and attach them back onto the input dataframe.
- Parameters:
-
-
propagate_first_value(value, *, by, sort_by, where=None, descending=False, nulls_last=False, ignore_nulls=True)[source]
Return an expression that broadcasts the first ordered value per window.
The helper sorts rows inside each by window, optionally filters the
ordered rows with where, takes the first value from that ordered
candidate set, and propagates it to every row in the same window. Null
values are ignored by default.
- Parameters:
value (ColumnExpr) – Column name or expression containing the value to propagate.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define the first row in each window.
where (pl.Expr | None) – Optional row predicate that limits which sorted rows can supply the
propagated value.
descending (DescendingLike) – Sort direction passed to Expr.sort_by.
nulls_last (bool) – Whether null sort-key values are ordered last.
ignore_nulls (bool) – Whether null value rows are skipped before taking the first value.
- Returns:
Window expression suitable for with_columns or select.
- Return type:
pl.Expr
Examples
Propagate the first non-null status to every row for a claim:
df = df.with_columns(
first_status=data_engine.helpers.propagate_first_value(
"status",
by="claim_id",
sort_by="claim_step_index",
)
)
-
propagate_last_value(value, *, by, sort_by, where=None, descending=False, nulls_last=False, ignore_nulls=True)[source]
Return an expression that broadcasts the last ordered value per window.
The helper sorts rows inside each by window, optionally filters the
ordered rows with where, takes the last value from that ordered
candidate set, and propagates it to every row in the same window. Null
values are ignored by default, which matches the common pattern where only
one row in a window contains the value to carry across the group.
- Parameters:
value (ColumnExpr) – Column name or expression containing the value to propagate.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define the last row in each window.
where (pl.Expr | None) – Optional row predicate that limits which sorted rows can supply the
propagated value.
descending (DescendingLike) – Sort direction passed to Expr.sort_by.
nulls_last (bool) – Whether null sort-key values are ordered last.
ignore_nulls (bool) – Whether null value rows are skipped before taking the last value.
- Returns:
Window expression suitable for with_columns or select.
- Return type:
pl.Expr
Examples
Propagate the latest non-null status to every row for a claim:
df = df.with_columns(
latest_status=data_engine.helpers.propagate_last_value(
"status",
by="claim_id",
sort_by="claim_step_index",
)
)
Propagate the timestamp from the last Archive row to every row for a
claim. The output column is named by with_columns:
df = df.with_columns(
archived_at=data_engine.helpers.propagate_last_value(
pl.col("archive_date").dt.combine(pl.col("archive_time")),
by="claim_id",
sort_by="claim_step_index",
where=pl.col("status") == "Archive",
)
)
Compose the predicate to use the last row that is not Archive:
df = df.with_columns(
last_active_at=data_engine.helpers.propagate_last_value(
pl.col("event_date").dt.combine(pl.col("event_time")),
by="claim_id",
sort_by="claim_step_index",
where=pl.col("status") != "Archive",
)
)
-
remove_null_columns(frame)[source]
Return a frame without columns that contain no non-null values.
Columns are kept when at least one row contains a non-null value. Columns
containing only nulls are removed. For zero-row dataframes, every column is
considered empty and the returned dataframe has no columns.
- Parameters:
frame (PolarsFrame) – Dataframe or lazy frame to trim.
- Returns:
Frame containing only columns with at least one non-null value. Lazy
inputs return lazy frames.
- Return type:
PolarsFrame
Examples
clean = data_engine.helpers.remove_null_columns(df)
clean = df.de.remove_null_columns()
-
read_rows_by_values(db_path, table, *, column, is_in, select)[source]
Return selected columns for rows whose one column matches provided values.
- Parameters:
db_path (str | Path) – DuckDB database file path.
table (str) – Source table name, optionally schema-qualified.
column (str) – Column matched against is_in.
is_in (list[object] | tuple[object, ...]) – Values to include.
select (str | list[str] | tuple[str, ...]) – Columns to return, or "*" for all columns.
- Returns:
Selected matching rows in input order by distinct lookup values.
- Return type:
pl.DataFrame
- Raises:
ValueError – If the table, column, or selected columns are invalid.
-
read_sql(db_path, *, sql)[source]
Run one SQL query against DuckDB and return the result as a Polars dataframe.
- Parameters:
-
- Returns:
Query result as a Polars dataframe.
- Return type:
pl.DataFrame
- Raises:
ValueError – If sql is empty.
-
read_table(db_path, table, *, select='*', where=None, limit=None)[source]
Read rows from one DuckDB table into a Polars dataframe.
- Parameters:
db_path (str | Path) – DuckDB database file path.
table (str) – Source table name, optionally schema-qualified.
select (str | list[str] | tuple[str, ...]) – Columns to return, or "*" for all columns.
where (str | None) – Optional raw SQL predicate appended after WHERE.
limit (int | None) – Optional row limit.
- Returns:
Selected table rows.
- Return type:
pl.DataFrame
- Raises:
ValueError – If the table, selected columns, or limit are invalid.
-
replace_rows_by_file(db_path, table, *, df, file_hash, file_hash_column='file_key', return_df=True)[source]
Atomically replace one file’s fact rows and append the current batch.
- Parameters:
-
-
replace_rows_by_values(db_path, table, *, df, column, return_df=True)[source]
Atomically replace one value-slice of rows and append the current batch.
- Parameters:
-
-
replace_table(db_path, table, *, df, return_df=True)[source]
Replace one DuckDB table wholesale from the provided dataframe.
- Parameters:
-
-
sink_parquet_atomic(lf, path, **sink_options)[source]
Sink a Polars lazy frame to parquet with same-directory atomic replacement.
The lazy query is executed into a unique temporary file beside the target,
then moved into place with os.replace. Use the default eager sink mode so
the helper can complete the replacement in the same call.
- Parameters:
lf (pl.LazyFrame) – Lazy Polars frame to execute and write.
path (PathLike) – Target parquet path.
**sink_options (object) – Keyword options forwarded to pl.LazyFrame.sink_parquet.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
- Raises:
ValueError – If lazy=True is passed.
Examples
import polars as pl
import data_engine.helpers
lf = pl.DataFrame({"claim_id": [1, 2]}).lazy()
lf.de.sink_parquet_atomic("workspaces/example/output/docs.parquet")
-
visit_counter(value, *, by, sort_by, descending=False, nulls_last=False)[source]
Return a per-value contiguous-run visit number inside each window.
Rows are ordered inside each by window, then consecutive rows with the
same value are treated as one visit. When a value leaves and later
returns in the same window, the returned run gets the next visit number for
that value.
- Parameters:
value (ColumnExpr) – Column name or expression containing the state to count visits for.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define row sequence inside each
window.
descending (DescendingLike) – Sort direction passed to DataFrame.sort for the in-window order.
nulls_last (bool) – Whether null sort-key values are ordered last.
- Returns:
Unsigned integer expression containing the one-based visit number for
each row’s current value.
- Return type:
pl.Expr
Examples
Count repeated workflow visits for each document:
df = df.with_columns(
workflow_visit=data_engine.helpers.visit_counter(
"workflow",
by="document_id",
sort_by="step_index",
)
)
For a document with workflow runs w1, w1, w1, w2, w2, w1, the result
is 1, 1, 1, 1, 1, 2.
-
workday(start, days, *, holidays=None, count_first_day=False, mask=None)[source]
Return Excel-style workday offsets as a Polars expression.
This helper mirrors Excel WORKDAY by returning the business date that
falls the requested number of working days before or after start.
The one intentional extension is count_first_day. When enabled, the
start date itself can be day 1, even if it falls on a masked weekday or one
of the supplied holidays.
- Parameters:
start (ExprLike) – Start date expression, column name, or scalar date/datetime.
days (IntExprLike) – Signed business-day offset expression, column name, or scalar integer.
holidays (HolidayDates) – Optional holiday dates skipped while calculating the result. String
values must use ISO date text such as "2026-04-15".
count_first_day (bool) – Whether the start date itself can count as day 1 when moving forward or
backward through business days.
mask (Iterable[bool] | None) – Monday-first seven-item business-day mask. Every item must be a real
bool. None uses the Excel default: Monday-Friday counted,
Saturday-Sunday excluded.
- Returns:
Expression that evaluates to a Date result. Datetime inputs are
normalized to their calendar date before offsetting.
- Return type:
pl.Expr
Examples
Add one target business date column:
from datetime import date
import polars as pl
import data_engine.helpers
df = pl.DataFrame(
{
"received_date": [date(2026, 4, 13), date(2026, 4, 14)],
"sla_days": [3, 5],
}
).with_columns(
due_date=data_engine.helpers.workday(
"received_date",
"sla_days",
holidays=[date(2026, 4, 15)],
)
)
Count the start date as day 1:
df = df.with_columns(
due_date=data_engine.helpers.workday(
"received_date",
"sla_days",
holidays=[date(2026, 4, 15)],
count_first_day=True,
)
)
Use a custom weekday mask where Saturday is also a business day:
df = df.with_columns(
due_date=data_engine.helpers.workday(
"received_date",
"sla_days",
mask=(True, True, True, True, True, True, False),
)
)
-
write_excel_atomic(df, path, worksheet=None, **write_options)[source]
Write a Polars dataframe to Excel with same-directory atomic replacement.
The dataframe is first written to a unique temporary workbook beside the
target, then moved into place with os.replace. All keyword options are
forwarded to pl.DataFrame.write_excel.
- Parameters:
df (pl.DataFrame) – Eager Polars dataframe to write.
path (PathLike) – Target Excel workbook path.
worksheet (str | None) – Optional worksheet name forwarded to pl.DataFrame.write_excel.
**write_options (object) – Keyword options forwarded to pl.DataFrame.write_excel.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
Examples
import polars as pl
from data_engine.helpers import write_excel_atomic
target = write_excel_atomic(
pl.DataFrame({"claim_id": [1, 2]}),
"workspaces/example/output/docs.xlsx",
worksheet="Docs",
table_name="docs",
autofit=True,
)
df = pl.DataFrame({"claim_id": [3]})
df.de.write_excel_atomic(target, worksheet="Docs")
-
write_parquet_atomic(df, path, **write_options)[source]
Write a Polars dataframe to parquet with same-directory atomic replacement.
The dataframe is first written to a unique temporary file beside the target,
then moved into place with os.replace. This keeps readers from seeing a
partially written parquet file while preserving normal Polars write options.
- Parameters:
df (pl.DataFrame) – Eager Polars dataframe to write.
path (PathLike) – Target parquet path.
**write_options (object) – Keyword options forwarded to pl.DataFrame.write_parquet.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
Examples
import polars as pl
from data_engine.helpers import write_parquet_atomic
target = write_parquet_atomic(
pl.DataFrame({"claim_id": [1, 2]}),
"workspaces/example/output/docs.parquet",
)
df = pl.DataFrame({"claim_id": [3]})
df.de.write_parquet_atomic(target)
-
normalize_column_name(name)[source]
Return a normalized column name.
- Parameters:
name (object) – Source column name to normalize.
- Returns:
Lowercase column name with separator-adjacent spaces removed, remaining
whitespace collapsed, and spaces replaced with underscores.
- Return type:
str
-
normalize_column_names(df, columns=None)[source]
Normalize column names on a Polars frame.
- Parameters:
df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to rename.
columns (Iterable[object] | None) – Optional subset of column names to normalize. When omitted, all frame
columns are normalized.
- Returns:
Frame with normalized column names.
- Return type:
pl.DataFrame | pl.LazyFrame
-
normalized_column_renames(columns)[source]
Return a Polars rename mapping for normalized column names.
- Parameters:
columns (Iterable[object]) – Source column names to normalize.
- Returns:
Mapping from original column names to normalized names, excluding names
that are already normalized.
- Return type:
dict[str, str]
Small schema helpers for Polars-oriented flow authoring.
-
class ColumnCasts[source]
Bases: dict[str, object]
Dict-like dtype mapping with a Polars apply helper.
TableSchema.dtypes returns this type. Values are passed to
polars.Expr.cast so callers can use normal Polars dtype objects such as
pl.String, pl.Int64, and pl.Datetime. apply casts remaining
frame columns to pl.String.
-
apply(df)[source]
Cast columns on a Polars frame.
- Parameters:
df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to transform.
- Returns:
Frame with configured dtype casts applied and unspecified columns
cast to pl.String.
- Return type:
pl.DataFrame | pl.LazyFrame
-
class ColumnSelection(columns)[source]
Bases: tuple[str, …]
Tuple-like column projection with Polars convenience methods.
TableSchema.columns returns this type so schema definitions can be used
directly in dataframe chains while still behaving like a normal tuple.
Examples
import polars as pl
from data_engine.helpers import TableSchema
schema = TableSchema(columns=("Claim Id",), dtypes={"Claim Id": pl.Int64})
df = pl.DataFrame({"Claim Id": [1], "SSN": ["123"]})
assert schema.columns.apply(df).columns == ["Claim Id"]
- Parameters:
columns (Iterable[str])
- Return type:
ColumnSelection
-
apply(df)[source]
Select these columns from a Polars frame.
- Parameters:
df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to transform.
- Returns:
Frame containing only these columns.
- Return type:
pl.DataFrame | pl.LazyFrame
-
normalize_column_names(df)[source]
Normalize this selection’s column names on a Polars frame.
- Parameters:
df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to rename.
- Returns:
Frame with matching selected column names normalized.
- Return type:
pl.DataFrame | pl.LazyFrame
-
class DropColumns(columns)[source]
Bases: tuple[str, …]
Tuple-like drop list with a Polars apply helper.
TableSchema.drop returns this type. Empty drop lists are no-ops, which
keeps chained cleanup code simple.
- Parameters:
columns (Iterable[str])
- Return type:
DropColumns
-
apply(df)[source]
Drop these columns from a Polars frame.
- Parameters:
df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to transform.
- Returns:
Frame without these columns.
- Return type:
pl.DataFrame | pl.LazyFrame
-
class RenameColumns[source]
Bases: dict[str, str]
Dict-like rename mapping with a Polars apply helper.
TableSchema.rename returns this type. Empty mappings are no-ops, so the
same cleanup chain can be used whether a schema currently renames columns or
not.
-
apply(df)[source]
Rename columns on a Polars frame.
- Parameters:
df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to transform.
- Returns:
Frame with configured columns renamed.
- Return type:
pl.DataFrame | pl.LazyFrame
-
class TableSchema(columns=(), dtypes=(), rename=(), drop=())[source]
Bases: object
Column cleanup helper for compact Polars dataframe chains.
TableSchema is intentionally small: it stores an explicit column
projection, a source-column dtype map, a rename map, and a drop list. Each
attribute exposes an apply method so flow code can decide the cleanup
order explicitly instead of relying on a magical all-in-one schema
operation.
- Variables:
columns (Iterable[str] | ColumnSelection) – Explicit projection columns. Use schema.columns.apply(df) wherever
that projection belongs in your chain.
dtypes (ColumnDtypes | ColumnCasts) – Source column names mapped to Polars dtype objects. Use
schema.dtypes.apply(df) to cast them. Remaining frame columns are
cast to pl.String.
rename (ColumnRenames) – Source-to-target column names. Use schema.rename.apply(df) to rename
them.
drop (Iterable[str]) – Source columns to remove. Use schema.drop.apply(df) to drop them.
- Parameters:
columns (Iterable[str] | ColumnSelection)
dtypes (Mapping[str, object] | Iterable[tuple[str, object]] | ColumnCasts)
rename (Mapping[str, str] | Iterable[tuple[str, str]])
drop (Iterable[str])
Notes
columns is an explicit projection applied at the point you call
schema.columns.apply(df). For example, you might cast all incoming
columns, drop private fields before persistence, write to DuckDB, and then
select the columns to return.
Examples
import polars as pl
from data_engine.helpers import TableSchema
schema = TableSchema(
columns=("step", "time", "workflow"),
dtypes={"step_to": pl.String, "time": pl.Time},
rename={"step_to": "step", "workflow_to": "workflow"},
drop=("workflow_from", "ssn"),
)
df = pl.DataFrame(
{
"step_to": ["review"],
"time": ["09:30:00"],
"workflow_to": ["docs"],
"workflow_from": ["intake"],
"ssn": ["000-00-0000"],
}
).with_columns(pl.col("time").str.to_time())
df = schema.dtypes.apply(df)
df = schema.drop.apply(df)
df = schema.rename.apply(df)
df = schema.columns.apply(df)
assert df.columns == ["step", "time", "workflow"]
assert df.schema["workflow"] == pl.String
Normalize all incoming names first when source files use inconsistent
spacing or capitalization:
df = pl.DataFrame({"Workflow\tTo": ["docs"]})
df = schema.normalize_column_names(df)
assert df.columns == ["workflow_to"]
-
columns: Iterable[str] | ColumnSelection = ()
-
dtypes: Mapping[str, object] | Iterable[tuple[str, object]] | ColumnCasts = ()
-
rename: Mapping[str, str] | Iterable[tuple[str, str]] = ()
-
drop: Iterable[str] = ()
-
normalize_column_names(df)[source]
Normalize all column names on a Polars frame.
- Parameters:
df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to rename.
- Returns:
Frame with normalized column names.
- Return type:
pl.DataFrame | pl.LazyFrame
-
normalize_column_name(name)[source]
Return a normalized column name.
- Parameters:
name (object) – Source column name to normalize.
- Returns:
Lowercase column name with separator-adjacent spaces removed, remaining
whitespace collapsed, and spaces replaced with underscores.
- Return type:
str
-
normalize_column_names(df, columns=None)[source]
Normalize column names on a Polars frame.
- Parameters:
df (pl.DataFrame | pl.LazyFrame) – Eager or lazy Polars frame to rename.
columns (Iterable[object] | None) – Optional subset of column names to normalize. When omitted, all frame
columns are normalized.
- Returns:
Frame with normalized column names.
- Return type:
pl.DataFrame | pl.LazyFrame
-
normalized_column_renames(columns)[source]
Return a Polars rename mapping for normalized column names.
- Parameters:
columns (Iterable[object]) – Source column names to normalize.
- Returns:
Mapping from original column names to normalized names, excluding names
that are already normalized.
- Return type:
dict[str, str]
Polars namespace helpers for Data Engine flow authoring.
-
networkdays(start, end, *, holidays=None, count_first_day=False, mask=None)[source]
Return Excel-style business-day counts as a Polars expression.
This helper matches Excel NETWORKDAYS semantics by counting both
endpoints when they are business days. Weekends default to Saturday/Sunday,
and optional holidays are excluded from the count.
The one intentional extension is count_first_day. When enabled, the
start date is still counted even if it falls on a masked weekday or one of
the supplied holidays.
- Parameters:
start (ExprLike) – Start date expression, column name, or scalar date/datetime.
end (ExprLike) – End date expression, column name, or scalar date/datetime.
holidays (HolidayDates) – Optional holiday dates removed from the business-day count. String
values must use ISO date text such as "2026-04-15".
count_first_day (bool) – Whether to force the first day into the count when it would normally be
excluded by the weekday mask or holiday list.
mask (Iterable[bool] | None) – Monday-first seven-item business-day mask. Every item must be a real
bool. None uses the Excel default: Monday-Friday counted,
Saturday-Sunday excluded.
- Returns:
Expression that evaluates to the signed business-day count. Datetime
inputs are normalized to their calendar date before counting.
- Return type:
pl.Expr
Examples
Add a row-level business-day count:
from datetime import date
import polars as pl
import data_engine.helpers
df = pl.DataFrame(
{
"received_date": [date(2026, 4, 13), date(2026, 4, 14)],
"due_date": [date(2026, 4, 17), date(2026, 4, 21)],
}
).with_columns(
business_days=data_engine.helpers.networkdays(
"received_date",
"due_date",
holidays=[date(2026, 4, 15)],
)
)
Use scalar datetimes and count the first day:
from datetime import datetime
df = df.with_columns(
sla_days=data_engine.helpers.networkdays(
datetime(2026, 4, 13, 8, 30),
pl.col("resolved_at"),
count_first_day=True,
)
)
Chain the expression into a grouped cumulative total:
df = (
df.sort(["claim_id", "sequence_number"])
.with_columns(
cumulative_business_days=
pl.when(pl.col("use_days"))
.then(
data_engine.helpers.networkdays(
"start_date",
"end_date",
holidays=[date(2026, 4, 15)],
)
)
.otherwise(pl.lit(0))
.cum_sum()
.over("claim_id")
)
)
Notes
networkdays(...) returns a normal pl.Expr. You can chain it into
cum_sum(), window expressions, filters, and any other Polars expression
pipeline.
-
workday(start, days, *, holidays=None, count_first_day=False, mask=None)[source]
Return Excel-style workday offsets as a Polars expression.
This helper mirrors Excel WORKDAY by returning the business date that
falls the requested number of working days before or after start.
The one intentional extension is count_first_day. When enabled, the
start date itself can be day 1, even if it falls on a masked weekday or one
of the supplied holidays.
- Parameters:
start (ExprLike) – Start date expression, column name, or scalar date/datetime.
days (IntExprLike) – Signed business-day offset expression, column name, or scalar integer.
holidays (HolidayDates) – Optional holiday dates skipped while calculating the result. String
values must use ISO date text such as "2026-04-15".
count_first_day (bool) – Whether the start date itself can count as day 1 when moving forward or
backward through business days.
mask (Iterable[bool] | None) – Monday-first seven-item business-day mask. Every item must be a real
bool. None uses the Excel default: Monday-Friday counted,
Saturday-Sunday excluded.
- Returns:
Expression that evaluates to a Date result. Datetime inputs are
normalized to their calendar date before offsetting.
- Return type:
pl.Expr
Examples
Add one target business date column:
from datetime import date
import polars as pl
import data_engine.helpers
df = pl.DataFrame(
{
"received_date": [date(2026, 4, 13), date(2026, 4, 14)],
"sla_days": [3, 5],
}
).with_columns(
due_date=data_engine.helpers.workday(
"received_date",
"sla_days",
holidays=[date(2026, 4, 15)],
)
)
Count the start date as day 1:
df = df.with_columns(
due_date=data_engine.helpers.workday(
"received_date",
"sla_days",
holidays=[date(2026, 4, 15)],
count_first_day=True,
)
)
Use a custom weekday mask where Saturday is also a business day:
df = df.with_columns(
due_date=data_engine.helpers.workday(
"received_date",
"sla_days",
mask=(True, True, True, True, True, True, False),
)
)
-
remove_null_columns(frame)[source]
Return a frame without columns that contain no non-null values.
Columns are kept when at least one row contains a non-null value. Columns
containing only nulls are removed. For zero-row dataframes, every column is
considered empty and the returned dataframe has no columns.
- Parameters:
frame (PolarsFrame) – Dataframe or lazy frame to trim.
- Returns:
Frame containing only columns with at least one non-null value. Lazy
inputs return lazy frames.
- Return type:
PolarsFrame
Examples
clean = data_engine.helpers.remove_null_columns(df)
clean = df.de.remove_null_columns()
-
propagate_last_value(value, *, by, sort_by, where=None, descending=False, nulls_last=False, ignore_nulls=True)[source]
Return an expression that broadcasts the last ordered value per window.
The helper sorts rows inside each by window, optionally filters the
ordered rows with where, takes the last value from that ordered
candidate set, and propagates it to every row in the same window. Null
values are ignored by default, which matches the common pattern where only
one row in a window contains the value to carry across the group.
- Parameters:
value (ColumnExpr) – Column name or expression containing the value to propagate.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define the last row in each window.
where (pl.Expr | None) – Optional row predicate that limits which sorted rows can supply the
propagated value.
descending (DescendingLike) – Sort direction passed to Expr.sort_by.
nulls_last (bool) – Whether null sort-key values are ordered last.
ignore_nulls (bool) – Whether null value rows are skipped before taking the last value.
- Returns:
Window expression suitable for with_columns or select.
- Return type:
pl.Expr
Examples
Propagate the latest non-null status to every row for a claim:
df = df.with_columns(
latest_status=data_engine.helpers.propagate_last_value(
"status",
by="claim_id",
sort_by="claim_step_index",
)
)
Propagate the timestamp from the last Archive row to every row for a
claim. The output column is named by with_columns:
df = df.with_columns(
archived_at=data_engine.helpers.propagate_last_value(
pl.col("archive_date").dt.combine(pl.col("archive_time")),
by="claim_id",
sort_by="claim_step_index",
where=pl.col("status") == "Archive",
)
)
Compose the predicate to use the last row that is not Archive:
df = df.with_columns(
last_active_at=data_engine.helpers.propagate_last_value(
pl.col("event_date").dt.combine(pl.col("event_time")),
by="claim_id",
sort_by="claim_step_index",
where=pl.col("status") != "Archive",
)
)
-
propagate_first_value(value, *, by, sort_by, where=None, descending=False, nulls_last=False, ignore_nulls=True)[source]
Return an expression that broadcasts the first ordered value per window.
The helper sorts rows inside each by window, optionally filters the
ordered rows with where, takes the first value from that ordered
candidate set, and propagates it to every row in the same window. Null
values are ignored by default.
- Parameters:
value (ColumnExpr) – Column name or expression containing the value to propagate.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define the first row in each window.
where (pl.Expr | None) – Optional row predicate that limits which sorted rows can supply the
propagated value.
descending (DescendingLike) – Sort direction passed to Expr.sort_by.
nulls_last (bool) – Whether null sort-key values are ordered last.
ignore_nulls (bool) – Whether null value rows are skipped before taking the first value.
- Returns:
Window expression suitable for with_columns or select.
- Return type:
pl.Expr
Examples
Propagate the first non-null status to every row for a claim:
df = df.with_columns(
first_status=data_engine.helpers.propagate_first_value(
"status",
by="claim_id",
sort_by="claim_step_index",
)
)
-
visit_counter(value, *, by, sort_by, descending=False, nulls_last=False)[source]
Return a per-value contiguous-run visit number inside each window.
Rows are ordered inside each by window, then consecutive rows with the
same value are treated as one visit. When a value leaves and later
returns in the same window, the returned run gets the next visit number for
that value.
- Parameters:
value (ColumnExpr) – Column name or expression containing the state to count visits for.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define row sequence inside each
window.
descending (DescendingLike) – Sort direction passed to DataFrame.sort for the in-window order.
nulls_last (bool) – Whether null sort-key values are ordered last.
- Returns:
Unsigned integer expression containing the one-based visit number for
each row’s current value.
- Return type:
pl.Expr
Examples
Count repeated workflow visits for each document:
df = df.with_columns(
workflow_visit=data_engine.helpers.visit_counter(
"workflow",
by="document_id",
sort_by="step_index",
)
)
For a document with workflow runs w1, w1, w1, w2, w2, w1, the result
is 1, 1, 1, 1, 1, 2.
-
write_parquet_atomic(df, path, **write_options)[source]
Write a Polars dataframe to parquet with same-directory atomic replacement.
The dataframe is first written to a unique temporary file beside the target,
then moved into place with os.replace. This keeps readers from seeing a
partially written parquet file while preserving normal Polars write options.
- Parameters:
df (pl.DataFrame) – Eager Polars dataframe to write.
path (PathLike) – Target parquet path.
**write_options (object) – Keyword options forwarded to pl.DataFrame.write_parquet.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
Examples
import polars as pl
from data_engine.helpers import write_parquet_atomic
target = write_parquet_atomic(
pl.DataFrame({"claim_id": [1, 2]}),
"workspaces/example/output/docs.parquet",
)
df = pl.DataFrame({"claim_id": [3]})
df.de.write_parquet_atomic(target)
-
write_excel_atomic(df, path, worksheet=None, **write_options)[source]
Write a Polars dataframe to Excel with same-directory atomic replacement.
The dataframe is first written to a unique temporary workbook beside the
target, then moved into place with os.replace. All keyword options are
forwarded to pl.DataFrame.write_excel.
- Parameters:
df (pl.DataFrame) – Eager Polars dataframe to write.
path (PathLike) – Target Excel workbook path.
worksheet (str | None) – Optional worksheet name forwarded to pl.DataFrame.write_excel.
**write_options (object) – Keyword options forwarded to pl.DataFrame.write_excel.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
Examples
import polars as pl
from data_engine.helpers import write_excel_atomic
target = write_excel_atomic(
pl.DataFrame({"claim_id": [1, 2]}),
"workspaces/example/output/docs.xlsx",
worksheet="Docs",
table_name="docs",
autofit=True,
)
df = pl.DataFrame({"claim_id": [3]})
df.de.write_excel_atomic(target, worksheet="Docs")
-
sink_parquet_atomic(lf, path, **sink_options)[source]
Sink a Polars lazy frame to parquet with same-directory atomic replacement.
The lazy query is executed into a unique temporary file beside the target,
then moved into place with os.replace. Use the default eager sink mode so
the helper can complete the replacement in the same call.
- Parameters:
lf (pl.LazyFrame) – Lazy Polars frame to execute and write.
path (PathLike) – Target parquet path.
**sink_options (object) – Keyword options forwarded to pl.LazyFrame.sink_parquet.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
- Raises:
ValueError – If lazy=True is passed.
Examples
import polars as pl
import data_engine.helpers
lf = pl.DataFrame({"claim_id": [1, 2]}).lazy()
lf.de.sink_parquet_atomic("workspaces/example/output/docs.parquet")
-
class DataEngineDataFrameNamespace(df)[source]
Bases: object
Data Engine helpers available from pl.DataFrame.de.
- Parameters:
df (pl.DataFrame)
-
normalize_column_names(columns=None)[source]
Normalize column names on this dataframe.
- Parameters:
columns (Iterable[object] | None) – Optional subset of column names to normalize. When omitted, all
dataframe columns are normalized.
- Returns:
Dataframe with normalized column names.
- Return type:
pl.DataFrame
-
remove_null_columns()[source]
Remove columns from this dataframe when every value is null.
- Returns:
Dataframe containing only columns with at least one non-null value.
- Return type:
pl.DataFrame
-
networkdays(start, end, *, holidays=None, count_first_day=False, mask=None)[source]
Return an Excel-style business-day count expression for this dataframe.
This is a convenience wrapper around data_engine.helpers.networkdays().
The returned value is still a normal pl.Expr, so it can be chained
into cumulative windows and other Polars expressions.
- Parameters:
start (ExprLike) – Start date expression, column name, or scalar date/datetime.
end (ExprLike) – End date expression, column name, or scalar date/datetime.
holidays (HolidayDates) – Optional holiday dates removed from the business-day count.
count_first_day (bool) – Whether to force the first day into the count when it would
normally be excluded.
mask (Iterable[bool] | None) – Monday-first seven-item business-day mask.
- Returns:
Expression that evaluates to the signed business-day count.
- Return type:
pl.Expr
Example
df = df.with_columns(
business_days=df.de.networkdays(
"start_date",
"end_date",
holidays=[date(2026, 4, 15)],
)
)
df = df.sort(["claim_id", "sequence_number"]).with_columns(
cumulative_business_days=
pl.when(pl.col("use_days"))
.then(df.de.networkdays("start_date", "end_date"))
.otherwise(pl.lit(0))
.cum_sum()
.over("claim_id")
)
-
workday(start, days, *, holidays=None, count_first_day=False, mask=None)[source]
Return an Excel-style workday offset expression for this dataframe.
This is a convenience wrapper around data_engine.helpers.workday().
- Parameters:
start (ExprLike) – Start date expression, column name, or scalar date/datetime.
days (IntExprLike) – Signed business-day offset expression, column name, or scalar
integer.
holidays (HolidayDates) – Optional holiday dates skipped while calculating the result.
count_first_day (bool) – Whether the start date itself can count as day 1.
mask (Iterable[bool] | None) – Monday-first seven-item business-day mask.
- Returns:
Expression that evaluates to a Date result.
- Return type:
pl.Expr
Example
df = df.with_columns(
due_date=df.de.workday(
"received_date",
"sla_days",
holidays=[date(2026, 4, 15)],
)
)
-
propagate_last_value(value, *, by, sort_by, where=None, descending=False, nulls_last=False, ignore_nulls=True)[source]
Return an expression broadcasting the last ordered value per window.
This is a convenience wrapper around
data_engine.helpers.propagate_last_value().
- Parameters:
value (ColumnExpr) – Column name or expression containing the value to propagate.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define the last row in each
window.
where (pl.Expr | None) – Optional row predicate that limits which sorted rows can supply the
propagated value.
descending (DescendingLike) – Sort direction passed to Expr.sort_by.
nulls_last (bool) – Whether null sort-key values are ordered last.
ignore_nulls (bool) – Whether null value rows are skipped before taking the last
value.
- Returns:
Window expression suitable for with_columns or select.
- Return type:
pl.Expr
-
propagate_first_value(value, *, by, sort_by, where=None, descending=False, nulls_last=False, ignore_nulls=True)[source]
Return an expression broadcasting the first ordered value per window.
This is a convenience wrapper around
data_engine.helpers.propagate_first_value().
- Parameters:
value (ColumnExpr) – Column name or expression containing the value to propagate.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define the first row in each
window.
where (pl.Expr | None) – Optional row predicate that limits which sorted rows can supply the
propagated value.
descending (DescendingLike) – Sort direction passed to Expr.sort_by.
nulls_last (bool) – Whether null sort-key values are ordered last.
ignore_nulls (bool) – Whether null value rows are skipped before taking the first
value.
- Returns:
Window expression suitable for with_columns or select.
- Return type:
pl.Expr
-
visit_counter(value, *, by, sort_by, descending=False, nulls_last=False)[source]
Return a per-value contiguous-run visit number expression.
This is a convenience wrapper around
data_engine.helpers.visit_counter().
- Parameters:
value (ColumnExpr) – Column name or expression containing the state to count visits for.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define row sequence inside each
window.
descending (DescendingLike) – Sort direction passed to DataFrame.sort for the in-window
order.
nulls_last (bool) – Whether null sort-key values are ordered last.
- Returns:
Unsigned integer expression containing the one-based visit number
for each row’s current value.
- Return type:
pl.Expr
-
write_parquet_atomic(path, **write_options)[source]
Write this dataframe to parquet with atomic target replacement.
- Parameters:
-
- Returns:
Absolute target path that was replaced.
- Return type:
Path
-
write_excel_atomic(path, worksheet=None, **write_options)[source]
Write this dataframe to Excel with atomic target replacement.
- Parameters:
path (PathLike) – Target Excel workbook path.
worksheet (str | None) – Optional worksheet name forwarded to pl.DataFrame.write_excel.
**write_options (object) – Keyword options forwarded to pl.DataFrame.write_excel.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
-
compose_excel(path, *, sheet_name, table_name=None, template=None, position='A1', table_style=None, autofit=True, autofilter=True, freeze_panes=None, **write_options)[source]
Compose this dataframe into a single-sheet Excel workbook.
This is a convenience wrapper around
data_engine.helpers.compose_excel().
- Parameters:
path (PathLike) – Target Excel workbook path.
sheet_name (str) – Worksheet name to write.
table_name (str | None) – Optional Excel table name for this dataframe.
template (PathLike | None) – Optional template workbook path.
position (str | tuple[int, int]) – Top-left cell where the dataframe table should be written.
table_style (str | dict[str, object] | None) – Optional table style.
autofit (bool) – Whether Polars should autofit columns in fresh-workbook mode.
autofilter (bool) – Whether the generated table should include filter controls.
freeze_panes (object | None) – Optional freeze-pane setting.
**write_options (object) – Additional write options forwarded to the sheet specification.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
-
build_dimension(db_path, table, *, key_column='dimension_key', return_df=True)[source]
Build or extend one DuckDB dimension table from this dataframe.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
key_column (str) – Surrogate key column to create in the dimension table.
return_df (bool) – Whether to return the mapping dataframe for this dataframe’s
natural keys.
- Returns:
Mapping dataframe when return_df is true; otherwise None.
- Return type:
pl.DataFrame | None
-
attach_dimension(db_path, table, *, on, key_column='dimension_key', drop_key=False)[source]
Attach an existing DuckDB dimension key to this dataframe.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
on (ColumnNames) – Natural key column or columns used to join to the dimension table.
key_column (str) – Surrogate key column to attach.
drop_key (bool) – Whether to drop the natural key columns after attaching the
surrogate key.
- Returns:
Dataframe with the surrogate key column attached.
- Return type:
pl.DataFrame
-
denormalize_columns(db_path, table, *, key_column='dimension_key', select='*', drop_key=False)[source]
Attach natural columns from an existing DuckDB dimension table.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
key_column (str) – Surrogate key column used to join to the dimension table.
select (ColumnNames) – Natural columns to attach, or "*" for all non-key columns.
drop_key (bool) – Whether to drop key_column after attaching the natural columns.
- Returns:
Dataframe with selected dimension columns attached.
- Return type:
pl.DataFrame
-
normalize_columns(db_path, table, *, on, key_column='dimension_key', drop_key=True, returns='df')[source]
Build dimension keys and attach them back onto this dataframe.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
on (ColumnNames) – Natural key column or columns used to build the dimension.
key_column (str) – Surrogate key column to create and attach.
drop_key (bool) – Whether to drop natural key columns after attaching the surrogate
key.
returns (ReturnMode) – "df" for normalized input rows, "map" for only the key
mapping, or None to only persist dimension rows.
- Returns:
Normalized dataframe, mapping dataframe, or None according to
returns.
- Return type:
pl.DataFrame | None
-
replace_rows_by_file(db_path, table, *, file_hash, file_hash_column='file_key', return_df=True)[source]
Replace one file’s DuckDB rows and append this dataframe.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
file_hash (str) – Stable source-file identifier used to delete the previous batch.
file_hash_column (str) – Column name used to store file_hash in the destination table.
return_df (bool) – Whether to return this dataframe with the file hash column attached.
- Returns:
Inserted rows with file_hash_column when return_df is true;
otherwise None.
- Return type:
pl.DataFrame | None
-
replace_rows_by_values(db_path, table, *, column, return_df=True)[source]
Replace DuckDB rows matching this dataframe’s values for one column.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
column (str) – Column whose incoming values define the rows to replace.
return_df (bool) – Whether to return the inserted dataframe.
- Returns:
Inserted dataframe when return_df is true; otherwise None.
- Return type:
pl.DataFrame | None
-
replace_table(db_path, table, *, return_df=True)[source]
Replace one DuckDB table wholesale from this dataframe.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
return_df (bool) – Whether to return the inserted dataframe.
- Returns:
Inserted dataframe when return_df is true; otherwise None.
- Return type:
pl.DataFrame | None
-
class DataEngineLazyFrameNamespace(lf)[source]
Bases: object
Data Engine helpers available from pl.LazyFrame.de.
- Parameters:
lf (pl.LazyFrame)
-
normalize_column_names(columns=None)[source]
Normalize column names on this lazy frame.
- Parameters:
columns (Iterable[object] | None) – Optional subset of column names to normalize. When omitted, all
lazy-frame columns are normalized.
- Returns:
Lazy frame with normalized column names.
- Return type:
pl.LazyFrame
-
remove_null_columns()[source]
Remove columns from this lazy frame when every value is null.
- Returns:
Lazy frame containing only columns with at least one non-null
value.
- Return type:
pl.LazyFrame
-
networkdays(start, end, *, holidays=None, count_first_day=False, mask=None)[source]
Return an Excel-style business-day count expression for this lazy frame.
This is a convenience wrapper around data_engine.helpers.networkdays().
The returned value stays lazy and can be chained into window
expressions before collect().
- Parameters:
start (ExprLike) – Start date expression, column name, or scalar date/datetime.
end (ExprLike) – End date expression, column name, or scalar date/datetime.
holidays (HolidayDates) – Optional holiday dates removed from the business-day count.
count_first_day (bool) – Whether to force the first day into the count when it would
normally be excluded.
mask (Iterable[bool] | None) – Monday-first seven-item business-day mask.
- Returns:
Expression that evaluates to the signed business-day count.
- Return type:
pl.Expr
-
workday(start, days, *, holidays=None, count_first_day=False, mask=None)[source]
Return an Excel-style workday offset expression for this lazy frame.
This is a convenience wrapper around data_engine.helpers.workday().
- Parameters:
start (ExprLike) – Start date expression, column name, or scalar date/datetime.
days (IntExprLike) – Signed business-day offset expression, column name, or scalar
integer.
holidays (HolidayDates) – Optional holiday dates skipped while calculating the result.
count_first_day (bool) – Whether the start date itself can count as day 1.
mask (Iterable[bool] | None) – Monday-first seven-item business-day mask.
- Returns:
Expression that evaluates to a Date result.
- Return type:
pl.Expr
-
propagate_last_value(value, *, by, sort_by, where=None, descending=False, nulls_last=False, ignore_nulls=True)[source]
Return an expression broadcasting the last ordered value per window.
This is a convenience wrapper around
data_engine.helpers.propagate_last_value().
- Parameters:
value (ColumnExpr) – Column name or expression containing the value to propagate.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define the last row in each
window.
where (pl.Expr | None) – Optional row predicate that limits which sorted rows can supply the
propagated value.
descending (DescendingLike) – Sort direction passed to Expr.sort_by.
nulls_last (bool) – Whether null sort-key values are ordered last.
ignore_nulls (bool) – Whether null value rows are skipped before taking the last
value.
- Returns:
Window expression suitable for with_columns or select.
- Return type:
pl.Expr
-
propagate_first_value(value, *, by, sort_by, where=None, descending=False, nulls_last=False, ignore_nulls=True)[source]
Return an expression broadcasting the first ordered value per window.
This is a convenience wrapper around
data_engine.helpers.propagate_first_value().
- Parameters:
value (ColumnExpr) – Column name or expression containing the value to propagate.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define the first row in each
window.
where (pl.Expr | None) – Optional row predicate that limits which sorted rows can supply the
propagated value.
descending (DescendingLike) – Sort direction passed to Expr.sort_by.
nulls_last (bool) – Whether null sort-key values are ordered last.
ignore_nulls (bool) – Whether null value rows are skipped before taking the first
value.
- Returns:
Window expression suitable for with_columns or select.
- Return type:
pl.Expr
-
visit_counter(value, *, by, sort_by, descending=False, nulls_last=False)[source]
Return a per-value contiguous-run visit number expression.
This is a convenience wrapper around
data_engine.helpers.visit_counter().
- Parameters:
value (ColumnExpr) – Column name or expression containing the state to count visits for.
by (ColumnExprs) – Window column or columns.
sort_by (ColumnExprs) – Ordering column or columns used to define row sequence inside each
window.
descending (DescendingLike) – Sort direction passed to DataFrame.sort for the in-window
order.
nulls_last (bool) – Whether null sort-key values are ordered last.
- Returns:
Unsigned integer expression containing the one-based visit number
for each row’s current value.
- Return type:
pl.Expr
-
sink_parquet_atomic(path, **sink_options)[source]
Execute this lazy frame to parquet with atomic target replacement.
- Parameters:
-
- Returns:
Absolute target path that was replaced.
- Return type:
Path
-
compose_excel(path, *, sheet_name, table_name=None, template=None, position='A1', table_style=None, autofit=True, autofilter=True, freeze_panes=None, **write_options)[source]
Compose this lazy frame into a single-sheet Excel workbook.
This is a convenience wrapper around
data_engine.helpers.compose_excel(). The lazy frame is collected
when the workbook is composed.
- Parameters:
path (PathLike) – Target Excel workbook path.
sheet_name (str) – Worksheet name to write.
table_name (str | None) – Optional Excel table name for this lazy frame.
template (PathLike | None) – Optional template workbook path.
position (str | tuple[int, int]) – Top-left cell where the lazy frame table should be written.
table_style (str | dict[str, object] | None) – Optional table style.
autofit (bool) – Whether Polars should autofit columns in fresh-workbook mode.
autofilter (bool) – Whether the generated table should include filter controls.
freeze_panes (object | None) – Optional freeze-pane setting.
**write_options (object) – Additional write options forwarded to the sheet specification.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
-
build_dimension(db_path, table, *, key_column='dimension_key', return_df=True)[source]
Build or extend one DuckDB dimension table from this lazy frame.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
key_column (str) – Surrogate key column to create in the dimension table.
return_df (bool) – Whether to return the mapping dataframe for this lazy frame’s
natural keys.
- Returns:
Mapping dataframe when return_df is true; otherwise None.
- Return type:
pl.DataFrame | None
-
attach_dimension(db_path, table, *, on, key_column='dimension_key', drop_key=False)[source]
Attach an existing DuckDB dimension key to this lazy frame.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
on (ColumnNames) – Natural key column or columns used to join to the dimension table.
key_column (str) – Surrogate key column to attach.
drop_key (bool) – Whether to drop the natural key columns after attaching the
surrogate key.
- Returns:
Collected dataframe with the surrogate key column attached.
- Return type:
pl.DataFrame
-
denormalize_columns(db_path, table, *, key_column='dimension_key', select='*', drop_key=False)[source]
Attach natural columns from an existing DuckDB dimension table.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
key_column (str) – Surrogate key column used to join to the dimension table.
select (ColumnNames) – Natural columns to attach, or "*" for all non-key columns.
drop_key (bool) – Whether to drop key_column after attaching the natural columns.
- Returns:
Collected dataframe with selected dimension columns attached.
- Return type:
pl.DataFrame
-
normalize_columns(db_path, table, *, on, key_column='dimension_key', drop_key=True, returns='df')[source]
Build dimension keys and attach them back onto this lazy frame.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Dimension table name, optionally schema-qualified.
on (ColumnNames) – Natural key column or columns used to build the dimension.
key_column (str) – Surrogate key column to create and attach.
drop_key (bool) – Whether to drop natural key columns after attaching the surrogate
key.
returns (ReturnMode) – "df" for normalized input rows, "map" for only the key
mapping, or None to only persist dimension rows.
- Returns:
Normalized dataframe, mapping dataframe, or None according to
returns.
- Return type:
pl.DataFrame | None
-
replace_rows_by_file(db_path, table, *, file_hash, file_hash_column='file_key', return_df=True)[source]
Replace one file’s DuckDB rows and append this lazy frame.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
file_hash (str) – Stable source-file identifier used to delete the previous batch.
file_hash_column (str) – Column name used to store file_hash in the destination table.
return_df (bool) – Whether to return the collected frame with the file hash column
attached.
- Returns:
Inserted rows with file_hash_column when return_df is true;
otherwise None.
- Return type:
pl.DataFrame | None
-
replace_rows_by_values(db_path, table, *, column, return_df=True)[source]
Replace DuckDB rows matching this lazy frame’s values for one column.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
column (str) – Column whose incoming values define the rows to replace.
return_df (bool) – Whether to return the inserted dataframe.
- Returns:
Inserted dataframe when return_df is true; otherwise None.
- Return type:
pl.DataFrame | None
-
replace_table(db_path, table, *, return_df=True)[source]
Replace one DuckDB table wholesale from this lazy frame.
- Parameters:
db_path (PathLike) – DuckDB database file path.
table (str) – Destination table name, optionally schema-qualified.
return_df (bool) – Whether to return the inserted dataframe.
- Returns:
Inserted dataframe when return_df is true; otherwise None.
- Return type:
pl.DataFrame | None
Excel workbook composition helpers for Data Engine flow authoring.
-
class ExcelSheet(name, df, table_name=None, position='A1', table_style=None, autofit=True, autofilter=True, freeze_panes=None, write_options=<factory>)[source]
Bases: object
Specification for one dataframe-backed worksheet in an Excel workbook.
- Variables:
name (str) – Worksheet name. Names must follow Excel’s normal sheet-name limits.
df (ExcelFrame) – Dataframe to write. Lazy frames are collected when the workbook is
composed.
table_name (str | None) – Optional Excel table name for the written dataframe.
position (str | tuple[int, int]) – Top-left cell where the dataframe table should be written.
table_style (str | dict[str, Any] | None) – Optional table style forwarded to pl.DataFrame.write_excel.
autofit (bool) – Whether Polars should autofit columns after writing.
autofilter (bool) – Whether the generated table should include filter controls.
freeze_panes (object | None) – Optional freeze-pane setting forwarded to pl.DataFrame.write_excel.
write_options (dict[str, Any]) – Additional keyword options forwarded to pl.DataFrame.write_excel.
- Parameters:
name (str)
df (DataFrame | LazyFrame)
table_name (str | None)
position (str | tuple[int, int])
table_style (str | dict[str, Any] | None)
autofit (bool)
autofilter (bool)
freeze_panes (object | None)
write_options (dict[str, Any])
-
name: str
-
df: DataFrame | LazyFrame
-
table_name: str | None = None
-
position: str | tuple[int, int] = 'A1'
-
table_style: str | dict[str, Any] | None = None
-
autofit: bool = True
-
autofilter: bool = True
-
freeze_panes: object | None = None
-
write_options: dict[str, Any]
-
compose_excel(path, sheets, *, template=None)[source]
Compose one Excel workbook from dataframe-backed sheet specifications.
The workbook is written to a same-directory temporary file and then moved
into place with os.replace. Each sheet is written through Polars’
Excel writer, so sheet-level options map directly to
pl.DataFrame.write_excel behavior.
- Parameters:
path (PathLike) – Target Excel workbook path.
sheets (Sequence[ExcelSheet]) – One or more worksheet specifications to write.
template (PathLike | None) – Optional template workbook path. When provided, the template workbook is
copied to a temporary file, the requested sheets are updated, and the
composed copy atomically replaces path.
- Returns:
Absolute target path that was replaced.
- Return type:
Path
- Raises:
Exception – If workbook validation, dataframe collection, Excel writing, or atomic
replacement fails.
Examples
import polars as pl
from data_engine.helpers import ExcelSheet, compose_excel
compose_excel(
"workspaces/example/output/report.xlsx",
sheets=[
ExcelSheet(
name="Claims",
df=pl.DataFrame({"claim_id": [1, 2]}),
table_name="claims",
freeze_panes="A2",
),
],
)
Public one-shot DuckDB helpers for flow authoring.
-
attach_dimension(db_path, table, *, df, on, key_column='dimension_key', drop_key=False)[source]
Attach an existing surrogate key mapping table to an input dataframe.
- Parameters:
-
-
build_dimension(db_path, table, *, df, key_column='dimension_key', return_df=True)[source]
Build or extend one dimension table from unique incoming row combinations.
- Parameters:
-
-
compact_database(db_path, *, tables=None, drop_all_null_columns=True, vacuum=True)[source]
Compact one DuckDB database by dropping all-null columns and optionally vacuuming.
Existing indexes on compacted tables are dropped before column removal and
recreated afterward when their original CREATE INDEX statement still
applies to the compacted table. Indexes that reference dropped columns are
reported as skipped.
- Parameters:
-
- Return type:
DataFrame
-
denormalize_columns(db_path, table, *, df, key_column='dimension_key', select='*', drop_key=False)[source]
Attach natural columns from an existing dimension table onto a keyed dataframe.
- Parameters:
-
-
ensure_index(db_path, table, *, columns, name=None)[source]
Create one DuckDB index if it does not already exist.
- Parameters:
db_path (str | Path) – DuckDB database file path.
table (str) – Target table name, optionally schema-qualified.
columns (str | list[str] | tuple[str, ...]) – Column or columns to index.
name (str | None) – Optional index name. When omitted, Data Engine generates a stable name
from the table and columns.
- Returns:
Index name that exists after the call.
- Return type:
str
- Raises:
ValueError – If the table does not exist, selected columns do not exist, or the
provided index name is empty.
Examples
Index a file-slice column before repeated replace_rows_by_file calls:
data_engine.helpers.duckdb.ensure_index(
context.database("warehouse.duckdb"),
"fact_claim",
columns="file_key",
)
Index a lookup column before repeated read_rows_by_values calls:
data_engine.helpers.duckdb.ensure_index(
context.database("warehouse.duckdb"),
"fact_claim",
columns="claim_id",
name="idx_fact_claim_claim_id",
)
-
normalize_columns(db_path, table, *, df, on, key_column='dimension_key', drop_key=True, returns='df')[source]
Build missing surrogate keys and attach them back onto the input dataframe.
- Parameters:
-
-
read_rows_by_values(db_path, table, *, column, is_in, select)[source]
Return selected columns for rows whose one column matches provided values.
- Parameters:
db_path (str | Path) – DuckDB database file path.
table (str) – Source table name, optionally schema-qualified.
column (str) – Column matched against is_in.
is_in (list[object] | tuple[object, ...]) – Values to include.
select (str | list[str] | tuple[str, ...]) – Columns to return, or "*" for all columns.
- Returns:
Selected matching rows in input order by distinct lookup values.
- Return type:
pl.DataFrame
- Raises:
ValueError – If the table, column, or selected columns are invalid.
-
read_sql(db_path, *, sql)[source]
Run one SQL query against DuckDB and return the result as a Polars dataframe.
- Parameters:
-
- Returns:
Query result as a Polars dataframe.
- Return type:
pl.DataFrame
- Raises:
ValueError – If sql is empty.
-
read_table(db_path, table, *, select='*', where=None, limit=None)[source]
Read rows from one DuckDB table into a Polars dataframe.
- Parameters:
db_path (str | Path) – DuckDB database file path.
table (str) – Source table name, optionally schema-qualified.
select (str | list[str] | tuple[str, ...]) – Columns to return, or "*" for all columns.
where (str | None) – Optional raw SQL predicate appended after WHERE.
limit (int | None) – Optional row limit.
- Returns:
Selected table rows.
- Return type:
pl.DataFrame
- Raises:
ValueError – If the table, selected columns, or limit are invalid.
-
replace_rows_by_file(db_path, table, *, df, file_hash, file_hash_column='file_key', return_df=True)[source]
Atomically replace one file’s fact rows and append the current batch.
- Parameters:
-
-
replace_rows_by_values(db_path, table, *, df, column, return_df=True)[source]
Atomically replace one value-slice of rows and append the current batch.
- Parameters:
-
-
replace_table(db_path, table, *, df, return_df=True)[source]
Replace one DuckDB table wholesale from the provided dataframe.
- Parameters:
-
Host Surfaces
APScheduler-backed host for scheduled flow execution.
-
class ScheduledFlowJob(job_id, flow_name, trigger_kind)[source]
Bases: object
Description of one scheduler job owned by the host.
- Parameters:
job_id (str)
flow_name (str)
trigger_kind (str)
-
job_id: str
-
flow_name: str
-
trigger_kind: str
-
class SchedulerHost(*, runtime_engine=None, scheduler=None, job_id_prefix='data-engine:schedule:')[source]
Bases: object
Own APScheduler timing while delegating flow meaning to the runtime engine.
- Parameters:
-
-
rebuild_jobs(flows)[source]
Replace scheduler jobs from discovered scheduled flows.
- Parameters:
flows (tuple['Flow', ...])
- Return type:
tuple[ScheduledFlowJob, …]
-
start()[source]
Start the underlying scheduler.
- Return type:
None
-
shutdown(*, wait=True)[source]
Stop the underlying scheduler.
- Parameters:
wait (bool)
- Return type:
None
-
run_until_stopped(flows, stop_event)[source]
Run scheduled flow jobs until stop_event is set.
- Parameters:
-
- Return type:
tuple[ScheduledFlowJob, …]
-
class SchedulerPort(*args, **kwargs)[source]
Bases: Protocol
Small scheduler surface used by the scheduler host.
-
add_job(func, *, trigger, id, replace_existing=False, max_instances=1)[source]
Add or replace one scheduled job.
- Parameters:
id (str)
replace_existing (bool)
max_instances (int)
-
remove_job(job_id)[source]
Remove one scheduled job by id.
- Parameters:
job_id (str)
- Return type:
None
-
start()[source]
Start the scheduler.
- Return type:
None
-
shutdown(wait=True)[source]
Stop the scheduler.
- Parameters:
wait (bool)
- Return type:
None
Application Services
Daemon IPC and lifecycle services.
-
class DaemonService(*, spawn_process_func=<function spawn_daemon_process>, request_func=<function daemon_request>, is_live_func=<function is_daemon_live>, force_shutdown_func=<function force_shutdown_daemon_process>, client_error_type=<class 'data_engine.hosts.daemon.client.DaemonClientError'>)[source]
Bases: object
Thin injectable wrapper around daemon lifecycle and IPC helpers.
- Parameters:
spawn_process_func (Callable[..., object])
request_func (Callable[..., dict[str, Any]])
is_live_func (Callable[[WorkspacePaths], bool])
force_shutdown_func (Callable[..., None])
client_error_type (type[Exception])
-
spawn(paths, *, lifecycle_policy=DaemonLifecyclePolicy.PERSISTENT)[source]
Start the local workspace daemon process for the given paths.
- Parameters:
-
- Return type:
object
-
request(paths, payload, *, timeout=0.0)[source]
Send one request to the local workspace daemon.
- Parameters:
paths (WorkspacePaths)
payload (dict[str, Any])
timeout (float)
- Return type:
dict[str, Any]
-
is_live(paths)[source]
Return whether the local workspace daemon is reachable.
- Parameters:
paths (WorkspacePaths)
- Return type:
bool
-
force_shutdown(paths, *, timeout=0.5)[source]
Force-stop the local workspace daemon for the given paths.
- Parameters:
paths (WorkspacePaths)
timeout (float)
- Return type:
None
-
property client_error_type: type[Exception]
Return the daemon client exception type.
Workspace daemon state and control services.
-
class DaemonLaneUpdate(lane, flow_names=(), run_ids=(), completed_run_ids=(), step_events=(), log_entries=())[source]
Bases: object
One lane-scoped daemon update derived from snapshot changes.
- Parameters:
lane (Literal['control', 'engine', 'flow_activity', 'run_lifecycle', 'step_activity', 'log_events'])
flow_names (tuple[str, ...])
run_ids (tuple[str, ...])
completed_run_ids (tuple[str, ...])
step_events (tuple[RuntimeStepEvent, ...])
log_entries (tuple[FlowLogEntry, ...])
-
lane: Literal['control', 'engine', 'flow_activity', 'run_lifecycle', 'step_activity', 'log_events']
-
flow_names: tuple[str, ...] = ()
-
run_ids: tuple[str, ...] = ()
-
completed_run_ids: tuple[str, ...] = ()
-
step_events: tuple[RuntimeStepEvent, ...] = ()
-
log_entries: tuple[FlowLogEntry, ...] = ()
-
class DaemonStateService(*, shared_state_adapter=None)[source]
Bases: object
Own workspace daemon-manager construction and normalized snapshot access.
- Parameters:
shared_state_adapter (DaemonSharedStateAdapter | None)
-
create_manager(paths)[source]
Create one daemon-state manager for a workspace.
- Parameters:
paths (WorkspacePaths)
- Return type:
WorkspaceDaemonManager
-
sync(manager)[source]
Fetch one normalized daemon snapshot.
- Parameters:
manager (WorkspaceDaemonManager)
- Return type:
WorkspaceDaemonSnapshot
-
wait_for_update(manager, *, timeout_seconds=5.0)[source]
Wait for one daemon projection update and return the normalized snapshot.
- Parameters:
-
- Return type:
WorkspaceDaemonSnapshot
-
run_subscription_loop(manager, *, stop_event, workspace_available, on_update, timeout_seconds=1.5)[source]
Drive one long-poll subscription loop until stopped.
The daemon-state service owns the transport semantics here:
authored-workspace gating, long-poll waiting, unchanged-snapshot
suppression, and lane-scoped batch derivation. Surfaces provide only the
stop signal and the update sink.
- Parameters:
manager (WorkspaceDaemonManager)
stop_event (Event)
workspace_available (Callable[[], bool])
on_update (Callable[[DaemonUpdateBatch], None])
timeout_seconds (float)
- Return type:
None
-
static diff_update_batch(previous, current)[source]
Return lane-scoped update information for one daemon snapshot change.
- Parameters:
-
- Return type:
DaemonUpdateBatch
-
static should_run_heartbeat(*, daemon_live, transport_mode, wait_worker_alive, now_monotonic, last_sync_monotonic, last_subscription_monotonic, stale_after_seconds=15.0)[source]
Return whether fallback heartbeat sync should run right now.
- Parameters:
daemon_live (bool)
transport_mode (str)
wait_worker_alive (bool)
now_monotonic (float)
last_sync_monotonic (float)
last_subscription_monotonic (float)
stale_after_seconds (float)
- Return type:
bool
-
control_state(manager, snapshot, *, daemon_startup_in_progress=False)[source]
Build structured workspace control state from one daemon snapshot.
- Parameters:
manager (WorkspaceDaemonManager)
snapshot (WorkspaceDaemonSnapshot)
daemon_startup_in_progress (bool)
- Return type:
WorkspaceControlState
-
request_control(manager)[source]
Request workspace control through one daemon-state manager.
- Parameters:
manager (WorkspaceDaemonManager)
- Return type:
str
-
class DaemonUpdateBatch(snapshot, updates, requires_full_sync=False)[source]
Bases: object
One daemon snapshot plus the narrow update lanes it changed.
- Parameters:
snapshot (WorkspaceDaemonSnapshot)
updates (tuple[DaemonLaneUpdate, ...])
requires_full_sync (bool)
-
snapshot: WorkspaceDaemonSnapshot
-
updates: tuple[DaemonLaneUpdate, ...]
-
requires_full_sync: bool = False
-
property changed_flow_names: tuple[str, ...]
Return the union of flow names mentioned by this batch.
-
property completed_run_ids: tuple[str, ...]
Return the union of completed run ids mentioned by this batch.
-
class DaemonUpdateSubscription(*, daemon_state_service, manager, clock, timeout_seconds=1.5, stale_after_seconds=15.0)[source]
Bases: object
Own client-side daemon subscription state for one workspace manager.
- Parameters:
daemon_state_service (DaemonStateService)
manager (WorkspaceDaemonManager)
clock (Callable[[], float])
timeout_seconds (float)
stale_after_seconds (float)
-
thread: Thread | None
-
is_alive()[source]
Return whether the background subscription worker is alive.
- Return type:
bool
-
mark_sync(now_monotonic=None)[source]
Record one successful foreground sync timestamp.
- Parameters:
now_monotonic (float | None)
- Return type:
None
-
mark_subscription(now_monotonic=None)[source]
Record one successful subscription update timestamp.
- Parameters:
now_monotonic (float | None)
- Return type:
None
-
should_run_heartbeat(snapshot)[source]
Return whether fallback heartbeat sync should run for the current snapshot.
- Return type:
bool
-
ensure_started(*, workspace_available, on_update, start_worker)[source]
Start the background subscription worker if it is not already alive.
start_worker must start the thread before returning it.
- Parameters:
workspace_available (Callable[[], bool])
on_update (Callable[[DaemonUpdateBatch], None])
start_worker (Callable[[Callable[[], None]], Thread])
- Return type:
Thread | None
-
stop()[source]
Request the background subscription worker to stop.
- Return type:
None
-
merge_update_batches(existing, incoming)[source]
Merge two pending daemon update batches into one coalesced batch.
- Parameters:
-
- Return type:
DaemonUpdateBatch
Flow catalog loading services.
-
class FlowCatalogService(*, discover_definitions_func=<function discover_flow_module_definitions>)[source]
Bases: object
Own flow catalog loading through an explicit discovery dependency.
- Parameters:
discover_definitions_func (Callable[..., tuple[FlowModuleDefinition, ...]])
-
load_entries(*, workspace_root=None)[source]
Return discovered flow catalog entries for the requested workspace root.
- Parameters:
workspace_root (Path | None)
- Return type:
tuple[FlowCatalogEntry, …]
-
flow_catalog_entry_from_flow(flow, *, description)[source]
- Parameters:
flow (Flow)
description (str | None)
- Return type:
FlowCatalogEntry
Executable flow loading services.
-
class FlowExecutionService(*, load_flow_func=<function _default_load_flow>, discover_flows_func=<function _default_discover_flows>)[source]
Bases: object
Own executable flow loading through an explicit loader dependency.
- Parameters:
load_flow_func (Callable[..., 'CoreFlow'])
discover_flows_func (Callable[..., tuple['CoreFlow', ...]])
-
load_flow(name, *, workspace_root=None)[source]
Return one executable flow definition by name.
- Parameters:
-
- Return type:
CoreFlow
-
load_flows(names, *, workspace_root=None)[source]
Return executable flow definitions for the requested names.
- Parameters:
-
- Return type:
tuple[‘CoreFlow’, …]
-
discover_flows(*, workspace_root=None)[source]
Return all executable flow definitions for the requested workspace root.
- Parameters:
workspace_root (Path | None)
- Return type:
tuple[‘CoreFlow’, …]
Runtime control-ledger services.
-
LedgerService
alias of RuntimeControlLedgerService
-
class RuntimeControlLedgerService(open_ledger_func=None, *, runtime_layout_policy=None)[source]
Bases: object
Own workspace-local runtime control-ledger access and client-session bookkeeping.
- Parameters:
open_ledger_func (Callable[[Path], RuntimeControlStore] | None)
runtime_layout_policy (RuntimeLayoutPolicy | None)
-
open_for_workspace(workspace_root)[source]
Open the configured runtime control ledger for one workspace root.
- Parameters:
workspace_root (Path)
- Return type:
RuntimeControlStore
-
close(ledger)[source]
Close one runtime control-ledger connection.
- Parameters:
ledger (RuntimeControlStore)
- Return type:
None
-
register_client_session(ledger, *, client_id, workspace_id, client_kind, pid)[source]
Register or refresh one active local client session.
- Parameters:
-
- Return type:
None
-
remove_client_session(ledger, client_id)[source]
Remove one active local client session row.
- Parameters:
-
- Return type:
None
-
purge_process_client_sessions(ledger, *, workspace_id, client_kind, pid)[source]
Remove all client sessions for one workspace/client-kind/process tuple.
- Parameters:
-
- Return type:
None
-
count_live_client_sessions(ledger, workspace_id, *, exclude_client_id=None)[source]
Return the number of currently live client sessions for one workspace.
- Parameters:
-
- Return type:
int
Operator log history services.
-
class LogService[source]
Bases: object
Own operator log-store construction and log history queries.
-
DEFAULT_VISIBLE_LOG_LIMIT = 20000
-
create_store(runtime_cache_ledger=None)[source]
Create one log store hydrated from the given runtime cache store.
- Parameters:
runtime_cache_ledger (RuntimeCacheStore | None)
- Return type:
FlowLogStore
-
reload(store, runtime_cache_ledger)[source]
Reload one log store from an explicit runtime cache store.
- Parameters:
-
- Return type:
None
-
append_entry(store, entry)[source]
Append one log entry to the current store.
- Parameters:
store (FlowLogStore)
entry (FlowLogEntry)
- Return type:
None
-
clear_flow(store, flow_name)[source]
Clear one flow’s visible log history from the current store.
- Parameters:
store (FlowLogStore)
flow_name (str | None)
- Return type:
None
-
all_entries(store)[source]
Return every entry currently held in the store.
- Parameters:
store (FlowLogStore)
- Return type:
tuple[FlowLogEntry, …]
-
transient_entries(store)[source]
Return only non-persisted live entries still held in the store.
- Parameters:
store (FlowLogStore)
- Return type:
tuple[FlowLogEntry, …]
-
entries_for_flow(store, flow_name)[source]
Return flow-scoped entries for one selected flow.
- Parameters:
store (FlowLogStore)
flow_name (str | None)
- Return type:
tuple[FlowLogEntry, …]
-
runs_for_flow(store, flow_name)[source]
Return grouped run history for one selected flow.
- Parameters:
store (FlowLogStore)
flow_name (str | None)
- Return type:
tuple[FlowRunState, …]
-
entries_from_ledger(runtime_cache_ledger, *, flow_name=None, run_id=None, after_id=None, limit=None)[source]
Return visible log entries loaded directly from one runtime cache ledger.
- Parameters:
-
- Return type:
tuple[FlowLogEntry, …]
Workspace runtime binding services for operator surfaces.
-
class WorkspaceRuntimeBinding(workspace_paths, runtime_cache_ledger, runtime_control_ledger, log_store, daemon_manager)[source]
Bases: object
Concrete runtime resources bound to one selected workspace.
- Parameters:
workspace_paths (WorkspacePaths)
runtime_cache_ledger (RuntimeCacheStore)
runtime_control_ledger (RuntimeControlStore)
log_store (FlowLogStore)
daemon_manager (WorkspaceDaemonManager)
-
workspace_paths: WorkspacePaths
-
runtime_cache_ledger: RuntimeCacheStore
-
runtime_control_ledger: RuntimeControlStore
-
log_store: FlowLogStore
-
daemon_manager: WorkspaceDaemonManager
-
class WorkspaceRuntimeBindingService(*, ledger_service, log_service, daemon_state_service, runtime_history_service, runtime_io_layer=None)[source]
Bases: object
Own concrete runtime binding lifecycle for GUI/TUI surfaces.
- Parameters:
-
-
open_binding(workspace_paths)[source]
Open one concrete runtime binding for a workspace selection.
- Parameters:
workspace_paths (WorkspacePaths)
- Return type:
WorkspaceRuntimeBinding
-
close_binding(binding)[source]
Close one concrete runtime binding.
- Parameters:
binding (WorkspaceRuntimeBinding)
- Return type:
None
-
register_client_session(binding, *, client_id, client_kind, pid=None)[source]
Register or refresh one local client session for the binding workspace.
- Parameters:
-
- Return type:
None
-
remove_client_session(binding, client_id)[source]
Remove one active local client session row.
- Parameters:
-
- Return type:
None
-
purge_process_client_sessions(binding, *, client_kind, pid=None)[source]
Remove all client sessions for this workspace/client-kind/process tuple.
- Parameters:
-
- Return type:
None
-
count_live_client_sessions(binding, *, exclude_client_id=None)[source]
Return the number of live local client sessions for the binding workspace.
- Parameters:
-
- Return type:
int
-
sync_runtime_state(binding, *, runtime_application, flow_cards, daemon_startup_in_progress=False)[source]
Return daemon/runtime sync state for one bound workspace.
- Parameters:
-
- Return type:
object
-
reload_logs(binding)[source]
Reload the binding log store from its runtime cache store.
- Parameters:
binding (WorkspaceRuntimeBinding)
- Return type:
None
-
invalidate_flow_history(binding, *, flow_name)[source]
Drop one flow’s cached logs and derived step-output state after destructive resets.
- Parameters:
-
- Return type:
None
-
rebuild_step_outputs(binding, flow_cards)[source]
Rebuild latest successful per-step output paths for visible flows.
- Parameters:
-
- Return type:
StepOutputIndex
-
error_text_for_entry(binding, run_group, entry)[source]
Return one user-facing error title and persisted error text.
- Parameters:
-
- Return type:
tuple[str, str | None]
-
recent_run_count(binding, *, days)[source]
Return the number of persisted runs started in the recent window.
- Parameters:
-
- Return type:
int
Runtime execution services for flow runs and grouped engine runs.
-
class RuntimeExecutionService(*, flow_runtime_type=<class 'data_engine.runtime.execution.single.FlowRuntime'>, grouped_runtime_type=<class 'data_engine.runtime.execution.grouped.GroupedFlowRuntime'>, runtime_engine_type=<class 'data_engine.runtime.engine.RuntimeEngine'>, scheduler_host_factory=<class 'data_engine.hosts.scheduler.SchedulerHost'>, run_stop_controller=None)[source]
Bases: object
Own executable runtime construction for manual and grouped runs.
- Parameters:
-
-
run_once(flow, *, runtime_ledger=None, runtime_stop_event=None, flow_stop_event=None, workspace_id=None)[source]
Run one flow as a one-shot execution.
- Parameters:
flow (CoreFlow)
runtime_ledger (RuntimeCacheStore | None)
runtime_stop_event (Event | None)
flow_stop_event (Event | None)
workspace_id (str | None)
- Return type:
object
-
run_source(flow, source_path, *, runtime_ledger=None, runtime_stop_event=None, flow_stop_event=None, workspace_id=None)[source]
Run one flow for a specific source path.
- Parameters:
flow (CoreFlow)
source_path (str)
runtime_ledger (RuntimeCacheStore | None)
runtime_stop_event (Event | None)
flow_stop_event (Event | None)
workspace_id (str | None)
- Return type:
object
-
run_batch(flow, *, runtime_ledger=None, runtime_stop_event=None, flow_stop_event=None, workspace_id=None)[source]
Run one flow once in batch mode.
- Parameters:
flow (CoreFlow)
runtime_ledger (RuntimeCacheStore | None)
runtime_stop_event (Event | None)
flow_stop_event (Event | None)
workspace_id (str | None)
- Return type:
object
-
preview(flow, *, use=None, runtime_ledger=None, workspace_id=None)[source]
Preview one flow through the one-shot runtime path.
- Parameters:
-
- Return type:
object
-
run_manual(flow, *, runtime_ledger, runtime_stop_event, flow_stop_event=None, workspace_id=None)[source]
Run one flow as a manual one-shot execution.
- Parameters:
flow (CoreFlow)
runtime_ledger (RuntimeCacheStore)
runtime_stop_event (Event)
flow_stop_event (Event | None)
workspace_id (str | None)
- Return type:
object
-
run_manual_and_discard(flow, *, runtime_ledger, runtime_stop_event, flow_stop_event=None, workspace_id=None)[source]
Run one flow manually while discarding completed results immediately.
- Parameters:
flow (CoreFlow)
runtime_ledger (RuntimeCacheStore)
runtime_stop_event (Event)
flow_stop_event (Event | None)
workspace_id (str | None)
- Return type:
None
-
run_continuous(flow, *, runtime_ledger=None, flow_stop_event=None, workspace_id=None)[source]
Run one flow continuously.
- Parameters:
flow (CoreFlow)
runtime_ledger (RuntimeCacheStore | None)
flow_stop_event (Event | None)
workspace_id (str | None)
- Return type:
object
-
run_grouped(flows, *, runtime_ledger, runtime_stop_event, flow_stop_event, workspace_id=None)[source]
Run grouped automated flows continuously.
- Parameters:
flows (tuple['CoreFlow', ...])
runtime_ledger (RuntimeCacheStore)
runtime_stop_event (Event)
flow_stop_event (Event)
workspace_id (str | None)
- Return type:
object
-
run_automated(flows, *, runtime_ledger=None, runtime_stop_event, flow_stop_event, workspace_id=None)[source]
Run automated poll and schedule flows through separate host timing surfaces.
- Parameters:
flows (tuple['CoreFlow', ...])
runtime_ledger (RuntimeCacheStore | None)
runtime_stop_event (Event)
flow_stop_event (Event)
workspace_id (str | None)
- Return type:
object
-
run_grouped_continuous(flows, *, runtime_ledger=None, runtime_stop_event=None, flow_stop_event=None, workspace_id=None)[source]
Run grouped automated flows continuously with optional runtime controls.
- Parameters:
flows (tuple['CoreFlow', ...])
runtime_ledger (RuntimeCacheStore | None)
runtime_stop_event (Event | None)
flow_stop_event (Event | None)
workspace_id (str | None)
- Return type:
object
-
stop(run_id, *, flow_stop_event=None)[source]
Request that an active runtime stop a run by id.
- Parameters:
-
- Return type:
None
Runtime history query services.
-
class RuntimeHistoryService[source]
Bases: object
Own persisted run/step history queries used by operator surfaces.
-
rebuild_step_outputs(ledger, flow_cards)[source]
Rebuild latest successful per-step output paths for visible flows.
- Parameters:
-
- Return type:
StepOutputRefresh
-
refresh_step_outputs(ledger, flow_cards, *, current_index, last_seen_step_run_id)[source]
Incrementally merge newly finished successful step outputs into the current index.
- Parameters:
ledger (RuntimeCacheStore)
flow_cards (dict[str, FlowCatalogLike])
current_index (StepOutputIndex)
last_seen_step_run_id (int | None)
- Return type:
StepOutputRefresh
-
error_text_for_entry(ledger, run_group, entry)[source]
Return one user-facing error title and persisted error text for a failed entry.
- Parameters:
-
- Return type:
tuple[str, str | None]
-
class StepOutputRefresh(last_step_run_id, index)[source]
Bases: object
One step-output refresh result with cache watermark.
- Parameters:
-
-
last_step_run_id: int | None
-
index: StepOutputIndex
Machine-local settings services.
-
class SettingsService(store)[source]
Bases: object
Own machine-local settings persistence for operator surfaces.
- Parameters:
store (LocalSettingsStore)
-
classmethod default_store(*, app_root=None)[source]
Open the default local settings store for one app root.
- Parameters:
app_root (Path | None)
- Return type:
LocalSettingsStore
-
classmethod open_default(*, app_root=None, store_factory=None)[source]
Open the default local settings store for the current app root.
- Parameters:
-
- Return type:
SettingsService
-
workspace_collection_root()[source]
Return the saved local workspace collection root override, when present.
- Return type:
Path | None
-
set_workspace_collection_root(value)[source]
Persist the local workspace collection root override.
- Parameters:
value (Path | str | None)
- Return type:
None
-
default_workspace_id()[source]
Return the saved default workspace id, when present.
- Return type:
str | None
-
set_default_workspace_id(value)[source]
Persist the default workspace id.
- Parameters:
value (str | None)
- Return type:
None
-
runtime_root()[source]
Return the saved runtime/artifact root override, when present.
- Return type:
Path | None
-
set_runtime_root(value)[source]
Persist the runtime/artifact root override.
- Parameters:
value (Path | str | None)
- Return type:
None
Shared workspace lease metadata and runtime snapshot services.
-
class SharedStateService(*, workspace_io=None)[source]
Bases: object
Own lease-based shared snapshot hydration for operator surfaces.
- Parameters:
workspace_io (WorkspaceIoLayer | None)
-
hydrate_local_runtime(paths, ledger)[source]
Replace one local runtime ledger from the shared workspace snapshots.
- Parameters:
-
- Return type:
None
-
read_lease_metadata(paths)[source]
Return current workspace lease metadata, if present.
- Parameters:
paths (WorkspacePaths)
- Return type:
dict[str, Any] | None
-
lease_is_stale(paths, *, stale_after_seconds)[source]
Return whether current workspace lease metadata is stale.
- Parameters:
-
- Return type:
bool
-
reset_flow_state(paths, *, flow_name)[source]
Delete one flow’s shared snapshot history and freshness state.
- Parameters:
paths (WorkspacePaths)
flow_name (str)
- Return type:
None
-
reset_workspace_state(paths)[source]
Delete all shared coordination and snapshot state for one workspace.
- Parameters:
paths (WorkspacePaths)
- Return type:
None
Shared theme resolution services.
-
class ThemeService(*, themes={'dark': ThemePalette(name='dark', window_bg='#0d1117', app_bg='#0d1117', panel_bg='#161b22', panel_border='#30363d', text='#c9d1d9', muted_text='#8b949e', section_text='#7d8590', accent_text='#2ea043', warning_text='#d29922', error_text='#f85149', button_bg='#21262d', button_hover='#30363d', button_checked_bg='#1f6feb', button_checked_border='#388bfd', button_disabled_bg='#161b22', button_disabled_border='#30363d', button_disabled_text='#6e7681', input_bg='#0d1117', input_border='#30363d', hover_bg='#1b2230', hover_border='#3b4556', selection_bg='#1f6feb', selection_text='#f0f6fc', selection_border='#388bfd', tab_bg='#161b22', tab_hover_bg='#1b2230', tab_selected_bg='#21262d', progress_bg='#0d1117', progress_chunk='#2ea043', summary_bg='#21262d', summary_border='#30363d', request_control_bg='#F04A00', request_control_border='#c23c00', request_control_hover='#d84300', engine_start_bg='#1f883d', engine_start_border='#1a7f37', engine_start_hover='#1a7f37', engine_stop_bg='#cf222e', engine_stop_border='#a40e26', engine_stop_hover='#a40e26'), 'light': ThemePalette(name = 'light', window_bg='#ffffff', app_bg='#f6f8fa', panel_bg='#ffffff', panel_border='#d0d7de', text='#1f2328', muted_text='#656d76', section_text='#57606a', accent_text='#1a7f37', warning_text='#9a6700', error_text='#cf222e', button_bg='#f6f8fa', button_hover='#eef2f6', button_checked_bg='#ddf4ff', button_checked_border='#54aeff', button_disabled_bg='#f6f8fa', button_disabled_border='#d8dee4', button_disabled_text='#8c959f', input_bg='#ffffff', input_border='#d0d7de', hover_bg='#f6f8fa', hover_border='#c7d2dd', selection_bg='#0969da', selection_text='#ffffff', selection_border='#54aeff', tab_bg='#f6f8fa', tab_hover_bg='#eef2f6', tab_selected_bg='#ffffff', progress_bg='#eef2f6', progress_chunk='#1a7f37', summary_bg='#f6f8fa', summary_border='#d0d7de', request_control_bg='#F04A00', request_control_border='#c23c00', request_control_hover='#d84300', engine_start_bg='#1f883d', engine_start_border='#1a7f37', engine_start_hover='#1a7f37', engine_stop_bg='#cf222e', engine_stop_border='#a40e26', engine_stop_hover='#a40e26')}, default_theme_name='system', resolve_theme_name_func=<function resolve_theme_name>, system_theme_name_func=<function system_theme_name>, toggle_theme_name_func=<function toggle_theme_name>, theme_button_text_func=<function theme_button_text>)[source]
Bases: object
Thin injectable wrapper around shared theme state decisions.
- Parameters:
themes (Mapping[str, ThemePalette])
default_theme_name (str)
resolve_theme_name_func (Callable[[str], str])
system_theme_name_func (Callable[[], str])
toggle_theme_name_func (Callable[[str], str])
theme_button_text_func (Callable[[str], str])
-
resolve_name(theme_name='system')[source]
Resolve one explicit or system-bound theme name.
- Parameters:
theme_name (str)
- Return type:
str
-
system_name()[source]
Return the host-system theme name.
- Return type:
str
-
toggle_name(theme_name)[source]
Return the opposite theme name.
- Parameters:
theme_name (str)
- Return type:
str
-
button_text(theme_name)[source]
Return the user-facing theme toggle text.
- Parameters:
theme_name (str)
- Return type:
str
-
palette(theme_name='system')[source]
Return the resolved semantic palette.
- Parameters:
theme_name (str)
- Return type:
ThemePalette
Workspace provisioning helpers shared by CLI and GUI surfaces.
-
class WorkspaceProvisioningResult(workspace_root, created_paths, preserved_paths)[source]
Bases: object
Describe which workspace assets were created during provisioning.
- Parameters:
workspace_root (Path)
created_paths (tuple[Path, ...])
preserved_paths (tuple[Path, ...])
-
workspace_root: Path
-
created_paths: tuple[Path, ...]
-
preserved_paths: tuple[Path, ...]
-
property created_anything: bool
Return whether provisioning created any new files or directories.
-
class WorkspaceProvisioningService[source]
Bases: object
Own safe workspace-folder provisioning for operator surfaces.
-
provision_workspace(workspace_paths, *, interpreter_path=None)[source]
Provision missing authored-workspace folders without overwriting existing content.
- Parameters:
-
- Return type:
WorkspaceProvisioningResult
-
collection_vscode_settings(collection_root, *, app_root, interpreter_path=None)[source]
Return VS Code settings for one workspace collection root.
- Parameters:
-
- Return type:
dict[str, object]
-
checkout_source_dir(app_root)[source]
Return the repo-local source directory when app_root points at a checkout.
- Parameters:
app_root (Path)
- Return type:
Path | None
-
checkout_tests_dir(app_root)[source]
Return the repo-local tests directory when app_root points at a checkout.
- Parameters:
app_root (Path)
- Return type:
Path | None
-
write_collection_vscode_settings(collection_root, *, app_root, interpreter_path=None, overwrite=False)[source]
Write collection-root VS Code settings unless an existing file should be preserved.
- Parameters:
-
- Return type:
Path | None
-
workspace_vscode_settings(workspace_root, *, app_root, interpreter_path=None)[source]
Return VS Code settings for one workspace root.
- Parameters:
-
- Return type:
dict[str, object]
-
write_workspace_vscode_settings(workspace_root, *, app_root, interpreter_path=None, overwrite=False)[source]
Write workspace-local VS Code settings unless an existing file should be preserved.
- Parameters:
-
- Return type:
Path | None
Workspace path and discovery services.
-
class WorkspaceService(*, app_state_policy=None, discovery_policy=None, runtime_layout_policy=None, discover_workspaces_func=None, resolve_workspace_paths_func=None)[source]
Bases: object
Own workspace discovery and path resolution through explicit collaborators.
- Parameters:
app_state_policy (AppStatePolicy | None)
discovery_policy (WorkspaceDiscoveryPolicy | None)
runtime_layout_policy (RuntimeLayoutPolicy | None)
discover_workspaces_func (Callable[..., tuple[DiscoveredWorkspace, ...]] | None)
resolve_workspace_paths_func (Callable[..., WorkspacePaths] | None)
-
discover(*, app_root=None, workspace_collection_root=None)[source]
Return discoverable workspaces for the current app and collection roots.
- Parameters:
-
- Return type:
tuple[DiscoveredWorkspace, …]
-
resolve_paths(*, workspace_id=None, workspace_root=None, data_root=None, workspace_collection_root=None)[source]
Resolve one workspace path set with the current override-aware rules.
- Parameters:
workspace_id (str | None)
workspace_root (Path | None)
data_root (Path | None)
workspace_collection_root (Path | None)
- Return type:
WorkspacePaths