Metadata-Version: 2.4
Name: pgqrs
Version: 0.15.2
Classifier: Programming Language :: Rust
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: License :: OSI Approved :: Apache Software License
Requires-Dist: psycopg[binary]>=3.0
Requires-Dist: pytest>=7.0 ; extra == 'test'
Requires-Dist: pytest-asyncio>=0.21 ; extra == 'test'
Requires-Dist: testcontainers[postgres]>=3.7 ; extra == 'test'
Requires-Dist: boto3>=1.34 ; extra == 'test'
Requires-Dist: urllib3<2.0 ; extra == 'test'
Provides-Extra: test
Summary: PostgreSQL-backed job queue with Rust and Python bindings
Keywords: postgresql,queue,task-queue,background-jobs,job-queue
Author: Rajat Venkatesh
License-Expression: MIT OR Apache-2.0
Requires-Python: >=3.11
Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM
Project-URL: Documentation, https://pgqrs.vrajat.com
Project-URL: Homepage, https://github.com/vrajat/pgqrs
Project-URL: Issue Tracker, https://github.com/vrajat/pgqrs/issues
Project-URL: Repository, https://github.com/vrajat/pgqrs

# py-pgqrs

**pgqrs is a postgres-native, library-only durable execution engine.**

Python bindings for the Rust core. Built for Postgres. Also supports SQLite and Turso.

## What is Durable Execution?

A durable execution engine ensures workflows resume from application crashes or pauses. 
Each step executes exactly once. State persists in the database. Processes resume from the last completed step.

## Key Properties

- **Postgres-native:** Leverages SKIP LOCKED, ACID transactions
- **Library-only:** Runs in-process with your application
- **Multi-backend:** Postgres (production), SQLite/Turso (testing, CLI, embedded)
- **Type-safe:** Rust core with idiomatic Python bindings
- **Transaction-safe:** Exactly-once step execution within database transactions

## Installation

```bash
pip install pgqrs
```

For local development:

```bash
make requirements
```

## Backend Support

py-pgqrs supports all three backends. Choose the right one for your use case:

```python
# PostgreSQL (production)
store = await pgqrs.connect("postgresql://user:pass@localhost:5432/db")

# SQLite (embedded, testing)
store = await pgqrs.connect("sqlite:///path/to/database.db")

# Turso (SQLite-compatible, embedded)
store = await pgqrs.connect("turso:///path/to/database.db")
```

## Usage

### Producer + Consumer

```python
import asyncio
import pgqrs

async def main():
    store = await pgqrs.connect("postgresql://localhost/mydb")

    admin = pgqrs.admin(store)
    await admin.install()
    await store.queue("tasks")

    producer = await store.producer("tasks")
    msg_id = await producer.enqueue({"task": "process_image", "url": "..."})
    print(f"Enqueued job {msg_id}")

    consumer = await store.consumer("tasks")
    messages = await consumer.dequeue(batch_size=1)
    for msg in messages:
        print(f"Processing {msg.id}: {msg.payload}")
        await consumer.archive(msg.id)

asyncio.run(main())
```

### Durable Workflow (Python)

```python
import asyncio
import pgqrs

async def main():
    store = await pgqrs.connect("postgresql://localhost/mydb")
    admin = pgqrs.admin(store)
    await admin.install()

    await pgqrs.workflow().name("archive_files").store(store).create()
    consumer = await pgqrs.consumer("worker-1", 8080, "archive_files").create(store)

    await pgqrs.workflow() \
        .name("archive_files") \
        .store(store) \
        .trigger({"path": "/tmp/report.csv"}) \
        .execute()

    messages = await consumer.dequeue(batch_size=1)
    msg = messages[0]

    run = await pgqrs.run().message(msg).store(store).execute()
    step = await run.acquire_step("list_files", current_time=run.current_time)
    if step.status == "EXECUTE":
        await step.guard.success([msg.payload["path"]])

    step = await run.acquire_step("create_archive", current_time=run.current_time)
    if step.status == "EXECUTE":
        await step.guard.success(f"{msg.payload['path']}.zip")

    await run.complete({"archive": f"{msg.payload['path']}.zip"})
    await consumer.archive(msg.id)

asyncio.run(main())
```

## Testing

```bash
make test-py PGQRS_TEST_BACKEND=postgres
```

## Documentation

- **[Full Documentation](https://pgqrs.vrajat.com)** - Complete guides and API reference
- **[Docs Home](../docs/index.md)** - Master documentation source
- **[Python Examples](tests/test_pgqrs.py)** - Python test suite with examples
- **[API Reference](../docs/user-guide/api/consumer.md)** - Consumer/producer API details

