# azure-functions-db-python — Full LLM Reference

> Database trigger and input/output bindings for Azure Functions Python v2, powered by SQLAlchemy.

## Package Info

- PyPI: `pip install azure-functions-db-python`
- Version: 0.1.0
- Python: >=3.10, <3.15
- License: MIT
- Docs: https://yeongseon.github.io/azure-functions-db-python/
- Repository: https://github.com/yeongseon/azure-functions-db-python
- Azure Functions programming model: v2

## Installation

```bash
# Pick your database driver
pip install azure-functions-db-python[postgres]   # PostgreSQL (psycopg)
pip install azure-functions-db-python[mysql]      # MySQL (PyMySQL)
pip install azure-functions-db-python[mssql]      # SQL Server (pyodbc)
pip install azure-functions-db-python[all]        # All drivers
```

## Public API

### Core Classes

```python
from azure_functions_db import (
    DbBindings,
    DbOut,
    DbReader,
    DbWriter,
    SqlAlchemySource,
    BlobCheckpointStore,
    PollTrigger,
    EngineProvider,
    RowChange,
    PollContext,
    RetryPolicy,
    MetricsCollector,
)
```

### DbBindings — Decorator Namespace

```python
class DbBindings:
    def input(
        self,
        arg_name: str,
        *,
        url: str,
        table: str | None = None,
        schema: str | None = None,
        pk: dict[str, object] | Callable[..., dict[str, object]] | None = None,
        query: str | None = None,
        params: dict[str, object] | Callable[..., dict[str, object]] | None = None,
        model: type[BaseModel] | None = None,
        on_not_found: Literal["none", "raise"] = "none",
        engine_provider: EngineProvider | None = None,
    ) -> Callable: ...

    def output(
        self,
        arg_name: str,
        *,
        url: str,
        table: str,
        schema: str | None = None,
        action: Literal["insert", "upsert"] = "insert",
        conflict_columns: list[str] | None = None,
        engine_provider: EngineProvider | None = None,
    ) -> Callable: ...

    def trigger(
        self,
        *,
        arg_name: str,
        source: SourceAdapter,
        checkpoint_store: StateStore,
        name: str | None = None,
        normalizer: EventNormalizer | None = None,
        batch_size: int = 100,
        max_batches_per_tick: int = 1,
        lease_ttl_seconds: int = 120,
        retry_policy: RetryPolicy | None = None,
        metrics: MetricsCollector | None = None,
    ) -> Callable: ...

    def inject_reader(
        self,
        arg_name: str,
        *,
        url: str,
        table: str | None = None,
        schema: str | None = None,
        engine_provider: EngineProvider | None = None,
    ) -> Callable: ...

    def inject_writer(
        self,
        arg_name: str,
        *,
        url: str,
        table: str,
        schema: str | None = None,
        engine_provider: EngineProvider | None = None,
    ) -> Callable: ...
```

### DbOut — Output Binding Parameter

```python
class DbOut:
    def set(
        self,
        data: dict | list[dict] | BaseModel | list[BaseModel] | None,
    ) -> None: ...
```

**Purpose**: Injected by `output` decorator. Call `.set()` to write data explicitly.
Handler's return value is independent — use it for HTTP responses.

Accepted types:
- `dict` — single row insert
- `list[dict]` — batch insert
- `BaseModel` / `list[BaseModel]` — auto-dumped to dict
- `None` — no-op (skip write)

### DbReader — Imperative Reader

```python
class DbReader:
    def get(self, *, pk: dict[str, object]) -> dict[str, object] | None: ...
    def query(self, sql: str, *, params: dict[str, object] | None = None) -> list[dict[str, object]]: ...
    def close(self) -> None: ...
```

### DbWriter — Imperative Writer

```python
class DbWriter:
    def insert(self, *, data: dict[str, object]) -> None: ...
    def insert_many(self, *, rows: list[dict[str, object]]) -> None: ...
    def upsert(self, *, data: dict[str, object], conflict_columns: list[str]) -> None: ...
    def upsert_many(self, *, rows: list[dict[str, object]], conflict_columns: list[str]) -> None: ...
    def update(self, *, data: dict[str, object], pk: dict[str, object]) -> None: ...
    def delete(self, *, pk: dict[str, object]) -> None: ...
    def close(self) -> None: ...
```

### SqlAlchemySource — Trigger Source Adapter

```python
class SqlAlchemySource:
    def __init__(
        self,
        url: str,
        table: str,
        cursor_column: str,
        pk_columns: list[str],
        *,
        schema: str | None = None,
        engine_provider: EngineProvider | None = None,
    ): ...
```

### BlobCheckpointStore — Checkpoint Persistence

```python
class BlobCheckpointStore:
    def __init__(
        self,
        container_client: ContainerClient,
        source_fingerprint: str,
    ): ...
```

### RowChange — Trigger Event

```python
@dataclass
class RowChange:
    pk: dict[str, object]
    op: str           # "insert" | "update"
    cursor: object    # cursor column value
    before: dict[str, object] | None
    after: dict[str, object] | None
```

### Error Hierarchy

```python
class DbError(Exception): ...             # Base
class ConfigurationError(DbError): ...     # Bad decorator config
class DbConnectionError(DbError): ...     # Connection failure
class QueryError(DbError): ...            # Read failure
class WriteError(DbError): ...            # Write failure
class NotFoundError(DbError): ...         # Row not found (on_not_found="raise")
class CursorSerializationError(DbError): ... # Cursor value encoding failure

# Trigger-specific
class FetchError(DbError): ...            # Trigger fetch failure
class HandlerError(DbError): ...          # Handler raised during trigger
class CommitError(DbError): ...           # Checkpoint commit failure
class LeaseAcquireError(DbError): ...     # Lease acquisition failure
class LostLeaseError(DbError): ...        # Lease lost during processing
```

## Common Patterns

### 1. Input Binding — PK Lookup

```python
from azure_functions_db import DbBindings

db = DbBindings()

@db.input("user", url="%DB_URL%", table="users", pk={"id": 42})
def load_user(user: dict | None) -> None:
    if user:
        print(user["name"])

# Dynamic PK from handler kwargs
@db.input("user", url="%DB_URL%", table="users",
          pk=lambda req: {"id": req.params["id"]})
def get_user(req, user: dict | None) -> None:
    print(user)
```

### 2. Input Binding — Query Mode

```python
@db.input("users", url="%DB_URL%",
          query="SELECT * FROM users WHERE active = :active",
          params={"active": True})
def list_active_users(users: list[dict]) -> None:
    for user in users:
        print(user["email"])
```

### 3. Output Binding — Insert and Upsert

```python
from azure_functions_db import DbBindings, DbOut

db = DbBindings()

@db.output("out", url="%DB_URL%", table="orders")
def create_order(out: DbOut) -> str:
    out.set({"id": 1, "status": "pending", "total": 99.99})
    return "Created"

@db.output("out", url="%DB_URL%", table="orders",
           action="upsert", conflict_columns=["id"])
def upsert_orders(out: DbOut) -> str:
    out.set([
        {"id": 1, "status": "shipped"},
        {"id": 2, "status": "pending"},
    ])
    return "Upserted"
```

### 4. Client Injection — Imperative Control

```python
from azure_functions_db import DbBindings, DbReader, DbWriter

db = DbBindings()

@db.inject_reader("reader", url="%DB_URL%", table="users")
def complex_read(reader: DbReader) -> None:
    user = reader.get(pk={"id": 42})
    orders = reader.query("SELECT * FROM orders WHERE user_id = :uid", params={"uid": 42})

@db.inject_writer("writer", url="%DB_URL%", table="orders")
def complex_write(writer: DbWriter) -> None:
    writer.insert(data={"id": 1, "status": "pending"})
    writer.update(data={"status": "shipped"}, pk={"id": 1})
    writer.delete(pk={"id": 1})
```

### 5. Trigger — Poll-Based Change Detection

```python
import azure.functions as func
from azure.storage.blob import ContainerClient
from azure_functions_db import BlobCheckpointStore, DbBindings, RowChange, SqlAlchemySource

app = func.FunctionApp()
db = DbBindings()

source = SqlAlchemySource(
    url="%ORDERS_DB_URL%",
    table="orders",
    cursor_column="updated_at",
    pk_columns=["id"],
)

checkpoint_store = BlobCheckpointStore(
    container_client=ContainerClient.from_connection_string(
        conn_str="%AzureWebJobsStorage%",
        container_name="db-state",
    ),
    source_fingerprint=source.source_descriptor.fingerprint,
)

@app.schedule(schedule="0 */1 * * * *", arg_name="timer", use_monitor=True)
@db.trigger(arg_name="events", source=source, checkpoint_store=checkpoint_store)
def orders_poll(timer: func.TimerRequest, events: list[RowChange]) -> None:
    for event in events:
        print(f"Order {event.pk}: {event.op}")
```

### 6. Combined: Trigger + Output Binding

```python
@app.schedule(schedule="0 */1 * * * *", arg_name="timer", use_monitor=True)
@db.trigger(arg_name="events", source=source, checkpoint_store=checkpoint_store)
@db.output("out", url="%DEST_DB_URL%", table="processed_orders",
           action="upsert", conflict_columns=["order_id"])
def orders_poll(timer, events: list[RowChange], out: DbOut) -> None:
    out.set([
        {"order_id": e.pk["id"], "processed_at": str(e.cursor)}
        for e in events if e.after is not None
    ])
```

### 7. Shared Engine Provider

```python
from azure_functions_db import EngineProvider

engine_provider = EngineProvider()

# Reuse across source and bindings for connection pooling
source = SqlAlchemySource(url="%DB_URL%", ..., engine_provider=engine_provider)

@db.output("out", url="%DB_URL%", table="results", engine_provider=engine_provider)
def handler(out: DbOut) -> None: ...
```

### 8. Async Handlers

```python
@db.input("users", url="%DB_URL%",
          query="SELECT * FROM users", params={})
async def async_handler(users: list[dict]) -> None:
    # DB I/O is automatically offloaded via asyncio.to_thread()
    for user in users:
        print(user["email"])
```

## Design Principles

1. **Decorator-first**: All integration via decorators on handler functions
2. **Data injection vs Client injection**: `input`/`output` inject data; `inject_reader`/`inject_writer` inject clients
3. **SQLAlchemy-centric**: Single ORM layer for all databases
4. **Pseudo-trigger**: Poll-based change detection on top of timer trigger
5. **At-least-once delivery**: Handlers must be idempotent
6. **Connection pooling**: `EngineProvider` for shared engine management
7. **Env-var substitution**: `%VAR%` syntax in connection URLs
8. **Async-safe**: Blocking I/O offloaded via `asyncio.to_thread()` for async handlers

## Limitations

1. **Pseudo-trigger only**: No native Azure Functions trigger extension — requires timer trigger
2. **SQLAlchemy 2.0+**: Does not support SQLAlchemy 1.x
3. **At-least-once**: Duplicates possible during crashes/lease transitions
4. **No CDC**: Uses polling, not database change data capture
5. **Sync trigger handlers only**: `trigger` decorator rejects async handlers

## Supported Databases

| Database | Extra | Driver |
|----------|-------|--------|
| PostgreSQL | `[postgres]` | psycopg |
| MySQL | `[mysql]` | PyMySQL |
| SQL Server | `[mssql]` | pyodbc |

## Ecosystem Integration

Part of **Azure Functions Python DX Toolkit**:
- `azure-functions-openapi-python` — OpenAPI and Swagger UI
- `azure-functions-validation-python` — Request/response validation
- **azure-functions-db-python** — Database trigger and bindings
- `azure-functions-logging-python` — Structured logging
- `azure-functions-doctor-python` — Pre-deploy diagnostics
- `azure-functions-scaffold-python` — Project scaffolding
