Metadata-Version: 2.4
Name: philiprehberger-data-pipeline
Version: 0.2.5
Summary: Composable data transformation pipeline with lazy evaluation
Project-URL: Homepage, https://github.com/philiprehberger/py-data-pipeline#readme
Project-URL: Repository, https://github.com/philiprehberger/py-data-pipeline
Project-URL: Issues, https://github.com/philiprehberger/py-data-pipeline/issues
Project-URL: Changelog, https://github.com/philiprehberger/py-data-pipeline/blob/main/CHANGELOG.md
Author: Philip Rehberger
License-Expression: MIT
License-File: LICENSE
Keywords: data-processing,etl,pipeline,transform,workflow
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Typing :: Typed
Requires-Python: >=3.10
Description-Content-Type: text/markdown

# philiprehberger-data-pipeline

[![Tests](https://github.com/philiprehberger/py-data-pipeline/actions/workflows/publish.yml/badge.svg)](https://github.com/philiprehberger/py-data-pipeline/actions/workflows/publish.yml)
[![PyPI version](https://img.shields.io/pypi/v/philiprehberger-data-pipeline.svg)](https://pypi.org/project/philiprehberger-data-pipeline/)
[![License](https://img.shields.io/github/license/philiprehberger/py-data-pipeline)](LICENSE)

Composable data transformation pipeline with lazy evaluation.

## Installation

```bash
pip install philiprehberger-data-pipeline
```

## Usage

```python
from philiprehberger_data_pipeline import Pipeline

data = [
    {"name": " Alice ", "email": "alice@example.com", "status": "active", "age": 30},
    {"name": "Bob", "email": "bob@example.com", "status": "inactive", "age": 25},
    {"name": "Alice", "email": "alice@example.com", "status": "active", "age": 30},
]

result = (
    Pipeline(data)
    .filter(lambda r: r["status"] == "active")
    .map(lambda r: {**r, "name": r["name"].strip()})
    .unique_by("email")
    .sort_by("name")
    .collect()
)
```

### Reusable Pipelines

```python
clean_users = (
    Pipeline.define()
    .filter(lambda r: r.get("email"))
    .map(lambda r: {**r, "email": r["email"].lower()})
    .unique_by("email")
)

active = clean_users.run(active_users)
archived = clean_users.run(archived_users)
```

### Aggregations

```python
p = Pipeline(sales_data)
total = p.sum("amount")
average = p.avg("amount")
grouped = p.group_by("category")
```

### Export

```python
Pipeline(data).filter(...).to_csv("output.csv")
Pipeline(data).filter(...).to_json("output.json")
```

## API

| Function / Class | Description |
|------------------|-------------|
| `Pipeline(data)` | Composable, lazy data transformation pipeline |
| `.filter(fn)` | Keep items where fn returns True |
| `.map(fn)` | Transform each item |
| `.flat_map(fn)` | Transform and flatten |
| `.sort_by(key)` | Sort by key (string or callable) |
| `.unique_by(key)` | Remove duplicates by key |
| `.take(n)` | Take first n items |
| `.skip(n)` | Skip first n items |
| `.chunk(size)` | Split into chunks |
| `.flatten()` | Flatten one level of nesting |
| `.collect()` | Execute and return list |
| `.first()` | Return first item |
| `.count()` | Count items |
| `.sum(key)` | Sum values |
| `.avg(key)` | Average values |
| `.min(key)` | Find minimum value |
| `.max(key)` | Find maximum value |
| `.reduce(fn, initial)` | Reduce to single value |
| `.group_by(key)` | Group into dict |
| `.to_csv(path)` | Export as CSV |
| `.to_json(path)` | Export as JSON |

## Development

```bash
pip install -e .
python -m pytest tests/ -v
```

## License

MIT
