Metadata-Version: 2.4
Name: py-kafka-producer-client
Version: 0.1.0
Summary: High-performance, reusable Kafka producer library with synchronous and asynchronous support.
Author-email: Gaian <gaian@example.com>
License: MIT
Project-URL: Homepage, https://github.com/gaian/Py-Kafka
Project-URL: Repository, https://github.com/gaian/Py-Kafka
Project-URL: Issues, https://github.com/gaian/Py-Kafka/issues
Keywords: kafka,producer,asyncio,confluent-kafka
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: confluent-kafka>=2.3.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21; extra == "dev"

# kafka-producer-client

Reusable Kafka producer library — install once, use across all your Python projects.  
Works in **sync scripts**, **FastAPI**, **Django**, **Celery**, or any async framework.

## Install

```bash
# From your private registry / git
pip install py-kafka-producer-client

# Or directly from git
pip install git+https://github.com/your-org/kafka-producer-client.git
```

## Quick Start

### 1. Sync usage (scripts, Django, Celery workers)

```python
from kafka_producer_client import KafkaProducerClient, KafkaProducerConfig

config = KafkaProducerConfig(
    bootstrap_servers="broker1:9092,broker2:9092",
    default_topic="user-events",
    client_id="my-service",
)

producer = KafkaProducerClient(config)

# Single event
producer.send({"user": "alice", "action": "login"})

# Batch
producer.send_batch([
    {"user": "alice", "action": "login"},
    {"user": "bob", "action": "signup"},
])

# Flush before exit
producer.flush()
```

### 2. Async usage (FastAPI)

```python
from fastapi import FastAPI, Depends
from kafka_producer_client import KafkaProducerConfig
from kafka_producer_client.fastapi import create_lifespan, get_producer

config = KafkaProducerConfig(
    bootstrap_servers="broker:9092",
    default_topic="api-events",
)

app = FastAPI(lifespan=create_lifespan(config))

@app.post("/events")
async def post_event(producer=Depends(get_producer)):
    result = await producer.send_async({"event": "click"})
    return result
```

### 3. Async usage (standalone asyncio)

```python
import asyncio
from kafka_producer_client import KafkaProducerClient, KafkaProducerConfig

async def main():
    client = KafkaProducerClient(KafkaProducerConfig(
        bootstrap_servers="broker:9092",
    ))
    client.start_background_poll()

    result = await client.send_async(
        {"order_id": 123},
        topic="orders",
        key="order-123",
    )
    print(result)  # {"topic": "orders", "partition": 0, "offset": 42}

    client.close()

asyncio.run(main())
```

## Configuration

All fields on `KafkaProducerConfig` are optional with sensible defaults:

| Field                | Default               | Description                          |
|----------------------|-----------------------|--------------------------------------|
| `bootstrap_servers`  | `localhost:9092`      | Kafka broker addresses               |
| `client_id`          | `kafka-producer-client` | Client identifier                  |
| `default_topic`      | `None`                | Fallback topic if none given at send |
| `acks`               | `all`                 | Broker ack level                     |
| `retries`            | `3`                   | Send retry count                     |
| `linger_ms`          | `5`                   | Micro-batching delay (ms)            |
| `compression_type`   | `snappy`              | Message compression                  |
| `enable_idempotence` | `True`                | Exactly-once semantics               |
| `batch_size`         | `16384`               | Batch size in bytes                  |
| `batch_num_messages` | `100`                 | Max messages per batch               |
| `extra`              | `{}`                  | Any additional librdkafka config     |

### Passing extra librdkafka config

```python
config = KafkaProducerConfig(
    bootstrap_servers="broker:9092",
    extra={
        "security.protocol": "SASL_SSL",
        "sasl.mechanism": "PLAIN",
        "sasl.username": "key",
        "sasl.password": "secret",
    },
)
```

## API Reference

### `KafkaProducerClient`

| Method              | Returns           | Description                                |
|---------------------|-------------------|--------------------------------------------|
| `send(value, ...)`  | `None`            | Fire-and-forget produce                    |
| `send_batch(events)`| `int`             | Produce many, returns count                |
| `send_async(value)` | `dict`            | Await broker ACK, returns offset metadata  |
| `send_batch_async()`| `list[dict]`      | Concurrent async batch                     |
| `flush(timeout)`    | `int`             | Block until queue drains                   |
| `close()`           | `None`            | Flush + stop poll thread                   |
| `start_background_poll()` | `None`      | Start daemon poll thread                   |
| `queue_length`      | `int` (property)  | Messages still in internal queue           |
