Metadata-Version: 2.4
Name: processcube_client
Version: 6.1.2
Summary: A Client for the workflow engine of the ProcessCube platform.
Home-page: https://github.com/5minds/processcube_client.py
Author: ProcessCube UG
Author-email: cuby@processcube.io
Keywords: workflow engine processcube client bpmn
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Description-Content-Type: text/markdown
Requires-Dist: aiohttp>=3.9.0
Requires-Dist: nest-asyncio<1.7.0,>=1.5.6
Requires-Dist: dataclasses-json>=0.6.0
Requires-Dist: requests>=2.28.1
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: keywords
Dynamic: requires-dist
Dynamic: summary

# ProcessCube® Python Client

Python-Client-Bibliothek für die [ProcessCube®](https://processcube.io)-Workflow-Engine (Atlas Engine).

Ermöglicht die Anbindung an die ProcessCube® Engine aus Python-Anwendungen heraus — sowohl asynchron (für ExternalTasks, Notifications) als auch synchron (für Robot Framework und Scripting).

## Installation

```bash
pip install processcube_client
```

**Unterstützte Python-Versionen:** 3.11, 3.12, 3.13

## Schnellstart

```python
from processcube_client import ExternalTaskClient

def handle_task(payload):
    print(f"Verarbeite: {payload}")
    return {"ergebnis": "fertig"}

client = ExternalTaskClient("http://localhost:56100")
client.subscribe_to_external_task_topic("meinTopic", handle_task)
client.start()
```

## Architektur

Der Client ist in zwei Schichten aufgebaut:

```mermaid
graph TB
    subgraph "Schicht 2 — Domain-Clients (async)"
        ETClient["ExternalTaskClient<br/><i>Long-Polling + Worker</i>"]
        PDClient["ProcessDefinitionClient<br/><i>Prozesse starten</i>"]
        PIClient["ProcessInstanceClient<br/><i>Instanzen verwalten</i>"]
        UTClient["UserTaskClient<br/><i>User Tasks bearbeiten</i>"]
        EvClient["EventClient<br/><i>Messages & Signals</i>"]
        NotClient["NotificationClient<br/><i>Event-Subscriptions</i>"]
        FNIClient["FlowNodeInstanceClient<br/><i>Flow-Node-Events</i>"]
        AIClient["AppInfoClient<br/><i>Engine-Info</i>"]
    end

    subgraph "Schicht 1 — Low-Level HTTP"
        BaseAsync["BaseClient<br/><i>async, aiohttp</i>"]
        BaseSync["BaseClient<br/><i>sync, requests</i>"]
    end

    subgraph "Synchroner Aggregations-Client"
        SyncClient["Client<br/><i>core/api — für Robot Framework</i>"]
    end

    ETClient --> BaseAsync
    PDClient --> BaseAsync
    PIClient --> BaseAsync
    UTClient --> BaseAsync
    EvClient --> BaseAsync
    NotClient --> BaseAsync
    FNIClient --> BaseAsync
    AIClient --> BaseAsync

    SyncClient --> BaseSync

    CF["ClientFactory"] --> ETClient
    CF --> PDClient
    CF --> PIClient
    CF --> UTClient
    CF --> EvClient
    CF --> NotClient
    CF --> FNIClient
    CF --> AIClient
```

Alle Clients kommunizieren mit der Engine über REST-Endpunkte unter:

```
{engine_url}/atlas_engine/api/v1/{endpoint}
```

## ExternalTasks

### Konzept

ExternalTasks sind Service Tasks in einem BPMN-Prozess, deren Logik **außerhalb** der Engine ausgeführt wird. Die Engine stellt die Aufgabe bereit, ein externer Worker holt sie ab, verarbeitet sie und meldet das Ergebnis zurück.

Dieses Muster ermöglicht:

- **Entkopplung** — Die Geschäftslogik lebt im Python-Code, nicht in der Engine
- **Skalierung** — Mehrere Worker können dasselbe Topic parallel bedienen
- **Technologiefreiheit** — Der Worker kann beliebige Bibliotheken und Services nutzen

### Ablauf: Fetch-and-Lock-Zyklus

Der `ExternalTaskClient` verwendet **Long Polling**, um auf neue Aufgaben zu warten. Die Engine hält die Verbindung offen, bis entweder ein Task verfügbar ist oder das Timeout erreicht wird.

```mermaid
sequenceDiagram
    participant W as ExternalTaskClient<br/>(Worker)
    participant E as ProcessCube® Engine

    loop Endlosschleife pro Topic
        W->>+E: POST /external_tasks/fetch_and_lock<br/>{ topicName, workerId, maxTasks,<br/>  longPollingTimeout, lockDuration }
        Note over E: Engine hält Verbindung offen<br/>(Long Polling) bis Task<br/>verfügbar oder Timeout

        alt Task(s) verfügbar
            E-->>-W: [ ExternalTask, ... ]

            par Für jeden ExternalTask
                Note over W: Handler aufrufen mit payload

                alt Handler erfolgreich
                    W->>E: PUT /external_tasks/{id}/finish<br/>{ workerId, result }
                    E-->>W: 200 OK
                else FunctionalError
                    W->>E: PUT /external_tasks/{id}/error<br/>{ workerId, error: { errorCode, errorMessage } }
                    E-->>W: 200 OK
                else Technischer Fehler
                    W->>E: PUT /external_tasks/{id}/error<br/>{ workerId, error: { errorCode, errorMessage } }
                    E-->>W: 200 OK
                end
            end

        else Timeout (keine Tasks)
            E-->>W: [ ]
        end
    end
```

### Lock-Verlängerung

Wenn die Verarbeitung länger dauert als die Lock-Duration, verlängert der Worker den Lock automatisch. Der Timer wird bei **90% der Lock-Duration** ausgelöst und wiederholt sich, bis der Task abgeschlossen ist.

```mermaid
sequenceDiagram
    participant W as ExternalTaskWorker
    participant E as ProcessCube® Engine

    Note over W: Task gelockt<br/>lock_duration_in_ms = 60000

    par Task-Verarbeitung
        Note over W: Handler wird ausgeführt...
    and Lock-Verlängerung (alle 54s)
        loop Alle 90% der lock_duration
            W->>E: PUT /external_tasks/{id}/extend_lock<br/>{ workerId, additionalDuration }
            E-->>W: 200 OK
        end
    end

    Note over W: Handler fertig
    W->>E: PUT /external_tasks/{id}/finish
    Note over W: Lock-Timer wird gestoppt
```

### ExternalTask-Lebenszyklus

```mermaid
stateDiagram-v2
    [*] --> Wartend: BPMN-Prozess erreicht<br/>ExternalTask-Node

    Wartend --> Gelockt: Worker ruft<br/>fetch_and_lock auf

    Gelockt --> Gelockt: Lock wird verlängert<br/>(extend_lock)

    Gelockt --> Abgeschlossen: Worker meldet<br/>Ergebnis (finish)

    Gelockt --> Fehler: Worker meldet<br/>Fehler (error)

    Gelockt --> Wartend: Lock abgelaufen<br/>(Timeout)

    Fehler --> Wartend: Engine setzt Task<br/>erneut auf wartend

    Abgeschlossen --> [*]: Prozess läuft weiter
```

### API

#### ExternalTaskClient

```python
from processcube_client import ExternalTaskClient

client = ExternalTaskClient(url, session=None, identity=None, loop=None, **kwargs)
```

| Parameter | Beschreibung |
|-----------|-------------|
| `url` | Engine-URL, z.B. `"http://localhost:56100"` |
| `identity` | Optional: Dict `{"token": "..."}` oder Callable das ein solches Dict liefert |
| `loop` | Optional: Eigener asyncio Event Loop |
| `worker_id` | Optional (in kwargs): Eigene Worker-ID (Default: UUID) |

#### Topic abonnieren

```python
client.subscribe_to_external_task_topic(topic, handler, **options)
```

| Option | Beschreibung | Default |
|--------|-------------|---------|
| `max_tasks` | Maximale Anzahl gleichzeitig abgeholter Tasks | `10` |
| `long_polling_timeout_in_ms` | Timeout für Long Polling in Millisekunden | `10000` (10s) |
| `lock_duration_in_ms` | Dauer des Locks in Millisekunden | `100000` (100s) |
| `payload_filter` | Filter für den Task-Payload | `None` |

#### Starten und Stoppen

```python
# Startet den Event Loop und verarbeitet Tasks (blockierend)
client.start()

# Stoppt den Client
client.stop()
```

### Handler

Der Handler ist eine Funktion, die vom Worker für jeden ExternalTask aufgerufen wird. Er empfängt den **Payload** und gibt ein **Ergebnis-Dictionary** zurück.

#### Einfacher Handler (nur Payload)

```python
def handle_task(payload):
    name = payload.get("name", "Welt")
    return {"greeting": f"Hallo {name}!"}
```

#### Erweiterter Handler (Payload + ExternalTask)

Wenn der Handler einen zweiten Parameter akzeptiert, erhält er zusätzlich das gesamte ExternalTask-Objekt mit Metadaten:

```python
def handle_task(payload, external_task):
    task_id = external_task["id"]
    correlation_id = external_task["correlationId"]
    process_instance_id = external_task["processInstanceId"]

    print(f"Task {task_id} in Prozess {process_instance_id}")
    return {"status": "verarbeitet"}
```

Das ExternalTask-Objekt enthält:

| Feld | Beschreibung |
|------|-------------|
| `id` | Eindeutige Task-ID |
| `workerId` | ID des Workers, der den Task gelockt hat |
| `topic` | Topic-Name |
| `flowNodeInstanceId` | ID der Flow-Node-Instanz |
| `correlationId` | Korrelations-ID des Prozesses |
| `processDefinitionId` | ID der Prozessdefinition |
| `processInstanceId` | ID der Prozessinstanz |
| `payload` | Task-Payload (Dictionary) |
| `state` | Status: `pending` oder `finished` |
| `lockExpirationTime` | Zeitpunkt, wann der Lock abläuft |
| `createdAt` | Erstellungszeitpunkt |

#### Async-Handler

Async-Handler werden ebenfalls unterstützt:

```python
import aiohttp

async def handle_task(payload):
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.example.com/{payload['id']}") as resp:
            data = await resp.json()
    return {"result": data}
```

### Fehlerbehandlung

#### FunctionalError (BPMN-Fehler)

Ein `FunctionalError` signalisiert einen **fachlichen Fehler**, der im BPMN-Prozess behandelt werden kann (z.B. über einen Error Boundary Event):

```python
from processcube_client import ExternalTaskClient
from processcube_client.external_task import FunctionalError

def handle_task(payload):
    if not payload.get("email"):
        raise FunctionalError(
            code="VALIDATION_ERROR",
            message="E-Mail-Adresse fehlt",
            details="Das Feld 'email' ist erforderlich"
        )
    return {"status": "ok"}
```

```mermaid
flowchart LR
    A[ExternalTask-Handler] -->|FunctionalError| B[Engine: Error Boundary Event]
    A -->|Exception| C[Engine: Technischer Fehler]
    A -->|return result| D[Engine: Task abgeschlossen]

    B --> E[BPMN-Fehlerbehandlung]
    C --> F[Task wird als fehlerhaft markiert]
    D --> G[Prozess läuft weiter]
```

#### Technische Fehler

Jede andere Exception wird als **technischer Fehler** an die Engine gemeldet. Der Fehlercode ist der Exception-Typ, die Nachricht enthält den Stacktrace:

```python
def handle_task(payload):
    # Jede unbehandelte Exception wird automatisch als
    # technischer Fehler an die Engine gemeldet
    result = 1 / 0  # ZeroDivisionError → technischer Fehler
    return result
```

### Vollständiges Beispiel

```python
import logging
from processcube_client import ExternalTaskClient
from processcube_client.external_task import FunctionalError

logging.basicConfig(level=logging.INFO)

ENGINE_URL = "http://localhost:56100"

def bestellung_pruefen(payload):
    """Prüft eine Bestellung und gibt das Ergebnis zurück."""
    artikel = payload.get("artikel", [])

    if not artikel:
        raise FunctionalError(
            code="LEERE_BESTELLUNG",
            message="Bestellung enthält keine Artikel"
        )

    gesamtpreis = sum(a.get("preis", 0) * a.get("menge", 1) for a in artikel)

    return {
        "gesamtpreis": gesamtpreis,
        "artikelanzahl": len(artikel),
        "status": "geprueft"
    }

def versand_vorbereiten(payload, external_task):
    """Bereitet den Versand vor — nutzt ExternalTask-Metadaten."""
    print(f"Versand für Prozess: {external_task['processInstanceId']}")
    return {"versand_status": "vorbereitet"}

client = ExternalTaskClient(ENGINE_URL)

client.subscribe_to_external_task_topic(
    "BestellungPruefen",
    bestellung_pruefen,
    max_tasks=5,
    long_polling_timeout_in_ms=30000,
    lock_duration_in_ms=60000
)

client.subscribe_to_external_task_topic(
    "VersandVorbereiten",
    versand_vorbereiten
)

client.start()
```

### Mehrere Topics mit ClientFactory

```python
from processcube_client import ClientFactory

factory = ClientFactory()
client = factory.create_external_task_client("http://localhost:56100")

et_client = client.subscribe_to_external_task_topic("TopicA", handler_a)
et_client.start()
```

## Weitere Clients

### ProcessDefinitionClient

Prozesse starten und Definitionen verwalten.

```python
from processcube_client import ClientFactory

factory = ClientFactory()
client = factory.create_process_definition_client("http://localhost:56100")

# Prozess starten und auf Ende warten
result = client.start_process_instance_and_await_end_event(
    "MeinProzess",
    start_event_id="StartEvent_1",
    initial_token={"eingabe": "wert"}
)
```

| Methode | Beschreibung |
|---------|-------------|
| `start_process_instance(process_model_id, **options)` | Startet Prozess, gibt sofort zurück |
| `start_process_instance_and_await_end_event(process_model_id, **options)` | Startet Prozess und wartet auf Ende |
| `start_process_instance_and_await_specific_end_event(process_model_id, end_event_id=..., **options)` | Wartet auf bestimmtes End-Event |
| `get_process_definition(process_model_id)` | Gibt die Prozessdefinition zurück |

### ProcessInstanceClient

Laufende Prozessinstanzen verwalten.

```python
client = factory.create_process_instance_client("http://localhost:56100")

client.terminate(process_instance_id)
client.retry(process_instance_id)
```

| Methode | Beschreibung |
|---------|-------------|
| `terminate(process_instance_id)` | Bricht eine Prozessinstanz ab |
| `retry(process_instance_id)` | Wiederholt eine fehlgeschlagene Instanz |

### UserTaskClient

User Tasks abfragen und bearbeiten.

```python
client = factory.create_user_task_client("http://localhost:56100")

tasks = client.get_user_tasks(state="suspended")
for task in tasks:
    print(f"Task: {task['name']}")

client.finish_user_task(user_task_instance_id, {"approved": True})
```

| Methode | Beschreibung |
|---------|-------------|
| `get_user_tasks(state='suspended')` | Alle User Tasks im angegebenen Status |
| `reserve_user_task(id, owner_id)` | Reserviert einen Task für einen Benutzer |
| `cancel_reservation_user_task(id)` | Hebt die Reservierung auf |
| `finish_user_task(id, answer)` | Schließt einen Task mit Ergebnis ab |

### EventClient

BPMN-Events auslösen.

```python
client = factory.create_event_client("http://localhost:56100")

# Message-Event auslösen
client.trigger_message("BestellungEingegangen", payload={"order_id": "123"})

# Signal-Event auslösen
client.trigger_signal("NotfallStop")
```

| Methode | Beschreibung |
|---------|-------------|
| `trigger_message(event_name, payload={}, process_instance_id=None)` | Message-Event senden |
| `trigger_signal(signal_name)` | Signal-Event broadcasten |

### NotificationClient

Echtzeit-Benachrichtigungen über Prozess-Events per Long Polling.

```python
client = factory.create_notification_client("http://localhost:56100")

client.on_process_started(lambda event: print(f"Prozess gestartet: {event}"))
client.on_process_ended(lambda event: print(f"Prozess beendet: {event}"))
client.on_user_task_waiting(lambda event: print(f"User Task wartet: {event}"))

client.start()
```

| Methode | Beschreibung |
|---------|-------------|
| `on_process_started(callback)` | Prozess gestartet |
| `on_process_ended(callback)` | Prozess beendet |
| `on_process_error(callback)` | Prozess-Fehler |
| `on_activity_reached(callback)` | Aktivität erreicht |
| `on_activity_finished(callback)` | Aktivität abgeschlossen |
| `on_user_task_waiting(callback)` | User Task wartet |
| `on_user_task_finished(callback)` | User Task abgeschlossen |
| `on_user_task_reserved(callback)` | User Task reserviert |
| `on_user_task_reservation_canceled(callback)` | Reservierung aufgehoben |
| `on_manual_task_waiting(callback)` | Manual Task wartet |
| `on_manual_task_finished(callback)` | Manual Task abgeschlossen |
| `on_empty_activity_waiting(callback)` | Empty Activity wartet |
| `on_empty_activity_finished(callback)` | Empty Activity abgeschlossen |
| `on_boundary_event_triggered(callback)` | Boundary Event ausgelöst |
| `on_intermediate_throw_event_triggered(callback)` | Intermediate Throw Event |
| `on_intermediate_catch_event_reached(callback)` | Intermediate Catch Event erreicht |
| `on_intermediate_catch_event_finished(callback)` | Intermediate Catch Event abgeschlossen |

### FlowNodeInstanceClient

Events auf Flow-Node-Ebene auslösen.

```python
client = factory.create_flow_node_instance_client("http://localhost:56100")

client.trigger_message_event("MeineNachricht", process_instance_id="...")
client.trigger_signal_event("MeinSignal")
```

### AppInfoClient

Engine-Informationen abfragen.

```python
client = factory.create_app_info_client("http://localhost:56100")

info = client.get_info()
authority = client.get_authority()
```

## Synchroner Client (core/api)

Für synchrone Kontexte (z.B. Robot Framework) steht eine aggregierte `Client`-Klasse bereit, die alle Handler in einer einzigen Schnittstelle bündelt:

```python
from processcube_client.core.api.client import Client

client = Client("http://localhost:56100")

# Engine-Info
info = client.info()

# Prozess starten
from processcube_client.core.api.helpers.process_models import ProcessStartRequest
result = client.process_model_start("MeinProzess", ProcessStartRequest(
    start_event_id="StartEvent_1"
))

# User Tasks abfragen
from processcube_client.core.api.helpers.user_tasks import UserTaskQuery
tasks = client.user_task_query(UserTaskQuery(
    process_instance_id=result.process_instance_id
))

# BPMN-Dateien deployen
result = client.deploy_bpmn_from_path("prozesse/")
```

## Authentifizierung

Standardmäßig wird ein Dummy-Token (`ZHVtbXlfdG9rZW4=`) verwendet. Für eigene Authentifizierung kann ein Callable übergeben werden:

```python
def get_identity():
    return {"token": "mein_jwt_token"}

# Async Client
client = ExternalTaskClient("http://localhost:56100", identity=get_identity)

# Sync Client
from processcube_client.core.api.client import Client
client = Client("http://localhost:56100", identity=get_identity)
```

## API-Endpunkte

Alle Endpunkte liegen unter `{engine_url}/atlas_engine/api/v1/`.

| Methode | Endpunkt | Beschreibung |
|---------|----------|-------------|
| `GET` | `/info` | Engine-Informationen |
| `GET` | `/authority` | Authority-URL |
| `POST` | `/process_definitions` | Prozessdefinition hochladen |
| `DELETE` | `/process_definitions/{id}` | Prozessdefinition löschen |
| `POST` | `/process_models/{id}/start` | Prozessinstanz starten |
| `GET` | `/process_instances/query` | Prozessinstanzen abfragen |
| `PUT` | `/process_instances/{id}/terminate` | Prozessinstanz abbrechen |
| `POST` | `/external_tasks/fetch_and_lock` | ExternalTasks abholen und locken |
| `PUT` | `/external_tasks/{id}/extend_lock` | Lock verlängern |
| `PUT` | `/external_tasks/{id}/finish` | ExternalTask abschließen |
| `PUT` | `/external_tasks/{id}/error` | Fehler melden |
| `GET` | `/user_tasks` | User Tasks abfragen |
| `PUT` | `/user_tasks/{id}/finish` | User Task abschließen |
| `PUT` | `/user_tasks/{id}/reserve` | User Task reservieren |
| `DELETE` | `/user_tasks/{id}/cancel-reservation` | Reservierung aufheben |
| `PUT` | `/manual_tasks/{id}/finish` | Manual Task abschließen |
| `POST` | `/messages/{name}/trigger` | Message-Event auslösen |
| `POST` | `/signals/{name}/trigger` | Signal-Event auslösen |
| `GET` | `/flow_node_instances` | Flow-Node-Instanzen abfragen |
| `GET` | `/data_object_instances/query` | Datenobjekt-Instanzen abfragen |

## Lizenz

MIT — [ProcessCube UG](https://processcube.io)
