Metadata-Version: 2.4
Name: sayiir
Version: 0.4.0
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Rust
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Classifier: Typing :: Typed
Requires-Dist: pytest>=7.0 ; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.21 ; extra == 'dev'
Requires-Dist: pydantic>=2.0 ; extra == 'pydantic'
Provides-Extra: dev
Provides-Extra: pydantic
Summary: Durable workflow engine with Rust core and Python bindings — checkpointing, fork/join, distributed workers
Keywords: workflow,orchestration,durable,rust,distributed
Author-email: Yacine Bouraoui <ybsoft2011@live.com>
License: MIT
Requires-Python: >=3.10
Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM
Project-URL: Changelog, https://github.com/sayiir/sayiir/releases
Project-URL: Discord, https://discord.gg/A2jWBFZsNK
Project-URL: Documentation, https://docs.sayiir.dev
Project-URL: Homepage, https://docs.sayiir.dev
Project-URL: Issues, https://github.com/sayiir/sayiir/issues
Project-URL: Repository, https://github.com/sayiir/sayiir

# Sayiir

**Durable workflows for Python, powered by a Rust runtime.**

[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://github.com/sayiir/sayiir/blob/main/LICENSE)
[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)
[![Discord](https://img.shields.io/badge/Discord-Join-7289da)](https://discord.gg/A2jWBFZsNK)
[![Socket Badge](https://badge.socket.dev/pypi/package/sayiir)](https://badge.socket.dev/pypi/package/sayiir)

Write plain Python functions. Sayiir makes them durable — automatic checkpointing, crash recovery, and parallel execution with zero infrastructure.

```python
from sayiir import task, Flow, run_workflow

@task
def fetch_user(user_id: int) -> dict:
    return {"id": user_id, "name": "Alice"}

@task
def send_email(user: dict) -> str:
    return f"Sent welcome to {user['name']}"

workflow = Flow("welcome").then(fetch_user).then(send_email).build()
result = run_workflow(workflow, 42)
# "Sent welcome to Alice"
```

No DSL. No YAML. No determinism constraints. No infrastructure to deploy.

## Why Sayiir?

- **No replay, no determinism rules** — Unlike Temporal, Restate, and other replay-based engines, Sayiir checkpoints after each task and resumes from the last checkpoint. Your tasks can call any API, use any library, read the clock, generate random values. No restrictions.
- **A library, not a platform** — `pip install sayiir` and write workflows. No server cluster, no separate services. Optional PostgreSQL for production persistence.
- **Rust core** — All orchestration, checkpointing, and execution runs in Rust via PyO3. You write Python; Rust handles the hard parts.
- **Pydantic integration** — Automatic input validation and output serialization for `BaseModel` types.
- **Type-safe** — Full type stubs (`.pyi`) and PEP 561 `py.typed` marker. Works with mypy and pyright.

## Installation

```bash
pip install sayiir
```

**From source (development):**

```bash
git clone https://github.com/sayiir/sayiir.git
cd sayiir/sayiir-python
pip install -e ".[dev]"
```

Requires a Rust toolchain (`rustup`) for building from source.

## Quickstart

### Inline lambdas — zero boilerplate

```python
from sayiir import Flow, run_workflow

workflow = (
    Flow("pipeline")
    .then(lambda x: x * 2)
    .then(lambda x: x + 1)
    .then(lambda x: str(x))
    .build()
)
result = run_workflow(workflow, 5)
# "11"  (5 * 2 = 10, 10 + 1 = 11, str(11))
```

No decorators, no registration — just pass any callable. Use `@task` when you need metadata (retries, timeouts, tags) or explicit naming.

### Sequential workflow

```python
from sayiir import task, Flow, run_workflow

@task
def double(x: int) -> int:
    return x * 2

@task
def add_ten(x: int) -> int:
    return x + 10

workflow = Flow("math").then(double).then(add_ten).build()
result = run_workflow(workflow, 5)
# 20  (5 * 2 = 10, 10 + 10 = 20)
```

### Durable workflow (survives crashes)

```python
from sayiir import task, Flow, run_durable_workflow

@task(timeout="30s")
def process_order(order_id: int) -> dict:
    return {"order_id": order_id, "status": "processed"}

@task
def send_confirmation(order: dict) -> str:
    return f"Confirmed order {order['order_id']}"

workflow = Flow("order").then(process_order).then(send_confirmation).build()

# Checkpoints after each task — resumes from last checkpoint on crash
status = run_durable_workflow(workflow, "order-123", 42)
print(status.output)           # "Confirmed order 42"
print(status.is_completed())   # True
```

### PostgreSQL persistence

```python
from sayiir import task, Flow, PostgresBackend, run_durable_workflow

@task
def process(x: int) -> int:
    return x * 2

workflow = Flow("persistent").then(process).build()

# Auto-runs migrations on first connect
backend = PostgresBackend("postgresql://localhost/sayiir")
status = run_durable_workflow(workflow, "run-001", 21, backend=backend)
```

### Retry policy

```python
from sayiir import task, RetryPolicy

# Int shorthand (1s initial delay, 2x backoff)
@task(retries=3)
def flaky_call(url: str) -> dict:
    return requests.get(url).json()

# Full control
@task(retries=RetryPolicy(max_retries=3, initial_delay_secs=0.5, backoff_multiplier=2.0))
def precise_retry(url: str) -> dict:
    return requests.get(url).json()
```

### Parallel execution (fork/join)

```python
from sayiir import task, Flow, run_workflow

@task
def validate_payment(order: dict) -> dict:
    return {"payment": "valid"}

@task
def check_inventory(order: dict) -> dict:
    return {"stock": "available"}

@task
def finalize(results: dict) -> str:
    return f"Order complete: {results}"

workflow = (
    Flow("checkout")
    .fork()
        .branch(validate_payment)
        .branch(check_inventory)
    .join(finalize)
    .build()
)
result = run_workflow(workflow, {"order_id": 1})
```

### Multi-step branches

```python
workflow = (
    Flow("pipeline")
    .fork()
        .branch(fetch_data, transform, validate)  # 3-step branch
        .branch(fetch_metadata)                    # 1-step branch
    .join(merge_results)
    .build()
)
```

### Loops

Repeat a task until it signals completion with `LoopResult.done()`.

```python
from sayiir import task, Flow, LoopResult, run_workflow

@task
def refine(draft: str) -> dict:
    improved = improve(draft)
    if is_good_enough(improved):
        return LoopResult.done(improved).to_dict()
    return LoopResult.again(improved).to_dict()

workflow = (
    Flow("iterative")
    .then(initial_draft)
    .loop(refine, max_iterations=5)
    .then(publish)
    .build()
)
result = run_workflow(workflow, "rough draft")
```

The body task returns `LoopResult.again(value)` to continue iterating or `LoopResult.done(value)` to exit. When `max_iterations` is reached, the default behavior is to fail; pass `on_max="exit_with_last"` to exit with the last value instead.

### Task execution context

Access workflow and task metadata from within a running task using `get_task_context()`.

```python
from sayiir import task, get_task_context

@task(timeout="30s", tags=["io"])
def fetch_data(url: str) -> dict:
    ctx = get_task_context()
    if ctx is not None:
        print(f"Running task {ctx.task_id} in workflow {ctx.workflow_id}")
        print(f"Instance: {ctx.instance_id}")
        print(f"Timeout: {ctx.metadata.timeout_secs}s")
        print(f"Tags: {ctx.metadata.tags}")
        print(f"Workflow metadata: {ctx.workflow_metadata}")
    return do_fetch(url)
```

`get_task_context()` returns a `TaskExecutionContext` with `workflow_id`, `instance_id`, `task_id`, `metadata` (timeout, retries, tags, version, etc.), and `workflow_metadata` (the dict passed via `Flow("name", metadata={...})`), or `None` if called outside of a task execution.

### Pydantic integration

```python
from pydantic import BaseModel
from sayiir import task, Flow, run_workflow

class OrderInput(BaseModel):
    order_id: int
    amount: float

class OrderResult(BaseModel):
    status: str
    message: str

@task
def process(order: OrderInput) -> OrderResult:
    return OrderResult(status="ok", message=f"Processed ${order.amount}")

workflow = Flow("typed").then(process).build()
result = run_workflow(workflow, {"order_id": 1, "amount": 99.99})
# Automatic validation on input, serialization on output
```

### Conditional branching

```python
from sayiir import task, Flow, run_workflow

@task
def classify(ticket: dict) -> str:
    return "billing" if ticket["type"] == "invoice" else "tech"

@task
def handle_billing(ticket: dict) -> str:
    return f"Billing handled: {ticket['id']}"

@task
def handle_tech(ticket: dict) -> str:
    return f"Tech resolved: {ticket['id']}"

@task
def fallback(ticket: dict) -> str:
    return f"Routed to general: {ticket['id']}"

workflow = (
    Flow("support-router")
    .route(classify, keys=["billing", "tech"])
        .branch("billing", handle_billing)
        .branch("tech", handle_tech)
        .default_branch(fallback)
    .done()
    .build()
)
result = run_workflow(workflow, {"id": 1, "type": "invoice"})
# {"branch": "billing", "result": "Billing handled: 1"}
```

The key function returns a string routing key. The matching branch runs; if no match and no default, the workflow fails. The output is a `BranchEnvelope` with `branch` (the key) and `result` (the branch output).

### Task metadata

```python
@task(
    "Process Payment",
    timeout="60s",
    retries=3,
    tags=["payments", "critical"],
    description="Charges the customer's payment method",
)
def process_payment(order: dict) -> dict:
    ...
```

## API Reference

### Decorators

- **`@task`** — Mark a function as a workflow task. Accepts a positional name string: `@task("name")`. Optional params: `name`, `timeout` (duration string or seconds), `retries` (int shorthand or `RetryPolicy`), `tags`, `description`.

### Flow Builder

- **`Flow(name)`** — Create a new workflow builder.
- **`.then(task_fn, *, name=None)`** — Append a task to the workflow. Accepts `@task`-decorated functions, plain functions, or lambdas. Use `name` to set an explicit task ID.
- **`.loop(task_fn, *, max_iterations=10, on_max="fail", name=None)`** — Add a loop. Body returns `LoopResult.again(value)` or `LoopResult.done(value)`.
- **`.fork()`** — Start parallel branches. Returns a `ForkBuilder`.
- **`.branch(task_fn, ...)`** — Add a branch (one or more chained tasks).
- **`.join(task_fn)`** — Merge parallel branches. Join function receives `dict[str, value]`.
- **`.delay(name, duration)`** — Add a durable delay (`"30s"`, `"5m"`, `"1h"`, seconds, or `timedelta`).
- **`.wait_for_signal(signal_name, *, timeout=None)`** — Wait for an external signal.
- **`.route(key_fn, *, keys=["a", "b"])`** — Start conditional branching. Returns a `BranchBuilder`.
- **`BranchBuilder.branch(key, *tasks)`** — Add a named branch for a routing key.
- **`BranchBuilder.default_branch(*tasks)`** — Set the fallback branch for unmatched keys.
- **`BranchBuilder.done()`** — Finish branching and return to the `Flow` builder.
- **`.build()`** — Finalize and return a `Workflow`.

### Task Context

- **`get_task_context()`** — Returns a `TaskExecutionContext` with `workflow_id`, `instance_id`, `task_id`, `metadata`, and `workflow_metadata`, or `None` outside of task execution.

### Execution

- **`run_workflow(workflow, input, *, instance_id=None, backend=None)`** — Execute a workflow. Without `instance_id`, runs in-memory. With `instance_id` and `backend`, runs with full checkpointing (raises `WorkflowError` if the workflow doesn't complete). Returns the final output.
- **`run_durable_workflow(workflow, instance_id, input, backend=None)`** — Execute with checkpointing. Returns a `WorkflowStatus`.
- **`resume_workflow(workflow, instance_id, backend)`** — Resume a workflow from its last checkpoint.
- **`cancel_workflow(instance_id, backend, reason=None, cancelled_by=None)`** — Cancel a running workflow.
- **`pause_workflow(instance_id, backend, reason=None, paused_by=None)`** — Pause a running workflow.
- **`unpause_workflow(instance_id, backend)`** — Unpause a paused workflow.
- **`send_signal(instance_id, signal_name, payload, backend)`** — Send an external signal.

### WorkflowStatus

- **`.output`** — The final output value (if completed).
- **`.status`** — `"completed"`, `"failed"`, `"cancelled"`, or `"in_progress"`.
- **`.is_completed()`** / **`.is_failed()`** / **`.is_cancelled()`** / **`.is_paused()`** / **`.is_in_progress()`** — Status checks.
- **`.error`** — Error message (if failed).
- **`.reason`** / **`.cancelled_by`** — Cancellation details.

### Retry

- **`RetryPolicy(max_retries=2, initial_delay_secs=1.0, backoff_multiplier=2.0)`** — Exponential backoff retry policy for tasks.

### Loop Control

- **`LoopResult.again(value)`** — Continue iterating with a new value.
- **`LoopResult.done(value)`** — Exit the loop with a final value.
- **`OnMax.FAIL`** / **`OnMax.EXIT_WITH_LAST`** — Policy when max iterations is reached.

### Backends

- **`InMemoryBackend()`** — In-memory storage for development and testing (default).
- **`PostgresBackend(url)`** — PostgreSQL persistence. Auto-runs migrations on first connect.

### WorkflowClient (distributed)

- **`WorkflowClient(backend, *, conflict_policy=None)`** — Client for submitting and controlling workflow instances without executing tasks. Used with `Worker` for the distributed model.
- **`.submit(workflow, instance_id, input)`** — Submit a workflow for execution. Returns a `WorkflowStatus`.
- **`.cancel(instance_id, *, reason=None, cancelled_by=None)`** — Cancel a workflow instance.
- **`.pause(instance_id, *, reason=None, paused_by=None)`** — Pause a workflow instance.
- **`.unpause(instance_id)`** — Unpause a paused workflow.
- **`.send_signal(instance_id, signal_name, payload)`** — Send an external signal.
- **`.status(instance_id)`** — Get the current status. Returns a `WorkflowStatus`.

## Architecture

```mermaid
graph LR
    A["Your Python code<br/><b>@task</b> functions"] -->|input| B["Sayiir · Rust<br/>Orchestration<br/>Checkpointing<br/>Crash recovery<br/>Fork/join/branch<br/>Loops &amp; routing<br/>Serialization"]
    B -->|checkpoint<br/>after each task| C["Storage"]
    C -->|resume| B
    B -->|output| A
```

Python provides task implementations. Rust handles everything else: building the execution graph, running tasks in order, checkpointing results, recovering from crashes, and managing parallel branches.

The project follows hexagonal architecture — the core domain has zero infrastructure dependencies, all dependencies flow inward, and every integration point (storage, serialization, execution) is a swappable trait-based adapter.

## Requirements

- Python 3.10+
- Optional: `pydantic >= 2.0` for automatic model validation

## License

MIT

## Links

- [Documentation](https://docs.sayiir.dev/getting-started/python/)
- [Examples](https://github.com/sayiir/sayiir/tree/main/examples)
- [GitHub](https://github.com/sayiir/sayiir)
- [Discord](https://discord.gg/A2jWBFZsNK)
- [Roadmap](https://docs.sayiir.dev/roadmap/)

---

> ⭐ If you find Sayiir useful, [give us a star on GitHub](https://github.com/sayiir/sayiir) ⭐

