Metadata-Version: 2.4
Name: oxidizer
Version: 0.0.2
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Rust
Classifier: Topic :: Software Development :: Libraries
Requires-Dist: pyyaml>=6.0
Requires-Dist: structlog>=24.0
Requires-Dist: fastmcp>=2.0
Requires-Dist: pytz>=2024.1
License-File: LICENSE
Summary: Distributed DAG pipeline engine with Rust core and Python bindings
Keywords: pipeline,dag,distributed,rust,data-engineering
Home-Page: https://github.com/moos-engineering/oxidizer
License: MIT
Requires-Python: >=3.10
Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM
Project-URL: Documentation, https://github.com/moos-engineering/oxidizer/tree/main/docs
Project-URL: Homepage, https://github.com/moos-engineering/oxidizer
Project-URL: Issues, https://github.com/moos-engineering/oxidizer/issues
Project-URL: Repository, https://github.com/moos-engineering/oxidizer

# Oxidizer ALPHA

A distributed DAG execution engine built with **Python** and **Rust** (via [PyO3](https://pyo3.rs) / [Maturin](https://www.maturin.rs)). Oxidizer orchestrates multi-tier data pipelines across workers using Redis Streams for messaging, Redis JSON for state, and S3/MinIO for config storage.

Supports both **batch DAG runs** and **live (streaming) topologies**. Live nodes use a stateless **batch+republish** model — each worker processes one batch, re-publishes the task back to the stream, and any worker can claim it next. Redistribution across workers is automatic; scaling requires zero coordination.

## Architecture

| Component | Role |
|---|---|
| **Oxidizer** | DAG controller — builds execution tiers, dispatches nodes to workers |
| **Reagent** | Worker — consumes tasks from a stream and runs user-defined processing |
| **Microscope** | FastMCP control plane — HTTP API, MCP tools, resources, and prompts |
| **Catalyst** | Redis/Valkey cache client (Rust core) — streams, state, locking |
| **Alloy** | Pipeline config manager — loads, validates, and stores YAML configs in S3/MinIO |
| **Anvil** | S3 operations toolkit — static helpers for object storage via Rust engine |
| **Formula** | DAG engine — constructs runs from cached configs, manages DAG lifecycle |
| **Residue** | Structured logging — structlog + Rust `pyo3_log` bridge, optional Redis log sink |

All I/O (Redis, S3, stream reads/writes) runs in async Rust via Tokio. Python only handles business logic.

## Project Structure

```
oxidizer/
├── examples/                   # Self-contained pipeline examples
│   ├── Dockerfile              # Shared multi-stage build
│   ├── main.py                 # Controller entrypoint
│   ├── api.py                  # API entrypoint
│   ├── skeleton/               # Minimal starter template (batch)
│   ├── batch_etl/              # E-Commerce ETL pipeline (batch)
│   ├── live_events/            # IoT sensor monitoring (live)
│   └── mixed_pipeline/         # Fintech transactions (mixed)
├── oxidizer/                   # Python package
│   ├── ui.html                 # Dashboard UI (served by Microscope)
│   └── *.py                    # Core modules
├── rust/src/                   # Rust source (PyO3 extension)
├── tests/                      # Pytest + Rust unit tests
├── Cargo.toml
├── pyproject.toml
└── requirements.txt
```

Each example folder contains its own `docker-compose.yml`, worker file(s), and YAML config. See [examples/README.md](examples/README.md) for details.

## Prerequisites

- Python 3.10+
- Rust 1.70+ (for local builds)
- Docker & Docker Compose (for containerized runs)
- Redis (with JSON module) and MinIO (or S3-compatible store)

## Quickstart — Docker Compose

The fastest way to run the full stack (using the skeleton example):

```bash
docker compose -f examples/skeleton/docker-compose.yml up --build -d
```

Each example starts the following core services:

| Service | Description |
|---|---|
| `main` | Oxidizer controller — executes DAG runs |
| `worker` (×N) | Reagent workers — process nodes from streams |
| `api` | Microscope — FastMCP server on port 8000 (HTTP + MCP + Dashboard) |
| `redis` | Redis Stack (with JSON module) on port 6379 |
| `minio` | MinIO object store on ports 9000/9001 |
| `minio-init` | Sidecar — creates the bucket and uploads alloy configs |

Some examples add additional workers (e.g. dedicated stream workers). See each example's `docker-compose.yml` for specifics.

Once running, the dashboard is at `http://localhost:8000`, the API at `http://localhost:8000/health`, the MCP endpoint at `http://localhost:8000/mcp`, and the MinIO console at `http://localhost:9001`.

To stop:

```bash
docker compose -f examples/skeleton/docker-compose.yml down
```

See [examples/README.md](examples/README.md) for all four examples and their deployment instructions.

## Quickstart — Local Development

### 1. Install

```bash
python -m venv .venv && source .venv/bin/activate
pip install maturin
maturin develop          # builds the Rust extension in-place
pip install -r requirements.txt
```

### 2. Controller (Oxidizer)

The controller builds a DAG from an alloy config, splits it into execution tiers, and dispatches each node to a worker stream.

```python
from oxidizer import configure_logging
from oxidizer.catalyst import Catalyst, CatalystConnection
from oxidizer.alloy import Alloy, AlloyConnection
from oxidizer.oxidizer import Oxidizer

configure_logging()  # structlog + Rust pyo3_log bridge

catalyst = Catalyst(connection=CatalystConnection(url="localhost"))
alloy = Alloy(connection=AlloyConnection(endpoint="localhost:9000"))

oxidizer = Oxidizer(catalyst=catalyst, alloy=alloy)

# Submit a DAG run by alloy name (non-blocking — queued to the alloy stream)
oxidizer.submit_run("example")

# Start the controller loop (blocks forever, dispatches runs as they arrive)
oxidizer.start()
```

### 3. Worker (Reagent)

Workers consume tasks from the oxidizer stream. Define your processing logic with the `@reagent.react()` decorator:

```python
from oxidizer.catalyst import Catalyst, CatalystConnection
from oxidizer.reagent import Reagent

catalyst = Catalyst(connection=CatalystConnection(url="localhost"))
reagent = Reagent(catalyst=catalyst)

@reagent.react()
def process(data: dict, context: dict):
    node_id = context["node_id"]
    run_id = context["run_id"]
    print(f"Processing {node_id} for run {run_id}")
    # data: dict mapping alias → resolved upstream records
    # context: run_id, node_id, alloy, layer, node_config, connections
    return {"data": data}
```

The decorated function receives `(data, context)`:
- **data**: dict mapping alias → resolved upstream records (tiered retrieval handled by the framework)
- **context**: dict with keys `run_id`, `node_id`, `alloy`, `layer`, `node_config`, `connections`

For dedicated streams (e.g. routing specific nodes to specialized workers):

```python
@reagent.react(dedicated_stream="gold_summary_stream")
def process(data: dict, context: dict):
    ...
```

### 4. UI / API / MCP (Microscope)

Microscope is a [FastMCP](https://gofastmcp.com) server that exposes HTTP endpoints for dashboards and curl, plus MCP tools, resources, and prompts for AI agent integration.

```python
from oxidizer.catalyst import Catalyst, CatalystConnection
from oxidizer.alloy import Alloy, AlloyConnection
from oxidizer.microscope import Microscope

catalyst = Catalyst(connection=CatalystConnection(url="localhost"))
alloy = Alloy(connection=AlloyConnection(endpoint="localhost:9000"))

microscope = Microscope(catalyst=catalyst, alloy=alloy)
microscope.run(host="0.0.0.0", port=8000)
```

See [API Reference](docs/api.md) for the full list of HTTP endpoints, MCP tools, resources, and prompts.

## Live Topologies

Oxidizer supports **live (streaming) topologies** alongside batch DAG runs. A live topology deploys long-running nodes that continuously process data from Redis Streams using a **batch+republish** model — each worker processes one batch, re-publishes the task, and any worker can claim it next.

```bash
# Deploy a live topology
curl -X POST http://localhost:8000/topology/deploy \
  -H 'Content-Type: application/json' \
  -d '{"alloy_name": "example"}'

# Check status
curl http://localhost:8000/topology/{run_id}/status

# Graceful stop
curl -X POST http://localhost:8000/topology/{run_id}/stop
```

See [Live Topologies](docs/live-topologies.md) for the full architecture, node pause/unpause, rebalancing, and crash recovery details.

## Documentation

Detailed documentation lives in the [`docs/`](docs/) folder:

| Document | Description |
|---|---|
| [Architecture](docs/architecture.md) | Component overview, data flow, node state machines, locking, error handling |
| [API Reference](docs/api.md) | All HTTP endpoints, MCP tools, resources, prompts, metrics schemas |
| [Configuration](docs/configuration.md) | Alloy YAML reference — layers, nodes, output blocks, scheduling, retention |
| [Live Topologies](docs/live-topologies.md) | Batch+republish model, topology entity, deploy/stop/rebalance, pause/unpause |
| [Batching](docs/batching.md) | XRANGE cursor loop, per-record fan-out, auto-flatten, batch_size config |
| [Retention](docs/retention.md) | Task stream cleanup, producer-owned output, hierarchical data_retention |
| [Plugins](docs/plugins.md) | Python + Rust plugin system with Maturin/PyO3, entry points |
| [Testing](docs/testing.md) | Pytest suite, integration tests, Docker Compose test process, pass/fail criteria |
| [Examples](examples/README.md) | Four self-contained pipeline examples (skeleton, batch_etl, live_events, mixed_pipeline) |
| [PyPI Publishing](PYPI.md) | Publishing to PyPI with GitHub Actions, trusted publisher, versioning strategies |

## Testing

```bash
# Rust unit tests (no external services needed)
cargo test

# Python tests (no external services needed)
python -m pytest tests/ -v
```

## License

TBD

