Metadata-Version: 2.4
Name: ondine
Version: 1.7.0
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Requires-Dist: litellm>=1.80.0,!=1.82.7,!=1.82.8
Requires-Dist: instructor>=1.0.0
Requires-Dist: diskcache>=5.6.0
Requires-Dist: aiohttp>=3.9.0
Requires-Dist: pandas>=1.5.0
Requires-Dist: polars>=0.20.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: python-dotenv>=1.0.0
Requires-Dist: tqdm>=4.66.0
Requires-Dist: tenacity>=8.2.0
Requires-Dist: tiktoken>=0.5.0
Requires-Dist: structlog>=23.1.0
Requires-Dist: jinja2>=3.1.0
Requires-Dist: click>=8.1.0
Requires-Dist: rich>=13.0.0
Requires-Dist: anthropic>=0.84.0
Requires-Dist: redis>=5.0.0 ; extra == 'all'
Requires-Dist: openpyxl>=3.1.0 ; extra == 'all'
Requires-Dist: pyarrow>=15.0.0 ; extra == 'all'
Requires-Dist: uvloop>=0.19.0 ; sys_platform != 'win32' and extra == 'all'
Requires-Dist: llama-index>=0.12.0 ; extra == 'all'
Requires-Dist: opentelemetry-api>=1.20.0 ; extra == 'all'
Requires-Dist: opentelemetry-sdk>=1.20.0 ; extra == 'all'
Requires-Dist: opentelemetry-exporter-jaeger>=1.20.0 ; extra == 'all'
Requires-Dist: deprecated>=1.2.0 ; extra == 'all'
Requires-Dist: langfuse>=2.0.0 ; extra == 'all'
Requires-Dist: prometheus-client>=0.20.0 ; extra == 'all'
Requires-Dist: mlx>=0.29.0 ; extra == 'all'
Requires-Dist: mlx-lm>=0.28.0 ; extra == 'all'
Requires-Dist: azure-identity>=1.15.0 ; extra == 'all'
Requires-Dist: pymupdf>=1.24 ; extra == 'all'
Requires-Dist: sentence-transformers>=3.0 ; extra == 'all'
Requires-Dist: azure-identity>=1.15.0 ; extra == 'azure'
Requires-Dist: pytest>=8.0.0 ; extra == 'dev'
Requires-Dist: pytest-cov>=4.1.0 ; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23.0 ; extra == 'dev'
Requires-Dist: ruff>=0.8.0 ; extra == 'dev'
Requires-Dist: mypy>=1.13.0 ; extra == 'dev'
Requires-Dist: pre-commit>=4.0.0 ; extra == 'dev'
Requires-Dist: bandit>=1.7.0 ; extra == 'dev'
Requires-Dist: pip-audit>=2.7.0 ; extra == 'dev'
Requires-Dist: pip-licenses>=4.3.0 ; extra == 'dev'
Requires-Dist: ipython>=8.20.0 ; extra == 'dev'
Requires-Dist: jupyter>=1.0.0 ; extra == 'dev'
Requires-Dist: pydocstyle>=6.3.0 ; extra == 'dev'
Requires-Dist: interrogate>=1.7.0 ; extra == 'dev'
Requires-Dist: mkdocs>=1.5.0 ; extra == 'docs'
Requires-Dist: mkdocs-material>=9.5.0 ; extra == 'docs'
Requires-Dist: mkdocstrings[python]>=0.24.0 ; extra == 'docs'
Requires-Dist: mkdocs-gen-files>=0.5.0 ; extra == 'docs'
Requires-Dist: mkdocs-literate-nav>=0.6.0 ; extra == 'docs'
Requires-Dist: mkdocs-section-index>=0.3.0 ; extra == 'docs'
Requires-Dist: openpyxl>=3.1.0 ; extra == 'excel'
Requires-Dist: pymupdf>=1.24 ; extra == 'knowledge'
Requires-Dist: sentence-transformers>=3.0 ; extra == 'knowledge'
Requires-Dist: llama-index>=0.12.0 ; extra == 'llama-index'
Requires-Dist: mlx>=0.29.0 ; extra == 'mlx'
Requires-Dist: mlx-lm>=0.28.0 ; extra == 'mlx'
Requires-Dist: opentelemetry-api>=1.20.0 ; extra == 'observability'
Requires-Dist: opentelemetry-sdk>=1.20.0 ; extra == 'observability'
Requires-Dist: opentelemetry-exporter-jaeger>=1.20.0 ; extra == 'observability'
Requires-Dist: deprecated>=1.2.0 ; extra == 'observability'
Requires-Dist: langfuse>=2.0.0 ; extra == 'observability'
Requires-Dist: prometheus-client>=0.20.0 ; extra == 'observability'
Requires-Dist: pyarrow>=15.0.0 ; extra == 'parquet'
Requires-Dist: uvloop>=0.19.0 ; sys_platform != 'win32' and extra == 'performance'
Requires-Dist: redis>=5.0.0 ; extra == 'redis'
Requires-Dist: textual>=1.0.0 ; extra == 'tui'
Requires-Dist: zep-cloud>=2.0 ; extra == 'zep'
Provides-Extra: all
Provides-Extra: azure
Provides-Extra: dev
Provides-Extra: docs
Provides-Extra: excel
Provides-Extra: knowledge
Provides-Extra: llama-index
Provides-Extra: mlx
Provides-Extra: observability
Provides-Extra: parquet
Provides-Extra: performance
Provides-Extra: redis
Provides-Extra: tui
Provides-Extra: zep
Summary: Ondine - The LLM Dataset Engine. SDK for processing tabular datasets using LLMs with reliability, observability, and cost control
Keywords: llm,dataset,batch-processing,data-engineering,ai,machine-learning,data-transformation
Author-email: Binblok <git@binblok.com>
License: MIT
Requires-Python: >=3.10
Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM
Project-URL: Documentation, https://ptimizeroracle.github.io/ondine
Project-URL: Homepage, https://github.com/ptimizeroracle/Ondine
Project-URL: Issues, https://github.com/ptimizeroracle/Ondine/issues
Project-URL: Repository, https://github.com/ptimizeroracle/Ondine

<div align="center">
  <img src="https://raw.githubusercontent.com/ptimizeroracle/ondine/main/assets/images/ondine-logo.png" alt="Ondine Logo" width="600"/>

  # LLM Dataset Engine

  **Batch process millions of rows with LLMs — 100x fewer API calls, 40-50% cost savings, 99.9% completion rate**

  [![PyPI version](https://img.shields.io/pypi/v/ondine.svg)](https://pypi.org/project/ondine/)
  [![Downloads](https://static.pepy.tech/badge/ondine/month)](https://pepy.tech/project/ondine)
  [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
  [![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)
  [![GitHub stars](https://img.shields.io/github/stars/ptimizeroracle/ondine.svg?style=social)](https://github.com/ptimizeroracle/ondine)
  [![Tests](https://github.com/ptimizeroracle/Ondine/actions/workflows/ci.yml/badge.svg)](https://github.com/ptimizeroracle/Ondine/actions/workflows/ci.yml)
  [![Documentation](https://img.shields.io/badge/docs-MkDocs%20Material-blue.svg)](https://ptimizeroracle.github.io/ondine)

  <img src="assets/images/demo.gif" alt="Ondine Demo" width="700"/>

</div>

> **For data engineers and ML practitioners** who need to process millions of tabular rows with LLMs, Ondine is an open-source SDK that delivers **100x fewer API calls** via multi-row batching and **40-50% cost reduction** via prefix caching — with cost estimation, budget limits, checkpointing, and 100+ provider support built in.

## Features

- **Quick API**: 3-line hello world with smart defaults and auto-detection
- **Simple API**: Fluent builder pattern for full control when needed
- **Multi-Row Batching**: Process N rows per API call for 100× speedup (NEW!)
- **Prefix Caching**: 40-50% cost reduction by caching system prompts across millions of rows
- **Reliability**: Automatic retries, checkpointing, error policies (99.9% completion rate)
- **Cost Control**: Pre-execution estimation, budget limits, real-time tracking
- **Observability**: Progress bars, structured logging, metrics, cost reports
- **Extensibility**: Plugin architecture, custom stages, multiple LLM providers
- **Fault Tolerant**: Zero data loss on crashes, resume from checkpoint
- **100+ Providers**: Native LiteLLM integration supporting OpenAI, Azure, Anthropic, Groq, Cerebras, Moonshot, and 100+ others
- **Smart Routing**: Built-in LiteLLM Router with **latency-based routing** (fastest wins) and automatic failover for high availability
- **Local Inference**: Run models locally with MLX (Apple Silicon) or Ollama - 100% free, private, offline-capable
- **Multi-Column Processing**: Generate multiple output columns with composition or JSON parsing
- **Custom Providers**: Integrate any OpenAI-compatible API (Together.AI, vLLM, Ollama, custom endpoints)
- **Context Store (Anti-Hallucination)**: Post-LLM quality layer with grounding verification, contradiction detection, and confidence scoring (Rust/SQLite/FTS5)
- **Knowledge Store (RAG)**: Pre-LLM knowledge retrieval with document ingestion, hybrid search, and reranking
- **Evidence Priming**: Pre-LLM injection of prior validated answers for cross-run consistency

## Quick Start

### Option 1: Quick API (Recommended)

The simplest way to get started - just provide your data, prompt, and model:

```python
from ondine import QuickPipeline

# Process data with smart defaults
pipeline = QuickPipeline.create(
    data="data.csv",
    prompt="Clean this text: {description}",
    model="gpt-4o-mini"
)

# Execute pipeline
result = pipeline.execute()
print(f"Processed {result.metrics.processed_rows} rows")
print(f"Total cost: ${result.costs.total_cost:.4f}")
```

**What's auto-detected:**
- Input columns from `{placeholders}` in prompt
- Provider from model name (gpt-4 → openai, claude → anthropic)
- Parser type (JSON for multi-column, text for single column)
- Sensible batch size and concurrency for the provider

### Option 2: Builder API (Full Control)

For advanced use cases requiring explicit configuration:

```python
from ondine import PipelineBuilder

# Build with explicit settings
pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["description"],
              output_columns=["cleaned"])
    .with_prompt("Clean this text: {description}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    .with_batch_size(100)
    .with_concurrency(5)
    .build()
)

# Estimate cost before running
estimate = pipeline.estimate_cost()
print(f"Estimated cost: ${estimate.total_cost:.4f}")

# Execute pipeline
result = pipeline.execute()
print(f"Total cost: ${result.costs.total_cost:.4f}")
```

## Why Ondine?

| Feature | Ondine | LangChain | DSPy | Custom Scripts |
|---------|--------|-----------|------|----------------|
| **Purpose-built for tabular data** | ✅ | ❌ General purpose | ❌ Prompt optimization | ⚠️ Manual |
| **Multi-row batching (100x fewer calls)** | ✅ Built-in | ❌ | ❌ | ⚠️ DIY |
| **Prefix caching (40-50% savings)** | ✅ Automatic | ❌ | ❌ | ⚠️ DIY |
| **Pre-run cost estimation** | ✅ | ❌ | ❌ | ❌ |
| **Budget limits & real-time tracking** | ✅ | ❌ | ❌ | ❌ |
| **Checkpointing & resume** | ✅ Automatic | ❌ | ❌ | ⚠️ DIY |
| **100+ LLM providers** | ✅ Via LiteLLM | ✅ Via LangChain Hub | ⚠️ Limited | ⚠️ Manual |
| **Structured output (Pydantic)** | ✅ Via Instructor | ✅ | ✅ | ⚠️ DIY |
| **Setup complexity** | `pip install ondine` | Complex chains | Research-oriented | Significant engineering |

## Installation

### Using uv (recommended)

```bash
# Basic installation
uv add ondine

# With observability support
uv add "ondine[observability]"

# With Excel support
uv add "ondine[excel]"

# With Parquet support
uv add "ondine[parquet]"

# With MLX support (Apple Silicon only)
uv add "ondine[mlx]"

# Everything included
uv add "ondine[all]"
```

### Using pip

```bash
# Basic installation
pip install ondine

# With observability support
pip install "ondine[observability]"

# With Excel support
pip install "ondine[excel]"

# With Parquet support
pip install "ondine[parquet]"

# With MLX support (Apple Silicon only)
pip install "ondine[mlx]"

# Everything included
pip install "ondine[all]"
```

### Set up API keys

```bash
# For cloud providers
export OPENAI_API_KEY="your-key-here"  # pragma: allowlist secret
# or
export AZURE_OPENAI_API_KEY="your-key-here"  # pragma: allowlist secret
export AZURE_OPENAI_ENDPOINT="https://your-endpoint.openai.azure.com/"
# or
export ANTHROPIC_API_KEY="your-key-here"
# or
export GROQ_API_KEY="your-key-here"
# or
export TOGETHER_API_KEY="your-key-here"

# For MLX (Apple Silicon)
export HUGGING_FACE_HUB_TOKEN="your-token-here"  # For model downloads

# Local providers (Ollama, vLLM) don't need API keys
```

## Documentation

**Complete documentation is available at: https://ptimizeroracle.github.io/ondine**

The documentation includes:
- Installation and setup guides
- Quickstart tutorial (build your first pipeline in 5 minutes)
- Core concepts and architecture
- Execution modes (sync, async, streaming)
- Structured output with Pydantic
- Cost control and optimization
- Provider-specific guides
- Complete API reference (auto-generated from source)

## Usage Examples

### 1. Simple Data Processing

```python
from ondine import DatasetProcessor

# Minimal configuration for simple use cases
processor = DatasetProcessor(
    data="reviews.csv",
    input_column="customer_review",
    output_column="sentiment",
    prompt="Classify sentiment as: Positive, Negative, or Neutral\nReview: {customer_review}\nSentiment:",
    llm_config={"provider": "openai", "model": "gpt-4o-mini"}
)

# Test on sample first
sample = processor.run_sample(n=10)
print(sample)

# Process full dataset
result = processor.run()
```

### 2. Structured Data Extraction (JSON)

```python
from ondine import PipelineBuilder

pipeline = (
    PipelineBuilder.create()
    .from_dataframe(
        df,
        input_columns=["product_description"],
        output_columns=["brand", "model", "price", "condition"]
    )
    .with_prompt("""
        Extract structured information and return JSON:
        {
          "brand": "...",
          "model": "...",
          "price": "...",
          "condition": "new|used|refurbished"
        }

        Description: {product_description}
    """)
    .with_llm(provider="openai", model="gpt-4o-mini", temperature=0.0)
    .build()
)

result = pipeline.execute()
```

### 3. Prefix Caching for Cost Reduction (NEW!)

Reduce costs by 40-50% on large datasets by caching system prompts:

```python
from ondine import PipelineBuilder

# Define shared context once (cached across all stages and rows)
SHARED_CONTEXT = """You are an expert data analyst.
[General domain knowledge and principles - 1024+ tokens for OpenAI caching]
"""

# Stage 1: First transformation
pipeline1 = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["result1"])
    .with_prompt("TASK: Analyze text\nINPUT: {text}\nOUTPUT:")
    .with_system_prompt(SHARED_CONTEXT)  # Cached!
    .with_llm(provider="openai", model="gpt-4o-mini")
    .build()
)

# Stage 2: Second transformation (reuses Stage 1's cache!)
pipeline2 = (
    PipelineBuilder.create()
    .from_csv("data_stage1.csv",
              input_columns=["text", "result1"],
              output_columns=["result2"])
    .with_prompt("TASK: Further analysis\nINPUT: {text}, {result1}\nOUTPUT:")
    .with_system_prompt(SHARED_CONTEXT)  # Same cache!
    .with_llm(provider="openai", model="gpt-4o-mini")
    .build()
)

# Execute both stages
result1 = pipeline1.execute()
result2 = pipeline2.execute()

# Cost savings: 40-50% reduction from caching!
```

**How it works:**
- System prompt (1024+ tokens) cached automatically by provider
- Subsequent requests reuse the cache (no warm-up needed)
- Only pay full price for dynamic data (your row-specific content)
- 50% discount on cached tokens (OpenAI), up to 90% (Anthropic)

**Requirements:**
- OpenAI: System prompt >1024 tokens for automatic caching
- Anthropic: System message separation (automatic caching)
- Groq: Model-specific support (check provider docs)

**Use cases:**
- Multi-stage pipelines (classification, enrichment, validation)
- Large datasets with repeated instructions
- Any workflow with static context + dynamic data

See `examples/20_prefix_caching.py` for complete example.

### 4. Multi-Row Batching for 100× Speedup (NEW!)

Process 100 rows in a single API call to reduce API calls by 100×:

```python
from ondine import PipelineBuilder

# Traditional (slow): 5M rows = 5M API calls
pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["sentiment"])
    .with_prompt("Classify: {text}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    .build()
)

# With batching (fast): 5M rows = 50K API calls (100× fewer!)
pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["sentiment"])
    .with_prompt("Classify: {text}")
    .with_batch_size(100)  # Process 100 rows per API call!
    .with_llm(provider="openai", model="gpt-4o-mini")
    .build()
)
```

**How it works:**
- Aggregates N rows into a single JSON-formatted prompt
- LLM processes all rows in one call and returns JSON array
- Automatically disaggregates response back to individual rows
- Handles partial failures (retries failed rows individually)

**Benefits:**
- 100× fewer API calls (5M → 50K with batch_size=100)
- 100× faster processing (69 hours → 42 minutes)
- Same token cost, eliminates API overhead
- Automatic context window validation

**Requirements:**
- Batch size limited by model context window (auto-validated)
- Works with all providers (OpenAI, Anthropic, Groq, custom)
- Recommended: Start with batch_size=10-50, increase based on results

See `examples/21_multi_row_batching.py` for complete examples and benchmarks.

### 5. Type-Safe Structured Output (Pydantic)

```python
from pydantic import BaseModel
from ondine import PipelineBuilder
from ondine.stages.response_parser_stage import PydanticParser

# Define your Pydantic model for type-safe validation
class ProductInfo(BaseModel):
    brand: str
    model: str
    price: float
    condition: str

pipeline = (
    PipelineBuilder.create()
    .from_dataframe(
        df,
        input_columns=["product_description"],
        output_columns=["brand", "model", "price", "condition"]
    )
    .with_prompt("""
        Extract product information and return JSON:
        {
          "brand": "manufacturer name",
          "model": "product model",
          "price": 999.99,
          "condition": "new|used|refurbished"
        }

        Description: {product_description}
    """)
    .with_llm(provider="openai", model="gpt-4o-mini", temperature=0.0)
    .with_parser(PydanticParser(ProductInfo, strict=True))  # Type-safe validation!
    .build()
)

result = pipeline.execute()
# Results are validated against ProductInfo model
```

### 6. With Cost Control

```python
pipeline = (
    PipelineBuilder.create()
    .from_csv("large_dataset.csv",
              input_columns=["text"],
              output_columns=["summary"])
    .with_prompt("Summarize in 10 words: {text}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    # Cost control settings
    .with_max_budget(10.0)  # Maximum $10
    .with_batch_size(100)
    .with_concurrency(5)
    .with_rate_limit(60)  # 60 requests/min
    .with_checkpoint_interval(500)  # Checkpoint every 500 rows
    .build()
)

# Estimate first
estimate = pipeline.estimate_cost()
if estimate.total_cost > 10.0:
    print("Cost too high!")
    exit()

result = pipeline.execute()
```

### 7. Multiple Input Columns

```python
pipeline = (
    PipelineBuilder.create()
    .from_csv("products.csv",
              input_columns=["title", "description", "category"],
              output_columns=["optimized_title"])
    .with_prompt("""
        Optimize this product title for SEO.

        Current Title: {title}
        Description: {description}
        Category: {category}

        Optimized Title:
    """)
    .with_llm(provider="openai", model="gpt-4o-mini")
    .with_output("optimized_products.csv", format="csv")
    .build()
)

result = pipeline.execute()
```

### 8. Azure OpenAI

```python
pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["result"])
    .with_prompt("Process: {text}")
    .with_llm(
        provider="azure_openai",
        model="gpt-4",
        azure_endpoint="https://your-endpoint.openai.azure.com/",
        azure_deployment="your-deployment-name",
        api_version="2024-02-15-preview"
    )
    .build()
)
```

### 9. Anthropic Claude

```python
pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["analysis"])
    .with_prompt("Analyze: {text}")
    .with_llm(
        provider="anthropic",
        model="claude-3-opus-20240229",
        temperature=0.0,
        max_tokens=1024
    )
    .build()
)
```

### 10. Local Inference with MLX (Apple Silicon)

```python
# 100% free, private, offline-capable inference on M1/M2/M3/M4 Macs
pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["summary"])
    .with_prompt("Summarize: {text}")
    .with_llm(
        provider="mlx",
        model="mlx-community/Qwen3-1.7B-4bit",  # Fast, small model
        max_tokens=100,
        input_cost_per_1k_tokens=0.0,  # Free!
        output_cost_per_1k_tokens=0.0
    )
    .with_concurrency(1)  # MLX works best with concurrency=1
    .build()
)
```

**Requirements**:
- macOS with Apple Silicon (M1/M2/M3/M4)
- Install with: `pip install ondine[mlx]`

### 11. Provider Presets (Simplified Configuration)

```python
from ondine import PipelineBuilder
from ondine.core.specifications import LLMProviderPresets

# Use pre-configured providers (80% less boilerplate!)
pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["result"])
    .with_prompt("Process: {text}")
    .with_llm_spec(LLMProviderPresets.TOGETHER_AI_LLAMA_70B)  # One line!
    .build()
)

# Available presets:
# - LLMProviderPresets.GPT4O_MINI
# - LLMProviderPresets.GPT4O
# - LLMProviderPresets.TOGETHER_AI_LLAMA_70B
# - LLMProviderPresets.TOGETHER_AI_LLAMA_8B
# - LLMProviderPresets.OLLAMA_LLAMA_70B (free, local)
# - LLMProviderPresets.OLLAMA_LLAMA_8B (free, local)
# - LLMProviderPresets.GROQ_LLAMA_70B
# - LLMProviderPresets.CLAUDE_SONNET_4

# Override preset settings:
custom = LLMProviderPresets.GPT4O_MINI.model_copy(
    update={"temperature": 0.9, "max_tokens": 500}
)
pipeline.with_llm_spec(custom)

# Custom provider via factory:
custom_vllm = LLMProviderPresets.create_custom_openai_compatible(
    provider_name="My vLLM Server",
    model="mistral-7b-instruct",
    base_url="http://my-server:8000/v1"
)
pipeline.with_llm_spec(custom_vllm)
```

**Benefits**:
- Zero configuration errors (pre-validated)
- Correct pricing and URLs built-in
- IDE autocomplete for discovery
- 80% code reduction vs parameter-based config

### 12. Custom OpenAI-Compatible APIs (Parameter-Based)

```python
# Alternative: Configure providers with individual parameters
pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["result"])
    .with_prompt("Process: {text}")
    .with_llm(
        provider="openai_compatible",
        provider_name="Together.AI",  # Or "Ollama", "vLLM", etc.
        model="meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
        base_url="https://api.together.xyz/v1",  # Custom endpoint
        api_key="${TOGETHER_API_KEY}",
        input_cost_per_1k_tokens=0.0006,
        output_cost_per_1k_tokens=0.0006
    )
    .build()
)
```

**Supported APIs**:
- **Ollama** (local): `http://localhost:11434/v1`
- **Together.AI** (cloud): `https://api.together.xyz/v1`
- **vLLM** (self-hosted): Your custom endpoint
- **Any OpenAI-compatible API**

### 13. Multi-Column Output with JSON Parsing

```python
# Single LLM call generates multiple output columns
pipeline = (
    PipelineBuilder.create()
    .from_csv("products.csv",
              input_columns=["description"],
              output_columns=["brand", "category", "price"])  # Multiple outputs!
    .with_prompt("""
        Extract structured data from this product description.
        Return JSON format:
        {
          "brand": "...",
          "category": "...",
          "price": "..."
        }

        Description: {description}
    """)
    .with_llm(provider="openai", model="gpt-4o-mini", temperature=0.0)
    .build()
)

result = pipeline.execute()
# Result has 3 new columns: brand, category, price
```

### 14. Pipeline Composition (Multi-Column with Dependencies)

```python
from ondine import PipelineComposer

# Create multiple pipelines with dependencies
composer = PipelineComposer(input_data=df)

# Pipeline 1: Generate sentiment score
sentiment_pipeline = (
    PipelineBuilder.create()
    .from_dataframe(df, input_columns=["review"], output_columns=["sentiment"])
    .with_prompt("Rate sentiment (0-100): {review}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    .build()
)

# Pipeline 2: Generate explanation (depends on sentiment)
explanation_pipeline = (
    PipelineBuilder.create()
    .from_dataframe(df,
                    input_columns=["review", "sentiment"],
                    output_columns=["explanation"])
    .with_prompt("Explain why this review has {sentiment}% sentiment: {review}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    .build()
)

# Compose and execute
result = (
    composer
    .add_column("sentiment", sentiment_pipeline)
    .add_column("explanation", explanation_pipeline, depends_on=["sentiment"])
    .execute()
)
```

### 15. Context Store — Anti-Hallucination Quality Layer (NEW!)

Add post-LLM grounding, contradiction detection, and confidence scoring to catch hallucinations before they reach your output:

```python
from ondine import PipelineBuilder

pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["summary"])
    .with_prompt("Summarize: {text}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    # Anti-hallucination quality layer (post-LLM)
    .with_context_store()                # Enable evidence graph (Rust/SQLite/FTS5)
    .with_grounding()                    # Verify claims against source text
    .with_contradiction_detection()      # Flag conflicting statements across rows
    .with_confidence_scoring()           # Score each output 0.0-1.0
    .build()
)

result = pipeline.execute()
# Outputs include grounding status, contradiction flags, and confidence scores
```

**How it works:**
- Stores validated claims in a persistent evidence graph (SQLite + FTS5 full-text search)
- Grounding stage checks LLM output against input text to flag unsupported claims
- Contradiction detection compares new outputs against previously validated evidence
- Confidence scoring combines grounding + contradiction signals into a 0-1 score
- Powered by a Rust backend via PyO3 for sub-millisecond lookups

### 16. Knowledge Store — RAG Retrieval (NEW!)

Inject relevant knowledge from your documents into each prompt before the LLM call:

```python
from ondine import PipelineBuilder
from ondine.knowledge import KnowledgeStore, SentenceTransformerEmbedder

# Build a knowledge store from your documents
kb = KnowledgeStore(embedder=SentenceTransformerEmbedder())
kb.ingest("docs/")

pipeline = (
    PipelineBuilder.create()
    .from_csv("questions.csv", input_columns=["question"], output_columns=["answer"])
    .with_prompt("Answer using the provided context: {question}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    # RAG: retrieve top-k chunks and inject as {_kb_context} (pre-LLM)
    .with_knowledge_base(kb, top_k=5)
    .build()
)

result = pipeline.execute()
# Each row's prompt is augmented with the most relevant knowledge chunks
```

**How it works:**
- Ingests documents (PDF, text, images via OCR) into chunked embeddings
- Hybrid search combines BM25 keyword matching with vector similarity
- Optional reranking stage scores candidates for higher precision
- Retrieved chunks are injected into the system prompt before each LLM call
- Rust-powered chunking and indexing for fast ingestion at scale

### 17. Evidence Priming — Cross-Run Consistency (NEW!)

Re-inject prior validated answers so the LLM stays consistent across pipeline runs:

```python
from ondine import PipelineBuilder

pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["category"])
    .with_prompt("Classify: {text}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    .with_context_store()  # Required: evidence graph stores validated answers
    # Inject top-k prior answers matching the current input (pre-LLM)
    .with_evidence_priming(
        query_columns=["text"],  # Columns to search against evidence graph
        top_k=3,                 # Number of prior answers to inject
        min_score=0.1            # Minimum similarity threshold
    )
    .build()
)

result = pipeline.execute()
# LLM sees relevant prior answers, improving consistency across runs
```

**How it works:**
- Queries the evidence graph for previously validated outputs similar to the current input
- Matched evidence is prepended to the LLM prompt as reference examples
- Helps the model produce consistent classifications and extractions over time
- Works with `with_context_store()` to build a growing knowledge loop

## CLI Usage

Ondine includes a powerful command-line interface for processing datasets without writing code.

### List Available Providers

```bash
# See all supported LLM providers
ondine list-providers
```

This shows:
- Provider IDs (openai, azure_openai, anthropic, groq, mlx, openai_compatible)
- Platform requirements
- Cost estimates
- Use cases
- Required environment variables

### Process Datasets

```bash
# Basic usage
ondine process --config config.yaml

# Override input/output
ondine process --config config.yaml --input data.csv --output results.csv

# Override provider and model
ondine process --config config.yaml --provider groq --model llama-3.3-70b-versatile

# Set budget limit
ondine process --config config.yaml --max-budget 10.0

# Dry run (estimate only, don't execute)
ondine process --config config.yaml --dry-run

# Estimate cost
ondine estimate --config config.yaml --input data.csv

# Inspect data
ondine inspect --input data.csv --head 10
```

### Example Config File

```yaml
# config.yaml
dataset:
  source_type: csv
  source_path: data.csv
  input_columns: [text]
  output_columns: [sentiment]

prompt:
  template: "Classify sentiment: {text}"

llm:
  provider: openai
  model: gpt-4o-mini
  temperature: 0.0

processing:
  batch_size: 100
  concurrency: 5
  max_budget: 10.0

output:
  destination_type: csv
  destination_path: output.csv
```

## Architecture

The SDK follows a **layered architecture**:

```
┌─────────────────────────────────────────┐
│  Layer 5: High-Level API                │
│  (Pipeline, PipelineBuilder)            │
├─────────────────────────────────────────┤
│  Layer 4: Orchestration Engine          │
│  (PipelineExecutor, StateManager)       │
├─────────────────────────────────────────┤
│  Layer 3: Quality & Knowledge (NEW)     │
│  Pre-LLM: KnowledgeRetrieval,          │
│           EvidencePriming               │
│  Post-LLM: Grounding, Contradiction,   │
│            ConfidenceScoring            │
├─────────────────────────────────────────┤
│  Layer 2: Processing Stages             │
│  (DataLoader, LLMInvocation, Parser)    │
├─────────────────────────────────────────┤
│  Layer 1: Infrastructure Adapters       │
│  (LLMClient, DataReader, Checkpoint,   │
│   ContextStore, KnowledgeStore)         │
├─────────────────────────────────────────┤
│  Layer 0: Core Utilities (Rust/PyO3)    │
│  (RetryHandler, RateLimiter, Logging,   │
│   SQLite/FTS5, EvidenceGraph)           │
└─────────────────────────────────────────┘
```

### Key Design Principles

- **Simple**: Straightforward solutions
- **DRY**: No code duplication
- **Type Safe**: Type hints throughout
- **Separation of Concerns**: Configuration vs. execution

## Supported LLM Providers

| Provider | Platform | Cost | Use Case | Setup |
|----------|----------|------|----------|-------|
| **OpenAI** | Cloud (All) | $$ | Production, high quality | `OPENAI_API_KEY` |
| **Azure OpenAI** | Cloud (All) | $$ | Enterprise, compliance, **Managed Identity support** | `AZURE_OPENAI_API_KEY` or Managed Identity |
| **Anthropic** | Cloud (All) | $$$ | Long context, Claude models | `ANTHROPIC_API_KEY` |
| **Groq** | Cloud (All) | Free tier | Fast inference, development | `GROQ_API_KEY` |
| **MLX** | macOS (M1/M2/M3/M4) | Free | Local, private, offline | `pip install ondine[mlx]` |
| **OpenAI-Compatible** | Custom/Local/Cloud | Varies | Ollama, vLLM, Together.AI | `base_url` + optional API key |

Run `ondine list-providers` to see detailed information about each provider.

## Use Cases

- **Data Cleaning**: Clean, normalize, standardize text data
- **Sentiment Analysis**: Classify sentiment at scale
- **Information Extraction**: Extract structured data from unstructured text
- **Categorization**: Auto-categorize products, documents, emails
- **Content Generation**: Generate descriptions, summaries, titles
- **Translation**: Translate content to multiple languages
- **Data Enrichment**: Enhance datasets with LLM-generated insights
- **Product Matching**: Compare and score product similarity
- **Content Moderation**: Flag inappropriate content at scale

## Performance

- **Throughput**: Process 1,000 rows in < 5 minutes (GPT-4o-mini, concurrency=5)
- **Reliability**: 99.9% completion rate with automatic retries
- **Cost Efficiency**: Pre-execution estimation within 10% accuracy
- **Memory**: < 500MB for datasets up to 50K rows

## Observability & Debugging

Ondine supports multiple observability backends via **LiteLLM callbacks** for automatic instrumentation of all LLM calls. Add observability with a single line:

```python
from ondine import PipelineBuilder

pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["result"])
    .with_prompt("Process: {text}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    # Add observability - automatically tracks ALL LLM calls!
    .with_observer("langfuse", config={
        "public_key": "pk-lf-...",
        "secret_key": "sk-lf-..."  # pragma: allowlist secret
    })
    .build()
)

result = pipeline.execute()
```

### Supported Observers

**Langfuse** - LLM-specific observability (recommended):
```python
.with_observer("langfuse", config={
    "public_key": "pk-lf-...",
    "secret_key": "sk-lf-...",  # pragma: allowlist secret
    "host": "https://cloud.langfuse.com"  # optional
})
```
Tracks: prompts, completions, tokens, costs, latency, model info

**OpenTelemetry** - Infrastructure monitoring:
```python
.with_observer("opentelemetry", config={})
```
Tracks: spans, traces, metrics - works with Jaeger, Datadog, Grafana

**Logging** - Simple console output:
```python
.with_observer("logging", config={})
```
Tracks: basic LLM call logs to console

**Multiple observers simultaneously**:
```python
pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", ...)
    .with_prompt("...")
    .with_llm(provider="openai", model="gpt-4o-mini")
    .with_observer("langfuse", config={...})
    .with_observer("opentelemetry", config={...})
    .with_observer("logging", config={})
    .build()
)
```

### What's Tracked Automatically

**Powered by LiteLLM callbacks:**
- Full prompt and completion text
- Token usage (input, output, cached tokens)
- Cost per call (automatic via litellm.completion_cost)
- Latency metrics
- Model and provider information
- Router failover events
- Cache hit/miss metrics

### Examples

See complete examples:
- `examples/15_observability_logging.py` - Simple console logging
- `examples/16_observability_opentelemetry.py` - OpenTelemetry + Jaeger
- `examples/17_observability_langfuse.py` - Langfuse integration
- `examples/18_observability_multi.py` - Multiple observers

### Setup Langfuse (Recommended for LLM Observability)

1. Sign up at https://cloud.langfuse.com (free tier available)
2. Get your API keys
3. Add to your pipeline:
```python
.with_observer("langfuse", config={
    "public_key": "pk-lf-...",
    "secret_key": "sk-lf-..."  # pragma: allowlist secret
})
```
4. View detailed traces in Langfuse dashboard

## Configuration Options

### Execution Modes

**Standard Execution** (default)
```python
pipeline = PipelineBuilder.create().from_csv(...).build()
result = pipeline.execute()
```
Use when: Dataset fits in memory (< 50K rows typical), straightforward processing.

**Async Execution** (concurrent processing)
```python
pipeline = (
    PipelineBuilder.create()
    .from_csv(...)
    .with_async_execution(max_concurrency=10)
    .build()
)
result = await pipeline.execute_async()
```
Use when: Need high throughput, LLM API supports async, running in async context (FastAPI, aiohttp).

**Streaming Execution** (memory-efficient)
```python
pipeline = (
    PipelineBuilder.create()
    .from_csv(...)
    .with_streaming(chunk_size=1000)
    .build()
)
for chunk_result in pipeline.execute_stream():
    # Process each chunk as it completes
    chunk_result.data.to_csv("output_chunk.csv", mode="a")
```
Use when: Large datasets (100K+ rows), limited memory, need constant memory footprint, early results desired.

**When NOT to use streaming:**
- Dataset under 50K rows (overhead not justified)
- Need entire dataset in memory for post-processing
- Pipeline has dependencies between rows

See `examples/08_streaming_large_files.py` for detailed streaming example.

### Processing Configuration

```python
.with_batch_size(100)          # Rows per batch
.with_concurrency(5)            # Parallel requests
.with_checkpoint_interval(500)  # Checkpoint frequency
.with_rate_limit(60)            # Requests per minute
.with_max_budget(10.0)          # Maximum USD budget
```

### LLM Configuration

```python
.with_llm(
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.0,        # 0.0-2.0
    max_tokens=1024,        # Max output tokens
    api_key="..."           # Or from env
)
```

### Output Configuration

```python
.with_output(
    path="output.csv",
    format="csv",              # csv, excel, parquet
    merge_strategy="replace"   # replace, append, update
)
```

## Testing

```bash
# Run tests
uv run pytest

# With coverage
uv run pytest --cov=src --cov-report=html

# Run specific test
uv run pytest tests/test_pipeline.py
```

## Documentation

**Full documentation**: https://ptimizeroracle.github.io/ondine

Additional resources:
- **README.md** (this file): Quick start and usage guide
- **examples/**: Example scripts demonstrating various features
- **Code docstrings**: Inline documentation for all public APIs

## Contributing

Contributions welcome! Please follow:

1. Fork the repository at https://github.com/ptimizeroracle/Ondine
2. Create a feature branch
3. Follow the existing code style (Black, Ruff)
4. Add tests for new features
5. Update documentation
6. Submit a pull request

## License

MIT License - see LICENSE file for details

## Acknowledgments

- Built with [LiteLLM](https://docs.litellm.ai/) for native multi-provider LLM integration (100+ providers)
- Uses [Instructor](https://python.useinstructor.com/) for type-safe structured output with Pydantic models
- LlamaIndex integration preserved for future RAG capabilities
- Ondine adds: batch processing, Router for load balancing, automatic cost tracking, checkpointing, YAML configuration, and dataset orchestration
- Thanks to the open-source community

## Support

- **Repository**: https://github.com/ptimizeroracle/Ondine
- **Issues**: Open an issue on GitHub
- **Discussions**: Use GitHub Discussions for questions
- **Email**: git@binblok.com

## Recent Updates

### Version 1.4.1 (November 27, 2025)

**Native Caching Upgrade:**
- 🚀 **Native LiteLLM Caching**: Switched to `litellm.cache` for robust, multi-backend caching (Redis, S3, Memory)
- 💾 **Disk Caching Support**: New `with_disk_cache()` for zero-setup local persistence (using `diskcache`)
- 🧹 **Code Cleanup**: Removed custom caching implementation for better maintainability and feature parity with LiteLLM

### Version 1.4.0 (November 27, 2025)

**Latest Release - Router Optimization & Robust Batching:**
- 🚀 **Latency-Based Routing**: Automatically routes traffic to the fastest provider (Groq, Cerebras, etc.) in real-time
- 🛡️ **Resilient Router Fallback**: Fixed critical bug where single-node failures (404/NotFound) stopped pipelines; now automatically retries on other healthy providers
- 📦 **Minified JSON Batching**: Optimized batch prompt payload (indent=None) to save tokens and context window
- 🔄 **Smart Auto-Retry**: Improved logic to only retry rows where *all* output columns failed, preserving valid partial data
- ⚡ **High-Concurrency defaults**: Optimized default settings for multi-provider pools (concurrency=10, batch_size=50)
- 📊 **Enhanced Progress Tracking**: Fixed UI glitches (freezing bars, 1% stuck) and improved per-provider cost attribution

### Version 1.3.4 (November 24, 2025)

**LiteLLM Native Integration:**
- 🚀 **Native LiteLLM Integration**: Replaced LlamaIndex wrappers with direct `litellm.acompletion` for 100+ provider support
- 📦 **Instructor for Structured Output**: Type-safe Pydantic models with auto-detection (JSON mode for Groq, function calling for OpenAI/Anthropic)
- 🔄 **Router for Load Balancing**: Built-in multi-provider failover and latency-based routing via LiteLLM Router
- 💾 **Redis Caching**: Native LiteLLM response caching to avoid duplicate API calls
- 📊 **Prefix Caching Detection**: Automatic logging of cached tokens (40-50% cost savings)
- ⚡ **Async-First Design**: Native async throughout with `litellm.acompletion`
- 🧹 **Code Reduction**: Removed 673 lines of wrapper code (-11%), cleaner architecture
- ✅ **100% Test Coverage**: 461/461 unit tests, 103/103 integration tests passing

**Breaking Changes:**
- None! Fully backward compatible with existing code

### Version 1.2.1 (November 12, 2025)

**Previous Release:**
- Progress tracking enhancements
- Bug fixes and stability improvements
- Enhanced error handling

### Version 1.2.0 (November 9, 2025)

**New Features:**
- Enhanced API documentation with examples
- Fixed broken documentation references
- Improved code organization

### Version 1.1.0 (November 9, 2025)

**New Features:**
- Additional provider improvements
- Enhanced testing coverage
- Documentation updates

### Version 1.0.x (October 2025)

**Initial Release Features:**
- **Provider Presets**: Pre-configured LLMSpec objects for common providers (80% code reduction)
- **Simplified Configuration**: New `with_llm_spec()` method accepting LLMSpec objects
- **MLX Integration**: Local inference on Apple Silicon (M1/M2/M3/M4) - 100% free, private, offline
- **OpenAI-Compatible Provider**: Support for Ollama, vLLM, Together.AI, and custom APIs
- **Multi-Column Processing**: Generate multiple output columns with JSON parsing
- **Pipeline Composition**: Chain pipelines with dependencies between columns
- **CLI Provider Discovery**: `ondine list-providers` command to explore all providers
- **Auto-Retry for Multi-Column**: Automatic retry now checks all output columns for failures
- **Custom LLM Clients**: Extend `LLMClient` base class for exotic APIs

**Improvements:**
- Zero configuration errors with validated presets
- Enhanced error handling for multi-column outputs
- Better streaming implementation
- Improved documentation with provider comparison guide
- More examples (14+ example files including provider presets demo)

## Roadmap

### Recently Completed (v1.4.0 - November 24, 2025)

**LiteLLM Native Integration** - AGGRESSIVE REFACTOR
- ✅ **Native LiteLLM**: Direct `litellm.acompletion()` integration (removed 673 lines of wrappers)
- ✅ **Instructor**: Type-safe structured output with Pydantic (auto-retry on validation errors)
- ✅ **Router**: Load balancing + failover across providers
- ✅ **Redis Caching**: Response deduplication via `litellm.cache`
- ✅ **100+ Providers**: Expanded from 5 to 100+ supported providers
- ✅ **Async-First**: Native async throughout (true non-blocking I/O)
- ✅ **Prefix Caching**: Detection and logging (40-90% cost savings)
- ✅ **100% Tests**: 461 unit + 103 integration tests passing

### Completed (v1.3.0 - November 2025)

**Performance & Cost Optimizations**
- ✅ Multi-row batching (100× speedup)
- ✅ Prefix caching support (40-50% cost reduction)
- ✅ Flatten-then-concurrent pattern for true parallelism
- ✅ Cache hit detection and monitoring
- ✅ Shared context caching across pipeline stages
- ✅ Optimized prompt formatting (10× faster with itertuples)

### Upcoming Features

**Performance & Cost Optimizations**
- Smart model selection and cost comparison
- Automatic prompt optimization
- Dynamic batch size optimization based on context window

**New Capabilities**
- Enhanced streaming execution (async streaming)
- Multi-modal support (images, PDFs)
- **RAG integration** using LlamaIndex (vector stores, embeddings, retrieval)
- Distributed processing (Spark/Dask integration)

**Developer Experience**
- Web UI for pipeline management
- Enhanced Router strategies (cost-based routing)
- Redis caching analytics dashboard

## Star History

<a href="https://star-history.com/#ptimizeroracle/ondine&Date">
 <picture>
   <source media="(prefers-color-scheme: dark)" srcset="https://api.star-history.com/svg?repos=ptimizeroracle/ondine&type=Date&theme=dark" />
   <source media="(prefers-color-scheme: light)" srcset="https://api.star-history.com/svg?repos=ptimizeroracle/ondine&type=Date" />
   <img alt="Star History Chart" src="https://api.star-history.com/svg?repos=ptimizeroracle/ondine&type=Date" />
 </picture>
</a>

---

Built with [LiteLLM](https://docs.litellm.ai/) and [Instructor](https://python.useinstructor.com/)

