Metadata-Version: 2.4
Name: slonq
Version: 0.0.0
Classifier: Programming Language :: Rust
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: License :: OSI Approved :: MIT License
License-File: LICENSE
Summary: A reliable, lightweight PostgreSQL job queue. Uses FOR UPDATE SKIP LOCKED for atomic, concurrent task distribution without external dependencies.
Author-email: Borislav Borisov <chmodas@astutebits.com>
License-Expression: MIT
Requires-Python: >=3.11
Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM
Project-URL: Homepage, https://github.com/astutebits/slonq
Project-URL: Issues, https://github.com/astutebits/slonq/issues
Project-URL: Repository, https://github.com/astutebits/slonq

# slonq

`slonq` (from the Slavic slon, meaning elephant, + q for queue) is a reliable, lightweight PostgreSQL job queue.

This is the Python package for `slonq`, providing high-performance bindings to the Rust core.

Built on top of `FOR UPDATE SKIP LOCKED`, it provides atomic, concurrent task distribution without the need for an external message broker. It is designed for small to medium-scale projects that already use PostgreSQL and require an ‘at-least-once’ delivery guarantee without adding infrastructure complexity.

## Features

- **Atomic concurrency:** Utilises Postgres `SKIP LOCKED` to ensure multiple workers can dequeue tasks simultaneously without collisions.
- **Idempotency support:** Built-in support for idempotency keys to prevent duplicate job insertion.
- **Delayed jobs:** Schedule tasks to become visible at a specific time in the future.
- **Lease mechanism:** Jobs are ‘leased’ to workers; if a worker crashes, the lease expires and the job becomes visible for retry.
- **High performance:** Core logic implemented in Rust with Python bindings via PyO3.

## Installation

```bash
pip install slonq
```

## Quick start

The following example demonstrates the full lifecycle of a job, including enqueuing, dequeuing, heartbeat (touching), and final acknowledgement using `asyncio`.

```python
import asyncio
from slonq import PgQueue

async def main():
    # 1. Initialise the connection to Postgres
    queue = await PgQueue.connect("postgres://postgres@localhost:5432/db")

    # 2. Enqueue a job with an idempotency key and a 5-minute (300s) lease
    await queue.enqueue(
        "unique-request-id-123", 
        {"type": "process_video", "path": "/uploads/vid.mp4"}, 
        300
    )

    # 3. Dequeue a batch of jobs for 'worker-01'
    # This atomically leases up to 5 jobs for 3 attempts each
    jobs = await queue.dequeue("worker-01", 5, 3)

    for job in jobs:
        lease = job.lease_key()
        if not lease:
            continue

        # 4. Extend the lease (Heartbeat)
        # If the task is taking longer than expected, 'touch' it to prevent others from picking it up
        await queue.touch(lease, 60)

        # 5. Success vs failure logic
        success = True  # Replace with actual processing logic

        if success:
            # 6. Acknowledge (mark as done)
            await queue.ack(lease)
        else:
            # 7. Negatively acknowledge (return to queue with a 10s delay)
            await queue.nack(lease, 3, delay_seconds=10.0)
    
    # 8. Batch Acknowledgement
    # If you have a list of processed jobs, you can ack them all at once
    # await queue.ack_batch(list_of_leases)

if __name__ == "__main__":
    asyncio.run(main())
```

## How it works

`slonq` manages job states through a visibility-based lease system:

1. **Enqueue:** A job is inserted with a `visible_at` timestamp.
2. **Dequeue:** A worker selects a batch of jobs where `visible_at <= now()`. Using `FOR UPDATE SKIP LOCKED`, Postgres ensures no two workers grab the same job. The `visible_at` is then moved forward by the `lease_timeout`, effectively ‘locking’ the job for that worker.
3. **Heartbeat:** If a job is long-running, the worker can call `touch()` to extend the lease.
4. **Ack/Nack:**
    - **Ack:** Successfully processed jobs are marked as `done`.
    - **Nack:** If a worker fails, it can negatively acknowledge the job to make it visible for retry immediately (or with a delay).
5. **Recovery:** If a worker crashes, the `visible_at` time eventually passes, and the job naturally becomes available for another worker to attempt.

## Operational considerations

- **Database schema:** You must run the provided migration SQL to create the necessary table and indices.
- **Visibility:** Because slonq relies on `now()`, ensure your application servers and database server have synchronised clocks.
- **At-least-once delivery:** slonq guarantees that a job will be delivered to at least one worker. Ensure your worker logic is idempotent.

## Database schema

`slonq` requires a specific table structure and a custom ENUM type to manage job states. You can apply the following migration to your PostgreSQL instance:

```sql
-- Required ONLY for PostgreSQL versions prior to 13 to support gen_random_uuid()
CREATE EXTENSION IF NOT EXISTS pgcrypto;

-- Define the job lifecycle states
CREATE TYPE job_status AS ENUM ('pending', 'in_progress', 'done', 'failed');

CREATE TABLE jobs
(
   id                    BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
   idempotency_key       TEXT UNIQUE NOT NULL,
   status                job_status  NOT NULL DEFAULT 'pending',
   payload               JSONB       NOT NULL,

   -- 'Next eligible time' logic:
   -- Pending: When the job becomes available for its first attempt.
   -- In_progress: When the current lease is set to expire.
   visible_at            TIMESTAMPTZ NOT NULL DEFAULT now(),

   -- Tracks attempts to manage retry logic and dead-lettering
   attempt_count         INT         NOT NULL DEFAULT 0,

   -- Per-job lease duration (in seconds)
   lease_timeout_seconds INT         NOT NULL DEFAULT 60 CHECK (lease_timeout_seconds > 0),

   -- Unique lease identifier (regenerated on each dequeue)
   lease_id              UUID NULL,
   leased_by             TEXT NULL,

   created_at            TIMESTAMPTZ NOT NULL DEFAULT now(),
   updated_at            TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- Index for the dequeue operation (FOR UPDATE SKIP LOCKED)
CREATE INDEX idx_jobs_dequeue
   ON jobs (visible_at) WHERE status IN ('pending', 'in_progress');
```

## License

This project is licensed under the MIT License.

