Metadata-Version: 2.4
Name: cezam-lib
Version: 0.1.17
Summary: Bibliothèque partagée pour les microservices CEZAM
Project-URL: Repository, https://gitlab.com/cezamdev/cezam-lib
License-Expression: MIT
Keywords: cezam,microservices,pipeline
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Requires-Python: >=3.11
Requires-Dist: minio>=7.2.0
Requires-Dist: opentelemetry-api>=1.20.0
Requires-Dist: opentelemetry-exporter-otlp>=1.20.0
Requires-Dist: opentelemetry-instrumentation>=0.41b0
Requires-Dist: opentelemetry-sdk>=1.20.0
Requires-Dist: pika>=1.3.0
Requires-Dist: pydantic>=2.0.0
Description-Content-Type: text/markdown

# cezam-lib

Bibliothèque partagée pour les microservices CEZAM. Ce package regroupe deux sous-packages sous un namespace unique `cezam_lib` :

- **`cezam_shared`** — Clients d'infrastructure (MinIO, S3, RabbitMQ), configuration OpenTelemetry, et exceptions partagées
- **`pipeline_template`** — Classes de base pour construire des pipelines d'extraction spécialisés

> Python >= 3.11 requis

## Installation

```bash
# Avec uv (recommandé)
uv add cezam-lib

# Avec pip
pip install cezam-lib
```

## Structure du package

```
cezam_lib/
├── __init__.py              # __version__, __all__
├── cezam_shared/
│   ├── __init__.py          # Exports publics
│   ├── minio_client.py      # MinIOClient
│   ├── datalake_client.py   # DatalakeClient
│   ├── source_client.py     # SourceClient
│   ├── datalake_paths.py    # Fonctions de chemins normalisés
│   ├── rabbitmq.py          # RabbitMQPublisher, RabbitMQConsumer
│   ├── otel.py              # setup_otel, inject/extract_trace_context
│   └── exceptions.py        # MinIOError, RabbitMQError, etc.
└── pipeline_template/
    ├── __init__.py           # Exports publics
    ├── base_pipeline.py      # BasePipeline
    ├── extractor.py          # DataExtractor (ABC)
    └── messages.py           # PipelineMessage, FusionMessage
```

Imports :

```python
from cezam_lib.cezam_shared import MinIOClient, DatalakeClient, SourceClient
from cezam_lib.cezam_shared import RabbitMQPublisher, RabbitMQConsumer
from cezam_lib.cezam_shared import datalake_paths
from cezam_lib.cezam_shared import setup_otel

from cezam_lib.pipeline_template import BasePipeline, DataExtractor
from cezam_lib.pipeline_template import PipelineMessage, FusionMessage
```

## Composants `cezam_shared`

### MinIOClient

Client legacy pour les opérations JSON sur MinIO.

```python
from cezam_lib.cezam_shared import MinIOClient

client = MinIOClient(
    endpoint="localhost:9000",
    access_key="minioadmin",
    secret_key="minioadmin",
    bucket="my-bucket",
)

client.put_json("path/to/doc.json", {"key": "value"})
data = client.get_json("path/to/doc.json")
exists = client.exists("path/to/doc.json")
files = client.list_prefix("path/to/")
```

### DatalakeClient

Client S3 pour le bucket datalake avec préfixage automatique par environnement (lecture/écriture).

```python
from cezam_lib.cezam_shared import DatalakeClient

client = DatalakeClient(
    endpoint="s3.sbg.io.cloud.ovh.net",
    access_key="key",
    secret_key="secret",
    bucket="datalake",
    env_prefix="prod",
    secure=True,
)

client.put_json("sim123/ocr/doc.json", {"text": "..."})
data = client.get_json("sim123/ocr/doc.json")
client.put_bytes("sim123/pages/page1.png", png_bytes)
raw = client.get_bytes("sim123/pages/page1.png")
```

### SourceClient

Client S3 en lecture seule pour le bucket source de production.

```python
from cezam_lib.cezam_shared import SourceClient

client = SourceClient(
    endpoint="s3.eu-west-par.io.cloud.ovh.net",
    access_key="key",
    secret_key="secret",
    bucket="source",
    secure=True,
)

data = client.get_json("path/to/doc.json")
raw = client.get_bytes("path/to/file.pdf")
client.download_file("path/to/file.pdf", local_path)
```

### datalake_paths

Fonctions pures de construction de chemins normalisés pour le datalake. Le préfixage par environnement est géré par `DatalakeClient`.

```python
from cezam_lib.cezam_shared import datalake_paths

path = datalake_paths.original_path("sim123", "doc.pdf")
# → "sim123/original/doc.pdf"

path = datalake_paths.ocr_path("sim123", "doc.json")
# → "sim123/ocr/doc.json"

path = datalake_paths.pipeline_result_path("sim123", "ddp", "result.json")
# → "sim123/ddp/result.json"
```

### RabbitMQPublisher

Publisher RabbitMQ avec propagation automatique du contexte OpenTelemetry.

```python
from cezam_lib.cezam_shared import RabbitMQPublisher

with RabbitMQPublisher(
    host="localhost", port=5672, user="guest", password="guest"
) as publisher:
    publisher.publish(
        exchange="",
        routing_key="my_queue",
        message={"simulation_id": "sim123", "status": "ready"},
    )
```

### RabbitMQConsumer

Consumer RabbitMQ avec gestion automatique des ack/nack et propagation OTel.

- Callback réussit → ack automatique
- `RetryableError` → nack avec requeue
- `NonRetryableError` ou autre exception → nack sans requeue

```python
from cezam_lib.cezam_shared import RabbitMQConsumer

def handle_message(message: dict) -> None:
    print(f"Reçu: {message}")

with RabbitMQConsumer(
    host="localhost", port=5672, user="guest", password="guest"
) as consumer:
    consumer.consume(queue="my_queue", callback=handle_message)
```

## Composants `pipeline_template`

### BasePipeline

Classe de base abstraite pour les pipelines d'extraction spécialisés. Gère le flux complet :

1. Parse le `PipelineMessage` entrant
2. Lit les données OCR depuis le datalake
3. Appelle l'extracteur spécialisé
4. Écrit le résultat sur le datalake
5. Publie un `FusionMessage` vers la queue fusion

```python
from cezam_lib.pipeline_template import BasePipeline, DataExtractor
from pydantic import BaseModel


class MyResult(BaseModel):
    status: str
    confidence: float
    field_count: int


class MyExtractor(DataExtractor[MyResult]):
    def extract(self, ocr_data: dict) -> MyResult:
        return MyResult(status="SUCCESS", confidence=0.95, field_count=10)


pipeline = BasePipeline(
    datalake_client=datalake_client,
    publisher=publisher,
    consumer=consumer,
    extractor=MyExtractor(),
    queue_name="my_pipeline",
    pipeline_name="my_pipeline",
)
pipeline.run()
```

### DataExtractor

Interface abstraite générique pour l'extraction de données depuis l'OCR. Les extracteurs concrets héritent de `DataExtractor[T]` et implémentent `extract()`.

### PipelineMessage / FusionMessage

Modèles Pydantic pour la communication inter-pipelines :

- `PipelineMessage` — Message reçu du Doc Classifier (simulation_id, doc_name, document_type, ocr_json_path, etc.)
- `FusionMessage` — Message envoyé vers la queue Fusion avec les métriques d'extraction (status, quality, action, confidence_avg, etc.)

## Exceptions

```python
from cezam_lib.cezam_shared import (
    MinIOError,
    RabbitMQError,
    RetryableError,
    NonRetryableError,
)
```

| Exception | Usage |
|-----------|-------|
| `MinIOError` | Erreur lors d'une opération S3/MinIO |
| `RabbitMQError` | Erreur lors d'une opération RabbitMQ |
| `RetryableError` | Erreur temporaire, le message sera requeue |
| `NonRetryableError` | Erreur définitive, le message est rejeté |

## Configuration OpenTelemetry

```python
from cezam_lib.cezam_shared import (
    setup_otel,
    inject_trace_context,
    extract_trace_context,
)

# Initialiser OTel pour un service
tracer, meter = setup_otel(
    service_name="doc_classifier",
    otel_endpoint="localhost:4317",
)

# Propager le contexte de trace dans des headers
headers = {}
inject_trace_context(headers)

# Extraire et activer le contexte depuis des headers entrants
extract_trace_context(incoming_headers)
```

## Développement local

```bash
# Installer les dépendances (dev inclus)
uv sync

# Lancer les tests
uv run pytest

# Tests avec couverture
uv run pytest --cov=cezam_lib tests/

# Linting
uv run ruff check src/ tests/
```

## Licence

MIT
