Skip to content

Examples

Real-world examples of FlowTask workflows and configurations.

Basic Examples

Data Processing Pipeline

name: Sales Data Processing
description: Process daily sales CSV and generate Excel report
steps:
  - OpenWithPandas:
      mime: "text/csv"
      filename: daily_sales.csv
      directory: "/data/input"
      trim: true

  - TransformRows:
      operation: "calculate_totals"
      columns:
        - amount
        - tax
        - total

  - PandasToFile:
      filename: "/data/output/sales_report_{today}.xlsx"
      mime: "application/vnd.ms-excel"
      masks:
        today:
          - today
          - mask: "%Y%m%d"

Web Scraping to Database

name: Company Information Scraper
description: Scrape company data and save to database
steps:
  - CompanyScraper:
      use_proxies: true
      paid_proxy: false
      sources:
        - "https://example.com/companies"
      selectors:
        name: ".company-name"
        industry: ".industry"
        revenue: ".revenue"

  - DatabaseOutput:
      table: "companies"
      schema: "public"
      credentials:
        dsn: "postgresql://user:pass@localhost/db"
      upsert: true
      conflict_columns: ["name"]

Advanced Examples

Multi-Source Data Integration

name: Customer 360 Integration
description: Combine customer data from multiple sources
environment: production
timeout: 7200

steps:
  # Load from CRM
  - DatabaseConnector:
      query: "SELECT * FROM customers WHERE updated_at >= NOW() - INTERVAL '1 day'"
      credentials:
        dsn: "${CRM_DATABASE_URL}"
      as_dataframe: true

  # Load from Support System
  - HTTPConnector:
      url: "${SUPPORT_API_URL}/customers"
      headers:
        Authorization: "Bearer ${SUPPORT_API_TOKEN}"
        Content-Type: "application/json"
      method: "GET"
      as_dataframe: true

  # Merge datasets
  - JoinData:
      left_data: "{{ steps[0].output }}"
      right_data: "{{ steps[1].output }}"
      left_on: "customer_id"
      right_on: "id"
      how: "left"

  # Generate customer profiles
  - TransformRows:
      operation: "create_profile"
      profile_fields:
        - demographics
        - purchase_history
        - support_tickets

  # Save to data warehouse
  - PgVectorOutput:
      table: "customer_profiles"
      schema: "analytics"
      credentials:
        dsn: "${WAREHOUSE_DATABASE_URL}"
      embedding_model:
        model: "sentence-transformers/all-MiniLM-L6-v2"
        model_type: "transformers"
      create_table:
        create: true
        use_uuid: true

Automated Report Generation

name: Monthly Financial Report
description: Generate PDF financial reports with charts
steps:
  # Extract financial data
  - DatabaseConnector:
      query: |
        SELECT
          DATE_TRUNC('month', transaction_date) as month,
          SUM(amount) as total_amount,
          COUNT(*) as transaction_count,
          account_type
        FROM transactions
        WHERE transaction_date >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '12 months')
        GROUP BY month, account_type
        ORDER BY month, account_type
      credentials:
        dsn: "${FINANCE_DB_URL}"

  # Create visualizations
  - ChartGenerator:
      chart_type: "line"
      x_column: "month"
      y_column: "total_amount"
      group_by: "account_type"
      title: "Monthly Revenue by Account Type"
      output_file: "/tmp/revenue_chart.png"

  # Generate PDF report
  - PDFGenerator:
      template: "financial_report.html"
      output_file: "financial_report_{month}_{year}"
      directory: "/reports/monthly"
      config:
        css:
          - "reports.css"
          - "charts.css"
      masks:
        month:
          - today
          - mask: "%m"
        year:
          - today
          - mask: "%Y"

  # Email to stakeholders
  - EmailSender:
      to:
        - "cfo@company.com"
        - "finance-team@company.com"
      subject: "Monthly Financial Report - {month}/{year}"
      template: "financial_report_email.html"
      attachments:
        - "{{ steps[2].output }}"
      credentials:
        smtp_host: "${SMTP_HOST}"
        smtp_port: 587
        username: "${SMTP_USERNAME}"
        password: "${SMTP_PASSWORD}"

Event-Driven Workflow

name: Order Processing Workflow
description: Process new orders automatically via webhook
trigger:
  type: webhook
  path: "/hooks/new-order"

steps:
  # Validate order data
  - DataValidator:
      schema:
        customer_id:
          type: integer
          required: true
        items:
          type: array
          required: true
        total_amount:
          type: number
          required: true
          minimum: 0

  # Check inventory
  - DatabaseConnector:
      query: |
        SELECT product_id, available_quantity
        FROM inventory
        WHERE product_id IN ({{ order.items | map('product_id') | join(',') }})
      credentials:
        dsn: "${INVENTORY_DB_URL}"

  # Update inventory
  - DatabaseOutput:
      query: |
        UPDATE inventory
        SET available_quantity = available_quantity - {{ item.quantity }}
        WHERE product_id = {{ item.product_id }}
      table: "inventory"
      credentials:
        dsn: "${INVENTORY_DB_URL}"

  # Generate invoice
  - PDFGenerator:
      template: "invoice.html"
      output_file: "invoice_{order_id}"
      directory: "/invoices/{year}/{month}"

  # Send confirmation email
  - EmailSender:
      to: "{{ order.customer_email }}"
      subject: "Order Confirmation - #{{ order.id }}"
      template: "order_confirmation.html"
      attachments:
        - "{{ steps[3].output }}"

  # Log to audit trail
  - DatabaseOutput:
      table: "order_audit"
      data:
        order_id: "{{ order.id }}"
        action: "processed"
        timestamp: "{{ now() }}"
        workflow_id: "{{ workflow.id }}"

Task Organization Examples

Multi-Environment Setup

tasks/
├── programs/
│   ├── sales/
│   │   ├── tasks/
│   │   │   ├── daily_report.yaml
│   │   │   ├── weekly_summary.yaml
│   │   │   └── monthly_analysis.yaml
│   ├── marketing/
│   │   ├── tasks/
│   │   │   ├── lead_scoring.yaml
│   │   │   └── campaign_analysis.yaml
│   └── finance/
│       ├── tasks/
│       │   ├── reconciliation.yaml
│       │   └── reporting.yaml

Environment-Specific Configuration

# Run with development environment
ENV=dev task --program=sales --task=daily_report

# Run with production environment
ENV=production task --program=sales --task=daily_report

Environment files:

# .env.dev
DATABASE_URL=postgresql://localhost/sales_dev
API_BASE_URL=https://api-dev.company.com
LOG_LEVEL=DEBUG

# .env.production
DATABASE_URL=postgresql://prod-db:5432/sales_prod
API_BASE_URL=https://api.company.com
LOG_LEVEL=INFO

Programmatic Examples

Python Integration

import asyncio
from flowtask import Task
from flowtask.scheduler import TaskScheduler

async def main():
    # Single task execution
    task = Task(program='sales', task='daily_report')
    result = await task.run()
    print(f"Task completed: {result}")

    # Scheduled execution
    scheduler = TaskScheduler()

    # Schedule daily at 9 AM
    await scheduler.schedule_task(
        program='sales',
        task='daily_report',
        cron='0 9 * * *'
    )

    # Run scheduler
    await scheduler.start()

if __name__ == '__main__':
    asyncio.run(main())

Custom Component Development

from flowtask.components.flow import FlowComponent
from flowtask.interfaces import HTTPService, PandasDataframe

class CustomAPIComponent(FlowComponent, HTTPService, PandasDataframe):
    """
    Custom API integration component.

    Example:
        ```yaml
        CustomAPIComponent:
            api_endpoint: "https://api.example.com/data"
            api_key: "${API_KEY}"
            as_dataframe: true
            timeout: 60
        ```
    """

    def __init__(self, **kwargs):
        self.api_endpoint = kwargs.get('api_endpoint')
        self.api_key = kwargs.get('api_key')
        super().__init__(**kwargs)

    async def start(self):
        if not self.api_endpoint:
            raise ValueError("api_endpoint is required")
        return True

    async def run(self):
        headers = {"Authorization": f"Bearer {self.api_key}"}
        response = await self.request('GET', self.api_endpoint, headers=headers)

        if self.as_dataframe:
            return self.to_dataframe(response.json())
        return response.json()

Integration Examples

Docker Deployment

FROM python:3.9-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .
EXPOSE 5000

CMD ["python", "run.py"]
# docker-compose.yml
version: '3.8'
services:
  flowtask:
    build: .
    ports:
      - "5000:5000"
    environment:
      - DATABASE_URL=postgresql://postgres:password@db:5432/flowtask
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis

  db:
    image: postgres:17
    environment:
      POSTGRES_DB: flowtask
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:6-alpine

volumes:
  postgres_data:

These examples demonstrate FlowTask's flexibility and power in handling various data processing scenarios. Adapt them to your specific use cases and requirements.