Metadata-Version: 2.4
Name: planasonix-airflow
Version: 1.0.3
Summary: Apache Airflow provider for Planasonix data integration platform
Home-page: https://github.com/planasonix/planasonix-airflow
Author: Planasonix
Author-email: Planasonix <support@planasonix.com>
License-Expression: Apache-2.0
Project-URL: Homepage, https://planasonix.com
Project-URL: Documentation, https://docs.planasonix.com/integrations/airflow
Project-URL: Repository, https://github.com/planasonix/planasonix-airflow
Project-URL: Issues, https://github.com/planasonix/planasonix-airflow/issues
Keywords: airflow,planasonix,etl,data-integration,orchestration
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Framework :: Apache Airflow
Classifier: Framework :: Apache Airflow :: Provider
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: apache-airflow>=2.5.0
Requires-Dist: requests>=2.28.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.20.0; extra == "dev"
Requires-Dist: responses>=0.22.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: isort>=5.12.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"
Dynamic: author
Dynamic: home-page
Dynamic: license-file
Dynamic: requires-python

# Planasonix Airflow Provider

Apache Airflow provider package for [Planasonix](https://planasonix.com) data integration platform.

Orchestrate your data syncs directly from Airflow DAGs with full visibility into job status, progress, and results.

## Installation

```bash
pip install planasonix-airflow
```

Or with Poetry:

```bash
poetry add planasonix-airflow
```

## Requirements

- Python 3.8+
- Apache Airflow 2.5+
- Planasonix Professional or Enterprise subscription

## Quick Start

### 1. Create an API Key

1. Log into Planasonix at https://app.planasonix.com
2. Go to Settings → API Keys
3. Create a new key with `sync:trigger` and `sync:read` permissions
4. Copy the key (starts with `flx_`)

### 2. Configure Airflow Connection

**Option A: Via Airflow UI**

1. Go to Admin → Connections
2. Add a new connection:
   - Connection Id: `planalytix_default`
   - Connection Type: `Planasonix`
   - Host: `https://api.planasonix.com`
   - Password: Your API key (`flx_xxxx...`)

**Option B: Via Environment Variable**

```bash
export AIRFLOW_CONN_PLANALYTIX_DEFAULT='planalytix://unused:flx_your_api_key@api.planasonix.com'
```

### 3. Create Your DAG

```python
from datetime import datetime
from airflow import DAG
from planalytix_provider.operators.sync import PlanalytixSyncOperator

with DAG(
    dag_id="sync_salesforce_daily",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    catchup=False,
) as dag:
    
    sync_salesforce = PlanalytixSyncOperator(
        task_id="sync_salesforce",
        connection_id="conn_abc123",  # Your Planasonix connection ID
        sync_type="incremental",
        wait_for_completion=True,
        poll_interval=30,
        timeout=3600,
    )
```

## Components

### PlanalytixHook

Low-level hook for direct API access:

```python
from planalytix_provider.hooks.planalytix import PlanalytixHook

hook = PlanalytixHook()

# Trigger a sync
job = hook.trigger_sync(
    connection_id="conn_abc123",
    sync_type="incremental",
)

# Check status
status = hook.get_job(job["job_id"])

# Get results
results = hook.get_job_results(job["job_id"])
```

### PlanalytixSyncOperator

Trigger and optionally wait for sync completion:

```python
from planalytix_provider.operators.sync import PlanalytixSyncOperator

# Simple sync with waiting
sync_task = PlanalytixSyncOperator(
    task_id="sync_data",
    connection_id="conn_abc123",
    sync_type="incremental",  # or "full"
    wait_for_completion=True,
    poll_interval=30,
    timeout=3600,
)

# Sync specific streams only
sync_partial = PlanalytixSyncOperator(
    task_id="sync_orders",
    connection_id="conn_abc123",
    streams=["orders", "order_items"],
    sync_type="incremental",
)

# High priority sync (Enterprise only)
sync_priority = PlanalytixSyncOperator(
    task_id="sync_urgent",
    connection_id="conn_abc123",
    priority="high",
    wait_for_completion=True,
)

# Fire and forget (no waiting)
trigger_only = PlanalytixSyncOperator(
    task_id="trigger_sync",
    connection_id="conn_abc123",
    wait_for_completion=False,
)
```

### PlanalytixSyncSensor

Wait for a job triggered elsewhere:

```python
from planalytix_provider.sensors.sync import PlanalytixSyncSensor

# Wait for job from upstream task
wait_for_job = PlanalytixSyncSensor(
    task_id="wait_for_sync",
    job_id="{{ ti.xcom_pull(task_ids='trigger_sync', key='job_id') }}",
    poke_interval=30,
    timeout=3600,
    mode="reschedule",  # Free worker while waiting
)
```

## XCom Values

The operator pushes these values to XCom:

| Key | Description |
|-----|-------------|
| `job_id` | The Planasonix job ID |
| `connection_id` | The connection that was synced |
| `job_status` | Final status (completed, failed, cancelled) |
| `job_results` | Full results object (if completed) |

Access in downstream tasks:

```python
def process_results(**context):
    job_id = context["ti"].xcom_pull(task_ids="sync_task", key="job_id")
    results = context["ti"].xcom_pull(task_ids="sync_task", key="job_results")
    
    if results:
        rows_synced = results.get("summary", {}).get("total_rows_synced", 0)
        print(f"Synced {rows_synced} rows")
```

## Webhook Integration

For real-time notifications, configure webhooks in Planasonix:

1. Go to Settings → Webhooks
2. Add your endpoint URL
3. Select events: `job.completed`, `job.failed`

Your webhook will receive:

```json
{
  "id": "evt_abc123",
  "type": "job.completed",
  "timestamp": "2024-01-15T10:30:00Z",
  "job_id": "job_xyz789",
  "data": {
    "status": "completed",
    "summary": {
      "total_rows_synced": 48291,
      "duration_seconds": 245
    }
  }
}
```

## Idempotency

The operator automatically generates idempotency keys based on:
- DAG ID
- Task ID  
- Run ID

This prevents duplicate syncs if a task is retried:

```python
# Idempotency is automatic, but you can override:
sync_task = PlanalytixSyncOperator(
    task_id="sync_data",
    connection_id="conn_abc123",
    idempotency_key="my-custom-key-{{ ds }}",
)
```

## Error Handling

```python
# Fail task on sync failure (default)
sync_task = PlanalytixSyncOperator(
    task_id="sync_data",
    connection_id="conn_abc123",
    fail_on_error=True,  # Default
)

# Continue on failure (for non-critical syncs)
sync_optional = PlanalytixSyncOperator(
    task_id="sync_optional_data",
    connection_id="conn_xyz",
    fail_on_error=False,
)
```

## Tier Features

| Feature | Professional | Enterprise |
|---------|-------------|------------|
| Trigger syncs | ✅ | ✅ |
| Wait for completion | ✅ | ✅ |
| Webhook notifications | ✅ | ✅ |
| Priority queuing | ❌ | ✅ |
| AI event visibility | ❌ | ✅ |
| Deferrable operators | ❌ | ✅ (coming soon) |

## Troubleshooting

### "No API key found"

Ensure your Airflow connection has the API key in the password field or in extras as `api_key`.

### "Connection not found"

Verify the `connection_id` matches a connection in your Planasonix account. You can find connection IDs in the Planasonix UI under Connections.

### "Orchestration API requires Professional tier"

The orchestration API is available on Professional and Enterprise tiers. Upgrade at https://planasonix.com/pricing

### Sync timeout

Increase the `timeout` parameter or use `wait_for_completion=False` with a separate sensor.

## Support

- Documentation: https://docs.planasonix.com/integrations/airflow
- Issues: https://github.com/planasonix/planasonix-airflow/issues
- Email: support@planasonix.com

## License

Apache License 2.0
