Metadata-Version: 2.4
Name: athanore
Version: 0.0.2
Summary: Polls project management APIs for ready tickets and spawns AI agents to work on them
Author-email: Scott <me@scottrussell.net>
License-Expression: MIT
License-File: LICENSE
Keywords: agent,ai,automation,dispatcher
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Build Tools
Requires-Python: >=3.13
Requires-Dist: starlette>=0.30.0
Requires-Dist: uvicorn>=0.20.0
Description-Content-Type: text/markdown

# athanore

Polls task queues, dispatches agent subprocesses, and exposes an HTTP API so agents can interact with tasks without knowing which backend is in use.

## How it works

The router polls configured queues for ready tasks. When it finds one, it moves the task to an in-progress queue, spawns an agent (via an `AgentAdapter`), and passes task details to the agent. The subprocess uses a local HTTP API to read task details, post comments, update fields, and move the task when done. The agent never talks to the backend directly.

## Key concepts

- **Queue adapters** — Protocol-based (`QueueAdapter`, 11 methods). Ships with a JSON file adapter. A Planka adapter is included as a user-land example (`planka_backend.py`). Implement the protocol for anything else (Jira, Trello, Linear, SQLite, SQS, etc.).
- **Agent adapters** — Protocol-based (`AgentAdapter`). Owns the full agent run lifecycle (spawn, monitor, cleanup). Ships with `SubprocessAgentAdapter` (generic subprocess management). A Claude adapter using the `claude-code-sdk` is included as a user-land example (`claude_adapter.py`).
- **Routes** — Flask-style `@dispatcher.route()` decorators map queues to prompt-generating functions. Each route can specify its own agent adapter.
- **HTTP API** — Agents hit localhost. No credentials, no backend coupling.

## Quick start

Requires Python 3.13+.

```sh
uv pip install -e .          # preferred (pip install -e . also works)
uv pip show athanore  # verify the install succeeded
```

Create a Python script (e.g. `run.py`):

```python
from athanore import AgentDispatcher, JsonFileAdapter

dispatcher = AgentDispatcher(
    command="claude",
    poll_interval=30,
    agent_timeout=600,
    max_concurrent_agents=3,
    queue_backend=JsonFileAdapter("/tmp/board.json"),
)

@dispatcher.route(
    args=["--agent", "engineer", "-p"],
    queue_name="Todo",
    in_progress_queue="In Progress",
)
def engineer_agent(task_id: str, task_name: str) -> str:
    return f"Work on task {task_id}: {task_name}."

if __name__ == "__main__":
    dispatcher.run(debug=True)  # enable DEBUG logging (default: False)
```

> For Planka users, see `planka_backend.py` at the repo root for a ready-made backend.

```sh
python run.py
```

This starts two things:

1. **Router** — polls configured queues, picks up tasks, moves them to in-progress, and spawns agents via agent adapters.
2. **HTTP API** — listens on `http://{api_host}:{api_port}` so spawned agents can interact with tasks.

## Configuration reference

All configuration is done via the `AgentDispatcher` constructor.

### Constructor arguments

| Argument | Type | Default | Description |
|---|---|---|---|
| `command` | `str` | `""` | Base command to run for all routes (e.g. `"claude"`). Optional when using per-route `agent=` adapters. |
| `poll_interval` | `int` | `30` | Seconds between polls |
| `agent_timeout` | `int \| None` | `None` | Default timeout in seconds for all agents |
| `max_concurrent_agents` | `int` | `3` | Max agent processes at once |
| `api_host` | `str` | `"127.0.0.1"` | HTTP API bind address |
| `api_port` | `int` | `8000` | HTTP API port |
| `queue_backend` | `QueueAdapter \| None` | `None` | Task backend (required before calling `run()`). Any object with a `create_adapter()` method also works. |
| `default_agent` | `AgentAdapter \| None` | `None` | Default agent adapter for all routes. Defaults to `SubprocessAgentAdapter()` if not provided. |
| `max_retries` | `int` | `0` | Default max retry attempts for failed or timed-out agents. `0` disables retries. |
| `dead_letter_queue` | `str \| None` | `None` | Default queue name where tasks are moved after exhausting all retries. |
| `enable_queue_management` | `bool` | `False` | Enable queue CRUD HTTP endpoints |

### Route decorator

The `@dispatcher.route()` decorator registers a queue-to-command mapping. The decorated function receives `(task_id, task_name)` and returns a prompt string appended to the command arguments.

```python
@dispatcher.route(
    queue_name="My Project.My Board.Todo",          # required: queue to poll
    in_progress_queue="My Project.My Board.WIP",    # default: "In Progress"
    args=["--agent", "engineer", "-p"],              # extra args before prompt
    timeout=1800,                                    # route-specific timeout (optional)
    poll_interval=10,                                # route-specific poll interval (optional)
    priority=1,                                      # dispatch priority (optional)
    max_retries=3,                                   # max retry attempts (optional)
    dead_letter_queue="My Project.My Board.Failed",  # DLQ for exhausted retries (optional)
    agent=ClaudeAgentAdapter(subagent="eng"),         # per-route agent adapter (optional)
)
def my_agent(task_id: str, task_name: str) -> str:
    return f"Work on task {task_id}: {task_name}."
```

### Agent adapters

Agent adapters own the full lifecycle of running an agent: spawning, monitoring stdout, enforcing timeouts, and cleanup.

- **`SubprocessAgentAdapter`** — Generic subprocess management. Uses `route.format_command()` to build the command, spawns it, reads stdout, and enforces timeouts. Subclass to customize behavior.
- **`ClaudeAgentAdapter`** (`claude_adapter.py`) — Uses the `claude-code-sdk` to run Claude agents natively via the SDK's `query()` API. No subprocess management or stdout parsing needed. Picks up existing `.claude/` configuration automatically. Supports `subagent`, `model`, `permission_mode`, `max_turns`, `max_budget_usd`, and `cwd` constructor params.

Install the SDK separately: `uv pip install claude-code-sdk`

Per-route agent assignment:

```python
from athanore import AgentDispatcher
from claude_adapter import ClaudeAgentAdapter

dispatcher = AgentDispatcher(
    poll_interval=30,
    queue_backend=my_backend,
)

@dispatcher.route(
    queue_name="Engineering",
    agent=ClaudeAgentAdapter(subagent="software-engineer"),
)
def eng(task_id, task_name):
    return f"Implement {task_id}: {task_name}"

@dispatcher.route(
    queue_name="Research",
    agent=ClaudeAgentAdapter(subagent="research"),
)
def research(task_id, task_name):
    return f"Research {task_id}: {task_name}"
```

### Agent timeouts

You can configure timeouts to automatically terminate agent processes that run too long:

- **`agent_timeout`** (constructor): Sets a global timeout in seconds for all agents. If not specified, agents run indefinitely.
- **`timeout`** (per-route): Sets a route-specific timeout in seconds. Overrides `agent_timeout` for that route.

When an agent times out:
1. The process receives a TERM signal and has 5 seconds to exit gracefully
2. If it doesn't exit, it receives a KILL signal
3. A comment is added to the task noting the timeout

```python
dispatcher = AgentDispatcher(
    command="my-agent",
    agent_timeout=3600,  # 1 hour default for all agents
    queue_backend=my_backend,
)

@dispatcher.route(
    queue_name="Quick Tasks",
    timeout=300,  # 5 minutes for quick tasks (overrides default)
)
def quick(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(queue_name="Long Tasks")
# No timeout — uses default of 3600 seconds
def long_running(task_id, task_name):
    return f"Handle {task_id}"
```

### Route priority

When `max_concurrent_agents` is limited, routes with lower `priority` values are dispatched first. This lets you ensure downstream queues (closer to completion) are serviced before upstream ones, so a task flows all the way through a pipeline before new work begins.

Routes without an explicit `priority` use their registration order as a tiebreaker.

```python
@dispatcher.route(queue_name="QA",          priority=1)  # serviced first
def qa(task_id, task_name):
    return f"Review {task_id}"

@dispatcher.route(queue_name="Engineering", priority=2)
def eng(task_id, task_name):
    return f"Implement {task_id}"

@dispatcher.route(queue_name="Todo",        priority=3)  # serviced last
def todo(task_id, task_name):
    return f"Handle {task_id}"
```

### Per-queue poll intervals

By default, all queues are polled at the global `poll_interval` rate. You can override this per-route to poll high-priority queues more frequently or low-priority queues less often:

The router's internal tick rate automatically adjusts to the shortest configured interval, so no queue is ever starved.

```python
dispatcher = AgentDispatcher(
    command="my-agent",
    poll_interval=60,  # default for all queues
    queue_backend=my_backend,
)

@dispatcher.route(queue_name="High Priority", poll_interval=10)
def urgent(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(queue_name="Background", poll_interval=1800)
def background(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(queue_name="Normal")
# No poll_interval — uses global default of 60s
def normal(task_id, task_name):
    return f"Handle {task_id}"
```

### Retry and dead-letter queues

The dispatcher can automatically retry failed agents and optionally route exhausted tasks to a dead-letter queue (DLQ).

**What triggers a retry:**

- Agent failure (non-zero exit code)
- Agent timeout
- **Not** cancellation (e.g., router shutdown or `KeyboardInterrupt`)

**Retry behavior** — when a retry is triggered and the task's retry count is below `max_retries`:

1. The task's retry count is incremented.
2. A comment is added to the task (e.g., "Retry 1/3: timed out. Moving back to source queue for retry.").
3. The task is moved back to the source queue (the original watched queue), where it will be picked up again on the next poll cycle.

**Exhaustion without DLQ** — when the retry count reaches `max_retries` and no `dead_letter_queue` is configured:

- A comment is added: "Max retries (N) exhausted. No dead-letter queue configured; leaving in place."
- The task stays in the in-progress queue. Manual intervention is required.

**Exhaustion with DLQ** — when the retry count reaches `max_retries` and a `dead_letter_queue` is configured:

- A comment is added: "Max retries (N) exhausted. Moving to dead-letter queue."
- The task is moved to the configured dead-letter queue.

**Configuration** — retry and DLQ settings can be configured at two levels:

- **Global defaults**: `max_retries` and `dead_letter_queue` on the `AgentDispatcher` constructor apply to all routes.
- **Per-route overrides**: `max_retries` and `dead_letter_queue` on `@dispatcher.route()` override the global defaults for that route.

> **Note:** Setting `dead_letter_queue=None` on a per-route basis does **not** disable a global DLQ — it falls through to the global default. There is currently no way to explicitly disable a globally configured DLQ for a single route.

```python
dispatcher = AgentDispatcher(
    command="my-agent",
    max_retries=2,                      # default: retry up to 2 times
    dead_letter_queue="Failed Tasks",   # default DLQ for all routes
    queue_backend=my_backend,
)

@dispatcher.route(
    queue_name="Critical",
    max_retries=5,                              # override: more retries for critical tasks
    dead_letter_queue="Critical.Failed",        # override: separate DLQ
)
def critical(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(
    queue_name="Best Effort",
    max_retries=1,                              # override: only one retry
    # No dead_letter_queue override — uses global "Failed Tasks"
)
def best_effort(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(queue_name="Normal")
# No overrides — uses global defaults (2 retries, DLQ = "Failed Tasks")
def normal(task_id, task_name):
    return f"Handle {task_id}"
```

## HTTP API

| Method | Endpoint | Description |
|---|---|---|
| `GET` | `/status` | Server metadata: capacity, timing, active agent count |
| `GET` | `/agents` | List running agents with task IDs, runtime, timeout |
| `GET` | `/agents/{task_id}` | Single agent detail with buffered event history |
| `GET` | `/tasks/{task_id}` | Full task info: description, labels, assignees, retry_count, comments |
| `GET` | `/tasks/{task_id}/events` | SSE stream of agent events for a task |
| `POST` | `/tasks/{task_id}/comments` | Post a comment on a task (`{"comment": "text"}`) |
| `POST` | `/tasks/{task_id}/move` | Move a task to a different queue (`{"target_queue": "name"}`) |
| `PATCH` | `/tasks/{task_id}` | Update task fields (`{"name": "...", "description": "...", "labels": [...], "assignees": [...]}`) |
| `POST` | `/tasks` | Create a new task (`{"queue_name": "...", "name": "...", "description": "..."}`) |
| `GET` | `/queues` | List all queues with task counts |
| `GET` | `/queues/{queue_name}` | Get details for a specific queue |
| `POST` | `/queues` | Create a new queue (`{"name": "..."}`) |
| `PATCH` | `/queues/{queue_name}` | Update/rename a queue (`{"name": "..."}`) |
| `DELETE` | `/queues/{queue_name}` | Delete an empty queue |
| `GET` | `/routes` | List all configured routes |
| `GET` | `/routes/{queue_name}` | Get details for a specific route |
| `POST` | `/routes` | Create a new route (`{"queue_name": "...", ...}`) |
| `PATCH` | `/routes/{queue_name}` | Update route fields |
| `DELETE` | `/routes/{queue_name}` | Delete a route |

Queue/route write endpoints (`POST`, `PATCH`, `DELETE`) require `enable_queue_management=True`.

## Task lifecycle

1. Task sits in a watched queue (e.g. `Todo`)
2. Router picks it up, moves it to the in-progress queue, and assigns the authenticated user
3. Router spawns an agent via the route's `AgentAdapter`
4. The agent uses the HTTP API to read task details, add comments, etc.
5. When finished, the agent calls the move endpoint to move the task to a done queue

## Backends

### Planka

The Planka backend is provided as a user-land file (`planka_backend.py` at the repo root), not as part of the library. It requires `plankapy>=2.3.0` to be installed separately:

```sh
uv pip install plankapy>=2.3.0
```

Uses dot-notation for queue naming: `Project.Board.List`.

```python
from athanore import AgentDispatcher
from planka_backend import PlankaBackend

dispatcher = AgentDispatcher(
    command="my-agent",
    queue_backend=PlankaBackend(url="http://localhost:1337"),
)

@dispatcher.route(
    queue_name="My Project.My Board.Todo",
    in_progress_queue="My Project.My Board.In Progress",
    args=["-p"],
)
def handle(task_id: str, task_name: str) -> str:
    return f"Work on task {task_id}: {task_name}"
```

#### Planka authentication

Credentials can be passed directly as kwargs or resolved from environment variables:

```python
# Option 1: API token (kwarg)
PlankaBackend(url="http://localhost:3000", token="your-token-here")

# Option 2: Username + password (kwargs)
PlankaBackend(url="http://localhost:3000", username="admin", password="secret")

# Option 3: Environment variables (default when no kwargs are given)
# PLANKA_TOKEN=your-token-here
# — or —
# PLANKA_USER=admin  +  PLANKA_PASSWORD=secret
PlankaBackend(url="http://localhost:3000")
```

Credentials are resolved at `dispatcher.run()` time, not at import time. If you use `.env` files, call `dotenv.load_dotenv()` in your script before `dispatcher.run()`.

### JSON file

For development/testing or lightweight use without external services.

```python
from athanore import AgentDispatcher, JsonFileAdapter

dispatcher = AgentDispatcher(
    command="my-agent",
    queue_backend=JsonFileAdapter("/tmp/board.json"),
)
```

The JSON file structure:

```json
{
  "queues": {
    "Todo": [
      {"id": "1", "name": "Fix crash", "description": "...", "labels": [], "assignees": [], "comments": [], "tasks": []}
    ],
    "In Progress": [],
    "Done": []
  }
}
```

### Custom

Implement the `QueueAdapter` protocol (11 methods) in `athanore/queue_adapters/base.py`:

- `get_ready_tasks(queue_names)` — Return tasks from the given queues
- `get_task(task_id)` — Return a single task by ID
- `move_task(task_id, target_queue)` — Move a task between queues
- `add_comment(task_id, text)` — Add a comment to a task
- `update_task(task_id, *, assignees, name, description, labels)` — Update task fields
- `create_task(queue_name, name, description)` — Create a new task
- `list_queues()` — List all queues with task counts
- `get_queue(queue_name)` — Get a single queue's info
- `create_queue(queue_name)` — Create a new empty queue
- `update_queue(queue_name, *, new_name)` — Rename a queue
- `delete_queue(queue_name)` — Delete an empty queue

Pass your custom adapter directly to the constructor:

```python
dispatcher = AgentDispatcher(
    command="my-agent",
    queue_backend=MyCustomAdapter(),
)
```

## Development

```sh
uv pip install -e ".[dev]"   # pip install -e ".[dev]" also works
pytest
```
