Metadata-Version: 2.4
Name: durable-workflow
Version: 0.4.53
Summary: Python SDK for the Durable Workflow server (language-neutral HTTP protocol)
Author: Durable Workflow Contributors
License-Expression: MIT
Project-URL: Homepage, https://github.com/durable-workflow/sdk-python
Project-URL: Documentation, https://durable-workflow.github.io/docs/2.0/polyglot/python
Project-URL: Repository, https://github.com/durable-workflow/sdk-python
Project-URL: Issues, https://github.com/durable-workflow/sdk-python/issues
Keywords: workflow,durable,orchestration,temporal,saga
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: httpx>=0.27
Requires-Dist: avro<2,>=1.12
Provides-Extra: dev
Requires-Dist: pytest>=8.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.23; extra == "dev"
Requires-Dist: mypy>=1.10; extra == "dev"
Requires-Dist: ruff>=0.4; extra == "dev"
Provides-Extra: prometheus
Requires-Dist: prometheus-client>=0.20; extra == "prometheus"
Provides-Extra: docs
Requires-Dist: mkdocs-material>=9.5; extra == "docs"
Requires-Dist: mkdocstrings[python]>=0.25; extra == "docs"
Dynamic: license-file

# Durable Workflow (Python SDK)

A Python SDK for the [Durable Workflow server](https://github.com/durable-workflow/server). Speaks the server's language-neutral HTTP/JSON worker protocol — no PHP runtime required.

Status: **Beta** — production-readiness validation in progress for the first `1.0.0` release candidate. Core features implemented: workflows, activities, schedules, signals, timers, child workflows, continue-as-new, side effects, version markers, worker-applied accepted updates, replay-verify and history-bundle-verify CLIs, the in-process `WorkflowEnvironment` test harness, and invocable activity carriers for HTTP and serverless runtimes. Client calls for queries and updates exist; Python workflow-side query receiver metadata is available, while server-routed Python query execution and pre-accept update validator routing are still in progress on the server side. Full language-neutral protocol support for cross-PHP/Python orchestration is the release goal.

## Install

```bash
pip install durable-workflow
```

Or for development:

```bash
pip install -e '.[dev]'
```

## Quickstart

```python
import asyncio

from durable_workflow import Client, Worker, workflow, activity

@activity.defn(name="greet")
def greet(name: str) -> str:
    return f"hello, {name}"

@workflow.defn(name="greeter")
class GreeterWorkflow:
    def run(self, ctx, name):
        result = yield ctx.schedule_activity("greet", [name])
        return result

async def main():
    client = Client("http://server:8080", token="dev-token-123", namespace="default")
    worker = Worker(
        client,
        task_queue="python-workers",
        workflows=[GreeterWorkflow],
        activities=[greet],
    )
    handle = await client.start_workflow(
        workflow_type="greeter",
        workflow_id="greet-1",
        task_queue="python-workers",
        input=["world"],
    )
    await worker.run_until(workflow_id="greet-1", timeout=30.0)
    result = await client.get_result(handle)
    print(result)  # "hello, world"

if __name__ == "__main__":
    asyncio.run(main())
```

For a fuller deployable example, see
[`examples/order_processing`](examples/order_processing), which runs a
multi-activity order workflow against a local server with Docker Compose.

## Retry policy scopes

Retry and timeout settings are scoped to the layer where you configure them:

- `TransportRetryPolicy` on `Client(...)` retries SDK HTTP requests only. It handles transient connection failures, request timeouts, 5xx responses, and 429 rate limits. It does not retry workflow code, activity code, child workflows, or failed workflow runs.
- `ActivityRetryPolicy` on `ctx.schedule_activity(...)` is recorded into durable history with that activity command. It controls server-side attempts for that one activity execution.
- `ChildWorkflowRetryPolicy` on `ctx.start_child_workflow(...)` is recorded with that child-start command. It controls server-side attempts for that child workflow execution.
- `non_retryable_error_types` belongs to durable activity/child retry policies. `non_retryable=True` on an activity failure bypasses the activity retry budget and surfaces the failure to the workflow.

Timeout names are also layer-specific. `start_to_close_timeout` limits one activity attempt, `schedule_to_start_timeout` limits queue wait before an activity starts, `schedule_to_close_timeout` limits the whole activity execution including retries, and `heartbeat_timeout` limits the gap between activity heartbeats. For child workflows, `execution_timeout_seconds` covers the overall child workflow execution and `run_timeout_seconds` covers one run.

## Activity failure payloads

When replay raises `ActivityFailed`, the top-level attributes expose the
stable cross-language fields: `activity_type`, `failure_category`,
`exception_type`, `message`, `non_retryable`, and `code`. The
`exception_payload` dictionary is filtered to language-neutral keys such as
`type`, `message`, `details`, `details_payload_codec`, and `non_retryable`.
Runtime diagnostics like PHP or Python exception classes, source file paths,
line numbers, and traces are not included by default unless the history event
contains an explicit `diagnostics` or `runtime_diagnostics` envelope.

## Activity retries and timeouts

Configure per-call activity retries and deadlines from workflow code:

```python
from durable_workflow import ActivityRetryPolicy

result = yield ctx.schedule_activity(
    "charge-card",
    [order],
    retry_policy=ActivityRetryPolicy(
        max_attempts=4,
        initial_interval_seconds=1,
        backoff_coefficient=2,
        maximum_interval_seconds=30,
        non_retryable_error_types=["ValidationError"],
    ),
    start_to_close_timeout=120,
    schedule_to_close_timeout=300,
    heartbeat_timeout=15,
)
```

Child workflow starts use the same retry policy shape and workflow-level
execution/run timeout names:

```python
from durable_workflow import ChildWorkflowRetryPolicy

receipt = yield ctx.start_child_workflow(
    "payment.child",
    [order],
    retry_policy=ChildWorkflowRetryPolicy(
        max_attempts=3,
        initial_interval_seconds=2,
        backoff_coefficient=2,
        non_retryable_error_types=["ValidationError"],
    ),
    execution_timeout_seconds=600,
    run_timeout_seconds=120,
)
```

## Workflow signals, queries, and updates

Signals mutate workflow state during replay:

```python
@workflow.defn(name="approval")
class ApprovalWorkflow:
    def __init__(self) -> None:
        self.approved = False

    @workflow.signal("approve")
    def approve(self, by: str) -> None:
        self.approved = True

    @workflow.query("status")
    def status(self) -> dict:
        return {"approved": self.approved}

    @workflow.update("set_approval")
    def set_approval(self, approved: bool) -> dict:
        self.approved = approved
        return {"approved": self.approved}

    @set_approval.validator
    def validate_set_approval(self, approved: bool) -> None:
        if not isinstance(approved, bool):
            raise ValueError("approved must be boolean")
```

The Python SDK records query and update receiver metadata on workflow classes,
exposes a query-state replay helper, and applies accepted updates on Python
workflow tasks by emitting `complete_update` or `fail_update` back to the
server. Query routing and synchronous pre-accept update validator execution are
still server-side follow-ups; use those paths only with deployments that
advertise support for the target workflow type.

Malformed signal and query payloads are reported as typed client errors with
the server's documented reason and status preserved:

```python
from durable_workflow import Client, QueryFailed, SignalFailed

client = Client("http://localhost:8080")

try:
    await client.signal_workflow("counter-1", "increment", args=["not-an-int"])
except SignalFailed as exc:
    assert exc.reason == "invalid_signal_arguments"
    assert exc.status == 422
    assert exc.validation_errors is not None

try:
    await client.query_workflow("counter-1", "current-at", args=["not-an-int"])
except QueryFailed as exc:
    assert exc.reason == "invalid_query_arguments"
    assert exc.status == 422
    assert exc.validation_errors is not None
```

Use `yield ctx.wait_condition(lambda: self.approved, key="approved",
timeout=30)` to wait for signal- or update-mutated workflow state without
polling timers by hand. The SDK sends a stable predicate fingerprint with the
durable wait command and rejects replay if history records a different wait
key or predicate fingerprint, so condition changes fail visibly instead of
silently resolving a different wait.

Workers fingerprint registered workflow class definitions and advertise those
fingerprints during registration. Re-registering the same `worker_id` with a
changed class body for an already advertised workflow type raises immediately;
restart the worker process with a new id before serving changed workflow code.

Workers also advertise their local workflow and activity concurrency limits
during registration. Tune `max_concurrent_workflow_tasks` and
`max_concurrent_activity_tasks` on `Worker(...)` to align local semaphores with
the server's task-queue admission and operator visibility surfaces. Use
`Client.list_task_queues()` or `Client.describe_task_queue("orders")` to read
the server-side workflow, activity, and query-task admission status before
tuning those local limits:

```python
queues = await client.list_task_queues()
for queue in queues.task_queues:
    workflow_admission = queue.admission.workflow_tasks if queue.admission else None
    print(queue.name, workflow_admission.status if workflow_admission else "unknown")
```

The workflow and activity admission objects expose both queue-level and
namespace-level server budgets, including active lease caps and per-minute
dispatch-rate limits, so automation can detect whether local worker slots,
queue caps, namespace caps, or downstream dispatch budget groups are
constraining throughput.

## Replay captured histories

Use `Replayer` to debug a captured history without connecting to a live server:

```python
from durable_workflow import Replayer

replayer = Replayer(workflows=[ApprovalWorkflow])
outcome = replayer.replay(history_export)

for command in outcome.commands:
    print(command)
```

`history_export` can be the server's event list or a dictionary with an
`events` key. When the history contains a `WorkflowStarted` event, the replayer
infers the workflow type and input from that event; otherwise pass
`workflow_type=` and `start_input=` explicitly. The returned `ReplayOutcome`
contains the commands the workflow would emit next, including determinism
failures surfaced as workflow failure commands.

For CI and operator replay gates, the package also installs offline
verification commands:

```bash
durable-workflow-replay-verify tests/fixtures/golden_history \
  --workflows my_app.workflows:all_workflows \
  --output replay-report.json

durable-workflow-replay-verify exported-history-bundles \
  --simulate-bundles \
  --output replay-simulation.json

durable-workflow-history-bundle-verify exported-history-bundles/run-001.json \
  --output integrity-report.json
```

`durable-workflow-replay-verify` emits the same verdict and
`promotion_decision` vocabulary as the platform replay contract. Golden-history
mode replays cross-runtime fixtures against registered workflow classes;
`--simulate-bundles` integrity-checks every exported history bundle in a
directory and reports missing bundle evidence as a blocking result. Because
bundle simulation does not execute workflow code in Python, a clean
integrity-only simulation recommends `review_before_promote` rather than
`safe_to_promote`.

## External payload storage

Large payload offload is opt-in. `serializer.external_storage_envelope(...)`
keeps small encoded payloads inline and stores larger bytes through an
`ExternalStorageDriver`, returning a stable reference envelope with URI, codec,
size, optional expiry, and SHA-256 integrity metadata.
`serializer.decode_envelope(...)` fetches referenced bytes through the same
driver and verifies size/hash before decode.

The SDK includes a local filesystem driver for development plus dependency-free
S3, GCS, and Azure Blob adapters. Cloud SDKs stay application-owned: pass an
already-configured boto3-compatible S3 client, google-cloud-storage client, or
azure-storage-blob container client when your deployment enables external
payload storage.
Retention cleanup should delete by typed reference rather than by raw URI:
`delete_external_payload(storage, reference, cache=cache)` calls the configured
driver and evicts any verified replay-cache entry for the same reference.
When the server or Cloud API returns an external payload storage policy, use
`ExternalPayloadStoragePolicy.from_dict(...)` plus
`external_storage_driver_from_policy(...)` to turn that control-plane payload
into the matching SDK driver while keeping provider clients application-owned.

```python
from durable_workflow import (
    ExternalPayloadStoragePolicy,
    external_storage_driver_from_policy,
    serializer,
)

policy = ExternalPayloadStoragePolicy.from_dict(namespace_response)
storage = external_storage_driver_from_policy(policy, s3_client=s3_client)
payload = serializer.external_storage_envelope(
    {"large": "value"},
    external_storage=storage,
    threshold_bytes=policy.threshold_bytes or 2 * 1024 * 1024,
)
```

## Features

- **Async-first**: Built on `httpx` and `asyncio`
- **Type-safe**: Full type hints, passes `mypy --strict`
- **Polyglot**: Works alongside PHP workers on the same task queue
- **HTTP/JSON protocol**: No gRPC, no protobuf dependencies
- **Codec envelopes**: Avro payloads by default, with JSON decode compatibility for existing history
- **External payload references**: opt-in reference envelopes, local filesystem/S3/GCS/Azure Blob drivers, and a bounded verified-byte cache for large-payload offload experiments
- **Payload-size warnings**: Structured warnings before oversized workflow, activity, schedule, signal, update, query, or search-attribute payloads reach the server
- **Workflow definition guard**: Worker registration refuses same-id hot reloads when a workflow class definition changed
- **Deterministic workflow helpers**: `ctx.now()`, `ctx.random()`, `ctx.uuid4()`, and `ctx.uuid7()` replay from workflow state
- **Worker interceptors**: Typed hooks around workflow tasks, activity calls, and query tasks for tracing, logging, and custom metrics
- **Metrics hooks**: Pluggable counters and histograms, with an optional Prometheus adapter

## Payload-size warnings

The SDK logs a structured warning before an encoded payload reaches 80% of the
default 2 MiB server payload limit. Warnings include context such as
`workflow_id`, `workflow_type`, `activity_name`, `schedule_id`, `signal_name`,
`update_name`, `query_name`, `payload_size`, `threshold_bytes`, and
`limit_bytes` when those fields are known at the call site.

Tune or disable the warning threshold on the client:

```python
client = Client(
    "https://workflow.example.internal",
    payload_size_limit_bytes=4 * 1024 * 1024,
    payload_size_warning_threshold_percent=75,
)

quiet_client = Client(
    "https://workflow.example.internal",
    payload_size_warnings=False,
)
```

## Avro payload type boundaries

The default Avro codec uses a generic JSON wrapper so PHP, Python, and other
workers can exchange the same wire format. It preserves JSON-native values:
`None`, booleans, numbers, strings, lists, and dictionaries with string keys.

Class-carrying values are not encoded with type metadata. Convert pydantic
models, attrs classes, dataclasses, pendulum values, `datetime` / `date` /
`time`, `UUID`, `Decimal`, and plain `Enum` values to explicit dictionaries or
scalars before passing them to the SDK. `IntEnum` and `StrEnum` encode because
they are JSON scalar subclasses, but they decode as `int` and `str`.
`OrderedDict` decodes as a plain `dict`.

Use `to_avro_payload_value(...)` when a rich value should enter durable
history through the default Avro envelope:

```python
from dataclasses import dataclass
from datetime import datetime, timezone
from decimal import Decimal
from enum import Enum
from uuid import UUID

from durable_workflow import Client, to_avro_payload_value


class OrderStatus(Enum):
    PENDING = "pending"


@dataclass
class OrderInput:
    order_id: UUID
    placed_at: datetime
    amount: Decimal
    status: OrderStatus


order = OrderInput(
    order_id=UUID("12345678-1234-5678-1234-567812345678"),
    placed_at=datetime.now(timezone.utc),
    amount=Decimal("10.25"),
    status=OrderStatus.PENDING,
)

client = Client("http://server:8080", token="dev-token-123")
await client.start_workflow(
    "order-workflow",
    task_queue="orders",
    workflow_id="order-123",
    input=[to_avro_payload_value(order)],
)
```

The helper also accepts pydantic-style models with `model_dump(mode="json")`
and attrs-style classes. Rebuild domain objects explicitly inside workflows or
activities, for example `OrderInput(order_id=UUID(data["order_id"]), ...)`.
Adapter output is part of the durable history contract, so changing that shape
is a workflow compatibility change.

## Authentication

For local servers that use one shared bearer token, pass `token=`:

```python
client = Client("http://server:8080", token="shared-token", namespace="default")
```

For production servers with role-scoped tokens, pass separate credentials for
control-plane calls and worker-plane polling:

```python
client = Client(
    "https://workflow.example.internal",
    control_token="operator-token",
    worker_token="worker-token",
    namespace="orders",
)
```

Create one client per namespace when your deployment issues namespace-scoped
tokens. The SDK sends the configured token as `Authorization: Bearer ...` and
the namespace as `X-Namespace` on every request.

## Metrics

Pass a recorder to `Client(metrics=...)` or `Worker(metrics=...)` to collect request, poll, and task metrics. The SDK ships a no-op default, an `InMemoryMetrics` recorder for tests or custom exporter loops, and `PrometheusMetrics` for deployments that install the optional extra:

```bash
pip install 'durable-workflow[prometheus]'
```

```python
from durable_workflow import Client, PrometheusMetrics

metrics = PrometheusMetrics()
client = Client("http://server:8080", token="dev-token-123", metrics=metrics)
```

Custom recorders implement `increment(name, value=1.0, tags=None)` and `record(name, value, tags=None)`.

## Worker interceptors

Use `Worker(interceptors=[...])` when instrumentation needs the task payload,
result, or exception around worker execution instead of only aggregate counters.
Interceptors run in list order; the first interceptor is the outermost wrapper.

```python
from durable_workflow import (
    ActivityInterceptorContext,
    ActivityHandler,
    PassthroughWorkerInterceptor,
    Worker,
)

class LoggingInterceptor(PassthroughWorkerInterceptor):
    async def execute_activity(
        self,
        context: ActivityInterceptorContext,
        next: ActivityHandler,
    ) -> object:
        print("activity started", context.activity_type)
        try:
            result = await next(context)
        except Exception:
            print("activity failed", context.activity_type)
            raise
        print("activity completed", context.activity_type)
        return result

worker = Worker(
    client,
    task_queue="python-workers",
    workflows=[GreeterWorkflow],
    activities=[greet],
    interceptors=[LoggingInterceptor()],
)
```

## Documentation

Full documentation is available at
[durable-workflow.github.io/docs/2.0/polyglot/python](https://durable-workflow.github.io/docs/2.0/polyglot/python):

- [Python SDK guide](https://durable-workflow.com/docs/2.0/polyglot/python)
- [API reference](https://python.durable-workflow.com/)

## Requirements

- Python ≥ 3.10
- A running [Durable Workflow server](https://github.com/durable-workflow/server)

## Compatibility

SDK version 0.4.x is compatible with servers that advertise these protocol
manifests from `GET /api/cluster/info`:

- `control_plane.version: "2"`
- `control_plane.request_contract.schema: durable-workflow.v2.control-plane-request.contract` version `1`
- `auth_composition_contract.schema: durable-workflow.v2.auth-composition.contract` version `1`
- `worker_protocol.version: "1.1"`
- `worker_protocol.external_task_input_contract.schema: durable-workflow.v2.external-task-input.contract` version `1`
- `worker_protocol.external_task_result_contract.schema: durable-workflow.v2.external-task-result.contract` version `1`

The top-level server `version` is build identity only. The worker checks these
protocol manifests at startup and fails closed when compatibility is missing,
unknown, or undiscoverable.

Carriers and support tooling can validate `auth_composition_contract` with
`parse_auth_composition_contract()` before resolving connection, namespace,
token, TLS, profile, and redacted effective-configuration diagnostics.

External task carriers can validate fixture artifacts from
`worker_protocol.external_task_input_contract.fixtures` with
`parse_external_task_input_artifact()` and parse leased task envelopes with
`parse_external_task_input()`.

They can also validate result fixture artifacts from
`worker_protocol.external_task_result_contract.fixtures` with
`parse_external_task_result_artifact()` and parse result envelopes with
`parse_external_task_result()`. The result parser exposes stable carrier
decisions for success, retryability, malformed output, cancellation, deadline
exceeded, handler crash, decode failure, and unsupported payload
codec/reference states without treating stderr as a machine signal.

Invocable activity carriers can use `InvocableActivityHandler` as a reference
adapter for HTTP or serverless runtimes. It accepts the same external-task input
envelope, invokes a registered activity handler, and returns the same
external-task result envelope while rejecting workflow-task inputs:

```python
from durable_workflow import InvocableActivityHandler

adapter = InvocableActivityHandler({"billing.charge-card": charge_card})
result_envelope = await adapter.handle(request_json)
```

Bridge adapters can hand bounded webhook ingress into the server through
`Client.send_webhook_bridge_event()`. The method returns the server's typed
bridge outcome for accepted, duplicate, and rejected events, including
machine-readable HTTP 422 rejection outcomes:

```python
outcome = await client.send_webhook_bridge_event(
    "pagerduty",
    action="signal_workflow",
    idempotency_key="pagerduty-event-3003",
    target={"workflow_id": "wf-remediation-42", "signal_name": "incident_escalated"},
    input={"severity": "critical", "service": "checkout"},
    correlation={"provider": "pagerduty", "event_type": "incident.triggered"},
)

if outcome.accepted:
    print(outcome.workflow_id, outcome.control_plane_outcome)
else:
    print(outcome.outcome, outcome.reason)
```

## Development

```bash
# Install dev dependencies
pip install -e '.[dev]'

# Run tests
pytest

# Run integration tests (requires Docker)
pytest -m integration

# Type check
mypy src/durable_workflow/

# Lint
ruff check src/ tests/

# Preview the API reference site locally
pip install -e '.[docs]'
mkdocs serve
```

The API reference is published to [python.durable-workflow.com](https://python.durable-workflow.com/) and rebuilt automatically on push to `main`.

## License

MIT

## Public Boundary Checks

This is a public repository. Do not add private tracker names, workspace-only absolute paths, or internal automation metadata to files or new commit metadata. Run `scripts/check-public-boundary.sh` before publishing changes; CI runs the same scan on pushes and pull requests.
