Metadata-Version: 2.4
Name: dimq-load-task
Version: 0.1.2
Classifier: Development Status :: 3 - Alpha
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Rust
Classifier: License :: OSI Approved :: MIT License
Summary: Rust-powered load generation task for DIMQ
Author-email: Walnut Geek <wg@walnutgeek.com>
License-Expression: MIT
Requires-Python: >=3.9
Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM
Project-URL: Homepage, https://github.com/walnutgeek/dimq
Project-URL: Repository, https://github.com/walnutgeek/dimq

# DIMQ - Distributed In-Memory Queue

A general-purpose distributed task processing framework built on ZeroMQ. Workers connect to a central orchestrator, receive tasks, and return results. The orchestrator adaptively tunes per-worker parallelization to maximize throughput.

## Architecture

```
                    +-------------------+
  Clients -------->| Orchestrator      |<-------- Workers
  (DEALER)         | (2x ROUTER)       |          (DEALER)
                   |                   |
  SUBMIT/STATUS/   | - FIFO task queue |  READY/HEARTBEAT/
  RESULT queries   | - Result storage  |  RESULT messages
                   | - Retry logic     |
                   | - Adaptive tuning |
                   +-------------------+
```

**Orchestrator** runs two ZMQ ROUTER sockets: one for workers (registration, heartbeats, task dispatch/results) and one for clients (task submission, status queries, result retrieval).

**Workers** connect via ZMQ DEALER sockets, register with their CPU count, and pull tasks. They run tasks concurrently (sync tasks in a thread pool, async tasks natively) and report results back. Workers handle timeout cancellation locally; the orchestrator owns all retry decisions.

**Adaptive parallelization** starts each worker's parallel task limit at its CPU count, then probes upward. If throughput plateaus or drops, it scales back by one and enters steady mode, re-probing periodically.

**Tasks** are plain Python functions with Pydantic-typed input and output. No base class or decorator needed -- the framework introspects types via `inspect`.

## Prerequisites

- Python 3.9+
- [uv](https://docs.astral.sh/uv/)
- Rust toolchain (for the LoadTask extension only)

## Setup

```bash
# Install Python dependencies
uv sync

# (Optional) Build the Rust LoadTask extension
cd dimq_load_task
uv tool run maturin develop --uv
cd ..
```

## Running

Create a config file:

```yaml
# config.yaml
endpoint: "tcp://0.0.0.0:5555"
client_endpoint: "tcp://0.0.0.0:5556"
heartbeat_interval_seconds: 5
heartbeat_timeout_missed: 3

tasks:
  - name: "my_app.tasks:process"
    max_retries: 3
    timeout_seconds: 30
```

Start the orchestrator and one or more workers:

```bash
# Terminal 1: start orchestrator
uv run dimq orchestrator --config config.yaml

# Terminal 2: start a worker (same machine)
uv run dimq worker --config config.yaml

# Terminal 3: start a worker on another machine (override endpoint)
uv run dimq worker --config config.yaml --endpoint tcp://orchestrator-host:5555
```

### Submitting tasks programmatically

```python
import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.DEALER)
sock.connect("tcp://localhost:5556")

# Submit a task
sock.send_multipart([
    b"SUBMIT",
    b"my_app.tasks:process",    # task type
    b"task-001",                 # task ID
    b'{"input_field": "value"}', # JSON payload matching the Pydantic input model
])

# Receive ACK
ack = sock.recv_multipart()  # [b"ACK", b"task-001"]

# Later, query the result
sock.send_multipart([b"RESULT", b"task-001"])
reply = sock.recv_multipart()
# [b"RESULT_REPLY", b"task-001", b"COMPLETED", b'{"output_field": "result"}']
```

## Testing

```bash
# Run all tests (excludes e2e by default if Docker unavailable)
uv run pytest -v

# Run specific test modules
uv run pytest tests/test_orchestrator.py -v
uv run pytest tests/test_integration.py -v

# Run LoadTask tests (requires Rust extension to be built)
uv run pytest tests/test_load_task.py -v

# Run end-to-end Docker test (requires Docker daemon)
# Spins up 3 worker containers, submits load tasks, verifies adaptive tuning
uv run pytest tests/test_e2e_docker.py -v -s
```

## Writing Custom Tasks

A task is a plain function with Pydantic-typed input and output. It can be sync or async.

```python
# my_app/tasks.py
from pydantic import BaseModel

class ImageInput(BaseModel):
    url: str
    width: int
    height: int

class ImageOutput(BaseModel):
    thumbnail_path: str
    original_size_bytes: int

def resize(input: ImageInput) -> ImageOutput:
    # Your logic here
    return ImageOutput(
        thumbnail_path=f"/tmp/{input.width}x{input.height}.jpg",
        original_size_bytes=1024,
    )

# Async tasks work the same way
async def fetch_and_resize(input: ImageInput) -> ImageOutput:
    ...
```

Register it in your config:

```yaml
tasks:
  - name: "my_app.tasks:resize"
    max_retries: 2
    timeout_seconds: 60
  - name: "my_app.tasks:fetch_and_resize"
    max_retries: 3
    timeout_seconds: 120
```

The framework uses `inspect` to automatically extract:
- Input type from the first parameter's annotation
- Output type from the return annotation
- Whether the function is sync or async

Sync tasks run in a thread pool. Async tasks run natively in the event loop. If a task exceeds `timeout_seconds`, the worker cancels it and reports a timeout to the orchestrator, which handles retries.

## LoadTask (Rust Extension)

A built-in stress-testing task implemented in Rust (pyo3). It creates configurable CPU, I/O, and memory pressure while releasing the GIL so Python threading works efficiently.

```python
import dimq_load_task

result = dimq_load_task.run(
    duration_seconds=5.0,
    concurrency=4,       # number of CPU threads
    cpu_load=0.7,        # fraction of duration for CPU work (SHA-256 hashing)
    io_load=0.2,         # fraction for I/O (temp file writes)
    memory_mb=100,       # memory to allocate and touch
)
# result = {
#     "phases": [
#         {"type": "memory", "start_seconds": 0.0, "duration_seconds": 0.001},
#         {"type": "cpu", "start_seconds": 0.001, "duration_seconds": 3.5},
#         {"type": "io", "start_seconds": 3.501, "duration_seconds": 1.0},
#     ],
#     "total_duration_seconds": 4.501,
#     "peak_memory_mb": 100,
# }
```

## Project Structure

```
DIMQ/
├── pyproject.toml
├── Dockerfile              # Multi-stage build for worker containers
├── docker-compose.yml      # 3 worker services for e2e testing
├── src/dimq/
│   ├── orchestrator.py     # ZMQ ROUTER, dispatch, heartbeat, retry
│   ├── worker.py           # ZMQ DEALER, task execution, timeout
│   ├── adaptive.py         # Throughput-based parallelization tuning
│   ├── task.py             # Task loading and introspection via inspect
│   ├── models.py           # Pydantic models (TaskRecord, DimqConfig, etc.)
│   ├── config.py           # YAML config loading
│   ├── cli.py              # CLI entry points
│   └── tasks/
│       └── load.py         # Pydantic wrapper for dimq_load_task
├── dimq_load_task/         # Rust extension (pyo3 + maturin)
│   ├── Cargo.toml
│   ├── pyproject.toml
│   └── src/lib.rs
├── e2e/
│   └── config.yaml         # Worker config for Docker containers
└── tests/
    ├── test_orchestrator.py
    ├── test_worker.py
    ├── test_adaptive.py
    ├── test_integration.py
    ├── test_e2e_docker.py   # Docker-based e2e test
    ├── test_load_task.py
    ├── test_tasks_load.py   # LoadTask wrapper tests
    ├── test_models.py
    ├── test_config.py
    ├── test_task.py
    └── sample_tasks.py      # Test fixtures
```

