Metadata-Version: 2.4
Name: topoqueue
Version: 0.1.0
Summary: Single-machine DAG ready-queue with state machine and thread-safe API
License: Apache-2.0
Project-URL: Homepage, https://github.com/zijian-optics/TopoQueue
Project-URL: Repository, https://github.com/zijian-optics/TopoQueue
Project-URL: Documentation, https://github.com/zijian-optics/TopoQueue#readme
Keywords: dag,scheduler,workflow,dependency-graph,topology,thread-safe,state-machine
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Provides-Extra: dev
Requires-Dist: pytest>=7; extra == "dev"
Dynamic: license-file

![architecture](topoqueue.png)

# TopoQueue

A dependency-aware ready queue for DAG tasks.

TopoQueue behaves similarly to `queue.Queue`, but tasks are released only
when their dependencies are satisfied. It provides:

- DAG validation
- ready-queue scheduling
- task state machine
- thread-safe dispatch

TopoQueue is designed as a lightweight scheduling primitive for local DAG
task orchestration.

Non-goals:

- distributed scheduling
- task execution frameworks
- workflow UI or monitoring systems

[![Python 3.12+](https://img.shields.io/badge/python-3.12+-blue.svg)](https://www.python.org/downloads/) [![Apache 2.0](https://img.shields.io/badge/license-Apache%202.0-green.svg)](LICENSE)

## Installation

Requires **Python 3.12+**.

TopoQueue includes Cython-compiled extensions for performance.  
Install dependencies before building:

- [Cython](https://cython.org/)
- a C compiler  
  - Windows: [Visual Studio Build Tools](https://visualstudio.microsoft.com/visual-cpp-build-tools/)
  - Linux/macOS: system default compiler (gcc/clang)

Install for development:

```bash
pip install -e .
```

Install development dependencies:

```bash
pip install -e ".[dev]"
```

Run tests (install the package first, then run pytest):

```bash
pip install -e ".[dev]"
pytest tests/ -v
```

Alternatively, if you do not install the package, build Cython extensions in place first,
then run tests with `PYTHONPATH`:

```bash
pip install -e ".[dev]"   # still need dev deps for pytest
python setup.py build_ext --inplace
# Linux/macOS:
PYTHONPATH=src pytest tests/ -v
# Windows PowerShell:
$env:PYTHONPATH="src"; pytest tests/ -v
```


## Quick Start

```python
import queue
from topoqueue import DAGData, DAGQueue, Node, State

data = DAGData(nodes={
    "a": Node("a", deps=[]),
    "b": Node("b", deps=["a"]),
    "c": Node("c", deps=["b"]),
}, metadata={})

q = DAGQueue(data=data)
q.validate() 

while not q.is_finished():
    try:
        node = q.get_ready(block=False)
    except queue.Empty:
        break
    q.mark_done(node.node_id, result="ok")

q.join(timeout=5.0)
```

If a node fails:

q.mark_failed(node.node_id, cascade=True)

Downstream nodes will transition to **CANCELED** when `cascade=True`.

## When to Use TopoQueue

TopoQueue is designed as a lightweight scheduling primitive for dependency-based task execution.

Typical use cases include:

- Local DAG task runners
- Workflow engine foundations
- Build systems or CI dependency scheduling
- Task orchestration inside a single service
- Research or experimentation with DAG schedulers

TopoQueue is most useful when you need:

- deterministic dependency resolution
- a ready queue for DAG nodes
- thread-safe worker dispatch
- a minimal core without heavy framework constraints


## When Not to Use TopoQueue

TopoQueue is not intended to replace full workflow orchestration systems.

You should consider other tools if you need:

- distributed workers across multiple machines
- persistent task storage or recovery
- scheduling based on time (cron / intervals)
- built-in task execution environments
- monitoring dashboards or workflow UI
- complex retry, backoff, or SLA management

In those cases, a workflow engine such as Airflow, Prefect, or Ray may be more appropriate.

## State Machine

```mermaid
stateDiagram-v2
    [*] --> PENDING

    PENDING --> READY: dependencies satisfied
    DIRTY --> READY: dependencies satisfied
    READY --> RUNNING: get_ready()

    RUNNING --> DONE: mark_done()
    RUNNING --> FAILED: mark_failed()
    RUNNING --> CANCELED: cancel()

    DONE --> DIRTY: invalidate()
    READY --> PENDING: invalidate()

    FAILED --> READY: retry (attempts < max_retries)
    FAILED --> CANCELED: cascade downstream
```

State meanings:

**PENDING**  
Dependencies are not yet satisfied.

**READY**  
Dependencies completed; node is waiting in the ready queue.

**RUNNING**  
Node has been dispatched to a worker.

**DONE**  
Node finished successfully.

**DIRTY**  
Node was DONE (or READY) but invalidated; must be recomputed. When all dependencies are DONE again, it becomes READY.

**FAILED**  
Node execution failed.

**CANCELED**  
Node was canceled (e.g. by `cancel()` or upstream failed with cascade).

## Retry and Monitoring

Nodes support retry control using `max_retries`.

If `mark_failed()` is called and `attempts < max_retries`,
the node will automatically transition back to **READY** and be re-queued.

Each failure increments `attempts` automatically.

Queue statistics:

q.stats()

returns counts for each state and the value:

blocked  
    PENDING nodes whose dependency chain contains a FAILED node.

## Queue Policy

Task dispatch order is determined by **QueuePolicy**.

If no policy is provided, **DefaultQueuePolicy** is used
(priority first, then nodes with larger fan-out).

Example custom policy:

```python
from topoqueue import DAGQueue, QueuePolicy, QueueContext, DefaultQueuePolicy

class MyPolicy(QueuePolicy):
    def score(self, node, context):
        return (-node.priority, 0)
q = DAGQueue(data=data, policy=MyPolicy())
```

## Bulk Graph Construction and Persistence

Large DAGs can be built incrementally and validated once:

q = DAGQueue()
q.add_nodes([...])
q.validate()

DAG structures can be serialized:

data.to_json()
DAGData.from_json(s)

## Node identity

Core treats **node_id equality** as the same node. Two common strategies:

- **Stable node_id** — Same id across runs; `merge_dag()` identifies nodes by id and invalidates when `deps` or `priority` change.
- **Hash-based node_id** — e.g. `hash(payload + deps)` so semantic change implies a new id; merge then naturally adds/removes nodes. Caller can invalidate downstream as needed.

Snapshot and restore use the same node shape (no payload/results); persistence is up to the caller.

## API Overview

| Component | Description |
|------|------|
| **State** | PENDING, READY, RUNNING, DONE, DIRTY, FAILED, CANCELED |
| **Node** | `node_id`, `deps`(tuple), `priority`, `max_retries`, `payload`, `attempts`, `state`；`to_dict()`；支持 `**kwargs` |
| **DAGData** | `nodes`, `metadata`；`to_json()` / `from_json()` |
| **DAGQueue** | `validate()`, `add_node()`, `add_nodes()`, `merge_dag()`, `snapshot()`, `restore()`, `get_ready()`, `mark_done()`, `mark_failed()`, `invalidate()`, `cancel()`, `stats()`, `is_finished()`, `join()`, `rebuild_set(changed)`, `visualization_snapshot()`；可选 `policy`；可选 `result_store`、`event_sink`；`graph_version` 属性 |
| **VisualizationSnapshot** | 调试用只读快照：`node_ids`, `edges`, `node_states`, `ready_ids`, `running_ids`, `dirty_or_pending_ids`, `graph_version`；`to_dict()` 可序列化；不可 restore |
| **ResultStore** | 协议：`load(node_id)`, `save(node_id, result)`, `invalidate(node_id)`；由调用方实现，core 仅调用 |
| **Events** | `node_started`, `node_done`, `node_failed`, `node_invalidated`, `dag_merged`；传入 `event_sink` 可接收执行日志事件 |
| **SnapshotData** | `metadata`, `nodes`；`to_dict()` / `from_dict()`；快照不含 payload |
| **QueuePolicy** | `QueuePolicy`, `QueueContext`, `DefaultQueuePolicy` |

## Performance

Critical operations are implemented in Cython and compiled to C extensions.

Optimized paths include:

- DAG validation
- dependency resolution
- state transitions
- scheduling statistics

**Targets (see `docs/plans/2025-03-07-phase3-performance-constraints.md`):** 100k+ nodes DAG; O(nodes+edges) invalidation propagation.

Benchmark:

python -m benchmarks.bench_scheduler

## Examples

The directory `examples/wish_engine` demonstrates a simple DAG executor.

The example loads a DAG from CSV and runs multiple workers that consume
tasks from the ready queue.

Run the example:

```bash
python -m examples.wish_engine.run_scheduler examples/wish_engine/sample_dag.csv -n 2
```

## License

[Apache License 2.0](LICENSE)
