Flow Methods
This page is the author-facing tour of the Flow builder surface. The method
docstrings remain the source of truth for exact signatures and API reference
rendering; use this guide for examples and rules that matter while writing flow
modules.
from data_engine import Flow
Flow(group, name=None, label=None)
Create a new immutable flow definition.
flow = Flow(group="Reports")
flow = Flow(group="Reports", label="Daily Report Build")
Rules:
groupmust be a non-empty string.name, when provided, must be a non-empty string.label, when provided, must be a non-empty string and is the human-readable flow title shown by operator surfaces.When a flow module omits
name, the module loader can derive the stable flow identity from the module filename.Each builder method returns a new
Flow; the previous instance is unchanged.
Use label= when the UI title should be friendlier than the module identity.
Keep name= stable when code outside the module needs to refer to the flow by
identifier.
watch(...)
Configure how the runtime starts flow runs.
manual_flow = Flow(group="Reports").watch(mode="manual")
poll_flow = Flow(group="Reports").watch(
mode="poll",
source="incoming",
interval="30s",
extensions=[".csv", "json"],
settle=2,
max_parallel=4,
)
scheduled_flow = Flow(group="Reports").watch(
mode="schedule",
run_as="batch",
source="incoming",
time=["08:00", "16:30"],
)
Parameters:
modeis required and must be"manual","poll", or"schedule".run_asdefaults to"individual". Use"individual"for one run per concrete source file, or"batch"for one run against the watched source root.max_parallelcontrols how many source-file runs one individual-mode flow can execute concurrently. It must be an integer greater than zero.sourceis a file or directory path used by source-backed runs.intervalaccepts positive duration strings such as"500ms","30s","5m","2h","1d", or"1w".timeaccepts oneHH:MMstring or a collection ofHH:MMstrings.extensionslimits source discovery to matching suffixes. Values are normalized to lowercase and may be written with or without the leading dot.settleis the poll-only delay, in seconds, before a changed file is queued. It must be an integer greater than or equal to zero.
Mode rules:
manualrejectsinterval=,time=, and customsettle=values.pollrequires bothsource=andinterval=, and rejectstime=.scheduleaccepts exactly one ofinterval=ortime=.schedulemay includesource=when the scheduled run should be source-backed.pollandscheduleboth supportrun_as="individual"andrun_as="batch".
Use manual flows for button-driven or explicit Python execution, poll flows when file changes should trigger work, and scheduled flows when time should trigger work. Use batch mode when the step logic needs to reason about a whole folder or root as one unit.
mirror(root=...)
Bind a mirrored output namespace rooted at one directory.
flow = Flow(group="Reports").mirror(root="processed")
mirror(...) configures context.mirror; it does not write files by itself.
Steps can use the mirror helpers to build write-ready paths for any output type:
def write_summary(context):
output_path = context.mirror.with_suffix(".parquet")
context.current.write_parquet(output_path)
return output_path
You can omit mirror(...) when a flow has no mirrored output namespace. root
may be a string or Path.
step(fn, use=None, save_as=None, label=None)
Append one callable step.
def read_source(context):
return context.source.path.read_text(encoding="utf-8")
def parse_rows(context):
return [line.split(",") for line in context.current.splitlines()]
def write_rows(context):
output_path = context.mirror.root_file("rows.json")
output_path.write_text(str(context.current), encoding="utf-8")
return output_path
flow = (
Flow(group="Reports")
.watch(mode="poll", source="incoming", interval="30s", extensions=[".csv"])
.mirror(root="processed")
.step(read_source, save_as="raw_text", label="Read Source")
.step(parse_rows, use="raw_text", save_as="rows", label="Parse Rows")
.step(write_rows, use="rows", label="Write Output")
)
Rules:
fnmust be callable and must accept exactly onecontextparameter.The callable return value becomes the next
context.current.save_as=stores the return value incontext.objectsunder that name.use=loads a previously saved object intocontext.currentbefore the step runs.label=overrides the display name used in logs, UI summaries, and debug artifact grouping.
step(...) is the default workhorse. It is the right choice when a callable
needs the full FlowContext or needs to operate on the current value as one
object.
collect(extensions, root=None, recursive=False, use=None, save_as=None, label=None)
Append a step that collects matching files into a Batch of FileRef items.
flow = (
Flow(group="Reports")
.watch(mode="schedule", run_as="batch", source="incoming", interval="15m")
.collect([".csv", ".json"], recursive=True, save_as="source_files")
)
Behavior:
extensionsis required and must include at least one non-empty extension.root=chooses the search root.When
root=is omitted, collection usescontext.source.root.recursive=Truesearches child directories.Missing collection roots return an empty
Batch.Each
FileRefexposes.path,.name,.stem,.suffix,.parent, and.exists().Batchis iterable and also exposes.items,.names(), and.paths().
collect(...) accepts the same use=, save_as=, and label= step options as
step(...).
map(fn, use=None, save_as=None, label=None)
Append a step that maps a callable across the current iterable value.
def summarize_file(file_ref):
return {"name": file_ref.name, "bytes": file_ref.path.stat().st_size}
flow = (
Flow(group="Reports")
.collect([".csv"], root="incoming", save_as="files")
.map(summarize_file, use="files", save_as="summaries", label="Summarize Files")
)
The mapped callable may accept either item or context, item:
def tag_file(context, file_ref):
return {"flow": context.flow_name, "name": file_ref.name}
flow = Flow(group="Reports").collect([".json"], root="incoming").map(tag_file)
Rules:
fnmust be callable and accept either one or two parameters.The current value must be iterable.
Strings, bytes, dictionaries, and
Noneare rejected as map inputs.Empty iterables are rejected.
Results are returned as a
Batch.use=,save_as=, andlabel=work the same way they do forstep(...).
Use map(...) when the same operation should run once per item. Use
step(...) when a callable should combine, reduce, or validate the whole
collection.
step_each(fn, use=None, save_as=None, label=None)
step_each(...) is an alias for map(...).
flow = Flow(group="Reports").collect([".csv"], root="incoming").step_each(summarize_file)
Choose whichever name makes the chain read better. The signature, validation,
and returned Batch behavior are the same as map(...).
run_once()
Run the flow once and return completed FlowContext objects.
contexts = flow.run_once()
first_result = contexts[0].current
This is useful for one-off Python-driven execution. A source-backed individual flow may return more than one context, one for each executed source.
run()
Run continuously according to the flow trigger.
Flow(group="Reports").watch(mode="poll", source="incoming", interval="30s").run()
Use this for long-lived poll or schedule execution from Python. For running
several flows together, import the module-level run helper:
from data_engine import Flow, run
run(flow_a, flow_b)
Grouped execution runs flows sequentially within each group and allows different groups to run in parallel.
preview(use=None)
Run one flow for authoring-time inspection and return a real object.
build().preview()
build().preview(use="raw_text")
build().preview(use="summaries")
Behavior:
Without
use=, preview returns the finalcontext.current.With
use="name", preview runs only until thatsave_asslot exists.Later write/debug steps are skipped once the requested object is available.
If a poll flow has several startup source files, preview uses the first deterministic source candidate.
preview(...)is not available from inside compiled flow modules.
preview(...) is for notebooks, REPLs, and direct flow-module authoring. Use
run_once() when you need completed runtime contexts rather than just the
previewed value.