Metadata-Version: 2.4
Name: soprano-sdk
Version: 0.2.134
Summary: YAML-driven workflow engine with AI agent integration for building conversational SOPs
Author: Arvind Thangamani
License: MIT
License-File: LICENSE
Keywords: agent,ai,conversational,langgraph,sop,soprano,workflow
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.12
Requires-Dist: agno>=2.0.7
Requires-Dist: crewai<1.3.0,>=1.1.0
Requires-Dist: instructor>=1.8.3
Requires-Dist: jsonschema>=4.0.0
Requires-Dist: langchain-aws>=0.2.0
Requires-Dist: langchain-community>=0.4.1
Requires-Dist: langchain-core>=0.3.67
Requires-Dist: langchain-litellm>=0.1.0
Requires-Dist: langchain-openai>=1.0.3
Requires-Dist: langchain>=1.0.7
Requires-Dist: langfuse>=3.10.1
Requires-Dist: langgraph>=1.0.2
Requires-Dist: litellm>=1.74.9
Requires-Dist: openai>=1.92.1
Requires-Dist: pydantic-ai>=1.22.0
Requires-Dist: pydantic-monty>=0.0.8
Requires-Dist: pydantic>=2.0.0
Requires-Dist: pytest>=9.0.1
Requires-Dist: pyyaml>=6.0
Provides-Extra: dev
Requires-Dist: gradio>=5.46.0; extra == 'dev'
Requires-Dist: pytest>=7.0.0; extra == 'dev'
Requires-Dist: respx==0.22.0; extra == 'dev'
Requires-Dist: ruff==0.14.13; extra == 'dev'
Provides-Extra: guardrails
Requires-Dist: boto3>=1.35.0; extra == 'guardrails'
Provides-Extra: persistence
Requires-Dist: langgraph-checkpoint-mongodb>=0.2.0; extra == 'persistence'
Requires-Dist: pymongo>=4.0.0; extra == 'persistence'
Provides-Extra: supervisors
Requires-Dist: crewai>=1.1.0; extra == 'supervisors'
Requires-Dist: langchain-openai>=0.3.34; extra == 'supervisors'
Description-Content-Type: text/markdown

# Conversational SOP Framework

A YAML-driven workflow engine with AI agent integration for building conversational Standard Operating Procedures (SOPs).

## Features

- **YAML Configuration**: Define workflows declaratively using YAML
- **AI Agent Integration**: Built-in support for conversational data collection using OpenAI models
- **State Management**: Powered by LangGraph for robust workflow execution
- **External Context Injection**: Support for pre-populated fields from external orchestrators
- **Pattern Matching**: Flexible transition logic based on patterns and conditions
- **Visualization**: Generate workflow graphs as images or Mermaid diagrams
- **Follow-up Conversations**: Handle user follow-up questions with full workflow context
- **Intent Detection**: Route users between collector nodes based on detected intent
- **Out-of-Scope Detection**: Signal when user queries are unrelated to the current workflow
- **Outcome Humanization**: LLM-powered transformation of outcome messages into natural, context-aware responses
- **Per-Turn Localization**: Dynamic language and script switching for multi-language support
- **Real-Time Streaming**: Generator-based streaming API with mid-node event emission via `emit()`
- **Grounding Check**: AWS Bedrock Guardrails integration to prevent agent hallucinations

## Installation

```bash
pip install conversational-sop-framework
```

Or using uv:

```bash
uv add conversational-sop-framework
```

## Quick Start

### 1. Define a Workflow in YAML

```yaml
name: "User Greeting Workflow"
description: "Collects user information and provides a personalized greeting"
version: "1.0"

data:
  - name: name
    type: text
    description: "User's name"
    label: "Full Name"  # Optional: User-friendly label for UI display
  - name: age
    type: number
    description: "User's age in years"
    label: "Age (years)"  # Optional: User-friendly label for UI display

steps:
  - id: get_name
    action: collect_input_with_agent
    field: name
    max_attempts: 3
    agent:
      name: "NameCollector"
      model: "gpt-4o-mini"
      instructions: |
        Your goal is to capture the user's name.
        Start with a friendly greeting and ask for their name.
        Once you have a clear name, respond with: 'NAME_CAPTURED: [name]'
    transitions:
      - pattern: "NAME_CAPTURED:"
        next: get_age
      - pattern: "NAME_FAILED:"
        next: end_failed

  - id: get_age
    action: collect_input_with_agent
    field: age
    max_attempts: 3
    agent:
      name: "AgeCollector"
      model: "gpt-4o-mini"
      instructions: |
        Ask for the user's age.
        Once you have a valid age, respond with: 'AGE_CAPTURED: [age]'
    transitions:
      - pattern: "AGE_CAPTURED:"
        next: end_success
      - pattern: "AGE_FAILED:"
        next: end_failed

outcomes:
  - id: end_success
    type: success
    message: "Hello {name}! You are {age} years old."

  - id: end_failed
    type: failure
    message: "Sorry, I couldn't complete the workflow."
```

### 2. Load and Execute the Workflow

```python
from soprano_sdk import load_workflow
from langgraph.types import Command
import uuid

# Load workflow
graph, engine = load_workflow("greeting_workflow.yaml")

# Setup execution
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}

# Start workflow
result = graph.invoke({}, config=config)

# Interaction loop
while True:
    if "__interrupt__" in result and result["__interrupt__"]:
        # Get prompt from workflow
        prompt = result["__interrupt__"][0].value
        print(f"Bot: {prompt}")

        # Get user input
        user_input = input("You: ")

        # Resume workflow with user input
        result = graph.invoke(Command(resume=user_input), config=config)
    else:
        # Workflow completed
        message = engine.get_outcome_message(result)
        print(f"Bot: {message}")
        break
```

## Data Fields Configuration

Data fields define the information collected and processed by your workflow. Each field supports the following properties:

### Field Properties

```yaml
data:
  - name: field_name       # Required: Unique identifier
    type: text             # Required: text, number, boolean, date, etc.
    description: "..."     # Required: Used for agent understanding
    label: "Display Name"  # Optional: User-friendly label for UI display
    default: "value"       # Optional: Default value
```

### Label Field

The `label` field provides a user-friendly display name for fields. When present, it appears in the `field_details` object returned by the workflow tool, making it easier to build consumer UIs.

**Example:**
```yaml
data:
  - name: email
    type: text
    description: "User's email address"
    label: "Email Address"
  
  - name: phone
    type: text
    description: "User's phone number"
    label: "Phone Number"
  
  - name: return_reason
    type: text
    description: "Reason for return"
    # No label - field_details will only include name and value
```

**Field Details Output:**
```python
# When workflow interrupts for user input
result = tool.execute(thread_id="123", user_message="hi")

# Result includes field_details with labels
result.field_details
# [
#   {"name": "email", "value": "user@example.com", "label": "Email Address"},
#   {"name": "phone", "value": "1234567890", "label": "Phone Number"},
#   {"name": "return_reason", "value": "damaged item"}  # No label
# ]
```

**Use Cases:**
- **UI Rendering**: Display friendly labels instead of field names (`"Email Address"` vs `"email"`)
- **Internationalization**: Use labels for localized field names while keeping internal field names in English
- **Better UX**: Show contextual labels (`"Age (years)"` is clearer than `"age"`)
- **Backward Compatibility**: Optional - existing workflows without labels continue to work

### 3. External Context Injection

You can inject external context into workflows:

```python
# Pre-populate fields from external orchestrator
result = graph.invoke({
    "name": "Alice",
    "age": 30
}, config=config)

# Workflow will automatically skip collection steps
# and proceed to validation/processing
```

### 4. Persistence

The library supports pluggable persistence through LangGraph's checkpointer system.

#### In-Memory (Default)

```python
# No persistence - state lost when process ends
graph, engine = load_workflow("workflow.yaml")
```

#### MongoDB Persistence

```python
from soprano_sdk import load_workflow
from langgraph.checkpoint.mongodb import MongoDBSaver
from pymongo import MongoClient

# Setup MongoDB persistence (local)
client = MongoClient("mongodb://localhost:27017")
checkpointer = MongoDBSaver(client=client, db_name="workflows")

# Or MongoDB Atlas (cloud)
client = MongoClient("mongodb+srv://user:pass@cluster.mongodb.net")
checkpointer = MongoDBSaver(client=client, db_name="workflows")

# Load workflow with persistence
graph, engine = load_workflow("workflow.yaml", checkpointer=checkpointer)

# Execute with thread_id for state tracking
config = {"configurable": {"thread_id": "user-123-return"}}
result = graph.invoke({}, config=config)

# Later, resume using same thread_id
result = graph.invoke(Command(resume="continue"), config=config)
```

#### Thread ID Strategies

Choose a thread_id strategy based on your use case:

| Strategy | Thread ID Pattern | Best For |
|----------|-------------------|----------|
| **Entity-Based** | `f"return_{order_id}"` | One workflow per business entity |
| **Conversation** | `str(uuid.uuid4())` | Multiple concurrent workflows |
| **User+Workflow** | `f"{user_id}_{workflow_type}"` | One workflow type per user |
| **Session-Based** | `session_id` | Web apps with sessions |

**Examples**: See `examples/persistence/` for detailed examples of each strategy.

## Workflow Actions

### collect_input_with_agent

Collects user input using an AI agent with conversation history.

```yaml
- id: collect_field
  action: collect_input_with_agent
  field: field_name
  max_attempts: 5
  agent:
    name: "CollectorAgent"
    model: "gpt-4o-mini"
    instructions: |
      Instructions for the agent...
  transitions:
    - pattern: "SUCCESS:"
      next: next_step
    - pattern: "FAILED:"
      next: failure_outcome
```

### call_function

Calls a Python function with workflow state.

```yaml
- id: process_data
  action: call_function
  function: "my_module.my_function"
  inputs:
    field1: "{field_name}"
    field2: "static_value"
  output: result_field
  transitions:
    - condition: true
      next: success_step
    - condition: false
      next: failure_step
```

### call_async_function

Calls an async function that may return a pending status, triggering an interrupt until the async operation completes.

```yaml
- id: verify_payment
  action: call_async_function
  function: "payments.start_verification"
  output: verification_result
  transitions:
    - condition: "verified"
      next: payment_approved
    - condition: "failed"
      next: payment_rejected
```

### follow_up

Handles follow-up questions from users. Unlike `collect_input_with_agent` where the agent asks first, here the **user initiates** by asking questions. The agent responds using full workflow context.

```yaml
- id: handle_questions
  action: follow_up
  next: final_confirmation  # Where to go when user says "done"
  closure_patterns:  # Optional: customize closure detection
    - "ok"
    - "thank you"
    - "done"
  agent:
    name: "FollowUpAssistant"
    model: "gpt-4o-mini"
    description: "Answering questions about the order"
    instructions: |
      Help the user with any questions about their order.
      Be concise and helpful.
    detect_out_of_scope: true  # Signal when user asks unrelated questions
  transitions:  # Optional: route based on patterns
    - pattern: "ROUTE_TO_PAYMENT:"
      next: payment_step
```

**Key features:**
- **User initiates**: No initial prompt - waits for user to ask a question
- **Full state context**: Agent sees all collected workflow data
- **Closure detection**: Detects "ok", "thanks", "done" → proceeds to next step
- **Intent change**: Routes to collector nodes when user wants to change data
- **Out-of-scope**: Signals to parent orchestrator for unrelated queries

## Interrupt Types

The workflow engine uses three interrupt types to pause execution and communicate with the caller:

| Type | Marker | Triggered By | Use Case |
|------|--------|--------------|----------|
| **USER_INPUT** | `__WORKFLOW_INTERRUPT__` | `collect_input_with_agent`, `follow_up` | Waiting for user input |
| **ASYNC** | `__ASYNC_INTERRUPT__` | `call_async_function` | Waiting for async operation callback |
| **OUT_OF_SCOPE** | `__OUT_OF_SCOPE_INTERRUPT__` | `collect_input_with_agent`, `follow_up` | User query unrelated to current task |

### Handling Interrupts

```python
result = graph.invoke({}, config=config)

if "__interrupt__" in result and result["__interrupt__"]:
    interrupt_value = result["__interrupt__"][0].value

    # Check interrupt type
    if isinstance(interrupt_value, dict):
        if interrupt_value.get("type") == "async":
            # Async interrupt - wait for external callback
            pending_metadata = interrupt_value.get("pending")
            # ... handle async operation ...
            result = graph.invoke(Command(resume=async_result), config=config)

        elif interrupt_value.get("type") == "out_of_scope":
            # Out-of-scope - user asking unrelated question
            reason = interrupt_value.get("reason")
            user_message = interrupt_value.get("user_message")
            # ... route to different workflow or handle appropriately ...
    else:
        # User input interrupt - prompt is a string
        prompt = interrupt_value
        user_input = input(f"Bot: {prompt}\nYou: ")
        result = graph.invoke(Command(resume=user_input), config=config)
```

### Out-of-Scope Detection

Data collector and follow-up nodes can detect when user queries are unrelated to the current task. This is useful for multi-workflow systems where a supervisor agent needs to route users to different SOPs.

**Configuration:**
```yaml
agent:
  detect_out_of_scope: true  # Disabled by default, set to true to enable
  scope_description: "collecting order information for returns"  # Optional
```

**Response format:**
```
__OUT_OF_SCOPE_INTERRUPT__|{thread_id}|{workflow_name}|{"reason":"...","user_message":"..."}
```

## Outcome Types

Workflows can define three types of outcomes to represent different completion states:

### 1. Success Outcome

Represents successful workflow completion.

```yaml
outcomes:
  - id: order_approved
    type: success
    message: "Order {{order_id}} has been approved!"
    humanize: true  # Optional, default: true
```

### 2. Failure Outcome

Represents workflow completion with an error or failure state.

```yaml
outcomes:
  - id: order_rejected
    type: failure
    message: "Order {{order_id}} could not be processed."
    humanize: true  # Optional, default: true
```

### 3. Redirect Outcome

Redirects to another workflow or system. Returns a special formatted response for external orchestrators to route the conversation.

```yaml
outcomes:
  - id: transfer_to_support
    type: redirect
    redirect_to: "customer_support_workflow"
    message: "Let me transfer you to customer support for {{issue_type}}."
    humanize: true
```

**Response format:**
```
__REDIRECT__|{thread_id}|{workflow_name}|{redirect_to}|{message}
```

**Key features:**
- `redirect_to` is mandatory when `type` is `redirect`
- If `redirect_to` is provided, `type` must be `redirect`
- The `redirect_to` field supports Jinja2 templates (e.g., `"support_{{issue_type}}"`)
- Messages are humanized (if enabled) before being included in the redirect response
- Orchestrators can parse this format to route users to appropriate workflows

**Example:**
```yaml
outcomes:
  - id: escalate_to_billing
    type: redirect
    redirect_to: "billing_workflow"
    message: "I'll connect you with our billing department for assistance with {{issue_type}}."
```

If triggered with `issue_type: "refund"`, returns:
```
__REDIRECT__|thread-123|order_workflow|billing_workflow|I'll connect you with our billing department for assistance with refund.
```

## Outcome Humanization

Outcome messages can be automatically humanized using an LLM to transform template-based messages into natural, context-aware responses. This feature uses the full conversation history to generate responses that match the tone and context of the interaction.

### How It Works

1. **Template rendering**: The outcome message template is first rendered with state values (e.g., `{{order_id}}` → `1234`)
2. **LLM humanization**: The rendered message is passed to an LLM along with the conversation history
3. **Natural response**: The LLM generates a warm, conversational response while preserving all factual details

### Configuration

Humanization is **enabled by default**. Configure it at the workflow level:

```yaml
name: "Return Processing Workflow"
version: "1.0"

# Humanization configuration (optional - enabled by default)
humanization_agent:
  model: "gpt-4o"  # Override model for humanization (optional)
  base_url: "https://custom-api.com/v1"  # Override base URL (optional)
  instructions: |  # Custom instructions (optional)
    You are a friendly customer service representative.
    Rewrite the message to be warm and empathetic.
    Always thank the customer for their patience.

outcomes:
  - id: success
    type: success
    message: "Return approved for order {{order_id}}. Reason: {{return_reason}}."

  - id: technical_error
    type: failure
    humanize: false  # Disable humanization for this specific outcome
    message: "Error code: {{error_code}}. Contact support."
```

### Example Transformation

| Template Message | Humanized Response |
|-----------------|-------------------|
| `"Return approved for order 1234. Reason: damaged item."` | `"Great news! I've approved the return for your order #1234. I completely understand about the damaged item - that's so frustrating. You'll receive an email shortly with return instructions. Is there anything else I can help you with?"` |

### Disabling Humanization

**Globally** (for entire workflow):
```yaml
humanization_agent:
  enabled: false
```

**Per-outcome**:
```yaml
outcomes:
  - id: error_code
    type: failure
    humanize: false  # Keep exact message for debugging/logging
    message: "Error: {{error_code}}"
```

### Model Configuration

The humanization agent inherits the workflow's runtime `model_config`. You can override specific settings:

```python
config = {
    "model_config": {
        "model_name": "gpt-4o-mini",  # Base model for all agents
        "api_key": os.getenv("OPENAI_API_KEY"),
    }
}

# In YAML, humanization_agent.model overrides model_name for humanization only
```

## Per-Turn Localization

The framework supports per-turn localization, allowing dynamic language and script switching during workflow execution. Each call to `execute()` can specify a different target language/script.

### How It Works

1. **Per-turn parameters**: Pass `target_language` and `target_script` to `execute()`
2. **Instruction injection**: Localization instructions are prepended to agent system prompts
3. **No extra LLM calls**: The same agent that generates the response handles localization

### Usage

**Per-turn language switching:**
```python
from soprano_sdk import WorkflowTool

tool = WorkflowTool(
    yaml_path="return_workflow.yaml",
    name="return_processor",
    description="Process returns",
    checkpointer=checkpointer,
    config=config
)

# Turn 1: English (no localization)
result = tool.execute(thread_id="123", user_message="hi")

# Turn 2: Switch to Tamil
result = tool.execute(
    thread_id="123",
    user_message="my order id is 1234",
    target_language="Tamil",
    target_script="Tamil"
)

# Turn 3: Back to English (no localization params)
result = tool.execute(thread_id="123", user_message="yes")
```

### YAML Defaults (Optional)

You can set default localization in the workflow YAML. These are used when `target_language`/`target_script` are not passed to `execute()`:

```yaml
name: "Return Workflow"
version: "1.0"

localization:
  language: "Tamil"
  script: "Tamil"
  instructions: |  # Optional: custom instructions
    Use formal Tamil suitable for customer service.
    Always be polite and respectful.

# ... rest of workflow
```

### Key Points

- **Localization affects**: Data collector prompts, follow-up responses, and humanized outcome messages
- **Outcome messages require humanization**: If `humanize: false`, outcome messages stay in English (template output)
- **Per-turn override**: Runtime parameters always override YAML defaults

## Real-Time Streaming

The streaming API replaces `WorkflowTool.execute()` with a generator that yields typed events as each node runs. This enables Server-Sent Events, WebSocket pushes, or any real-time transport.

### Quick Start

```python
from soprano_sdk.streaming import (
    WorkflowStreamer,
    InterruptEvent,
    CompleteEvent,
    CustomEvent,
    ErrorEvent,
)

streamer = WorkflowStreamer(
    yaml_path="return_workflow.yaml",
    name="returns",
    description="Process customer returns",
    config={"model_config": {"model_name": "gpt-4o-mini", "api_key": "..."}},
)

for event in streamer.stream(thread_id="t1", user_message="hello"):
    match event:
        case InterruptEvent():
            print(f"Bot: {event.prompt}")
        case CompleteEvent():
            print(f"Done: {event.message}")
        case CustomEvent():
            print(f"[{event.payload['type']}] {event.payload}")
        case ErrorEvent():
            print(f"Error: {event.error}")
```

### Emitting Events from Functions

Use `emit()` inside `call_function` or `call_async_function` handlers to push mid-execution events to streaming consumers. Outside streaming mode (`WorkflowTool.execute()`), `emit()` is a silent no-op.

```python
from soprano_sdk.streaming import emit

def check_eligibility(state):
    order_id = state.get("order_id")

    emit("step", {"label": f"Looking up order {order_id}..."})
    order = crm.lookup(order_id)

    emit("step", {"label": "Checking return window..."})
    eligible = order["days_since_purchase"] <= 30

    emit("result", {"label": "Eligible" if eligible else "Not eligible"})
    return eligible
```

### Event Types

| Event | When | Contains |
|-------|------|----------|
| `NodeCompleteEvent` | A graph node finished | `node`, `state_update` |
| `CustomEvent` | `emit()` called mid-node | `payload` (arbitrary dict) |
| `InterruptEvent` | Waiting for user input | `prompt`, `options`, `field_details` |
| `AsyncInterruptEvent` | Waiting for external callback | `thread_id`, `step_id`, `pending` |
| `OutOfScopeEvent` | User query is off-topic | `thread_id`, `reason` |
| `CompleteEvent` | Workflow finished | `message`, `options` |
| `ErrorEvent` | Unrecoverable error | `error` |

### Streaming API vs WorkflowTool

The streaming API accepts the same parameters as `WorkflowTool.execute()`:

```python
# WorkflowTool (synchronous, returns WorkflowResult)
result = tool.execute(
    thread_id="t1",
    user_message="hello",
    initial_context={"order_id": "123"},
    target_language="Tamil",
    target_script="Tamil",
)

# WorkflowStreamer (streaming, yields events)
for event in streamer.stream(
    thread_id="t1",
    user_message="hello",
    initial_context={"order_id": "123"},
    target_language="Tamil",
    target_script="Tamil",
):
    ...
```

Both run the same underlying LangGraph nodes with the same tracing, checkpointing, and interrupt behaviour. The only difference is observation: `execute()` blocks and returns one result; `stream()` yields events as they happen.

## Grounding Check

The grounding check validates agent responses against the available context (instructions, workflow data, and tool outputs) using [AWS Bedrock Guardrails](https://docs.aws.amazon.com/bedrock/latest/userguide/guardrails.html). This prevents agents from hallucinating information — fabricating URLs, phone numbers, prices, or other details not present in their context.

### How It Works

1. After the agent generates a response, it is sent to the Bedrock ApplyGuardrail API along with the grounding source (agent instructions + collected workflow data + tool outputs) and the user's query
2. Bedrock evaluates whether the response is supported by the grounding source
3. If grounded, the response is returned as-is
4. If not grounded, the response is discarded, the agent receives feedback, and regenerates once
5. **Fail-open**: If Bedrock is unavailable or errors occur, the response passes through

### Prerequisites

1. An AWS account with Bedrock access
2. A Bedrock Guardrail with **Contextual Grounding** policy enabled (configure grounding and relevance thresholds in the AWS console)
3. AWS credentials configured (env vars, profile, or IAM role)
4. Install the guardrails dependency:

```bash
pip install soprano-sdk[guardrails]
# or
uv add soprano-sdk --extra guardrails
```

### Environment Variables

| Variable | Required | Default | Description |
|----------|----------|---------|-------------|
| `BEDROCK_GUARDRAIL_ID` | Yes | — | Your Bedrock Guardrail identifier |
| `BEDROCK_GUARDRAIL_VERSION` | No | `DRAFT` | Guardrail version (`DRAFT` or a version number) |
| `AWS_REGION_NAME` | No | boto3 default | AWS region for Bedrock runtime |

Standard AWS credential env vars (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`) or an IAM role are also required.

### YAML Configuration

Enable grounding check on any data collector agent by setting `grounding_check: true`:

```yaml
steps:
  - id: collect_order_id
    action: collect_input_with_agent
    field: order_id
    agent:
      name: "OrderCollector"
      model: "gpt-4o-mini"
      grounding_check: true  # Enable grounding check for this agent
      instructions: |
        Ask the customer for their order ID.
        Once captured, respond with: 'ORDER_ID_CAPTURED: [order_id]'
    transitions:
      - pattern: "ORDER_ID_CAPTURED:"
        next: check_eligibility
```

### What Gets Checked

The grounding source sent to Bedrock is assembled from three parts:

- **Agent instructions**: The system prompt from the YAML `instructions` field
- **Workflow data**: All previously collected field values (e.g., order ID, customer name)
- **Tool outputs**: Results from any tools called by the agent during the current turn (LangGraph adapter only)

Only conversational responses are checked. Extraction patterns (e.g., `ORDER_ID_CAPTURED: 12345`) skip the grounding check automatically.

### Example: What Gets Caught

Given an agent with instructions about a 30-day return policy:

| Response | Grounded? | Why |
|----------|-----------|-----|
| "Your order is eligible for return within 30 days." | Yes | Matches instructions |
| "Call 1-800-555-RETURN to process your return." | No | Phone number not in context |
| "Your refund of $149.99 has been initiated." | No | Price/action not in context |

### Backward Compatibility

- Grounding check is **disabled by default** (`grounding_check: false`)
- Existing workflows without `grounding_check` in their YAML continue to work unchanged
- If `BEDROCK_GUARDRAIL_ID` is not set but `grounding_check: true`, a warning is logged and the check is skipped

## Examples

See the `examples/` directory for complete workflow examples:

- `greeting_workflow.yaml` - Simple user greeting workflow
- `return_workflow.yaml` - Customer return processing workflow
- Function modules with business logic (`greeting_functions.py`, `return_functions.py`)
- `persistence/` - Persistence strategy examples (entity-based, conversation-based, SQLite demo)
- `streaming/` - Interactive streaming example with real-time UI updates via `emit()`

## Running Workflows

### CLI Demo

```bash
# Basic usage (in-memory)
python scripts/workflow_demo.py examples/greeting_workflow.yaml

# With MongoDB persistence (local)
python scripts/workflow_demo.py examples/greeting_workflow.yaml --mongodb mongodb://localhost:27017

# Resume existing workflow
python scripts/workflow_demo.py examples/greeting_workflow.yaml --mongodb mongodb://localhost:27017 --thread-id abc-123

# With MongoDB Atlas
python scripts/workflow_demo.py examples/greeting_workflow.yaml --mongodb mongodb+srv://user:pass@cluster.mongodb.net
```

### Gradio UI

```bash
# Basic usage (in-memory)
python scripts/workflow_demo_ui.py examples/greeting_workflow.yaml

# With MongoDB persistence
python scripts/workflow_demo_ui.py examples/greeting_workflow.yaml --mongodb mongodb://localhost:27017

# With MongoDB Atlas
python scripts/workflow_demo_ui.py examples/greeting_workflow.yaml --mongodb mongodb+srv://user:pass@cluster.mongodb.net
```

### Persistence Examples

```bash
cd examples/persistence

# Entity-based (order ID as thread ID)
python entity_based.py ORDER-123

# Conversation-based (UUID with supervisor pattern)
python conversation_based.py ../return_workflow.yaml --order-id ORDER-456

# MongoDB demo with pause/resume
python mongodb_demo.py

# Use MongoDB Atlas
python mongodb_demo.py --mongodb mongodb+srv://user:pass@cluster.mongodb.net
```

### Visualize Workflow

```bash
python scripts/visualize_workflow.py examples/greeting_workflow.yaml
```

## Development

### Setup

```bash
git clone https://github.com/dnivra26/soprano_sdk_framework.git
cd soprano_sdk_framework
uv sync --dev
```

### Run Tests

```bash
python tests/test_external_values.py
```

## Architecture

- **soprano_sdk/**: Core library package
  - `engine.py`: Workflow engine implementation
  - `__init__.py`: Public API exports

- **examples/**: Example workflows and persistence patterns
  - Workflow YAML definitions
  - Function modules with business logic
  - `persistence/`: Different persistence strategy examples

- **scripts/**: Utility tools for running and visualizing workflows
  - `workflow_demo.py`: CLI runner with persistence support
  - `workflow_demo_ui.py`: Gradio UI with thread management
  - `visualize_workflow.py`: Workflow graph generator

- **tests/**: Test suite
- **legacy/**: Previous implementations (FSM, direct LangGraph)

## Requirements

### Core Dependencies

- Python >= 3.12
- agno >= 2.0.7
- langgraph >= 0.6.8
- openai >= 1.108.1
- pyyaml >= 6.0

### Optional Dependencies

For MongoDB persistence:
```bash
# Using pip
pip install langgraph-checkpoint-mongodb pymongo

# Using uv (recommended)
uv add langgraph-checkpoint-mongodb pymongo --optional persistence

# Or install library with persistence support
pip install conversational-sop-framework[persistence]
```

For Bedrock Guardrails grounding check:
```bash
pip install conversational-sop-framework[guardrails]
# or
uv add conversational-sop-framework --extra guardrails
```

For development (includes Gradio UI and tests):
```bash
pip install conversational-sop-framework[dev]
# or
uv sync --dev
```

## License

MIT

## Contributing

Contributions are welcome! Please open an issue or submit a pull request.

## To Do

- ✅ Database persistence (SqliteSaver, PostgresSaver supported)
- ✅ Pluggable checkpointer system
- ✅ Thread ID strategies and examples
- ✅ Follow-up node for conversational Q&A
- ✅ Out-of-scope detection for multi-workflow routing
- ✅ Outcome humanization with LLM
- ✅ Per-turn localization for multi-language support
- ✅ Grounding check via AWS Bedrock Guardrails
- ✅ Real-time streaming with mid-node event emission
- Additional action types (webhook, conditional branching, parallel execution)
- More workflow examples (customer onboarding, support ticketing, approval flows)
- Workflow testing utilities
- Metrics and monitoring hooks

## Links

- [GitHub Repository](https://github.com/dnivra26/soprano_sdk_framework)
- [Issues](https://github.com/dnivra26/soprano_sdk_framework/issues)
