Metadata-Version: 2.4
Name: antaris-pipeline
Version: 4.9.15
Summary: Unified orchestration pipeline for Antaris Analytics Suite
Author-email: Antaris Analytics <dev@antarisanalytics.com>
License: Apache-2.0
Project-URL: Homepage, https://antarisanalytics.ai
Project-URL: Documentation, https://docs.antarisanalytics.ai
Project-URL: Repository, https://github.com/antaris-analytics/antaris-pipeline
Project-URL: Issues, https://github.com/antaris-analytics/antaris-pipeline/issues
Keywords: ai,agents,pipeline,orchestration,telemetrics
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: antaris-memory>=2.0.0
Requires-Dist: antaris-router>=3.0.0
Requires-Dist: antaris-guard>=2.0.0
Requires-Dist: antaris-context>=2.0.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: click>=8.0.0
Requires-Dist: rich>=13.0.0
Requires-Dist: asyncio-dgram>=2.1.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: isort>=5.12.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"
Requires-Dist: pre-commit>=3.0.0; extra == "dev"
Provides-Extra: telemetrics
Requires-Dist: clickhouse-driver>=0.2.6; extra == "telemetrics"
Requires-Dist: uvicorn>=0.20.0; extra == "telemetrics"
Requires-Dist: fastapi>=0.100.0; extra == "telemetrics"
Requires-Dist: websockets>=11.0.0; extra == "telemetrics"
Dynamic: license-file

# antaris-pipeline

**Agent pipeline orchestration for the Antaris Analytics Suite**

Zero dependencies. Coordinates memory, routing, safety, and context across agent lifecycles.

```bash
pip install antaris-pipeline
```

## Architecture

antaris-pipeline orchestrates five Antaris packages:
- **antaris-memory**: Persistent conversation memory with temporal decay
- **antaris-guard**: Input/output safety scanning and policy enforcement  
- **antaris-context**: Dynamic context window management and compression
- **antaris-router**: Model selection based on task classification
- **antaris-contracts**: Schema validation and structured outputs

Two coordination patterns:
1. **AgentPipeline**: Simple pre_turn/post_turn workflow for direct integration
2. **AntarisPipeline**: Full pipeline orchestration with cross-package optimization

## Quick Start

### AgentPipeline (Recommended)

```python
from antaris_pipeline import AgentPipeline

pipeline = AgentPipeline(memory=True, guard=True, context=True)

# Before LLM call
pre_result = pipeline.pre_turn("What is quantum computing?")
if pre_result.blocked:
    return pre_result.block_reason

# Add context if available
prompt = pre_result.context + user_input if pre_result.context else user_input
response = your_llm_call(prompt)

# After LLM call
post_result = pipeline.post_turn(user_input, response)
print(f"Stored {post_result.stored_memories} memories")
```

### Full Pipeline Orchestration

```python
from antaris_pipeline import create_pipeline, PipelineConfig

config = PipelineConfig(
    memory_enabled=True,
    guard_enabled=True,
    context_enabled=True,
    router_enabled=True
)

pipeline = create_pipeline(
    storage_path="./pipeline_storage",
    pipeline_config=config
)

# Process with full orchestration
result = await pipeline.process("Explain neural networks", your_model_function)
print(f"Response: {result.output}")
print(f"Tokens: {result.metrics.total_tokens}")
```

## Core Features

### Session Lifecycle Management

```python
pipeline = AgentPipeline(memory=True)

# Start new session
pipeline.session_start("user_123", session_metadata={"context": "support"})

# Process turns
for user_input in conversation:
    pre_result = pipeline.pre_turn(user_input)
    # ... LLM processing ...
    post_result = pipeline.post_turn(user_input, llm_response)

# End session (automatic memory consolidation)
pipeline.session_end()
```

### Pre-turn Processing

AgentPipeline.pre_turn() handles:
1. **Guard input check**: Scans for policy violations, PII, harmful content
2. **Memory recall**: Retrieves relevant conversation history  
3. **Context building**: Assembles enriched prompt with recalled memories

```python
pre_result = pipeline.pre_turn("Remember what I told you about my project?")

# Check results
print(f"Blocked: {pre_result.blocked}")
print(f"Context length: {len(pre_result.context or '')}")
print(f"Memories recalled: {pre_result.memory_count}")
print(f"Warnings: {pre_result.warnings}")
```

### Post-turn Processing  

AgentPipeline.post_turn() handles:
1. **Guard output check**: Validates LLM response safety
2. **Memory ingestion**: Stores conversation turn for future recall

```python
post_result = pipeline.post_turn(user_input, llm_response)

# Check results
print(f"Output blocked: {post_result.blocked_output}")
print(f"Safe replacement: {post_result.safe_replacement}")
print(f"Memories stored: {post_result.stored_memories}")
```

### Compaction Recovery

When OpenClaw compacts agent context, the pipeline preserves essential state:

```python
# Before compaction: pipeline stores handoff notes
handoff_data = pipeline.prepare_compaction()

# After compaction: pipeline restores context from notes
pipeline.recover_from_compaction(handoff_data)
```

The bridge handles this automatically in OpenClaw environments.

### Pipeline Telemetry *(v4.2.0)*

`PipelineTelemetry` is a structured dataclass attached to every pipeline run, providing per-stage timing breakdowns to help identify bottlenecks.

```python
from antaris_pipeline import PipelineTelemetry

# After pipeline run:
telemetry = agent.last_telemetry
print(telemetry.summary())
# "Pipeline: 145ms total | recall=12ms guard=5ms llm=120ms | 342 tokens"

stage, ms = telemetry.slowest_stage()
print(f"Bottleneck: {stage} at {ms:.1f}ms")
```

`PipelineTelemetry` fields:
- `stages` — `dict[str, float]` mapping stage name → elapsed milliseconds
- `total_ms` — total wall-clock time for the full pipeline run
- `token_count` — tokens processed (input + output combined)
- `summary()` — returns a one-line human-readable performance report
- `slowest_stage()` → `tuple[str, float]` — name and duration of the slowest stage

Access the telemetry object from the result of any `pre_turn` / `post_turn` call or via `agent.last_telemetry` after a full pipeline run.

### Telemetry and Observability

```python
from antaris_pipeline import TelemetricsCollector

collector = TelemetricsCollector(
    session_id="prod_session_001",
    output_dir="./telemetry_logs"
)

pipeline = AgentPipeline(
    memory=True, 
    guard=True,
    telemetrics_collector=collector
)

# All pipeline operations automatically emit telemetry events
# Logs stored as JSONL: ./telemetry_logs/telemetrics_prod_session_001.jsonl
```

Telemetry captures:
- Performance metrics (latency, tokens, costs)
- Security events (blocks, policy violations)
- Memory operations (storage, retrieval, compaction)
- Error conditions and warnings

### Configuration Profiles

```python
from antaris_pipeline import PipelineConfig, ProfileType

# Balanced: moderate safety, performance, and cost
config = PipelineConfig.from_profile(ProfileType.BALANCED)

# Strict safety: maximum security scanning
config = PipelineConfig.from_profile(ProfileType.STRICT_SAFETY)  

# Cost optimized: minimal processing, fast models
config = PipelineConfig.from_profile(ProfileType.COST_OPTIMIZED)

# Performance: minimal latency, premium models
config = PipelineConfig.from_profile(ProfileType.PERFORMANCE)

# Debug: full logging, telemetry, dry-run mode
config = PipelineConfig.from_profile(ProfileType.DEBUG)
```

### Dry-run Mode

Test pipeline behavior without API costs:

```python
config = PipelineConfig(dry_run=True)
pipeline = create_pipeline(pipeline_config=config)

# Simulates processing without actual LLM calls
simulation = pipeline.dry_run("Test input")
print(f"Estimated latency: {simulation['total_estimated_time_ms']}ms")
print(f"Would recall: {simulation['memory']['would_retrieve']} memories")
```

### Component Access

Access individual components for fine-grained control:

```python
pipeline = AgentPipeline(memory=True, guard=True)

# Direct memory access
memories = pipeline.memory_system.search("quantum computing", limit=5)

# Direct guard access  
scan_result = pipeline.guard_system.scan_input("Test message")

# Component status
status = pipeline.get_component_status()
print(f"Memory: {status['memory']['status']}")
print(f"Guard: {status['guard']['policy_count']} policies loaded")
```

## OpenClaw Bridge Integration

For OpenClaw agents, antaris-pipeline provides a bridge protocol that handles stdin/stdout NDJSON communication.

Bridge commands:
- `config-check`: Validate pipeline configuration
- `session-start`: Initialize new session with metadata
- `pre-turn`: Process input before LLM call
- `post-turn`: Process output after LLM response  
- `compaction-recovery`: Restore state after context compaction
- `memory-search`: Direct memory query operations

The bridge runs as a persistent subprocess, maintaining pipeline state across turns.

Example bridge usage:
```bash
echo '{"cmd": "pre-turn", "text": "Hello", "memory_path": "./mem"}' | python3 pipeline_bridge.py
# {"success": true, "context": "Relevant context...", "blocked": false}
```

## Configuration

### PipelineConfig Options

```python
from antaris_pipeline import PipelineConfig, MemoryConfig, GuardConfig

config = PipelineConfig(
    # Component toggles
    memory_enabled=True,
    guard_enabled=True, 
    context_enabled=True,
    router_enabled=False,
    
    # Memory settings
    memory_config=MemoryConfig(
        max_memory_mb=1024,
        decay_half_life_hours=168.0,  # 1 week
        min_relevance=0.3
    ),
    
    # Guard settings  
    guard_config=GuardConfig(
        input_scanning=True,
        output_scanning=True,
        policy_strictness=0.7,
        max_scan_time_ms=1000
    ),
    
    # Performance settings
    dry_run=False,
    enable_telemetrics=True,
    max_concurrent_operations=10
)
```

### YAML Configuration

```yaml
# pipeline_config.yaml
profile: balanced

memory:
  enabled: true
  storage_path: "./memory_store"
  max_memory_mb: 2048
  decay_half_life_hours: 168.0
  min_relevance: 0.3

guard:
  enabled: true
  input_scanning: true
  output_scanning: true
  policy_strictness: 0.7

context:
  enabled: true
  max_tokens: 8000
  compression_enabled: true

telemetrics:
  enabled: true
  session_id: "production_v1"
  output_dir: "./logs"
```

```python
from antaris_pipeline import PipelineConfig

config = PipelineConfig.from_yaml("pipeline_config.yaml")
pipeline = create_pipeline(pipeline_config=config)
```

## Command Line Interface

```bash
# Validate configuration
antaris-pipeline config --profile balanced --output config.yaml

# Process single input (dry-run)
antaris-pipeline process "Hello world" --dry-run

# Analyze telemetry logs
antaris-pipeline telemetrics summary ./logs/telemetrics_session123.jsonl

# Start telemetry dashboard  
antaris-pipeline serve --port 8080
```

## Performance Characteristics

Memory operation latencies (single-threaded, M2 MacBook Pro):
- Store single memory: 1-5ms
- Search 1000 memories: 15-45ms  
- Compaction (10k memories): 200-500ms

Guard scan latencies:
- Input scan (typical): 10-50ms
- Output scan (typical): 15-75ms
- Policy compilation: 100-300ms (cached)

Context processing:
- Token counting: 1-5ms
- Compression (8k → 4k): 50-200ms

Pipeline coordination overhead: 5-15ms per turn.

## Error Handling

Pipeline operations return structured results with success indicators:

```python
pre_result = pipeline.pre_turn("Test input")

if not pre_result.success:
    print("Pre-turn failed:")
    for warning in pre_result.warnings:
        print(f"  Warning: {warning}")
    for issue in pre_result.guard_issues:
        print(f"  Guard: {issue}")

# Graceful degradation
if pre_result.memory_count == 0:
    print("Memory retrieval failed, proceeding without context")
```

Component failures are isolated. If memory fails, guard and context continue. If guard fails, processing continues with warnings logged.

## Dependencies

Zero external dependencies. Uses only Python standard library:
- `json` for serialization
- `pathlib` for file operations  
- `logging` for error reporting
- `dataclasses` for structured types
- `typing` for type annotations

The packages that antaris-pipeline coordinates (memory, guard, context, router) have their own dependency requirements.

## Testing

```bash
# Install with dev dependencies
pip install "antaris-pipeline[dev]"

# Run tests
pytest tests/

# With coverage
pytest --cov=antaris_pipeline tests/

# Test specific components
pytest tests/test_agent_pipeline.py -v
```

## Thread Safety

AgentPipeline is thread-safe for read operations (memory search, guard scans). Write operations (memory storage, session state) use file locking to prevent corruption.

For high-concurrency applications, use separate pipeline instances per worker thread.

## License

Apache 2.0

## Related Packages

- [antaris-memory](https://pypi.org/project/antaris-memory/) - Persistent conversation memory
- [antaris-guard](https://pypi.org/project/antaris-guard/) - Input/output safety scanning  
- [antaris-context](https://pypi.org/project/antaris-context/) - Context window management
- [antaris-router](https://pypi.org/project/antaris-router/) - Adaptive model routing
- [antaris-contracts](https://pypi.org/project/antaris-contracts/) - Schema validation
