Getting Started

This guide is for someone new to the code-defined Data Engine API and app surfaces.

By the end, you should understand:

  • what a flow is

  • where flow modules live

  • what a workspace contains

  • how discovery and runtime execution work at a high level

  • how to run a first flow end to end

  • how batch workflows fit into the model

The mental model

Data Engine has one source of truth for per-flow behavior: the Flow returned by build().

In practice:

  • the flow-module filename is the stable flow identity; Flow(label=...) is an optional display title

  • the flow definition provides a group, runtime mode, optional source and mirror bindings, and ordered steps

  • step functions do real work with native libraries such as Polars, DuckDB, and plain Python

  • the GUI, TUI, and egui surfaces discover those flow modules inside the selected workspace and show them as configurable runnable flows

The fluent API owns orchestration, while the step callables own your actual business logic.

Flow builder methods are immutable. Each call returns a new Flow, which is why flow definitions usually chain methods and return the final object from build().

The basic workspace layout

A typical authored workspace looks like this:

workspaces/
  example_workspace/
    flow_modules/
    flow_modules/flow_helpers/
    config/
    databases/
    .workspace_state/

The parts you will usually author directly are:

  • flow_modules/: runnable flows in .py or .ipynb

  • flow_modules/flow_helpers/: reusable helper modules imported from flows

  • config/: workspace-local TOML files available through context.config

  • databases/: a conventional home for workspace-local databases used through context.database(...)

The app can provision that shape for you without overwriting existing content.

Where flow module sources live

Flow module sources are authored in:

  • workspaces/<workspace_id>/flow_modules/<name>.ipynb

  • workspaces/<workspace_id>/flow_modules/<name>.py

Reusable helper modules live in:

  • workspaces/<workspace_id>/flow_modules/flow_helpers/<name>.py

Notebook-authored flow modules (.ipynb) run in the normal install. The optional notebook extra only adds Jupyter authoring tools.

Compiled runtime modules are generated into machine-local artifacts. Those runtime artifacts are isolated per workspace, so helper imports with the same module names stay workspace-local.

Each flow module should export:

  • optional DESCRIPTION

  • build() -> Flow

Display titles come from Flow(label=...) when provided. Otherwise the UI derives them from the flow-module filename. Avoid setting name= in normal flow modules; the module loader provides the discovered name.

Your first flow

A minimal scheduled flow can create data in memory and write it out:

from data_engine import Flow
import polars as pl


def build_dates(context):
    return pl.DataFrame({"day": [1, 2, 3]})


def write_dates(context):
    output = context.mirror.file("dates.parquet")
    context.current.write_parquet(output)
    return output


def build():
    return (
        Flow(group="Reference", label="Date Dimension")
        .watch(mode="schedule", run_as="batch", interval="1h")
        .mirror(root="../../example_data/Output/date_dimension")
        .step(build_dates, save_as="dates_df")
        .step(write_dates, use="dates_df", label="Write Parquet")
    )

That example shows the full shape:

  1. create Flow(group=...)

  2. attach a runtime mode with watch(...)

  3. optionally attach mirror(...)

  4. add ordered step(...) callables

  5. return the built flow from build()

The return value from each step becomes context.current. save_as= stores a named intermediate in context.objects, and use= loads a saved object back into context.current before a later step runs.

What the app actually does with that flow

Once the flow is discovered, the desktop app uses it for:

  • grouping and labels in the home view

  • deciding whether the flow is manual, poll, or schedule

  • deciding whether work runs per source file or once for a watched root

  • deciding whether the flow participates in the engine

  • rendering step names and inspectable outputs

  • manual runs and engine runs for the selected workspace

The app itself binds to one workspace at a time, so when you switch workspaces, the discovered flows, runtime ledger, daemon state, and visible runs all switch with it.

A starter-style polling flow

This shape maps directly to starter flows such as example_mirror and example_poll:

from data_engine import Flow
import polars as pl


def read_docs(context):
    return pl.read_excel(context.source.path)


def keep_open(context):
    return context.current.filter(pl.col("status") == "OPEN")


def write_target(context):
    output = context.mirror.with_suffix(".parquet")
    context.current.write_parquet(output)
    return output


def build():
    return (
        Flow(group="Docs")
        .watch(
            mode="poll",
            source="../../example_data/Input/docs_dated",
            interval="5s",
            extensions=[".xlsx", ".xlsm"],
            settle=1,
        )
        .mirror(root="../../example_data/Output/example_poll")
        .step(read_docs, save_as="raw_df")
        .step(keep_open, use="raw_df", save_as="filtered_df")
        .step(write_target, use="filtered_df", label="Write Parquet")
    )

This is a good first mental model for source-driven flows:

  • watch(...) tells the runtime what to listen to

  • context.source tells the step which concrete file is active

  • mirror(...) defines where mirrored outputs belong

  • returning the written path makes the result inspectable in the UI

Batch-oriented files

When you want a folder of files as one runtime object, use Flow.collect(...) and either Flow.map(...) or Flow.step_each(...).

from data_engine import Flow


def validate_pdf(file_ref):
    return {"name": file_ref.name, "ok": file_ref.exists()}


def summarize_results(context):
    return tuple(item["name"] for item in context.current if item["ok"])


def build():
    return (
        Flow(group="Docs")
        .watch(mode="schedule", run_as="batch", interval="15m", source="../../example_data/Input/pdfs")
        .collect(extensions=[".pdf"], save_as="pdf_files")
        .map(fn=validate_pdf, use="pdf_files", save_as="pdf_results")
        .step(summarize_results, use="pdf_results")
    )

Flow.collect(...) returns a Batch of FileRef items. If root= is omitted, it collects from the active source root supplied by the runtime context.

Flow.map(...) runs one callable per item and returns a new Batch.

Flow.step_each(...) is the same operation with a name that can read more clearly in some flows.

If the batch is empty, both forms raise immediately. That behavior makes batch-flow outcomes explicit and easy to diagnose.

Batch is iterable and also exposes small helpers such as names() and paths() when every item has the matching attribute.

Running flows from Python

Load one discovered flow:

from data_engine import load_flow

built = load_flow("example_poll")
results = built.run_once()

Discover everything the workspace exposes:

from data_engine import discover_flows, run

flows = discover_flows()
run(*flows)

run_once() returns one completed FlowContext per executed source. Inspect context.current, context.objects, context.source, and context.metadata when you need details from a run.

For preview-oriented authoring outside a compiled flow module, you can still inspect a flow with:

build().preview()
build().preview(use="raw_df")

That is often the fastest way to sanity-check a flow while you are still writing it. preview() is not available from inside compiled flow modules, so use it from an external notebook, REPL, or script.

For poll flows that watch a folder, preview(...) uses one deterministic startup source as a representative notebook preview.

Manual, poll, and schedule at a glance

Manual

  • no watch(...) call, or watch(mode="manual") when you want to set run_as, max_parallel, or a source binding explicitly

  • context.current starts as None

  • useful for ad hoc or UI-driven runs

  • works well for flows that build data in memory or start from operator actions

Poll

  • watch(mode="poll", ...)

  • watches either one file or a directory of source files

  • the first step receives the active source through context.source

  • freshness compares the current source file signature against the runtime ledger

  • extensions= and settle= only apply here

  • max_parallel= controls concurrent source-file runs for run_as="individual"

Schedule

  • watch(mode="schedule", ...)

  • runs on an interval or on one or more wall-clock times

  • supports one time="HH:MM" value or a collection of times

  • accepts exactly one of interval= or time=

  • often starts by building data in memory or loading from a known source root

For poll and schedule, run_as="individual" means one run per concrete source file. run_as="batch" means one run for the watched root, which is the usual companion to collect(...).

Runtime context essentials

Inside a step, context is the runtime surface:

  • context.current is the value flowing through the pipeline

  • context.objects contains values saved by save_as=

  • context.source is available for source-backed runs and can report path, root, folder, and sidecar paths

  • context.mirror is available after mirror(root=...) and returns write-ready output paths

  • context.config lazily reads workspace-local TOML files from config/

  • context.database("name.duckdb") returns a write-ready path under the workspace databases/ directory

  • context.source_metadata() returns file metadata for the active source when one exists

  • context.debug is available when debug artifact capture is enabled by the runtime

A few good habits early

  • keep import-time code side-effect free

  • keep expensive work inside steps

  • return output paths from writer steps when you want the UI Inspect action

  • move reusable SQL, parsing helpers, and constants into flow_modules/flow_helpers/

  • use context.config for workspace-local TOML configuration

  • use context.database(...) when you want a conventional workspace-local database path

  • use context.debug.save_frame(...) or context.debug.save_json(...) sparingly for runtime diagnostics when the debug context is available

Next steps