Skip to content

Components

flowtask.components

AbstractFlow

Bases: ABC

run abstractmethod async

run()

Execute the code for component.

start abstractmethod async

start(**kwargs)

Start Method called on every component.

FlowComponent

FlowComponent(job=None, *args, **kwargs)

Bases: FuncSupport, MaskSupport, LogSupport, ResultSupport, StatSupport, LocaleSupport, AbstractFlow

Abstract

Overview:

    Helper for building components that consume REST APIs

.. table:: Properties

:widths: auto +--------------+----------+-----------+--------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+--------------------------------------+ | method | Yes | Component for Data Integrator | +--------------+----------+-----------+--------------------------------------+ | attributes | Yes | Attribute: barcode | +--------------+----------+-----------+--------------------------------------+

Return the list of arbitrary days

close abstractmethod async

close()

close.

Close (if needed) component requirements.

conditions_replacement

conditions_replacement(obj)

conditions_replacement.

Replacing occurrences of Conditions into an String. Args: obj (Any): Any kind of object.

Returns:

Name Type Description
Any

Object with replaced conditions.

get_filename

get_filename()

get_filename. Detect if File exists.

run abstractmethod async

run()

run.

Run operations declared inside Component.

start abstractmethod async

start(**kwargs)

start.

Initialize (if needed) a task

var_replacement

var_replacement(obj)

var_replacement.

Replacing occurrences of Variables into an String. Args: obj (Any): Any kind of object.

Returns:

Name Type Description
Any

Object with replaced variables.

GroupComponent

GroupComponent(loop=None, job=None, stat=None, component_list=None, **kwargs)

Bases: FlowComponent

GroupComponent

Overview

This component executes a group of other FlowTask components sequentially as a single unit.
It allows chaining multiple tasks together and provides error handling for various scenarios.

.. table:: Properties :widths: auto

+------------------------+----------+----------------------------------------------------------------------------------------------------------------+
| Name                   | Required | Description                                                                                                    |
+------------------------+----------+----------------------------------------------------------------------------------------------------------------+
| component_list (list)  |   Yes    | List of dictionaries defining the components to be executed in the group. Each dictionary                      |
|                        |          | should contain the following keys:                                                                             |
|                        |          |   - "component": The FlowTask component class to be used.                                                      |
|                        |          |   - "params": A dictionary containing parameters to be passed to the component.                                |
|                        |          | (Optional)                                                                                                     |
|                        |          |   - "conditions": A dictionary containing conditions that must be met before running the component. (Optional) |
+------------------------+----------+----------------------------------------------------------------------------------------------------------------+
| stat (Callable)        |    No    | Optional callback function for step-level monitoring and statistics collection.                                |
+------------------------+----------+----------------------------------------------------------------------------------------------------------------+
| skipError              |    No    | Defines the behavior when a component within the group raises an error.                                        |
|                        |          | Valid options are:                                                                                             |
|                        |          |   SkipErrors: Skip This makes the component continue his execution.                                            |
|                        |          |   SkipErrors: Raise This Raise the error and interrupt execution.                                              |
+------------------------+----------+----------------------------------------------------------------------------------------------------------------+

Return

The component modifies the data received from the previous component and returns the final output after
all components in the group have been executed.

UserComponent

UserComponent(loop=None, job=None, stat=None, **kwargs)

Bases: DBSupport, FuncSupport, MaskSupport, LogSupport, ResultSupport, StatSupport, LocaleSupport, PandasDataframe, AbstractFlow

UserComponent Abstract Base Component for User-defined Components.

close abstractmethod async

close()

close. Close (if needed) a task

run abstractmethod async

run()

run. Close (if needed) a task

session async

session(url, method='get', headers=None, auth=None, data=None)

session. connect to an http source using aiohttp

start abstractmethod async

start(**kwargs)

start. Initialize (if needed) a task

ASPX

ASPX

ASPX(job=None, *args, **kwargs)

Bases: FlowComponent, HTTPService

ASPX

Overview

The ASPX class is designed for interacting with ASPX-based web applications, particularly those requiring authentication.
It inherits from the DtComponent and HTTPService classes, providing a structured way to manage sessions, handle state views,
and perform HTTP requests with ASPX web forms. This class is useful for automating interactions with ASPX web pages, including login
and data retrieval operations.

.. table:: Properties :widths: auto

+-------------------------+----------+-----------+--------------------------------------------------------------------------------------+
| Name                    | Required | Description                                                                                      |
+-------------------------+----------+-----------+--------------------------------------------------------------------------------------+
| _credentials            |   Yes    | A dictionary containing username and password for authentication.                                  |
+-------------------------+----------+-----------+--------------------------------------------------------------------------------------+
| _views                  |   No     | A dictionary storing state views extracted from ASPX pages.                                        |
+-------------------------+----------+-----------+--------------------------------------------------------------------------------------+
| _client                 |   Yes    | An instance of `httpx.AsyncClient` used for making asynchronous HTTP requests.                     |
+-------------------------+----------+-----------+--------------------------------------------------------------------------------------+

Return

The methods in this class facilitate the interaction with ASPX-based web applications, including login handling,
session management, and state view management. The class also allows for abstract extension, enabling customization
for specific ASPX web forms and interactions.
check_keys_exists
check_keys_exists(*args, **kwargs)

Validate that specified keys are present in the kwargs dict

run abstractmethod async
run()

Extend this method: Call super() after your code to make sure the client session is closed.

AddDataset

AddDataset

AddDataset(loop=None, job=None, stat=None, **kwargs)

Bases: DBSupport, TemplateSupport, FlowComponent

AddDataset Component

Overview

This component joins two pandas DataFrames based on specified criteria.
It supports various join types and handles cases where one of the DataFrames might be empty.

.. table:: Properties
:widths: auto

+---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | Name | Required | Summary | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | fields | Yes | List of field names to retrieve from the second dataset | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | dataset | Yes | Name of the second dataset to retrieve | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | datasource | No | Source of the second dataset ("datasets" or "vision") (default: "datasets") | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | join | No | List of columns to use for joining the DataFrames | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | type | No | Type of join to perform (left, inner) (default: left) | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | no_copy | No | If True, modifies original DataFrames instead of creating a copy (default: False) | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | distinct | No | If True, retrieves distinct rows based on join columns (default: False) | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | operator | No | Operator to use for joining rows (currently only "and" supported) | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | join_with | No | List of columns for a series of left joins (if main DataFrame has unmatched rows) | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | datatypes | No | Dictionary specifying data types for columns in the second dataset | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | infer_types | No | If True, attempts to infer better data types for object columns (default: False) | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | to_string | No | If True, attempts to convert object columns to strings during data type conversion (default: True) | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | as_objects | No | If True, creates resulting DataFrame with all columns as objects (default: False) | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | drop_empty | No | If True, drops columns with only missing values after join (default: False) | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | dropna | No | List of columns to remove rows with missing values after join | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | clean_strings | No | If True, replaces missing values in object/string columns with empty strings (default: False) | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | Group | No | usually used FormData | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+ | skipError | No | spected = skip - Log Errors - Enforce | +---------------------------+----------+-----------+------------------------------------------------------------------------------------------+

    Returns the joined DataFrame and a metric ("JOINED_ROWS")
    representing the number of rows in the result.



Example:

```yaml
AddDataset:
  datasource: banco_chile
  dataset: vw_form_metadata
  distinct: true
  type: left
  fields:
  - formid
  - form_name
  - column_name
  - description as question
  join:
  - formid
  - column_name
```
start async
start(**kwargs)

Obtain Pandas Dataframe.

Amazon

Scrapping a Web Page Using Selenium + ChromeDriver + BeautifulSoup.

    Example:

    ```yaml
    Amazon:
      type: product_info
      use_proxies: true
      paid_proxy: true
    ```

Amazon

Amazon(loop=None, job=None, stat=None, **kwargs)

Bases: ReviewScrapper

Amazon.

Combining API Key and Web Scrapping, this component will be able to extract Amazon Product Information (reviews, etc).

product_info async
product_info()

product_info.

Product Information.

reviews async
reviews()

reviews.

Target Product Reviews.

AutoTask

AutoTask

AutoTask(*args, **kwargs)

Bases: HTTPService, FlowComponent

AutoTask Component

Overview

This component retrieves data from AutoTask using the Autotask REST API.
It supports filtering data based on a query or specific IDs, handles pagination, and converts picklist values to human-readable labels.

.. table:: Properties
:widths: auto

+---------------------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------------------------------+ | Name | Required | Summary | +---------------------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------------------------------+ | credentials | Yes | Dictionary containing API credentials: "API_INTEGRATION_CODE", "USERNAME", and "SECRET". Credentials can be retrieved from environment variables. | +---------------------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------------------------------+ | zone | Yes | AutoTask zone (e.g., "na"). | +---------------------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------------------------------+ | entity | Yes | AutoTask entity to query (e.g., "tickets"). | +---------------------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------------------------------+ | query_json | No | JSON object representing the AutoTask query filter (defaults to retrieving all items). Refer to AutoTask documentation for query syntax. | +---------------------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------------------------------+ | id_column_name | No | Name of the column in the output DataFrame that should represent the AutoTask record ID (defaults to "id"). | +---------------------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------------------------------+ | ids | No | List of AutoTask record IDs to retrieve (overrides query_json filter if provided). | +---------------------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------------------------------+ | picklist_fields | No | List of picklist fields in the entity to be converted to human-readable labels. | +---------------------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------------------------------+ | user_defined_fields | No | List of user-defined fields in the entity to extract (requires "userDefinedFields" field in the response). | +---------------------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------------------------------+ | fillna_values | No | Default value to replace missing data (defaults to None). | +---------------------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------------------------------+ | map_field_type | No | Dictionary mapping field names to desired data types in the output DataFrame. | +---------------------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------------------------------+ | force_create_columns | No | If True, ensures "IncludeFields" columns always exist, even if empty. Must be used with "IncludeFields" in "query_json". | +---------------------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------------------------------+

Returns a pandas DataFrame containing the retrieved AutoTask data and additional columns for picklist labels (if applicable).

Example:

AutoTask:
  skipError: skip
  credentials:
    API_INTEGRATION_CODE: AUTOTASK_API_INTEGRATION_CODE
    USERNAME: AUTOTASK_USERNAME
    SECRET: AUTOTASK_SECRET
  entity: TicketNotes
  zone: webservices14
  id_column_name: ticket_note_id
  picklist_fields:
  - noteType
  - publish
  ids: []
  query_json:
    IncludeFields:
    - id
    - createDateTime
    - createdByContactID
    - creatorResourceID
    - description
    - impersonatorCreatorResourceID
    - impersonatorUpdaterResourceID
    - lastActivityDate
    - noteType
    - publish
    - ticketID
    - title
    Filter:
    - op: gte
      field: lastActivityDate
      value: '{two_days_ago}'
  masks:
    '{two_days_ago}':
    - date_diff
    - value: current_date
      diff: 48
      mode: hours
      mask: '%Y-%m-%d %H:%M:%S'
extract_udf staticmethod
extract_udf(row)

Extracts dictionary values into columns

Azure

Azure

Azure(loop=None, job=None, stat=None, **kwargs)

Bases: AzureClient, HTTPService, FlowComponent

Azure Component.

Overview

This component interacts with Azure services using the Azure SDK for Python.
It requires valid Azure credentials to establish a connection.

.. table:: Properties
:widths: auto

+--------------------------+----------+-----------+----------------------------------------------------------------+ | Name | Required | Summary | +--------------------------+----------+-----------+----------------------------------------------------------------+ | credentials (optional) | Yes | Dictionary containing Azure credentials: "client_id", "tenant_id", | | | | and "client_secret". Credentials can be retrieved from environment | | | | variables. | +--------------------------+----------+-----------+----------------------------------------------------------------+ | as_dataframe (optional) | No | Specifies if the response should be converted to a pandas DataFrame | | | | (default: False). | +--------------------------+----------+-----------+----------------------------------------------------------------+

This component does not return any data directly. It interacts with
Azure services based on the configuration and potentially triggers
downstream components in a task.
close async
close(timeout=5)

close. Closing the connection.

open async
open(host, port, credentials, **kwargs)

open. Starts (open) a connection to external resource.

start async
start(**kwargs)

Start.

Processing variables and credentials.

AzureUsers

AzureUsers

AzureUsers(loop=None, job=None, stat=None, **kwargs)

Bases: Azure

AzureUsers Component

Overview

This component retrieves a list of users from Azure Active Directory (AAD) using the Microsoft Graph API.

.. table:: Properties
    :widths: auto

+------------------------+----------+-----------+---------------------------------------------------------------------------------+ | Name | Required | Summary | +------------------------+----------+-----------+---------------------------------------------------------------------------------+ | credentials | Yes | Dictionary placeholder for Azure AD application credentials. | +------------------------+----------+-----------+---------------------------------------------------------------------------------+ | most_recent (optional) | No | ISO 8601-formatted date and time string to filter users created on or after that date/time. | | +------------------------+----------+-----------+---------------------------------------------------------------------------------+

Returns

A pandas DataFrame containing the retrieved user information. Each row represents an Azure user, and columns might include properties like:

- `objectId`: Unique identifier for the user in AAD.
- `displayName`: User's display name.
- `userPrincipalName`: User's email address (usually the primary login).
- ... (other user properties depending on the AAD response)

Error Handling

- The component raises a `ComponentError` with a detailed message in case of errors during authentication, API calls, or data processing. This error will be surfaced within your Flowtask workflow.


Example:

```yaml
AzureUsers:
  recent: '2023-10-01'
```
run async
run()

Run Azure Connection for getting Users Info.

BaseAction

BaseAction

BaseAction(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, ABC

BaseAction Component

Overview
    Basic component for making RESTful queries to URLs.
    This component serves as a foundation for building more specific action components.
    It allows you to define methods (functions) that can be executed asynchronously.

    .. table:: Properties
        :widths: auto

+-----------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------+
| Name                  | Required | Summary                                                                                                                   |
+-----------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------+
| loop (optional)       | No       | Event loop to use for asynchronous operations (defaults to the current event loop).                                       |
+-----------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------+
| job (optional)        | No       | Reference to a job object for logging and tracking purposes.                                                              |
+-----------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------+
| stat (optional)       | No       | Reference to a stat object for custom metrics collection.                                                                 |
+-----------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------+
| method (from kwargs)  | Yes      | Name of the method (function) within the component to be executed. Specified as a keyword argument during initialization. |
+-----------------------+----------+-----------+---------------------------------------------------------------------------------------------------------------+

**Returns**

The output data can vary depending on the implemented method. It can be a list, dictionary, or any data structure returned by the executed method.

**Error Handling**

- `DataNotFound`: This exception is raised if the executed method doesn't return any data.
- `ComponentError`: This exception is raised for any other errors encountered during execution.

BaseLoop

BaseLoop

BaseLoop(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

BaseLoop Interface.

Structural base for all Iterators (Switch, Loop, IF).

create_component
create_component(target, value=None, stat=None, **params)

get_component.

Create a new component instance.

start async
start(**kwargs)

start.

Initialize (if needed) a task

BestBuy

BestBuy

BestBuy(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, SeleniumService, HTTPService

BestBuy.

Combining API Key and Web Scrapping, this component will be able to extract Best Buy Information (stores, products, Product Availability, etc).

Example:

```yaml
BestBuy:
  type: availability
  product_info: false
  brand: Bose
```
availability async
availability()

availability.

Best Buy Product Availability.

chunkify
chunkify(lst, n)

Split list lst into chunks of size n.

product async
product()

product.

Best Buy Product Information.

products async
products()

Fetch all products from the Best Buy API by paginating through all pages.

Returns:

Name Type Description
list

A combined list of all products from all pages.

reviews async
reviews()

reviews.

Best Buy Product Reviews.

stores async
stores()

Fetch all stores from the Best Buy API by paginating through all pages.

Returns:

Name Type Description
list

A combined list of all stores from all pages.

bad_gateway_exception

bad_gateway_exception(exc)

Check if the exception is a 502 Bad Gateway error.

CSVToGCS

CSVToGCS

CSVToGCS(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

CSVToGCS.

Este componente sube un archivo CSV desde el sistema local a un bucket específico de Google Cloud Storage (GCS). Opcionalmente, puede crear el bucket si no existe.

close async
close()

Cierra la conexión AsyncDB.

run async
run()

Ejecuta la carga del archivo CSV a GCS y retorna bucket_uri y object_uri.

start async
start(**kwargs)

Inicializa el componente configurando la conexión AsyncDB.

CompanyScraper

parsers

base
ScrapperBase
ScrapperBase(*args, **kwargs)

Bases: SeleniumService, HTTPService

ScrapperBase Model.

Define how scrappers should be work.-

explorium
ExploriumScrapper
ExploriumScrapper(*args, **kwargs)

Bases: ScrapperBase

ExploriumScrapper Model.

scrapping async
scrapping(document, idx, row)

Scrape company information from Explorium. Updates the existing row with new data from Explorium.

leadiq
LeadiqScrapper
LeadiqScrapper(*args, **kwargs)

Bases: ScrapperBase

LeadiqScrapper Model.

scrapping async
scrapping(document, idx, row)

Scrape company information from LeadIQ. Updates the existing row with new data from LeadIQ.

rocket
RocketReachScrapper
RocketReachScrapper(*args, **kwargs)

Bases: ScrapperBase

RocketReachScrapper Model.

scrapping async
scrapping(document, idx, row)

Scrape company information from LeadIQ. Updates the existing row with new data from LeadIQ.

siccode
SicCodeScrapper
SicCodeScrapper(*args, **kwargs)

Bases: ScrapperBase

SicCodeScrapper Model.

scrapping async
scrapping(document, idx, row)

Scrapes company information from siccode.com and updates the row.

visualvisitor
VisualVisitorScrapper
VisualVisitorScrapper(*args, **kwargs)

Bases: ScrapperBase

VisualVisitorScrapper Model.

scrapping async
scrapping(document, idx, row)

Scrape company information from LeadIQ. Updates the existing row with new data from LeadIQ.

zoominfo
ZoomInfoScrapper
ZoomInfoScrapper(*args, **kwargs)

Bases: ScrapperBase

ZoomInfo Model.

scrapping async
scrapping(document, idx, row)

Scrape company information from Zoominfo. Updates the existing row with new data from Zoominfo.

scrapper

CompanyScraper
CompanyScraper(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, SeleniumService, HTTPService

Company Scraper Component

Overview:

This component scrapes company information from different sources using HTTPService. It can receive URLs from a previous component (like GoogleSearch) and extract specific company information.

.. table:: Properties :widths: auto

+-----------------------+----------+------------------------------------------------------------------------------------------------------+ | Name | Required | Description | +-----------------------+----------+------------------------------------------------------------------------------------------------------+ | url_column (str) | Yes | Name of the column containing URLs to scrape (default: 'search_url') | +-----------------------+----------+------------------------------------------------------------------------------------------------------+ | wait_for (tuple) | No | Element to wait for before scraping (default: ('class', 'company-overview')) | +-----------------------+----------+------------------------------------------------------------------------------------------------------+

Return:

The component adds new columns to the DataFrame with company information: - headquarters - phone_number - website - stock_symbol - naics_code - employee_count

close async
close()

Clean up resources.

extract_company_info
extract_company_info(soup, search_term, search_url)

Extract company information from the page.

run async
run()

Execute scraping for each URL in the DataFrame.

scrape_url async
scrape_url(idx, url)

Scrape company information from URL.

search_in_ddg async
search_in_ddg(search_term, company_name, scrapper, backend='html', region='wt-wt')

Search for a term in DuckDuckGo.

split_parts
split_parts(task_list, num_parts=5)

Split task list into parts for concurrent processing.

start async
start(**kwargs)

Initialize the component and validate required parameters.

CopyTo

CopyTo

CopyTo(loop=None, job=None, stat=None, **kwargs)

Bases: QSSupport, FlowComponent

CopyTo.

Abstract Class for Copying (saving) a Pandas Dataframe onto a Resource (example: Copy to PostgreSQL).

close async
close()

Close the connection to Database.

run async
run()

Run Copy into table functionality.

start async
start(**kwargs)

Obtain Pandas Dataframe.

CopyToBigQuery

CopyToBigQuery

CopyToBigQuery(loop=None, job=None, stat=None, **kwargs)

Bases: CopyTo, PandasDataframe

CopyToBigQuery.

Overview

This component allows copying data into a BigQuery table,
using write functionality from AsyncDB BigQuery driver.

.. table:: Properties :widths: auto

+--------------+----------+-----------+--------------------------------------------+
| Name         | Required | Summary                                                |
+--------------+----------+-----------+--------------------------------------------+
| tablename    |   Yes    | Name of the table in                                   |
|              |          | BigQuery                                               |
+--------------+----------+-----------+--------------------------------------------+
| schema       |   Yes    | Name of the dataset                                    |
|              |          | where the table is located                             |
+--------------+----------+-----------+--------------------------------------------+
| truncate     |   Yes    | This option indicates if the component should empty    |
|              |          | before copying the new data to the table. If set to    |
|              |          | true, the table will be truncated before saving data.  |
+--------------+----------+-----------+--------------------------------------------+
| use_buffer   |   No     | When activated, this option allows optimizing the      |
|              |          | performance of the task when dealing with large        |
|              |          | volumes of data.                                       |
+--------------+----------+-----------+--------------------------------------------+
| credentials  |   No     | Path to BigQuery credentials JSON file                 |
|              |          |                                                        |
+--------------+----------+-----------+--------------------------------------------+
| project_id   |   No     | Google Cloud Project ID                                |
|              |          |                                                        |
+--------------+----------+-----------+--------------------------------------------+


Example:

```yaml
CopyToBigQuery:
  schema: hisense
  tablename: product_availability_all
```
default_connection
default_connection()

default_connection.

Default Connection to BigQuery.

CopyToMongoDB

CopyToMongoDB

CopyToMongoDB(loop=None, job=None, stat=None, **kwargs)

Bases: CopyTo, PandasDataframe

CopyToMongo.

Overview This component allows copying data into a MongoDB collection, using write functionality from AsyncDB MongoDB driver.

.. table:: Properties :widths: auto

+--------------+----------+-----------+--------------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+--------------------------------------------+ | tablename | Yes | Name of the collection in | | | | MongoDB | +--------------+----------+-----------+--------------------------------------------+ | schema | Yes | Name of the database | | | | where the collection is located | +--------------+----------+-----------+--------------------------------------------+ | truncate | Yes | If true, the collection will be emptied | | | | before copying new data | +--------------+----------+-----------+--------------------------------------------+ | use_buffer | No | When activated, optimizes performance | | | | for large volumes of data | +--------------+----------+-----------+--------------------------------------------+ | key_field | No | Field to use as unique identifier | | | | for upsert operations | +--------------+----------+-----------+--------------------------------------------+

Example:

```yaml
CopyToMongoDB:
  schema: hisense
  tablename: product_availability
  dbtype: documentdb
```
default_connection
default_connection()

default_connection. Default Connection to MongoDB.

get_connection
get_connection(driver='mongo', dsn=None, params=None, **kwargs)

Useful for internal connections of QS.

CopyToPg

CopyToPg

CopyToPg(loop=None, job=None, stat=None, **kwargs)

Bases: CopyTo

CopyToPg

This component allows copy data into a Postgres table,
Copy into main postgres using copy_to_table functionality.
TODO: Design an Upsert feature with Copy to Pg.

.. table:: Properties :widths: auto

+----------------+----------+----------------------------------------------------------------------------------+ | Name | Required | Summary | +----------------+----------+----------------------------------------------------------------------------------+ | schema | Yes | Name of the schema where the table resides. | +----------------+----------+----------------------------------------------------------------------------------+ | tablename | Yes | Name of the table to insert data into. | +----------------+----------+----------------------------------------------------------------------------------+ | truncate | No | Boolean flag indicating whether to truncate the table before inserting. | | | | Defaults to False. | +----------------+----------+----------------------------------------------------------------------------------+ | use_chunks | No | Boolean flag indicating whether to insert data in chunks (for large datasets). | | | | Defaults to False. | | | | Requires specifying a chunksize property for chunk size determination. | +----------------+----------+----------------------------------------------------------------------------------+ | chunksize | No | Integer value specifying the size of each data chunk when use_chunks is True. | | | | Defaults to None (chunk size will be calculated based on CPU cores). | +----------------+----------+----------------------------------------------------------------------------------+ | use_buffer | No | Boolean flag indicating whether to use a buffer for data insertion (optional). | | | | Defaults to False. | | | | Using a buffer can improve performance for large datasets. | +----------------+----------+----------------------------------------------------------------------------------+ | array_columns | No | List of column names containing JSON arrays. These columns will be formatted | | | | appropriately before insertion. | | | | Requires use_buffer to be True. | +----------------+----------+----------------------------------------------------------------------------------+ | use_quoting | No | Boolean flag indicating whether to use quoting for CSV data insertion (optional).| | | | Defaults to False. | | | | Using quoting can be helpful for data containing special characters. | +----------------+----------+----------------------------------------------------------------------------------+ | datasource | No | Using a Datasource instead manual credentials | +----------------+----------+----------------------------------------------------------------------------------+ | credentials | No | Supporting manual postgresql credentials | +----------------+----------+----------------------------------------------------------------------------------+

Returns a dictionary containing metrics about the copy operation:
 * ROWS_SAVED (int): The number of rows successfully inserted into the target table.
 * NUM_ROWS (int): The total number of rows processed from the input data.
 * NUM_COLUMNS (int): The number of columns found in the input data.
 * (optional): Other metrics specific to the implementation.

CopyToRethink

CopyToRethink

CopyToRethink(loop=None, job=None, stat=None, **kwargs)

Bases: CopyTo, PandasDataframe

CopyToRethink.

Overview

This component allows copy data into a RethinkDB table,
Copy into main rethinkdb using write functionality.

.. table:: Properties :widths: auto

+--------------+----------+-----------+--------------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+--------------------------------------------+ | tablename | Yes | Name of the table in | | | | the database | +--------------+----------+-----------+--------------------------------------------+ | schema | Yes | Name of the schema | | | | where is to the table, alias: database | +--------------+----------+-----------+--------------------------------------------+ | truncate | Yes | This option indicates if the component should empty | | | | before coping the new data to the table. If set to true| | | | the table will be truncated before saving the new data.| +--------------+----------+-----------+--------------------------------------------+ | use_buffer | No | When activated, this option allows optimizing the | | | | performance of the task, when dealing with large | | | | volumes of data. | +--------------+----------+-----------+--------------------------------------------+ | credentials | No | Supporting manual rethinkdb credentials | | | | | +--------------+----------+-----------+--------------------------------------------+ | datasource | No | Using a Datasource instead manual credentials | | | | | +--------------+----------+-----------+--------------------------------------------+

Example:

```yaml
CopyToRethink:
  tablename: product_availability
  schema: bose
```
default_connection
default_connection()

default_connection.

Default Connection to RethinkDB.

Costco

Costco Product Reviews Scraper using Bazaarvoice API.

Example configuration:

Costco:
  type: reviews
  use_proxies: true
  paid_proxy: false
  api_token: xxx

Costco

Costco(loop=None, job=None, stat=None, **kwargs)

Bases: ReviewScrapper

Costco Product Reviews Scraper.

This component extracts product reviews from Costco using the Bazaarvoice API. It expects a DataFrame with product URLs or product IDs.

extract_product_id
extract_product_id(product_url)

Extract product ID from Costco URL.

Parameters:

Name Type Description Default
product_url str

Costco product URL

required

Returns:

Type Description
str

Product ID as string

Example

URL: https://www.costco.com/kidkraft-atrium-breeze-wooden-outdoor-playhouse-with-sunroom--play-kitchen.product.4000317158.html Returns: 4000317158

product_details async
product_details()

Extract product details (placeholder for future implementation).

product_info async
product_info()

Extract product information (placeholder for future implementation).

reviews async
reviews()

Extract reviews for all products in the DataFrame.

CreateGCSBucket

CreateGCSBucket

CreateGCSBucket(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

CreateGCSBucket.

Este componente crea un bucket en Google Cloud Storage (GCS).

Properties:

  • bucket_name: Nombre único para el bucket de GCS. (Requerido)
  • location: Región geográfica donde se creará el bucket. (Opcional, por defecto 'US')
  • storage_class: Clase de almacenamiento del bucket. (Opcional, por defecto 'STANDARD')
  • overwrite: Permite proceder si el bucket ya existe. (Opcional, por defecto 'False')
close async
close()

Cierra la conexión AsyncDB.

run async
run()

Ejecuta la creación del bucket en GCS.

start async
start(**kwargs)

Inicializa el componente configurando la conexión AsyncDB.

CreateReport

CreateReport.

Component for creating Rich Reports and send via Notify.

CreateReport

CreateReport.

Using a Jinja2 Template to crete a Report and, optionally, send via Email.

    Example:

    ```yaml
    CreateReport:
      template_file: echelon_program_overview_raw.html
      create_pdf: true
      masks:
        '{today}':
        - today
        - mask: '%m/%d/%Y'
        '{created}':
        - today
        - mask: '%Y-%m-%d %H:%M:%s'
        '{firstdate}':
        - date_diff
        - value: today
          diff: 8
          mode: days
          mask: '%b %d, %Y'
        '{lastdate}':
        - yesterday
        - mask: '%b %d, %Y'
      send:
        via: email
        list: echelon_program_overview
        message:
          subject: 'Echelon Kohl''s VIBA Report for: ({today})'
        arguments:
          today_report: '{today}'
          generated_at: '{created}'
          firstdate: '{firstdate}'
          lastdate: '{lastdate}'
    ```
CreateReport
CreateReport(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, DBSupport

CreateReport

Overview

The CreateReport class is a component for creating rich reports and sending them via the Notify service. It uses
template handling and chart creation to generate the content of the reports and sends them to a list of recipients.

.. table:: Properties :widths: auto

+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                                      |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| _data            |   Yes    | A dictionary containing the input data for the report.                                           |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| _parser          |   Yes    | The template parser for generating the report content.                                           |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| _chartparser     |   Yes    | The template parser for generating charts.                                                       |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| template_file    |   Yes    | The file name of the template to use for the report.                                             |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| recipients       |   Yes    | A list of recipients to send the report to.                                                      |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| send             |   Yes    | A dictionary containing the sending options and configurations.                                  |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| message          |   No     | The message content for the report.                                                              |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+

Returns:
    The input data after sending the report.

charts

Chart utilities using PyEcharts.

base

Base Class for all Chart types.

baseChart
baseChart(title, data, **kwargs)

Bases: ABC

image
image()

Returns the Base64 version of Image.

utils

Utilities and Functions.

get_json_data async
get_json_data(url)

Get a JSON data from a remote Source.

CustomerSatisfaction

CustomerSatisfaction

CustomerSatisfaction(loop=None, job=None, stat=None, **kwargs)

Bases: ParrotBot, FlowComponent

CustomerSatisfaction

Overview

The CustomerSatisfaction class is a component for interacting with an IA Agent for making Customer Satisfaction Analysis.
It extends the FlowComponent class.

.. table:: Properties :widths: auto

+------------------+----------+--------------------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                                      |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| output_column    |   Yes    | Column for saving the Customer Satisfaction information.                                         |
+------------------+----------+--------------------------------------------------------------------------------------------------+

Return

A Pandas Dataframe with the Customer Satisfaction statistics.

Example:

CustomerSatisfaction:
  llm:
    name: VertexLLM
    model_name: gemini-1.5-pro
    temperature: 0.4
  review_column: review
  product_column: product_name
  columns:
  - product_id
  - product_name
  - product_model
  - product_type
  - product_variant
  - account_name
  - picture_url
  - url
  output_column: customer_satisfaction

DataInput

DataInput

DataInput(loop=None, job=None, stat=None, **kwargs)

Bases: DbClient, TemplateSupport, FlowComponent

DataInput

Class to execute queries against a database and retrieve results using asyncDB.

Inherits from both DbClient (for database connection management) and FlowComponent (for component lifecycle management).

.. table:: Properties :widths: auto

+---------------+----------+------------------------------------------------------------------------------+
| Name          | Required | Description                                                                  |
+---------------+----------+------------------------------------------------------------------------------+
| driver        |   Yes    | asyncDB driver to use (defaults to "pg" for PostgreSQL).                     |
|               |          | Can be overridden by a "driver" key in the "credentials" dictionary.         |
+---------------+----------+------------------------------------------------------------------------------+
| credentials   |   Yes    | Dictionary containing database connection credentials (user, password, etc.).|
+---------------+----------+------------------------------------------------------------------------------+
| query         |   Yes    | SQL query or queries to be executed. Can be provided in different formats:   |
|               |          |  * String (single query)                                                     |
|               |          |  * List (multiple queries)                                                   |
|               |          |  * Dictionary (named queries) - key-value pairs where key is the query name  |
|               |          |    and value is the query string.                                            |
|               |          | If not provided, no queries will be executed.                                |
+---------------+----------+------------------------------------------------------------------------------+
| file          |   Yes    | Path to a file containing a single or multiple SQL queries (alternative to   |
|               |          | `query`).                                                                    |
|               |          | If provided along with `query`, `file` takes precedence.                     |
+---------------+----------+------------------------------------------------------------------------------+
| as_dataframe  |    No    | Boolean flag indicating whether to convert query results to DataFrames.      |
|               |          | Defaults to False (returns raw results).                                     |
+---------------+----------+------------------------------------------------------------------------------+

.. returns:: The return value depends on the number of queries executed and the as_dataframe property: * Single Query: * DataFrame (if as_dataframe is True): Pandas DataFrame containing the query results. * raw result object (default): Raw result object specific to the database driver. * Multiple Queries: * List[DataFrame] (if as_dataframe is True): List of DataFrames, each corresponding to a query result. * List[raw result object] (default): List of raw result objects, each representing a query result. Returns None if no queries are provided.

Example:

```yaml
DataInput:
  query: SELECT * FROM xfinity.product_types
  as_string: true
```

DateList

DateList

DateList(loop=None, job=None, stat=None, **kwargs)

Bases: IteratorBase

DateList.

Overview

    Run as many times as dates generated by this component.

.. table:: Properties :widths: auto

+--------------+----------+-----------+-------------------------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+-------------------------------------------------------+ | range | Yes | Start and end date range parameter | +--------------+----------+-----------+-------------------------------------------------------+ | inc | Yes | Increases the counter value N times | +--------------+----------+-----------+-------------------------------------------------------+ | format | Yes | Date format to generate | +--------------+----------+-----------+-------------------------------------------------------+ | vars | Yes | Generate variables from the start date you generate in each | | | | iterator | +--------------+----------+-----------+-------------------------------------------------------+

Return the list of arbitrary days

Example:

```yaml
DateList:
  range:
    start: 01-12-2021
    end: 24-12-2021
    format: '%d-%m-%Y'
  inc: 1
  format: '%Y-%m-%dT%H:%M:%S'
  vars:
    firstdate: '{date}'
    lastdate:
    - to_midnight
    - mask: '%Y-%m-%d %H:%M:%S'
  iterate: true
```
createJob
createJob(target, params, dt, d)

Create the Job Component.

run async
run()

Async Run Method.

setAttributes
setAttributes(dt)

Defining the result to component.

start async
start(**kwargs)

Get Range of Dates.

DbClient

DbClient

DbClient(*args, **kwargs)

Bases: DBInterface

DbClient.

Abstract base class for database clients using AsyncDB.

Provides common methods for connecting to a database, executing queries, and handling results.

Inherits from the DBInterface interface.

.. note:: This class is intended to be subclassed by specific database client implementations.

.. table:: Properties :widths: auto

+--------------------+----------+--------------------------------------------------------------------------------+ | Name | Required | Description | +--------------------+----------+--------------------------------------------------------------------------------+ | driver | Yes | Database driver to use (e.g., "pg" for PostgreSQL). | +--------------------+----------+--------------------------------------------------------------------------------+ | credentials | Yes | Dictionary containing database connection credentials (user, password, etc.). | | | | Defined by the _credentials class attribute. | +--------------------+----------+--------------------------------------------------------------------------------+ | raw_result | No | Boolean flag indicating whether to return raw query results or convert them | | | | to dictionaries (defaults to False). | +--------------------+----------+--------------------------------------------------------------------------------+ | infer_types | No | Boolean flag indicating whether to infer data types for pandas DataFrames | | | | created from query results (defaults to False). | +--------------------+----------+--------------------------------------------------------------------------------+ | as_dataframe | No | Boolean flag indicating whether to convert query results to a pandas DataFrame | | | | (defaults to True). | +--------------------+----------+--------------------------------------------------------------------------------+ | as_string | No | Boolean flag indicating whether to convert object columns in DataFrames | | | | to strings (defaults to False). Converting to strings requires type | | | | inference (set infer_types to True if used). | +--------------------+----------+--------------------------------------------------------------------------------+

.. returns:: varies The return value of the _query method depends on the raw_result and as_dataframe properties: * If raw_result is True: Returns the raw result object specific to the database driver. * If raw_result is False and as_dataframe is True: Returns a pandas DataFrame containing the query results. * If raw_result is False and as_dataframe is False: Returns a list of dictionaries representing the query results. Raises exceptions for errors during execution (e.g., DataNotFound, ComponentError).

DialPad

DialPad

DialPad(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, HTTPService

DialPad

Overview

The DialPad class is a component for interacting with the DialPad API. It extends the FlowComponent and HTTPService
classes, providing methods for authentication, fetching statistics, and handling API responses.

.. table:: Properties :widths: auto

+------------------+----------+--------------------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                                      |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| accept           |   No     | The accepted content type for API responses, defaults to "application/json".                     |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| download         |   No     | The download flag indicating if a file download is required.                                     |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| _credentials     |   Yes    | A dictionary containing the API key for authentication.                                          |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| _base_url        |   Yes    | The base URL for the DialPad API.                                                                |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| auth             |   Yes    | The authentication header for API requests.                                                      |
+------------------+----------+--------------------------------------------------------------------------------------------------+

Return

The methods in this class manage the interaction with the DialPad API, including initialization, fetching statistics,
processing results, and handling credentials.

DocumentDBQuery

DocumentDBQuery

DocumentDBQuery(loop=None, job=None, stat=None, **kwargs)

Bases: DocumentDBSupport, TemplateSupport, FlowComponent

DocumentDBQuery.

DocumentDBQuery is a component that interacts with a DocumentDB database using pymongo.

Returns json-object representation of the query result.

Example:

```yaml
DocumentDBQuery:
  schema: networkninja
  tablename: batches
  query:
    data.metadata.type: form_data
    data.metadata.timestamp:
      $gte: 1743670800.0
      $lte: 1743681600.999999
```
start async
start(**kwargs)

start.

Start the DocumentDBQuery component.

DownloadFrom

DownloadFromBase

DownloadFromBase(loop=None, job=None, stat=None, **kwargs)

Bases: PandasDataframe, FlowComponent

DownloadFromBase

Abstract base class for downloading files from various sources.

Inherits from FlowComponent and PandasDataframe to provide common functionalities for component management and data handling.

This class utilizes aiohttp for asynchronous HTTP requests and offers support for authentication, SSL connections, and basic file management.

.. note:: This class is intended to be subclassed for specific download implementations.

.. table:: Properties :widths: auto

+----------------------+----------+-----------+-------------------------------------------------------------------------+
| Name                 | Required | Summary                                                                             |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| credentials          |    Yes   | Dictionary containing expected username and password for authentication             |
|                      |          | (default: {"username": str, "password": str}).                                      |
+----------------------+----------+-------------------------------------------------------------------------------------+
| no_host              |    No    | Boolean flag indicating whether to skip defining host and port (default: False).    |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| overwrite            |    No    | Boolean flag indicating whether to overwrite existing files (default: True).        |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| overwrite            |    No    | Boolean flag indicating whether to overwrite existing files (default: True).        |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| create_destination   |    No    | Boolean flag indicating whether to create the destination directory                 |
|                      |          | if it doesn't exist (default: True).                                               |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| rename               |    No    | String defining a new filename for the downloaded file.                             |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| file                 |   Yes    | Access the file download through a url, with the required user credentials and      |
|                      |          | password                                                                            |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| download             |   Yes    | File destination and directory                                                      |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| source               |   Yes    | Origin of the file to download and location where the file is located.              |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| destination          |   Yes    | Destination where the file will be save.                                            |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| ssl                  |    No    | Boolean flag indicating whether to use SSL connection (default: False).             |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| ssl_cafile           |    No    | Path to the CA certificate file for SSL verification.                               |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| ssl_certs            |    No    | List of certificate chains for SSL verification.                                    |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| host                 |    No    | Hostname for the download source (default: "localhost").                            |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| port                 |    No    | Port number for the download source (default: 22).                                  |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| timeout              |    No    | Timeout value in seconds for HTTP requests (default: 30).                           |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| url                  |    No    | URL of the download source (populated within the class).                            |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
| headers              |   Yes    | Dictionary containing HTTP headers for the request.                                 |
+----------------------+----------+-----------+-------------------------------------------------------------------------+
http_session async
http_session(url=None, method='get', data=None, data_format='json')

session. connect to an http source using aiohttp

start async
start(**kwargs)

Start.

Processing variables and credentials.

DownloadFromD2L

DownloadFromD2L

DownloadFromD2L(loop=None, job=None, stat=None, **kwargs)

Bases: D2LClient, DownloadFromBase

DownloadFromD2L

Overview

Download Data from D2L.

Properties (inherited from D2LClient and DownloadFromBase)

.. table:: Properties :widths: auto

+--------------------+----------+-----------+----------------------------------------------------------------------+
| Name               | Required | Summary                                                                          |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| credentials        |   Yes    | Credentials to establish connection with Polestar site (user and password)       |
|                    |          | get credentials from environment if null.                                        |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| filename           |   Yes    | The filename to use for the downloaded file.                                     |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| Action             |   No     | Select 'download' or 'awards'. (Default: download)                               |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| schema             |   No     | The ID of the Schema to download. Required if action is 'download'               |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| org_units          |   No     | A list of ID of Organization Units. Required if action is 'awards'               |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| column             |   No     | A column name to extract the list of Organization Units. Required if action is   |
|                    |          | 'awards' and depends of a Pandas DataFrame.                                      |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| create_destination |   No     | Boolean flag indicating whether to create the destination directory if it        |
|                    |          | doesn't exist (default: True).                                                   |
+--------------------+----------+-----------+----------------------------------------------------------------------+

Save the downloaded files on the new destination.



Example:

```yaml
DownloadFromD2L:
  domain: POLESTAR_DOMAIN
  schema: c0b0740f-896e-4afa-bfd9-81d8e43006d9
  credentials:
    username: POLESTAR_USERNAME
    password: POLESTAR_PASSWORD
  destination:
    directory: /home/ubuntu/symbits/polestar/files/organizational_unit_ancestors/
    filename: organizational_unit_ancestors_{yesterday}.zip
    overwrite: true
  masks:
    yesterday:
    - yesterday
    - mask: '%Y-%m-%d'
```

DownloadFromFTP

DownloadFromSmartSheet Download an Excel file from SmartSheet.

  Example:

  ```yaml
  DownloadFromFTP:
    host: trendmicro_host
    port: trendmicro_port
    use_tls: false
    file:
      pattern: '{monday}.*-Limited852StoreReport.{saturday}.csv'
      values:
        monday:
        - date_dow
        - day_of_week: monday
          mask: '%Y%m%d'
        saturday:
        - date_diff_dow
        - diff: 7
          day_of_week: saturday
          mask: '%Y-%m-%d'
      directory: /Limited852Report
    credentials:
      user: trendmicro_user
      password: trendmicro_password
    download:
      directory: /home/ubuntu/symbits/trendmicro/files/market_share/
  ```

DownloadFromFTP

DownloadFromFTP(credentials=None, host=None, port=None, **kwargs)

Bases: FTPClient, DownloadFromBase

DownloadFromFTP

Overview

Downloads files from a remote FTP server using the functionality from DownloadFrom.

Properties

.. table:: Properties :widths: auto

+--------------+----------+-----------+--------------------------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+--------------------------------------------------------+ | credentials | Yes | Dictionary containing username and password for FTP authentication.| | | | (default: {"user": str, "password": str}) | +--------------+----------+-----------+--------------------------------------------------------+ | whole_dir | Yes | Boolean flag indicating whether to download the entire directory | | | | or specific files. (default: False) | +--------------+----------+-----------+--------------------------------------------------------+ | use_tls | Yes | Boolean for indicate if we need to use the TLS protocol | +--------------+----------+-----------+--------------------------------------------------------+

Save the downloaded files on the new destination.

DownloadFromIMAP

DownloadFromIMAP

DownloadFromIMAP(loop=None, job=None, stat=None, **kwargs)

Bases: IMAPClient, DownloadFromBase

DownloadFromIMAP.

Overview

    Download emails from an IMAP mailbox using the functionality from DownloadFrom.

.. table:: Properties
    :widths: auto

    +-------------------+----------+-----------+---------------------------------------------------------------------------------------------+
    | Name              | Required | Summary                                                                                                 |
    +-------------------+----------+-----------+---------------------------------------------------------------------------------------------+
    | credentials       |   Yes    | Credentials to access the IMAP mailbox.                                                                 |
    +-------------------+----------+-----------+---------------------------------------------------------------------------------------------+
    | mailbox           |   Yes    | The IMAP mailbox name (default: "INBOX").                                                               |
    +-------------------+----------+-----------+---------------------------------------------------------------------------------------------+
    | search_terms      |   Yes    | Dictionary containing search criteria in IMAP format.                                                   |
    +-------------------+----------+-----------+---------------------------------------------------------------------------------------------+
    | attachments       |   Yes    | Dictionary specifying download configuration for attachments:                                           |
    |                   |          |   - directory (str): Path to save downloaded attachments.                                               |
    |                   |          |   - filename (str, optional): Filename pattern for selection (fnmatch).                                 |
    |                   |          |   - pattern (str, optional): Regular expression pattern for selection.                                  |
    |                   |          |   - expected_mime (str, optional): Expected MIME type filter.                                           |
    |                   |          |   - rename (str, optional): Template string for renaming attachments (uses "{filename}").               |
    |                   |          |   - download_existing (bool, optional): Skip existing files (default: True).                            |
    |                   |          |   - create_destination (bool, optional): Create download directory if it doesn't exist (default: True). |
    +-------------------+----------+-----------+---------------------------------------------------------------------------------------------+
    | download_existing |    no    | Flag indicating whether to skip downloading existing files.                                             |
    +-------------------+----------+-----------+---------------------------------------------------------------------------------------------+
    | results           |   Yes    | Dictionary containing download results:                                                                 |
    |                   |          |   - attachments: List of downloaded attachment file paths.                                              |
    |                   |          |   - messages: List of `MailMessage` objects representing downloaded emails.                             |
    +-------------------+----------+-----------+---------------------------------------------------------------------------------------------+
    | use_ssl           |   Yes    | Boolean for indicate if we need to use the TLS protocol                                                 |
    +-------------------+----------+-----------+---------------------------------------------------------------------------------------------+

    Save the downloaded files on the new destination.


Example:

```yaml
DownloadFromIMAP:
  credentials:
    host: email_host
    port: email_port
    user: email_host_user
    password: email_host_password
    use_ssl: true
  search_terms:
    'ON': '{search_today}'
    SUBJECT: Custom Punch with Pay Codes - Excel
    FROM: eet_application@adp.com
  overwrite: true
  attachments:
    directory: /home/ubuntu/symbits/bose/files/worked_hours/
  masks:
    '{search_today}':
    - today
    - mask: '%d-%b-%Y'
```

DownloadFromS3

DownloadFromS3

DownloadFromS3(loop=None, job=None, stat=None, **kwargs)

Bases: Boto3Client, DownloadFromBase

DownloadFromS3.

Overview

Download a file from an Amazon S3 bucket using the functionality from DownloadFrom.

Properties

.. table:: Properties :widths: auto

+--------------------+----------+-----------+-----------------------------------------------------------------+
| Name               | Required | Summary                                                                     |
+--------------------+----------+-----------+-----------------------------------------------------------------+
| credentials        |   Yes    | Credentials to establish connection with S3 service (username and password) |
+--------------------+----------+-----------+-----------------------------------------------------------------+
| bucket             |   Yes    | The name of the S3 bucket to download files from.                           |
+--------------------+----------+-----------+-----------------------------------------------------------------+
| source_dir         |   No     | The directory path within the S3 bucket to download files from.             |
|                    |          | Defaults to the root directory (`/`).                                       |
+--------------------+----------+-----------+-----------------------------------------------------------------+
| source             |   No     | A dictionary specifying the filename to download.                           |
|                    |          | If provided, takes precedence over `source_dir` and `_srcfiles`.            |
+--------------------+----------+-----------+-----------------------------------------------------------------+
| _srcfiles          |   No     | A list of filenames to download from the S3 bucket.                         |
|                    |          | Used in conjunction with `source_dir`.                                      |
+--------------------+----------+-----------+-----------------------------------------------------------------+
| rename             |   No     | A new filename to use for the downloaded file.                              |
+--------------------+----------+-----------+-----------------------------------------------------------------+
| directory          |   Yes    | The local directory path to save the downloaded files.                      |
+--------------------+----------+-----------+-----------------------------------------------------------------+
| create_destination |   No     | A boolean flag indicating whether to create the destination directory       |
|                    |          | if it doesn't exist. Defaults to `True`.                                    |
+--------------------+----------+-----------+-----------------------------------------------------------------+

save the file on the new destination.

Methods

  • start()
  • close()
  • run()
  • s3_list(s3_client, suffix="")
  • save_attachment(self, filepath, content)
  • download_file(self, filename, obj)

    Example:

    DownloadFromS3:
      credentials:
        use_credentials: false
        region_name: us-east-2
      bucket: placer-navigator-data
      source_dir: placer-analytics/bulk-export/monthly-weekly/2025-03-06/metadata/
      destination:
        directory: /nfs/symbits/placerai/2025-03-06/metadata/
      create_destination: true
    

DownloadFromSFTP

DownloadFromSFTP

DownloadFromSFTP(loop=None, job=None, stat=None, **kwargs)

Bases: SSHClient, DownloadFromBase

DownloadFromSFTP.

Overview

Download a file or directory from an SFTP server using the functionality from DownloadFrom.

Properties (inherited from DownloadFromBase and SSHClient)

.. table:: Properties :widths: auto

+--------------------+----------+-----------+----------------------------------------------------------------------------+
| Name               | Required | Summary                                                                                |
+--------------------+----------+-----------+----------------------------------------------------------------------------+
| credentials        |   Yes    | Credentials to establish connection with SFTP server (username and password)           |
+--------------------+----------+-----------+----------------------------------------------------------------------------+
| host               |   Yes    | The hostname or IP address of the SFTP server.                                         |
+--------------------+----------+-----------+----------------------------------------------------------------------------+
| port               |   No     | The port number of the SFTP server (default: 22).                                      |
+--------------------+----------+-----------+----------------------------------------------------------------------------+
| tunnel             |   No     | Dictionary defining a tunnel to use for the connection.                                |
+--------------------+----------+-----------+----------------------------------------------------------------------------+
| block_size         |   No     | Block size for file transfer (default: 4096 bytes).                                    |
+--------------------+----------+-----------+----------------------------------------------------------------------------+
| max_requests       |   No     | Maximum number of concurrent file transfers (default: 5).                              |
+--------------------+----------+-----------+----------------------------------------------------------------------------+
| create_destination |   No     | Boolean flag indicating whether to create the destination directory                    |
|                    |          | if it doesn't exist (default: True).                                                   |
+--------------------+----------+-----------+----------------------------------------------------------------------------+
| source             |   Yes    | A dictionary specifying the source file or directory on the SFTP server.               |
|                    |          | Can include:                                                                           |
|                    |          |  whole_dir (Optional, bool): Whether to download the entire directory (default: False).|
|                    |          |  recursive (Optional, bool): Whether to download subdirectories recursively when       |
|                    |          |  `whole_dir` is True (default: False).                                                 |
+--------------------+----------+-----------+----------------------------------------------------------------------------+
| filename           |   No     | The filename to download from the SFTP server (if not using `source`).                 |
+--------------------+----------+-----------+----------------------------------------------------------------------------+
| mdate              |   No     | Modification date of the file to download (for filtering based on modification time).  |
+--------------------+----------+-----------------------+----------------------------------------------------------------+
| rename             |   No     | A new filename to use for the downloaded file.                                         |
+--------------------+----------+-----------+----------------------------------------------------------------------------+
| masks              |   No     | A dictionary mapping mask strings to replacement strings used for renaming files and   |
|                    |          | modification dates.                                                                    |
+--------------------+----------+-----------+----------------------------------------------------------------------------+
| overwrite          |   No     | Whether to overwrite existing files in the destination directory (default: False).     |
+--------------------+----------+-----------+----------------------------------------------------------------------------+
| remove             |   No     | Whether to remove the downloaded files from the SFTP server after                      |
|                    |          | successful download (default: False).                                                  |
+--------------------+----------+-----------+----------------------------------------------------------------------------+

Save the downloaded files on the new destination.

Example:

```yaml
DownloadFromSFTP:
  file:
    pattern: Performance_Tracker/*
    mdate: '{today}'
  host: altice_ltm_sftp_host
  port: altice_ltm_sftp_port
  credentials:
    username: altice_ltm_sftp_username
    password: altice_ltm_sftp_password
    known_hosts: null
  directory: /home/ubuntu/altice/files/
  overwrite: true
  masks:
    '{today}':
    - yesterday
    - mask: '%Y-%m-%d'
```
run async
run()

Running Download file.

DownloadFromSharepoint

DownloadFromSharepoint

DownloadFromSharepoint(loop=None, job=None, stat=None, **kwargs)

Bases: SharepointClient, DownloadFromBase

DownloadFromSharepoint.

Overview

This Sharepoint component downloads a file or uploads it to the Microsoft Sharepoint service

Properties (inherited from DownloadFromBase and Sharepoint)

.. table:: Properties :widths: auto

+--------------------+----------+-----------+----------------------------------------------------------------------+
| Name               | Required | Summary                                                                          |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| credentials        |   Yes    | Credentials to establish connection with SharePoint site (username and password) |
|                    |          | get credentials from environment if null.                                        |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| site               |   Yes    | The URL of the SharePoint site.                                                  |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| verify             |   No     | Whether to verify the SSL certificate of the SharePoint site (default: True).    |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| timeout            |   No     | The timeout value for SharePoint operations (default: 30 seconds).               |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| create_destination |   No     | Boolean flag indicating whether to create the destination directory if it        |
|                    |          | doesn't exist (default: True).                                                   |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| tenant             |   Yes    | Is the set of site collections of SharePoint                                     |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| file_id            |   Yes    | Identificador del archivo en sharepoint                                          |
+--------------------+----------+-----------+----------------------------------------------------------------------+

Save the downloaded files on the new destination.

Example:

```yaml
DownloadFromSharepoint:
  credentials:
    username: SHAREPOINT_TROCADV_USERNAME
    password: SHAREPOINT_TROCADV_PASSWORD
    tenant: symbits
    site: FlexRoc
  file:
    filename: Bose Schedule.xlsx
    directory: Shared Documents/General/Monthly Schedule & Reporting
  destination:
    directory: /home/ubuntu/symbits/bose/stores/
    filename: Bose Schedule - {today}.xlsx
  masks:
    '{today}':
    - today
    - mask: '%Y-%m-%d'
```

DownloadFromSmartSheet

DownloadFromSmartSheet Download an Excel file from SmartSheet.

  Example:

  ```yaml
  DownloadFromSmartSheet:
    comments: Download an SmartSheet Tab into an Excel file.
    file_id: '373896609326980'
    file_format: application/vnd.ms-excel
    destination:
      filename: WORP MPL 2022.xlsx
      directory: /home/ubuntu/symbits/worp/files/stores/
  ```

DownloadFromSmartSheet

DownloadFromSmartSheet(loop=None, job=None, stat=None, **kwargs)

Bases: SmartSheetClient, DownloadFromBase

DownloadFromSmartSheet

Overview

Download an Excel file or CSV file from SmartSheet.

Properties (inherited from DownloadFromBase)

.. table:: Properties :widths: auto

+--------------------+----------+-----------+----------------------------------------------------------------------+
| Name               | Required | Summary                                                                          |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| credentials        |   Yes    | Credentials to establish connection with SharePoint site (username and password) |
|                    |          | get credentials from environment if null.                                        |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| create_destination |   No     | Boolean flag indicating whether to create the destination directory if it        |
|                    |          | doesn't exist (default: True).                                                   |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| api_key            |   No     | The SmartSheet API key (can be provided as an environment variable or directly   |
|                    |          | set as a property). If not provided, tries to use the `SMARTSHEET_API_KEY`       |
|                    |          | environment variable.                                                            |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| url                |   No     | Base URL for the SmartSheet Sheets API (default:                                 |
|                    |          | https://api.smartsheet.com/2.0/sheets/).                                         |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| file_id            |   Yes    | The ID of the SmartSheet file to download.                                       |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| file_format        |   No     | The desired file format for the downloaded data (default:                        |
|                    |          | "application/vnd.ms-excel"). Supported formats are:                              |
|                    |          | * "application/vnd.ms-excel" (Excel)                                             |
|                    |          | * "text/csv" (CSV)                                                               |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| filename           |   Yes    | The filename to use for the downloaded file.                                     |
+--------------------+----------+-----------+----------------------------------------------------------------------+

Save the downloaded files on the new destination.

DownloadS3File

DownloadS3File

DownloadS3File(loop=None, job=None, stat=None, **kwargs)

Bases: Boto3Client, FlowComponent, PandasDataframe

Download a file from an S3 bucket.

Parameters:

Name Type Description Default
loop AbstractEventLoop

The event loop to use.

None
job Callable

The job to run.

None
stat Callable

The statistics to collect.

None
**kwargs

Additional arguments.

{}
parse_s3_url
parse_s3_url(url)

Parse an S3 URL into bucket and key.

Parameters:

Name Type Description Default
url str

The S3 URL to parse.

required

Returns:

Name Type Description
tuple tuple

A tuple containing the bucket name and the key.

run async
run()

run.

Download the file from S3.
start async
start(**kwargs)

start.

Initialize Task.

Dummy

Dummy

Dummy(job=None, *args, **kwargs)

Bases: FlowComponent

close async
close()

close.

Close (if needed) a task

run async
run()

run.

Close (if needed) a task

save_stats
save_stats()

Extension to save stats for this component

start async
start(**kwargs)

start.

Initialize (if needed) a task

Example:

Dummy:
  message: 'Dummy Date: {firstdate} and {lastdate}'
  masks:
    firstdate:
    - date_diff_dow
    - day_of_week: monday
      diff: 8
      mask: '%Y-%m-%d'
    lastdate:
    - date_diff_dow
    - day_of_week: monday
      diff: 2
      mask: '%Y-%m-%d'

DuplicatePhoto

DuplicatePhoto

DuplicatePhoto(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

DuplicatePhoto.

Check if Photo is Duplicated and add a column with the result. This component is used to check if a photo is duplicated in the dataset. It uses the image hash to check if the photo is duplicated. The image hash is a unique identifier for the image. The image hash is calculated using the image hash algorithm. The image hash algorithm is a fast and efficient way to calculate the hash of an image. saves a detailed information about matches based on perceptual hash and vector similarity.

pgvector_init async
pgvector_init(conn)

Initialize pgvector extension in PostgreSQL.

run async
run()

Run the duplicate detection with enhanced information.

qid

qid(name)

Very small helper to quote SQL identifiers safely. Raises if name contains anything but letters, digits or '_'.

EmployeeEvaluation

EmployeeEvaluation

EmployeeEvaluation(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

EmployeeEvaluation

Overview

The EmployeeEvaluation class is a component for interacting with an IA Agent evaluating Users chats.

.. table:: Properties :widths: auto

+------------------+----------+--------------------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                                      |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| output_column    |   Yes    | Column for saving the Customer Satisfaction information.                                         |
+------------------+----------+--------------------------------------------------------------------------------------------------+

Return

A Pandas Dataframe with the EmployeeEvaluation statistics.
run async
run()

run

Overview

The run method is a method for running the CustomerSatisfaction component.

Return

A Pandas Dataframe with the Customer Satisfaction statistics.
start async
start(**kwargs)

start

Overview

The start method is a method for starting the EmployeeEvaluation component.

Return

True if the EmployeeEvaluation component started successfully.

ExecuteSQL

ExecuteSQL

ExecuteSQL(loop=None, job=None, stat=None, **kwargs)

Bases: QSSupport, TemplateSupport, FlowComponent

ExecuteSQL

Overview

Executes one or more SQL queries against a PostgreSQL database, also can execute SQL's in a file.

Properties (inherited from FlowComponent)

.. table:: Properties :widths: auto

+--------------+----------+-----------+-------------------------------------------------------+
| Name         | Required | Summary                                                           |
+--------------+----------+-----------+-------------------------------------------------------+
| skipError    |   No     | The name of the database schema to use (default: "").             |
+--------------+----------+-----------+-------------------------------------------------------+
| sql          |   No     | A raw SQL query string to execute.                                |
+--------------+----------+-----------+-------------------------------------------------------+
| file_sql     |   No     | A path (string) or list of paths (strings) to SQL files           |
|              |          | containing the queries to execute.                                |
+--------------+----------+-----------+-------------------------------------------------------+
| pattern      |   No     | A dictionary mapping variable names to functions that return      |
|              |          | the corresponding values to be used in the SQL query.             |
+--------------+----------+-----------+-------------------------------------------------------+
| use_template |   No     | Whether to treat the SQL string as a template and use the         |
|              |          | `_templateparser` component to render it (default: False).        |
+--------------+----------+-----------+-------------------------------------------------------+
| multi        |   No     | Whether to treat the `sql` property as a list of multiple         |
|              |          | queries to execute sequentially (default: False).                 |
+--------------+----------+-----------+-------------------------------------------------------+
| exec_timeout |   No     | The timeout value for executing a single SQL query                |
|              |          | (default: 3600 seconds).                                          |
+--------------+----------+-----------+-------------------------------------------------------+

Methods

  • open_sqlfile(self, file: PurePath, **kwargs) -> str: Opens an SQL file and returns its content.
  • get_connection(self, event_loop: asyncio.AbstractEventLoop): Creates a connection pool to the PostgreSQL database.
  • _execute(self, query, event_loop): Executes a single SQL query asynchronously.
  • execute_sql(self, query: str, event_loop: asyncio.AbstractEventLoop) -> str: Executes an SQL query and returns the result.

Notes

  • This component uses asynchronous functions for non-blocking I/O operations.
  • Error handling is implemented to catch exceptions during database connection, SQL execution, and file operations.
  • Supports loading SQL queries from files.
  • Supports using templates for dynamic SQL generation.
  • Supports executing multiple queries sequentially.

    Example:

ExecuteSQL:
    file_sql: fill_employees.sql
close async
close()

Closing Database Connection.

run async
run()

Run Raw SQL functionality.

start async
start(**kwargs)

Start Component

ExtractHTML

ExtractHTML

ExtractHTML(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, PandasDataframe

ExtractHTML

Overview:
Extract HTML using XPATH or BS CSS Selectors.



Example:

```yaml
ExtractHTML:
  custom_parser: trustpilot_reviews
  as_dataframe: true
```
get_soup
get_soup(content, parser='html.parser')

Get a BeautifulSoup Object.

FileBase

FileBase

FileBase(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

FileBase

Overview

Abstract base class for file-based components.

Properties (inherited from FlowComponent)

.. table:: Properties :widths: auto

+--------------------+----------+-----------+------------------------------------------------------------+
| Name               | Required | Summary                                                                |
+--------------------+----------+-----------+------------------------------------------------------------+
| create_destination |   No     | Boolean flag indicating whether to create the destination              |
|                    |          | directory if it doesn't exist (default: True).                         |
+--------------------+----------+-----------+------------------------------------------------------------+
| directory          |   Yes    | The path to the directory containing the files.                        |
+--------------------+----------+-----------+------------------------------------------------------------+
| filename           |   No     | A filename (string), list of filenames (strings), or a                 |
|                    |          | glob pattern (string) to identify files.                               |
+--------------------+----------+-----------+------------------------------------------------------------+
| file               |   No     | A pattern or filename (string) to identify files.                      |
|                    |          | (This property takes precedence over 'filename' if both are specified.)|
+--------------------+----------+-----------+------------------------------------------------------------+
close abstractmethod async
close()

Method.

get_filelist
get_filelist()

Retrieves a list of files based on the component's configuration.

This method determines the list of files to process based on the component's attributes such as 'pattern', 'file', or 'filename'. It applies any masks or variables to the file patterns if specified.

Returns:

Type Description
list[PurePath]

list[PurePath]: A list of PurePath objects representing the files to be processed. If no specific pattern or filename is set, it returns all files in the component's directory.

run abstractmethod async
run()

Run File checking.

start async
start(**kwargs)

Check for File and Directory information.

FileCopy

FileCopy

FileCopy(loop=None, job=None, stat=None, **kwargs)

Bases: FileBase

FileCopy

Overview

Copies files from a source directory to a destination directory.

Properties (inherited from FileBase)

.. table:: Properties :widths: auto

+--------------------+----------+-----------+-------------------------------------------------------------------+
| Name               | Required | Summary                                                                       |
+--------------------+----------+-----------+-------------------------------------------------------------------+
| create_destination |   No     | Boolean flag indicating whether to create the destination                     |
|                    |          | directory if it doesn't exist (default: True).                                |
+--------------------+----------+-----------+-------------------------------------------------------------------+
| source             |   Yes    | A dictionary specifying the source directory and filename.                    |
|                    |          | (e.g., {"directory": "/path/to/source", "filename": "myfile.txt"})            |
+--------------------+----------+-----------+-------------------------------------------------------------------+
| destination        |   Yes    | A dictionary specifying the destination directory and optionally filename.    |
|                    |          | (e.g., {"directory": "/path/to/destination", "filename": "renamed_file.txt"}) |
+--------------------+----------+-----------+-------------------------------------------------------------------+
| rename             |   No     | Boolean flag indicating if the file have a new name                           |
+--------------------+----------+-----------+-------------------------------------------------------------------+
| remove_source      |   No     | Boolean flag indicating whether to remove the source file(s) after            |
|                    |          | copying (default: False).                                                     |
+--------------------+----------+-----------+-------------------------------------------------------------------+



Example:

```yaml
FileCopy:
  source:
    filename: '*.xlsx'
    directory: /home/ubuntu/symbits/bose/stores/
  remove_source: true
  destination:
    directory: /home/ubuntu/symbits/bose/stores/backup/
```
close async
close()

Method.

run async
run()

Delete File(s).

start async
start(**kwargs)

Check for File and Directory information.

FileDelete

FileDelete

FileDelete(loop=None, job=None, stat=None, **kwargs)

Bases: FileBase

FileDelete

Overview

This component remove all files in a Directory.

.. table:: Properties :widths: auto

+--------------+----------+-----------+-------------------------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+-------------------------------------------------------+ | file | Yes | A dictionary containing two values, "pattern" and "value", | | | | "pattern" and "value", "pattern" contains the path of the | | | | file on the server, If it contains the mask "{value}", then | | | | "value" is used to set the value of that mask | +--------------+----------+-----------+-------------------------------------------------------+ | pattern | Yes | Allows you to replace values ( ".xls", ".csv", ) | +--------------+----------+-----------+-------------------------------------------------------+ | directory | Yes | The directory where are the files to delete | +--------------+----------+-----------+-------------------------------------------------------+ | value | Yes | Name of the function and the arguments it receives for example | | | | [ "current_date", { "mask":"%Y&m%d" } -> 20220909 | +--------------+----------+-----------+-------------------------------------------------------+ | dictionary | Yes | Path where to validate if the file exist | +--------------+----------+-----------+-------------------------------------------------------+

Example:

```yaml
FileDelete:
  file:
    pattern: '*.csv'
    value: ''
  directory: /home/ubuntu/symbits/bayardad/files/job_advertising/bulk/
```
close async
close()

Method.

run async
run()

Delete File(s).

FileExists

FileExists

FileExists(loop=None, job=None, stat=None, **kwargs)

Bases: FileBase

FileExists Overview

This component validate if a file or a list of files exist in a directory.

.. table:: Properties :widths: auto

+--------------+----------+-----------+-------------------------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+-------------------------------------------------------+ | file | Yes | A dictionary containing two values, "pattern" and "value", | | | | "pattern" and "value", "pattern" contains the path of the | | | | file on the server, If it contains the mask "{value}", then | | | | "value" is used to set the value of that mask | +--------------+----------+-----------+-------------------------------------------------------+ | pattern | Yes | Allows you to replace values ( ".xls", ".csv" ) | +--------------+----------+-----------+-------------------------------------------------------+ | value | No | Name of the function and the arguments it receives for example | | | | [ "current_date", { "mask":"%Y&m%d" } -> 20220909 | +--------------+----------+-----------+-------------------------------------------------------+ | mask | No | A mask is applied with the today attribute, with date format | | | | {“Y-m-d”} | +--------------+----------+-----------+-------------------------------------------------------+ | directory | Yes | Path to validate if the file exists | +--------------+----------+-----------+-------------------------------------------------------+ | date | No | File date | +--------------+----------+-----------+-------------------------------------------------------+ | diff | No | Evaluate the difference in the file | +--------------+----------+-----------+-------------------------------------------------------+ | depens | No | The file depends on a previous process | +--------------+----------+-----------+-------------------------------------------------------+ | filename | Yes | Identify the file name | +--------------+----------+-----------+-------------------------------------------------------+

Return the validation if the file exist or not

Example:

```yaml
FileExists:
  file:
    pattern: '{value}-adp_worker.csv'
    value:
    - today
    - mask: '%Y-%m-%d'
  directory: /home/ubuntu/symbits/walmart/files/adfs/
```
close async
close()

Method.

run async
run()

Run File checking.

FileIteratorDelete

FileIteratorDelete

FileIteratorDelete(loop=None, job=None, stat=None, **kwargs)

Bases: IteratorBase

FileIteratorDelete

Overview

The FileIteratorDelete class is a component for removing all files in a specified directory. It extends the
IteratorBase class and provides methods for directory validation, file listing based on patterns, and file deletion.

.. table:: Properties :widths: auto

+------------------+----------+--------------------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                                      |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| directory        |   Yes    | The directory from which files will be deleted.                                                  |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| _filenames       |   No     | A list to store filenames that match the pattern.                                                |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| _path            |   No     | The Path object for the directory.                                                               |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| pattern          |   No     | The pattern to match files for deletion.                                                         |
+------------------+----------+--------------------------------------------------------------------------------------------------+

Return

The methods in this class manage the deletion of files in a specified directory, including initialization, file listing,
and deletion.
start async
start(**kwargs)

Check if Directory exists.

FileList

FileList

FileList(loop=None, job=None, stat=None, **kwargs)

Bases: IteratorBase

FileList with optional Parallelization Support.

Overview

This component iterates through a specified directory and returns a list of files based on a provided pattern or individual files. It supports asynchronous processing and offers options for managing empty results and detailed error handling.

.. table:: Properties :widths: auto

+---------------------+----------+-------------------------------------------------------------------------------------------------------+ | Name | Required | Description | +---------------------+----------+-------------------------------------------------------------------------------------------------------+ | directory (str) | Yes | Path to the directory containing the files to be listed. | +---------------------+----------+-------------------------------------------------------------------------------------------------------+ | pattern (str) | No | Optional glob pattern for filtering files (overrides individual files if provided). | +---------------------+----------+-------------------------------------------------------------------------------------------------------+ | filename (str) | No | Name of the files | +---------------------+----------+-------------------------------------------------------------------------------------------------------+ | iterate (bool) | No | Flag indicating whether to iterate through the files and process them sequentially (defaults to True).| +---------------------+----------+-------------------------------------------------------------------------------------------------------+ | generator (bool) | No | Flag controlling the output format: True returns a generator, False (default) returns a list. | +---------------------+----------+-------------------------------------------------------------------------------------------------------+ | file (dict) | No | A dictionary containing two values, "pattern" and "value", "pattern" and "value", | | | | "pattern" contains the path of the file on the server, If it contains the mask "{value}", | | | | then "value" is used to set the value of that mask | +---------------------+----------+-----------+-------------------------------------------------------------------------------------------+ | parallelize | No | If True, the iterator will process rows in parallel. Default is False. | +---------------------+----------+-----------+-------------------------------------------------------------------------------------------+ | num_threads | No | Number of threads to use if parallelize is True. Default is 10. | +---------------------+----------+-----------+-------------------------------------------------------------------------------------------+

Return the list of files in a Directory

Example:

```yaml
FileList:
  directory: /home/ubuntu/symbits/bayardad/files/job_advertising/bulk/
  pattern: '*.csv'
  iterate: true
```
start async
start(**kwargs)

Check if Directory exists.

FileOpen

FileOpen

FileOpen(loop=None, job=None, stat=None, **kwargs)

Bases: FileBase

FileOpen.

**Overview**

This component opens one or more files asynchronously and returns their contents as streams.
It supports handling both individual filenames and lists of filenames.
It provides error handling for missing files or invalid file types.


.. table:: Properties
:widths: auto

+---------------------+----------+-------------------------------------------------------------------------------------------------------+ | Name | Required | Description | +---------------------+----------+-------------------------------------------------------------------------------------------------------+ | directory (str) | Yes | Path to the directory containing the files to be listed. | +---------------------+----------+-------------------------------------------------------------------------------------------------------+ | pattern (str) | No | Optional glob pattern for filtering files (overrides individual files if provided). | +---------------------+----------+-------------------------------------------------------------------------------------------------------+ | filename (str) | No | Name of the files | +---------------------+----------+-------------------------------------------------------------------------------------------------------+ | file (dict) | No | A dictionary containing two values, "pattern" and "value", "pattern" and "value", | | | | "pattern" contains the path of the file on the server, If it contains the mask "{value}", | | | | then "value" is used to set the value of that mask | +---------------------+----------+-----------+-------------------------------------------------------------------------------------------+

run async
run()

Run File checking.

FileRead

FileRead

FileRead(loop=None, job=None, stat=None, **kwargs)

Bases: FileBase, PandasDataframe

FileRead.

**Overview**

Read an String File and returned as string (non-binary)


.. table:: Properties
:widths: auto

+---------------------+----------+-------------------------------------------------------------------------------------------------------+ | Name | Required | Description | +---------------------+----------+-------------------------------------------------------------------------------------------------------+ | directory (str) | Yes | Path to the directory containing the files to be listed. | +---------------------+----------+-------------------------------------------------------------------------------------------------------+ | pattern (str) | No | Optional glob pattern for filtering files (overrides individual files if provided). | +---------------------+----------+-------------------------------------------------------------------------------------------------------+ | filename (str) | No | Name of the files | +---------------------+----------+-------------------------------------------------------------------------------------------------------+ | file (dict) | No | A dictionary containing two values, "pattern" and "value", "pattern" and "value", | | | | "pattern" contains the path of the file on the server, If it contains the mask "{value}", | | | | then "value" is used to set the value of that mask | +---------------------+----------+-----------+-------------------------------------------------------------------------------------------+

Example:

```yaml
FileRead:
  file: recap_response_payloads.json
  directory: recaps/
  is_json: true
```
close async
close()

Method.

run async
run()

Run File checking.

start async
start(**kwargs)

Check for File and Directory information.

FileRename

FileRename

FileRename(loop=None, job=None, stat=None, **kwargs)

Bases: FileBase

FileRename.

Overview

This component renames a file asynchronously based on provided source and destination paths. It supports handling missing files and offers optional behavior for ignoring missing source files. It performs basic validation to ensure the destination doesn't overwrite existing files.

.. table:: Properties :widths: auto

+--------------------------+----------+---------------------------------------------------------------------------------------------------+ | Name | Required | Description | +--------------------------+----------+---------------------------------------------------------------------------------------------------+ | directory (str) | Yes | Path to the directory containing the source file. | +--------------------------+----------+-----------+---------------------------------------------------------------------------------------+ | destination (str) | Yes | New filename (with optional variable replacement using set_variables and mask_replacement) | | | | for the file. | +--------------------------+----------+-----------+---------------------------------------------------------------------------------------+ | ignore_missing (bool) | No | Flag indicating whether to ignore missing source files (defaults to False). | +--------------------------+----------+---------------------------------------------------------------------------------------------------+ | source (str) | Yes | Filename (with optional variable replacement using set_variables and mask_replacement) | | | | of the file to rename. | +--------------------------+----------+-----------+---------------------------------------------------------------------------------------+

Example:

```yaml
FileRename:
  Group: ICIMSRename
  ignore_missing: true
  directory: /home/ubuntu/symbits/icims/files/forms/
  source: '{form_id}_{form_data_id}.txt'
  destination: '{form_id}_{form_data_id}_{associate_id} - {full_name}.txt'
```
close async
close()

Method.

run async
run()

Delete File(s).

start async
start(**kwargs)

Check for File and Directory information.

FilterIf

FilterIf

FilterIf(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

FilterIf.

Overview

    The FilterIf is a component that applies specified filters to a Pandas DataFrame.
    if the condition is met, the row is kept, otherwise it is discarded.
    at result set (if any) will be executed a subset of components.

.. table:: Properties
:widths: auto

    +--------------+----------+-----------+---------------------------------------------------------------+
    | Name         | Required | Summary                                                                   |
    +--------------+----------+-----------+---------------------------------------------------------------+
    | operator     |   Yes    | Logical operator (e.g., `and`, `or`) used to combine filter conditions.   |
    +--------------+----------+-----------+---------------------------------------------------------------+
    | conditions   |   Yes    | List of conditions with columns, values, and expressions for filtering.   |
    |              |          | Format: `{ "column": <col_name>, "value": <val>, "expression": <expr> }`  |
    +--------------+----------+-----------+---------------------------------------------------------------+
    | filter       |   Yes    | List of conditions with columns, values, and expressions for filtering.   |
    |              |          | Format: `{ "column": <col_name>, "value": <val>, "expression": <expr> }`  |
    +--------------+----------+-----------+---------------------------------------------------------------+
    | true_condition|  Yes    | List of components to execute if the condition is met.                   |
    +--------------+----------+-----------+---------------------------------------------------------------+
    | false_condition| Yes    | List of components to execute if the condition is not met.               |
    +--------------+----------+-----------+---------------------------------------------------------------+
    | passthrough |   No     | If True, the component will pass through the data without filtering.      |
    +--------------+----------+-----------+---------------------------------------------------------------+
Returns

    This component returns a filtered Pandas DataFrame based on the provided conditions.
    The component tracks metrics such as the initial and filtered row counts,
    and optionally limits the returned columns if specified.
    Additional debugging information can be outputted based on configuration.

Example:
- FilterIf:
    operator: "&"
    filter:
        - column: previous_form_id
        expression: not_null
    true_condition:
        - TransformRows:
            replace_columns: true
            fields:
            form_id: previous_form_id
        - ExecuteSQL:
            file_sql: delete_previous_form.sql
            use_template: true
            use_dataframe: true

FilterRows

FilterRows.

Component for filtering, dropping, cleaning Pandas Dataframes.

FilterRows

FilterRows
FilterRows(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

FilterRows

Overview

The FilterRows class is a component for removing or cleaning rows in a Pandas DataFrame based on specified criteria.
It supports various cleaning and filtering operations and allows for the saving of rejected rows to a file.

.. table:: Properties :widths: auto

+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                                      |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| fields           |   Yes    | A dictionary defining the fields and corresponding filtering conditions to be applied.           |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| filter_conditions|   Yes    | A dictionary defining the filter conditions for transformations.                                 |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| _applied         |   No     | A list to store the applied filters.                                                             |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| multi            |   No     | A flag indicating if multiple DataFrame transformations are supported, defaults to False.        |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+

Return

The methods in this class manage the filtering of rows in a Pandas DataFrame, including initialization, execution,
and result handling.



Example:

```yaml
FilterRows:
  filter_conditions:
    clean_empty:
      columns:
      - updated
    drop_columns:
      columns:
      - legal_street_address_1
      - legal_street_address_2
      - work_location_address_1
      - work_location_address_2
      - birth_date
    suppress:
      columns:
      - payroll_id
      - reports_to_payroll_id
      pattern: (\.0)
  drop_empty: true
```
start async
start(**kwargs)

Obtain Pandas Dataframe.

functions

Functions for FilterRows.

filter_rows_by_column
filter_rows_by_column(df, column_source, pattern, column_destination)

Filter rows in a DataFrame by removing rows where column_destination values are found in column_source values.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to filter.

required
column_source str

The name of the column to extract values from.

required
pattern str

The regex pattern to match (for string columns) or None for integer columns.

required
column_destination str

The name of the column to check against.

required

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame with rows removed where column_destination values are in column_source values.

GCSToBigQuery

GCSToBigQuery

GCSToBigQuery(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

GCSToBigQuery.

Este componente carga un archivo CSV desde un bucket específico de GCS a una tabla de BigQuery.

close async
close()

Cierra la conexión AsyncDB.

run async
run()

Ejecuta la carga del CSV desde GCS a BigQuery.

start async
start(**kwargs)

Inicializa el componente configurando la conexión AsyncDB.

GoogleA4

GoogleA4

GoogleA4(loop=None, job=None, stat=None, **kwargs)

Bases: QSBase

GoogleA4

Overview

The GoogleA4 class is a component for interacting with Google Analytics 4 (GA4) to fetch and transform report data.
It extends the QSBase class and provides methods for retrieving reports and transforming the data into a specified format.

.. table:: Properties :widths: auto

+--------------+----------+-----------+----------------------------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+----------------------------------------------------------+ | datalist | Yes | Method for reports | +--------------+----------+-----------+----------------------------------------------------------+ | subtask | Yes | Identifiers of property and metrics | +--------------+----------+-----------+----------------------------------------------------------+ | type | Yes | Defines the type of data handled by the component, set to "report". | +--------------+----------+-----------+----------------------------------------------------------+ | _driver | Yes | Specifies the driver used by the component, set to "ga". | +--------------+----------+-----------+----------------------------------------------------------+ | _metrics | Yes | A dictionary mapping GA4 metrics to their corresponding output names.| +--------------+----------+-----------+----------------------------------------------------------+ | _qs | Yes | Instance of the QSBase class used to interact with the data source. | +--------------+----------+-----------+----------------------------------------------------------+

Raises:
    DataNotFound: If no data is found.
    ComponentError: If any other error occurs during execution.

Return

The methods in this class return the requested report data from Google Analytics 4, formatted according to the specific requirements of the component.

Example:

```yaml
GoogleA4:
  type: report
  property_id: '306735132'
  pattern:
    start_date:
    - date_diff
    - value: now
      diff: 14
      mode: days
      mask: '%Y-%m-%d'
    end_date:
    - today
    - mask: '%Y-%m-%d'
  dimensions:
  - mobileDeviceBranding
  - mobileDeviceModel
  metric:
  - sessions
  - totalUsers
  - newUsers
  - engagedSessions
  - sessionsPerUser
  company_id: 52
  ga4_dimension: 10
```

GoogleGeoCoding

GoogleGeoCoding

GoogleGeoCoding(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

Google GeoCoding

Overview

This component retrieves geographical coordinates (latitude and longitude) for a given set of addresses using Google Maps Geocoding API. It utilizes asynchronous processing to handle multiple requests concurrently and offers error handling for various scenarios.

.. table:: Properties :widths: auto

+-----------------------+----------+------------------------------------------------------------------------------------------------------+
| Name                  | Required | Description                                                                                          |
+-----------------------+----------+------------------------------------------------------------------------------------------------------+
| data (pd.DataFrame)   |   Yes    | Pandas DataFrame containing the addresses. Requires a column with the address information.           |
+-----------------------+----------+------------------------------------------------------------------------------------------------------+
| columns (list)        |   Yes    | List of column names in the DataFrame that contain the address components (e.g., ["street", "city"]).|
+-----------------------+----------+------------------------------------------------------------------------------------------------------+

Return

The component modifies the input DataFrame by adding new columns named
'latitude', 'longitude', 'formatted_address', 'place_id' and 'zipcode' containing the retrieved
geocoding information for each address. The original DataFrame is returned.


Example:

```yaml
GoogleGeoCoding:
  skipError: skip
  place_prefix: account_name
  use_find_place: true
  return_pluscode: true
  chunk_size: 50
  keywords:
  - electronics_store
  columns:
  - street_address
  - city
  - state_code
  - zipcode
```
chunkify
chunkify(lst, n)

Split list lst into chunks of size n.

find_place async
find_place(address, place_prefix=None, fields='name,place_id,plus_code,formatted_address,geometry,type', get_plus_code=True)

Searches for a place using the Google Places API.

Parameters:

Name Type Description Default
idx

row index

required
row

pandas row

required
return_pluscode

return the Google +Code

required
place_prefix str

adding a prefix to address

None

Returns:

Type Description
dict

The Place ID of the first matching result, or None if no results are found.

GooglePlaces

GooglePlaces

GooglePlaces(loop=None, job=None, stat=None, **kwargs)

Bases: GoogleBase

index_get
index_get(array, *argv)

checks if a index is available in the array and returns it :param array: the data array :param argv: index integers :return: None if not available or the return value

rating_reviews async
rating_reviews(idx, row)

Getting Place Reviews using the Google Places API.

Parameters:

Name Type Description Default
idx int

row index

required
row Series

pandas row

required

Returns:

Type Description

Review Information.

Example:

GooglePlaces:
  type: traffic
  paid_proxy: true
run async
run()

Run the Google Places API.

traffic async
traffic(idx, row)

get the current status of popular times (Traffic).

Parameters:

Name Type Description Default
idx int

row index

required
row Series

pandas row

required

Returns:

Type Description

Place information with Traffic.

GoogleSearch

GoogleSearch

GoogleSearch(loop=None, job=None, stat=None, **kwargs)

Bases: GoogleBase

Google Custom Search Component

Overview:

This component performs Google Custom Search queries using the Google Custom Search API. It can search for specific queries and return results including URLs, titles, and snippets. The component can receive search terms either from a previous component or a list of terms specified in the configuration.

.. table:: Properties :widths: auto

+-----------------------+----------+------------------------------------------------------------------------------------------------------+ | Name | Required | Description | +-----------------------+----------+------------------------------------------------------------------------------------------------------+ | terms (list) | No | List of search terms to use. Required if no previous component provided | +-----------------------+----------+------------------------------------------------------------------------------------------------------+ | column (str) | No | Name of the column in the DataFrame when using a previous component | +-----------------------+----------+------------------------------------------------------------------------------------------------------+ | site (str) | No | Optional site restriction for the search (e.g., 'site:example.com') | +-----------------------+----------+------------------------------------------------------------------------------------------------------+ | max_results (int) | No | Maximum number of results to return per search (default: 1) | +-----------------------+----------+------------------------------------------------------------------------------------------------------+

Return:

The component returns a DataFrame with columns: 'search_term', 'search_url', 'search_title', 'search_snippet' containing the search results.

close async
close()

Clean up resources.

get_ddg_service async
get_ddg_service()

Get the DuckDuckGo search service.

get_search_service
get_search_service()

Get the Google Custom Search service.

run async
run()

Execute searches for each query in the DataFrame.

search async
search(idx, row)

Perform search using selected engine and return results.

start async
start(**kwargs)

Initialize the component and validate required parameters.

HTTPClient

Basic HTTP Connection Request.

Example:

HTTPClient:
  Group: ICIMSDownload
  url: https://api.icims.com/customers/{customer_id}/forms/{form_data_id}.txt
  customer_id: '5674'
  form_data_id: '1731'
  form_id: '1823'
  associate_id: '518491'
  auth_type: basic
  use_proxy: false
  use_async: true
  credentials:
    username: ICIMS_API_USERNAME
    password: ICIMS_API_PASSWORD
  as_dataframe: false
  download: true
  timeout: 360
  destination:
    directory: /home/ubuntu/symbits/icims/files/forms/{associate_id}/
    filename: '{form_id}_{filename}.html'
    overwrite: false
  no_errors:
    '403':
      errorMessage: This form has been disabled
      errorCode: 3

HTTPClient

HTTPClient(loop=None, job=None, stat=None, **kwargs)

Bases: DownloadFromBase, HTTPService

HTTPClient.

Overview

   UserComponent: (abstract) ->

.. table:: Properties :widths: auto

+-----------+----------+-----------+----------------------------------------------+ | Name | Required | Summary | +-----------+----------+-----------+----------------------------------------------+ | start | Yes | It is executed when the component is "initialized", | | | | it MUST return TRUE if it does not fail | +-----------+----------+-----------+----------------------------------------------+ | run | Yes | Is the code that will execute the component ,must return TRUE or | | | | a content if it does not fail, but fails is declared | +-----------+----------+-----------+-----------------------------------------------+ | close | Yes | It is used for any cleaning operation | +-----------+----------+-----------+-----------------------------------------------+

Return the list of arbitrary days

evaluate_error async
evaluate_error(response, message)

evaluate_response.

Check Response status and available payloads. Args: response (type): description url (str): description

Returns:

Name Type Description
tuple tuple

description

run async
run()

Executes the HTTPClient component, handling multiple URLs if provided.

:return: The result of the HTTP request(s).

start async
start(**kwargs)

Initializes the HTTPClient component, including fetching proxies if necessary.

:param kwargs: Additional keyword arguments. :return: True if initialization is successful.

var_replacement
var_replacement()

Replaces variables in the arguments with their corresponding values.

ICIMS

ICIMS

ICIMS(loop=None, job=None, stat=None, **kwargs)

Bases: QSBase

ICIMS

Overview

The ICIMS class is a specialized component for handling form data interactions within the ICIMS system.
It extends the QSBase class and provides methods for retrieving people data, individual person data,
lists of forms, and form data with specific metrics mappings.

.. table:: Properties :widths: auto

+------------------+----------+--------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                          |
+------------------+----------+--------------------------------------------------------------------------------------+
| type             |   Yes    | Defines the type of data handled by the component.                                   |
+------------------+----------+--------------------------------------------------------------------------------------+
| conditions       |    No    | If any condition is required to do the work.                                         |
+------------------+----------+--------------------------------------------------------------------------------------+

Methods

people

Retrieves a list of people data from the ICIMS system.

Raises:
    DataNotFound: If no data is found.
    ComponentError: If any other error occurs during execution.

person

Retrieves individual person data from the ICIMS system and adds a person_id to the data.

Raises:
    DataNotFound: If no data is found.
    ComponentError: If any other error occurs during execution.

forms_list

Retrieves a list of forms from the ICIMS system.

Raises:
    DataNotFound: If no data is found.
    ComponentError: If any other error occurs during execution.

form_data

Retrieves form data from the ICIMS system and maps internal metrics to output names.

Raises:
    DataNotFound: If no data is found.
    ComponentError: If any other error occurs during execution.

Return

The methods in this class return the requested data from the ICIMS system, formatted according to the specific requirements of each method.

Example:

```yaml
ICIMS:
  type: people
```

IF

IF

IF(loop=None, job=None, stat=None, **kwargs)

Bases: BaseLoop

IF Component

Executes one of two components based on a condition.

true_component: The component to execute if the condition evaluates to True. false_component: The component to execute if the condition evaluates to False.

run async
run()

Executes the appropriate component based on the condition.

start async
start(**kwargs)

Initialize the component.

IcimsFolderCopy

IcimsFolderCopy

IcimsFolderCopy(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

IcimsFolderCopy.

Copies folders from one directory to another based on data retrieved from a PostgreSQL database. Supports three modes: copying all folders, copying based on the associate's name, or copying based on the associate's ID.

Properties: +-------------------+-------------+-------------------------------------------------+ | Name | Required | Summary | +-------------------+-------------+-------------------------------------------------+ | driver | Yes | pg (default asyncdb PostgreSQL driver) | | source_directory | Yes | Directory where folders are located | | destination_dir | Yes | Directory where folders will be copied to | | by_name | No | Person's name to filter by | | by_associate_id | No | Associate ID to filter by | +-------------------+-------------+-------------------------------------------------+

close async
close(connection=None)

Close the database connection.

copy_folders
copy_folders(folder_codes)

Copy folders and handle missing folders gracefully.

run async
run()

Execute the folder copying process based on database results using Pandas.

start async
start(**kwargs)

Prepare component for execution.

ImageFeatures

ImageFeatures

ImageFeatures(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

ImageFeatures is a component for extracting image features. It extends the FlowComponent class and implements a Plugin system for various image processing tasks.

Parameters:

Name Type Description Default
*args

Variable length argument list.

required
**kwargs

Arbitrary keyword arguments.

{}

Attributes:

Name Type Description
_model_name str

The name of the model used for feature extraction.

get_bio
get_bio(bio)

Return a BytesIO object for a payload.

run async
run()

run.

Execute the plugin List to extract image features.
start async
start(**kwargs)

start.

Initialize Task.

process

ImageFeatures
ImageFeatures(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

ImageFeatures is a component for extracting image features. It extends the FlowComponent class and implements a Plugin system for various image processing tasks.

Parameters:

Name Type Description Default
*args

Variable length argument list.

required
**kwargs

Arbitrary keyword arguments.

{}

Attributes:

Name Type Description
_model_name str

The name of the model used for feature extraction.

get_bio
get_bio(bio)

Return a BytesIO object for a payload.

run async
run()

run.

Execute the plugin List to extract image features.
start async
start(**kwargs)

start.

Initialize Task.

IteratorBase

IteratorBase

IteratorBase(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

IteratorBase

Overview

The IteratorBase class is an abstract component for handling iterative tasks. It extends the FlowComponent class
and provides methods for starting tasks, retrieving steps, and executing jobs asynchronously.

.. table:: Properties :widths: auto

+-----------------+----------+-------------------------------------------------------------------------------+
| Name            | Required | Description                                                                   |
+-----------------+----------+-------------------------------------------------------------------------------+
| iterate         |   No     | Boolean flag indicating if the component should                               |
|                 |          | iterate the components or return the list, defaults to False.                 |
+-----------------+----------+-------------------------------------------------------------------------------+

The methods in this class manage the execution of iterative tasks, including initialization, step retrieval,
job creation, and asynchronous execution.
start async
start(**kwargs)

start.

Initialize (if needed) a task

LangchainLoader

LangchainLoader

LangchainLoader(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

LangchainLoader.

Overview:

Getting a list of documents and convert into Langchain Documents.

Example:

```yaml
LangchainLoader:
    path: /home/ubuntu/symbits/lg/bot/products_positive
    source_type: Product-Top-Reviews
    loader: HTMLLoader
    chunk_size: 2048
    elements:
    - div: .product
```
get_default_llm
get_default_llm()

Return a VertexLLM instance.

loader

LangchainLoader
LangchainLoader(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

LangchainLoader.

Overview:

Getting a list of documents and convert into Langchain Documents.

Example:

```yaml
LangchainLoader:
    path: /home/ubuntu/symbits/lg/bot/products_positive
    source_type: Product-Top-Reviews
    loader: HTMLLoader
    chunk_size: 2048
    elements:
    - div: .product
```
get_default_llm
get_default_llm()

Return a VertexLLM instance.

loaders

abstract
AbstractLoader
AbstractLoader(tokenizer=None, text_splitter=None, summarizer=None, markdown_splitter=None, source_type='file', doctype='document', device=None, cuda_number=0, llm=None, **kwargs)

Bases: ABC

Abstract class for Document loaders.

get_default_llm
get_default_llm()

Return a VertexLLM instance.

get_summary_from_text
get_summary_from_text(text, use_gpu=False)

Get a summary of a text.

load async
load(path)

Load data from a source and return it as a Langchain Document.

Parameters:

Name Type Description Default
path Union[str, PurePath, List[PurePath]]

The source of the data.

required

Returns:

Type Description
List[Document]

List[Document]: A list of Langchain Documents.

resolve_paths
resolve_paths(path)

Resolve the input path into a list of file paths. Handles lists, directories, glob patterns, and single file paths.

Parameters:

Name Type Description Default
path Union[str, PurePath, List[PurePath]]

Input path(s).

required

Returns:

Type Description
List[Path]

List[Path]: A list of resolved file paths.

basepdf
BasePDF
BasePDF(**kwargs)

Bases: AbstractLoader

Base Abstract loader for all PDF-file Loaders.

load async
load(path)

Load data from a source and return it as a Langchain Document.

Parameters:

Name Type Description Default
path Union[str, PurePath, List[PurePath]]

The source of the data.

required

Returns:

Type Description
List[Document]

List[Document]: A list of Langchain Documents.

docx
MSWordLoader
MSWordLoader(tokenizer=None, text_splitter=None, summarizer=None, markdown_splitter=None, source_type='file', doctype='document', device=None, cuda_number=0, llm=None, **kwargs)

Bases: AbstractLoader

Load Microsoft Docx as Langchain Documents.

extract_text
extract_text(path)

Extract text from a docx file.

Parameters:

Name Type Description Default
path Path

The source of the data.

required

Returns:

Name Type Description
str

The extracted text.

html
HTMLLoader
HTMLLoader(**kwargs)

Bases: AbstractLoader

Loader for HTML files to convert into Langchain Documents.

Processes HTML files, extracts relevant content, converts to Markdown, and associates metadata with each document.

pdfblocks
PDFBlocks
PDFBlocks(table_settings={}, **kwargs)

Bases: BasePDF

Load a PDF Table as Blocks of text.

get_markdown
get_markdown(df)

Convert a DataFrame to a Markdown string.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to convert.

required

Returns:

Name Type Description
str str

The JSON string.

unique_columns
unique_columns(df)

Rename duplicate columns in the DataFrame to ensure they are unique.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame with potential duplicate column names.

required

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame with unique column names.

pdfmark
PDFMarkdown
PDFMarkdown(**kwargs)

Bases: BasePDF

Loader for PDF files converted content to markdown.

pdftables
PDFTables
PDFTables(table_settings={}, **kwargs)

Bases: BasePDF

Loader for Tables present on PDF Files.

get_markdown
get_markdown(df)

Convert a DataFrame to a Markdown string.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to convert.

required

Returns:

Name Type Description
str str

The JSON string.

unique_columns
unique_columns(df)

Rename duplicate columns in the DataFrame to ensure they are unique.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame with potential duplicate column names.

required

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame with unique column names.

qa
QAFileLoader
QAFileLoader(columns=['Question', 'Answer'], **kwargs)

Bases: AbstractLoader

Question and Answers File based on Excel, coverted to Langchain Documents.

load async
load(path)

Load data from a source and return it as a Langchain Document.

Parameters:

Name Type Description Default
path Path

The source of the data.

required

Returns:

Type Description
List[Document]

List[Document]: A list of Langchain Documents.

txt
TXTLoader
TXTLoader(tokenizer=None, text_splitter=None, summarizer=None, markdown_splitter=None, source_type='file', doctype='document', device=None, cuda_number=0, llm=None, **kwargs)

Bases: AbstractLoader

Loader for PDF files.

LeadIQ

LeadIQ

LeadIQ(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, HTTPService

LeadIQ API Component

Overview:

This component interacts with the LeadIQ GraphQL API to retrieve company and employee information. Supports different types of searches through the 'type' parameter.

.. table:: Properties :widths: auto

+-----------------------+----------+------------------------------------------------------------------------------------------------------+ | Name | Required | Description | +-----------------------+----------+------------------------------------------------------------------------------------------------------+ | type | Yes | Type of search to perform: 'company', 'employees' or 'flat' | +-----------------------+----------+------------------------------------------------------------------------------------------------------+ | column | No | Name of the column containing company names (default: 'company_name') | +-----------------------+----------+------------------------------------------------------------------------------------------------------+ | companies | No | List of company names to search (alternative to using DataFrame input) | +-----------------------+----------+------------------------------------------------------------------------------------------------------+

Returns:

Type Description

DataFrame containing the requested information based on the search type

close async
close()

Clean up resources.

get_leadiq_url
get_leadiq_url(resource, args=None)

Construct LeadIQ API URL with optional query parameters.

run async
run()

Execute searches based on the specified type.

search_company async
search_company(company_name)

Basic company search.

search_employees async
search_employees(company_name)

Search for employees at a company.

search_flat async
search_flat(company_name)

Flat search for employees at a company.

start async
start(**kwargs)

Initialize the component and validate inputs.

Loop

Loop

Loop(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

Loop.

Overview

The Loop class is a FlowComponent that is used to iterate over the next Component and execute them in a sequential order. It extends the FlowComponent class and provides methods for starting tasks, retrieving steps, and executing jobs asynchronously.

create_component
create_component(target, value=None, **params)

get_component.

Create a new component instance.

run async
run()

Async Run Method.

start async
start(**kwargs)

start.

Initialize (if needed) a task

Lowes

Scrapping a Web Page Using Selenium + ChromeDriver + BeautifulSoup.

    Example:

    ```yaml
    Lowes:
      type: reviews
      use_proxies: true
      paid_proxy: true
      api_token: xxx
    ```

Lowes

Lowes(loop=None, job=None, stat=None, **kwargs)

Bases: ReviewScrapper

Lowes.

Combining API Key and Web Scrapping, this component will be able to extract Lowes Information (reviews, etc).

column_exists
column_exists(column, default_val=None)

Check if column exists in DataFrame, create if not.

product async
product()

Extract basic product information from Lowes using Selenium.

product_details async
product_details()

product_details.

Get Product Details from Lowes URL.

product_info async
product_info()

product_info.

Get Product Information from Lowes URL.

reviews async
reviews()

reviews.

Target Product Reviews.

top_reviewed async
top_reviewed()

Wrapper para top_reviewed_products para integrarlo con el sistema de type. Permite llamarlo como type='top_reviewed'.

top_reviewed_products async
top_reviewed_products(brand, top_n=3, **kwargs)

Busca productos por brand y devuelve un DataFrame con los top N productos con más reviews. Incluye: nombre, precio, url, imagen, sku, cantidad de reviews y rating. Usa los selectores reales del HTML de Lowes.

MS365Usage

MS365Usage

MS365Usage(loop=None, job=None, stat=None, **kwargs)

Bases: Azure

MS365Usage

Overview

The MS365Usage class is a component for retrieving Microsoft 365 usage reports via the Microsoft Graph API.
It extends the Azure class and supports various report types such as M365, Teams, SharePoint, OneDrive, and Yammer.

.. table:: Properties :widths: auto

+------------------+----------+--------------------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                                      |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| report_type      |   No     | The type of report to retrieve, defaults to "M365".                                              |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| usage_method     |   No     | The usage method for the report, defaults to "UserDetail".                                       |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| _report          |   Yes    | The specific report endpoint to use based on the report type and usage method.                   |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| period           |   No     | The period for the report, defaults to "D7".                                                     |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| format           |   No     | The format of the report, defaults to "text/csv".                                                |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| method           |   Yes    | The HTTP method to use for the API request, set to "GET".                                        |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| download         |   Yes    | Flag indicating if a file download is required, set to False.                                    |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| url              |   Yes    | The formatted URL for the API request.                                                           |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| app              |   Yes    | The MSAL app instance for authentication.                                                        |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| token_type       |   Yes    | The type of authentication token.                                                                |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| auth             |   Yes    | Dictionary containing the authentication details.                                                |
+------------------+----------+--------------------------------------------------------------------------------------------------+

Return

The methods in this class manage the retrieval and processing of Microsoft 365 usage reports, including initialization,
API request execution, and result handling.



Example:

```yaml
MS365Usage:
  period: D7
  report_type: Yammer
  usage_method: UserDetail
  credentials:
    client_id: OFFICE_365_REPORT_ID
    client_secret: OFFICE_365_REPORT_SECRET
    tenant_id: AZURE_ADFS_TENANT_ID
```
run async
run()

Run Azure Connection for getting Users Info.

MSTeamsMessages

MSTeamsMessages

MSTeamsMessages(loop=None, job=None, stat=None, **kwargs)

MarketClustering

MarketClustering

MarketClustering(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

Offline clustering of stores using BallTree+DBSCAN (in miles or km), then generating a fixed number of ghost employees for each cluster, refining if store-to-ghost distance > threshold, and optionally checking daily route constraints.

Steps

1) Clustering with DBSCAN (haversine + approximate). 2) Create ghost employees at cluster centroid (random offset). 3) Remove 'unreachable' stores if no ghost employee can reach them within a threshold (e.g. 25 miles). 4) Check if a single ghost can cover up to max_stores_per_day in a route < day_hours or max_distance_by_day. If not, we mark that store as 'rejected' too. 5) Return two DataFrames: final assignment + rejected stores.

Parameters:

Name Type Description Default
cluster_radius default

150.0)

required
Purpose

Controls the search radius for the BallTree clustering algorithm

required
Usage

Converted to radians and used in tree.query_radius() to find nearby stores during cluster formation

required
Effect

Determines how far apart stores can be and still be considered for the same cluster during the initial clustering phase

required
Location

Used in _create_cluster() method

required
max_cluster_distance default

50.0)

required
Purpose

Controls outlier detection within already-formed clusters

required
Usage

Used in _detect_outliers() to check if stores are too far from their cluster's centroid

required
Effect

Stores farther than this distance from their cluster center get marked as outliers

required
Location

Used in validation after clusters are formed

required
get_rejected_stores
get_rejected_stores()

Return the DataFrame of rejected stores (those removed from any final market).

load_graph_from_pbf
load_graph_from_pbf(pbf_path, bounding_box)

Load a road network graph from a PBF file for the specified bounding box. Args: pbf_path (str): Path to the PBF file. north, south, east, west (float): Bounding box coordinates. Returns: nx.MultiDiGraph: A road network graph for the bounding box.

run async
run()

1) Cluster with BallTree + K-Means validation. 2) Road-based validation: assign stores to ghost employees via VRP. 3) Remove any stores that cannot be assigned within constraints. 4) Re-assign rejected stores if possible. 5) Add cluster centroids to result DataFrame. 6) Return final assignment + rejected stores.

start async
start(**kwargs)

Validate input DataFrame and columns.

create_data_model

create_data_model(distance_matrix, num_vehicles, depot=0, max_distance=150, max_stores_per_vehicle=3)

Stores the data for the VRP problem.

print_routes

print_routes(routes, store_ids)

Prints the routes in a readable format.

solve_vrp

solve_vrp(data)

Solves the VRP problem using OR-Tools and returns the routes.

MergeFiles

MergeFiles

MergeFiles(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

MergeFiles

Overview

The MergeFiles class is a component for merging multiple files into a single file or dataframe. It supports various
file formats, including CSV, Excel, and HTML, and handles encoding detection and conversion as needed.

.. table:: Properties :widths: auto

+------------------+----------+--------------------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                                      |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| filename         |   No     | The name of the merged output file.                                                              |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| file             |   No     | The file object to be merged.                                                                    |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| filepath         |   No     | The directory path for the merged output file.                                                   |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| ContentType      |   No     | The content type of the files being merged, defaults to "text/csv".                              |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| as_dataframe     |   No     | Boolean flag indicating if the result should be returned as a dataframe, defaults to False.      |
+------------------+----------+--------------------------------------------------------------------------------------------------+

Return

The methods in this class manage the merging of files, including initialization, execution, and result handling.



Example:

```yaml
MergeFiles:
  ContentType: application/vnd.ms-excel
  model: worked_hours
  pd_args:
    skiprows: 6
  as_dataframe: true
```
close async
close()

close.

close method
run async
run()

run.

Run the connection and merge all the files
start async
start(**kwargs)

start. Start connection.

MilvusOutput

MilvusOutput

MilvusOutput(loop=None, job=None, stat=None, **kwargs)

Bases: MilvusStore, FlowComponent

Milvus Database Vectorstore Output.

Example:
MilvusOutput:
    credentials:
    collection_name: lg_products
    db_name: lg
    embedding_model:
    model_name: thenlper/gte-base
    model_type: transformers
    vector_field: vector
    text_field: text
    pk: source_type
    consistency_level: Bounded
run async
run()

Saving Langchain Documents on a Milvus Database.

NearByStores

NearByStores

NearByStores(job=None, *args, **kwargs)

Bases: FlowComponent

NearByStores.

Overview

Calculates the nearest stores to an employee location.

Example:

NearByStores:
  depends:
  - QueryToPandas_1
  - QueryToPandas_2
  radius: 50

NetworkNinja

NetworkNinja

NetworkNinja(loop=None, job=None, stat=None, **kwargs)

Bases: HTTPService, FlowComponent

NetworkNinja.

Overview: Router for processing NetworkNinja Payloads.
Properties

+---------------+----------+------------------------------------------+ | Name | Required | Description | +---------------+----------+------------------------------------------+ | action | Yes | Type of operation (get_batch, etc) | +---------------+----------+------------------------------------------+ | credentials | No | API credentials (taken from config) | +---------------+----------+------------------------------------------+ | payload | No | Additional payload parameters | +---------------+----------+------------------------------------------+

Supported Types
  • get_batch: Retrieves batch acceptance data
get_batch async
get_batch()

Handle get_batch operation type

Uses to download a Batch from NetworkNinja SQS Queue.

get_multi_batches async
get_multi_batches()

Get Multiples batches at once.

report_batch async
report_batch(batch_id, report_code=200)

Handle report_batch operation type

Uses to report a Batch to NetworkNinja SQS Queue.

run async
run()

Run NetworkNinja Router.

models

abstract
AbstractPayload

Bases: BaseModel

Abstract Payload Model.

Common fields implemented by any Object in NetworkNinja Payloads.

ensure_timezone
ensure_timezone(dt)

Ensure a datetime has timezone information.

insert_record async
insert_record(conn, **kwargs)

Insert Record to Database.

on_sync async
on_sync(conn, upsert=True)

Sync Current Object with the Database.

save async
save(conn, pk=None, **kwargs)

Save the Object to the Database.

sync async
sync(**kwargs)

Sync the Object with the Database

update_many async
update_many(objects, primary_keys=None, **kwargs)

Upsert Several Records in Database.

upsert_record async
upsert_record(**kwargs)

Upsert Record to Database.

account
Account

Bases: AbstractPayload

{ "metadata": { "type": "retailer", "transaction_type": "UPSERT", "source": "MainEvent", "client": "global", "client_id": 1, "orgid": null, "timestamp": 1742240432.348096 }, "payload": { "account_id": 26, "account_name": "Brandsmart", "active": true } },

events
Event

Bases: AbstractPayload

Event Model.

EventPosition

Bases: BaseModel

Event Position Model.

EventPunch

Bases: AbstractPayload

Event Punch Model.

Example

{

    "cico_id": 4089,
    "event_id": 12272,
    "visitor_id": 18180,
    "latitude": "32.8066746",
    "longitude": "-97.426601",
    "cico_type": "out",
    "related_checkout_id": null,
    "total_hours": null,
    "client_id": 57,
    "orgid": 106,
    "visitor_name": "Margaret Ojeogwu",
    "timestamp_utc": "2025-03-22T01:17:36+00:00"
}
forms
Condition

Bases: BaseModel

Defines a Condition, a condition for a Logic Group. Example: { "condition_id": 1835, "condition_logic": "EQUALS", "condition_comparison_value": "Regular", "condition_question_reference_id": 48, "condition_option_id": 4308 }

Form

Bases: AbstractPayload

Reference to a Form:

on_sync async
on_sync()

Sync the form with the database.

FormData

Bases: AbstractPayload

Defines a Form Data, a collection of responses to a Form.

Example

{ "form_data_id": 1, "formid": 1, "client_id": 59, "orgid": 77, "store_id": 1, "store_name": "Best Buy 4350", "user_id": 1, "user_name": "Arturo", "created_at": "2025-02-01T00:00:00-06:00", "updated_at": "2025-02-01T00:00:00-06:00", "form_responses": [ { "column_name": "8550", "data": "Arturo", "question_shown_to_user": true, "column_id": "150698" } ] }

save async
save(conn, pk=None, **kwargs)

Always do an UPSERT on Form Data:

FormDefinition

Bases: AbstractPayload

Defines a Form (recap) definition.

FormMetadata

Bases: AbstractPayload

Defines a Form Metadata, a single question from a Form.

Example

{ "column_name": "8452", "description": "Please provide a photo of the starting odometer reading", "is_active": true, "data_type": "FIELD_IMAGE_UPLOAD", "formid": 1, "form_name": "Territory Manager Visit Form TEST", "client_id": 59, "client_name": "TRENDMICRO", "orgid": 77 }

insert_record async
insert_record(conn, **kwargs)

Insert Record to Database.

FormResponse

Bases: BaseModel

Defines a Form Response, a response to a Form.

Example

{ "event_id": 10516, "column_name": 8550, "data": "Arturo", "question_shown_to_user": true, "column_id": "150698" }

LogicGroup

Bases: BaseModel

Defines a Logic Group, a group of questions in a Form. Example: { "logic_group_id": 1706, "conditions": [ { "condition_id": 1835, "condition_logic": "EQUALS", "condition_comparison_value": "Regular", "condition_question_reference_id": 48, "condition_option_id": 4308 } ] }

Question

Bases: BaseModel

Defines a Question, a single question in a Form. Example: { "question_id": 48, "question_column_name": "8501", "question_description": "Purpose of Visit", "question_logic_groups": [], "validations": [ { "validation_id": 43, "validation_type": "responseRequired", "validation_logic": null, "validation_comparison_value": null, "validation_question_reference_id": null } ] }

QuestionBlock

Bases: BaseModel

Defines a Question Block, a collection of questions in a Form.

Example

{ "question_block_id": 9, "question_block_type": "simple", "question_block_logic_groups": [], "questions": [] }

Validation

Bases: BaseModel

Defines a Validation, a validation rule for a question. Example: { "validation_id": 43, "validation_type": "responseRequired", "validation_logic": null, "validation_comparison_value": null, "validation_question_reference_id": null }

organization
Organization
photos
Document

Bases: AbstractPayload

Document Model.

Represents a document in the system.

Example

{ "document_id": 1, "document_name": "Test Document", "document_path": "https://www.example.com/test.pdf", "description": "Test Document Description", "created_on": "2025-02-01T00:00:00-06:00", "store_number": "12345", "account_id": 1, "account_name": "Test Account", "question_name": "Test Question", "url_parts": "https://www.example.com/test.pdf" }

PhotoCategory

Bases: AbstractPayload

Photo Category Model.

Represents a photo category in the system.

Example

{ "category_id": 53, "name": "TEST Category", "client_id": 61, "enabled": false, "orgid": 71, "client_name": "TRO Walmart Assembly" }

project
Project

Bases: AbstractPayload

NN Projects related to a Client.

Example payload: "payload": { "project_id": 595, "project_name": "Test Program", "is_active": true, "start_timestamp": "2025-02-01T00:00:00-06:00", "end_timestamp": "2025-02-28T00:00:00-06:00", "inserted_at": "2025-01-30T19:19:40-06:00", "updated_at": "2025-01-30T19:19:40-06:00", "description": "", "orgid": 69, "client_id": 56, "client_name": "EPSON" }

store
CustomStoreField

Bases: BaseModel

Custom Field Model for Store.

Represents a custom field for a store.

Example

{ "custom_id": 33, "custom_name": "Store Name", "custom_value": "Best Buy 4350", "custom_orgid": null, "custom_client_id": 1

}

Store

Bases: AbstractPayload

Store Model.

Represents a store in the system.

Example

{ "store_name": "KILMARNOCK-4350", "store_address": "200 Old Fair Grounds Way", "city": "Kilmarnock", "zipcode": "22482", "phone_number": "804-435-6149", }

StoreGeography

Bases: AbstractPayload

Store Geography Model.

Represents a store's geographical information.

Example

{ "geoid": 479, "region": "Assembly - Region", "district": "Assembly - District", "market": "136", "company_id": 61, "orgid": 71, "client_id": 61, "client_name": "ASSEMBLY" }

StoreType

Bases: AbstractPayload

Store Type Model.

Represents a store type in the system.

Example

{ "store_type_id": 1, "store_type_name": "Retail", "store_type_description": "Retail Store" }

user
User

Bases: AbstractPayload

User Model.

Represents a user in the system.

Example

{ "user_id": 1, "username": "admin", "employee_number": 1234, "first_name": "John", "last_name": "Doe", "email": " "mobile_number": "123-456-7890", "role_id": 1, "role_name": "Admin", "address": "1234 Elm St", "city": "Springfield", "state_code": "IL", "zipcode": "62704", "latitude": 39.781721, "longitude": -89.650148, }

router

EmptyQueue

Bases: BaseException

Exception raised when the Queue is empty.

Attributes:

Name Type Description
Example
NetworkNinja

comment: Download Batch from NetworkNinja. action: get_batch avoid_acceptance: true

NetworkNinja
NetworkNinja(loop=None, job=None, stat=None, **kwargs)

Bases: HTTPService, FlowComponent

NetworkNinja.

Overview: Router for processing NetworkNinja Payloads.
Properties

+---------------+----------+------------------------------------------+ | Name | Required | Description | +---------------+----------+------------------------------------------+ | action | Yes | Type of operation (get_batch, etc) | +---------------+----------+------------------------------------------+ | credentials | No | API credentials (taken from config) | +---------------+----------+------------------------------------------+ | payload | No | Additional payload parameters | +---------------+----------+------------------------------------------+

Supported Types
  • get_batch: Retrieves batch acceptance data
get_batch async
get_batch()

Handle get_batch operation type

Uses to download a Batch from NetworkNinja SQS Queue.

get_multi_batches async
get_multi_batches()

Get Multiples batches at once.

report_batch async
report_batch(batch_id, report_code=200)

Handle report_batch operation type

Uses to report a Batch to NetworkNinja SQS Queue.

run async
run()

Run NetworkNinja Router.

NextStopAgent

NextStop Agent.

Run queries using the NextStop Agent.

NextStopAgent

NextStopAgent(loop=None, job=None, stat=None, **kwargs)

Bases: AgentBase, FlowComponent

NextStopAgent.

Overview

The NextStopAgent class is a FlowComponent that integrates with the Parrot AI Agent framework to run queries using the NextStop Agent. It extends the AgentBase class and provides methods for creating and managing the agent's lifecycle within a FlowTask.

run async
run()

run.

Run the NextStop Agent with the provided data.
start async
start(**kwargs)

start.

Initialize (if needed) a task

NextStopResponse

Bases: BaseModel

NextStopResponse is a model that defines the structure of the response for the NextStop agent.

Odoo

Odoo

Odoo(*args, **kwargs)

Bases: HTTPService, FlowComponent

Odoo

Overview

    This component interacts with an Odoo server to perform various operations like `search_read` and `create`.

.. table:: Properties
:widths: auto


+------------------------------+----------+-----------+---------------------------------------------------------+
| Name                         | Required | Summary                                                             |
+------------------------------+----------+-----------+---------------------------------------------------------+
| credentials                  |   Yes    | A dictionary containing connection details to the Odoo server:      |
|                              |          | "HOST", "PORT", "DB", "USERNAME", "PASSWORD".                       |
+------------------------------+----------+-----------+---------------------------------------------------------+
| method                       |   Yes    | The Odoo method to be called (e.g., "search_read", "create").       |
+------------------------------+----------+-----------+---------------------------------------------------------+
| model                        |   Yes    | The Odoo model to be used for the method call (e.g., "res.partner").|
+------------------------------+----------+-----------+---------------------------------------------------------+
| domain                       |   No     | Domain filter for searching records, applicable for "search_read".  |
+------------------------------+----------+-----------+---------------------------------------------------------+
| fields                       |   No     | Fields to be retrieved, applicable for "search_read".               |
+------------------------------+----------+-----------+---------------------------------------------------------+
| values                       |   No     | Values to be used for creating records, applicable for "create".    |
+------------------------------+----------+-----------+---------------------------------------------------------+
| use_field_from_previous_step |   No     | Field from previous step to filter records in "search_read".        |
+------------------------------+----------+-----------+---------------------------------------------------------+

Returns

    This component returns a pandas DataFrame containing the results of the Odoo operation.


Example:

```yaml
Odoo:
  credentials:
    HOST: ODOO_HOST
    PORT: ODOO_PORT
    DB: ODOO_DB
    USERNAME: ODOO_USERNAME
    PASSWORD: ODOO_PASSWORD
  model: stock.warehouse
  method: search_read
  domain:
  - - - company_id.name
      - '='
      - Pokemon
  fields:
  - id
```

OdooInjector

OdooInjector

OdooInjector(loop=None, job=None, stat=None, **kwargs)

Bases: HTTPService, FlowComponent

OdooInjector

Overview

    The OdooInjector class is a component for injecting data into an Odoo server using a provided API key.
    This component takes data from a Pandas DataFrame, formats it as payload, and sends it to an Odoo endpoint
    specified in the credentials, facilitating seamless integration with Odoo’s API.

.. table:: Properties
:widths: auto

    +----------------+----------+-----------+---------------------------------------------------------------+
    | Name           | Required | Summary                                                                |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | credentials    |   Yes    | A dictionary containing connection details for the Odoo server:        |
    |                |          | "HOST", "PORT", "APIKEY", and "INJECTOR_URL".                          |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | model          |   Yes    | The Odoo model into which data will be injected.                       |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | headers        |   No     | Optional headers to include with the API request. Defaults to API key.  |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | data           |   Yes    | The data to inject, formatted as a list of dictionaries from DataFrame. |
    +----------------+----------+-----------+---------------------------------------------------------------+

Returns

    This component returns a Boolean indicating whether the data injection was successful.
    In case of errors, detailed logging is provided, and an exception is raised with the error message.
    Additionally, the component tracks successful API interactions and logs any unsuccessful payload deliveries
    for debugging and tracking.


Example:

```yaml
OdooInjector:
  credentials:
    APIKEY: ODOO_APIKEY
    HOST: ODOO_HOST
    PORT: ODOO_PORT
    INJECTOR_URL: ODOO_INJECTOR_URL
  model: fsm.location
  chunk_size: 10
```
split_chunk_df
split_chunk_df(df, chunk_size)

Splits a DataFrame into chunks of a specified size.

Parameters: df (pd.DataFrame): The DataFrame to be split. chunk_size (int): The maximum number of rows per chunk.

list: A list of DataFrame chunks. If the DataFrame is empty, returns an empty list.

OpenFromXML

OpenFromXML

OpenFromXML(job=None, *args, **kwargs)

Bases: FlowComponent

OpenFromXML

Overview

    This component opens an XML file and returns it as an XML etree Object.

.. table:: Properties
:widths: auto


+--------------+----------+-----------+-------------------------------------------------------+
| Name         | Required | Summary                                                           |
+--------------+----------+-----------+-------------------------------------------------------+
| directory    |   No     | The directory where the XML file is located.                      |
+--------------+----------+-----------+-------------------------------------------------------+
| filename     |   Yes    | The name of the XML file to be opened.                            |
+--------------+----------+-----------+-------------------------------------------------------+
| use_strings  |   No     | If True, the component will treat the input as a string containing|
|              |          | XML data.                                                         |
+--------------+----------+-----------+-------------------------------------------------------+
| as_nodes     |   No     | If set, specifies the node to be extracted from the XML tree.     |
+--------------+----------+-----------+-------------------------------------------------------+

Returns:

Type Description
  • An lxml.etree.ElementTree object if the entire XML tree is parsed.
  • A list of nodes (lxml.etree.Element) if as_nodes is set.
close async
close()

close. close method

run async
run()

run.

Open the XML file and return the object
start async
start(**kwargs)

start.

Get if directory exists

OpenWithBase

OpenWithBase

OpenWithBase(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

OpenWithBase

Overview

    Abstract Component for Opening Files into DataFrames.
    Supports various file types such as CSV, Excel, and XML.

.. table:: Properties
:widths: auto


+--------------+----------+-----------+-------------------------------------------------------+
| Name         | Required | Summary                                                           |
+--------------+----------+-----------+-------------------------------------------------------+
| directory    |   No     | The directory where the files are located.                        |
+--------------+----------+-----------+-------------------------------------------------------+
| filename     |   No     | The name of the file to be opened. Supports glob patterns.        |
+--------------+----------+-----------+-------------------------------------------------------+
| file         |   No     | A dictionary containing the file patterns to be used.             |
+--------------+----------+-----------+-------------------------------------------------------+
| mime         |   No     | The MIME type of the file. Default is "text/csv".                 |
+--------------+----------+-----------+-------------------------------------------------------+
| separator    |   No     | The delimiter to be used in CSV files. Default is ",".            |
+--------------+----------+-----------+-------------------------------------------------------+
| encoding     |   No     | The encoding of the file.                                         |
+--------------+----------+-----------+-------------------------------------------------------+
| datatypes    |   No     | Specifies the datatypes to be used for columns.                   |
+--------------+----------+-----------+-------------------------------------------------------+
| parse_dates  |   No     | Specifies columns to be parsed as dates.                          |
+--------------+----------+-----------+-------------------------------------------------------+
| filter_nan   |   No     | If True, filters out NaN values. Default is True.                 |
+--------------+----------+-----------+-------------------------------------------------------+
| na_values    |   No     | List of strings to recognize as NaN. Default is ["NULL", "TBD"].  |
+--------------+----------+-----------+-------------------------------------------------------+
| clean_nat    |   No     | If True, cleans Not-A-Time (NaT) values.                          |
+--------------+----------+-----------+-------------------------------------------------------+
| no_multi     |   No     | If True, disables multi-threading.                                |
+--------------+----------+-----------+-------------------------------------------------------+
| flavor       |   No     | Specifies the database flavor to be used for column information.  |
+--------------+----------+-----------+-------------------------------------------------------+
| force_map    |   No     | If True, forces the use of a mapping file.                        |
+--------------+----------+-----------+-------------------------------------------------------+
| skipcols     |   No     | List of columns to be skipped.                                    |
+--------------+----------+-----------+-------------------------------------------------------+
| pd_args      |   No     | Additional arguments to be passed to pandas read functions.       |
+--------------+----------+-----------+-------------------------------------------------------+

Returns

This component opens files and prepares them for further processing. The actual return type depends on the concrete
implementation, but typically it returns a list of filenames or file data.

OpenWithPandas

OpenWithPandas

OpenWithPandas(loop=None, job=None, stat=None, **kwargs)

Bases: OpenWithBase

OpenWithPandas

Overview

    Open a file and return a Dataframe type

.. table:: Properties :widths: auto

+-------------+----------+-----------+-------------------------------------------------------+ | Name | Required | Summary | +-------------+----------+-----------+-------------------------------------------------------+ | model | Yes | A model (json) representative of the data that I am going to | | | | open * name of a DataModel (in-development) | +-------------+----------+-----------+-------------------------------------------------------+ | map | Yes | Map the columns against the model | +-------------+----------+-----------+-------------------------------------------------------+ | tablename | Yes | Join the data from the table in the postgres database | +-------------+----------+-----------+-------------------------------------------------------+ | use_map | Yes | If true, then a MAP file is used instead of a table in postgresql | +-------------+----------+-----------+-------------------------------------------------------+ | file_engine | Yes | Pandas different types of engines for different types of Excel | | | | * xlrd (legacy, xls type) | | | | * openpyxl (new xlsx files) | | | | * pyxlsb (to open with macros and functions) | +-------------+----------+-----------+-------------------------------------------------------+ | dtypes | No | force the data type of a column ex: { order_date: datetime } | +-------------+----------+-----------+-------------------------------------------------------+

Return the list of arbitrary days

Example:

```yaml
OpenWithPandas:
  mime: text/csv
  process: true
  separator: '|'
  drop_empty: true
  trim: true
  pk:
    columns:
    - associate_oid
    - associate_id
    append: false
    verify_integrity: true
  map:
    tablename: employees
    schema: bacardi
    map: employees
    replace: false
```

PDFGenerator

PDFGenerator

PDFGenerator(loop=None, job=None, stat=None, **kwargs)

Bases: TemplateSupport, FlowComponent

Generates a PDF document from a DataFrame.

close async
close()

Close the component.

run async
run()

Run the PDF generation process.

start async
start(**kwargs)

Verify if the component is ready to run.

PGPDecrypt

PGPDecrypt

PGPDecrypt(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

PGPDecrypt

Overview

 Decrypt a file encrypted with PGP.
TODO: Works with several files (not only one).

.. table:: Properties :widths: auto

+--------------+----------+-----------+-------------------------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+-------------------------------------------------------+ | apply_mask | Yes | This component uses a mask to identify specific bit patterns in a | | | | byte of data | +--------------+----------+-----------+-------------------------------------------------------+ | start | Yes | We initialize the component obtaining the data through the | | | | parameter type | +--------------+----------+-----------+-------------------------------------------------------+ | close | Yes | The close method of a file object flushes any unwritten data | | | | and closes the file object | +--------------+----------+-----------+-------------------------------------------------------+

Return the list of arbitrary days

Example:

```yaml
PGPDecrypt:
  filename: LETSTALKMOBILE_NDW_TRANSACTIONS_INFOQUEST_{today}.zip.gpg
  directory: /home/ubuntu/symbits/xfinity/files/ltm/transactions/
  decrypt:
    filename: LETSTALKMOBILE_NDW_TRANSACTIONS_INFOQUEST_{today}.zip
    directory: /home/ubuntu/symbits/xfinity/files/ltm/transactions/
  masks:
    '{today}':
    - today
    - mask: '%Y%m%d'
  delete_source: true
```

PandasIterator

PandasIterator

PandasIterator(loop=None, job=None, stat=None, **kwargs)

Bases: IteratorBase

PandasIterator

Overview

    This component converts data to a pandas DataFrame in an iterator and processes each row.

.. table:: Properties
:widths: auto


+--------------+----------+-----------+------------------------------------------------------------+
| Name         | Required | Summary                                                                |
+--------------+----------+-----------+------------------------------------------------------------+
| columns      |   Yes    | Names of the columns that we are going to extract.                     |
+--------------+----------+-----------+------------------------------------------------------------+
| vars         |   Yes    | This attribute organizes names of the columns organized by id.         |
+--------------+----------+-----------+------------------------------------------------------------+
| parallelize  |   No     | If True, the iterator will process rows in parallel. Default is False. |
+--------------+----------+-----------+------------------------------------------------------------+
| num_threads  |   No     | Number of threads to use if parallelize is True. Default is 10.        |
+--------------+----------+-----------+------------------------------------------------------------+

Returns
-------
This component returns the processed pandas DataFrame after iterating
through the rows and applying the specified jobs.


Example:

```yaml
PandasIterator:
  columns:
  - formid
  - orgid
  vars:
    form: '{orgid}/{formid}'
```
createJob
createJob(target, params, row)

Create the Job Component.

run async
run()

Async Run Method.

start async
start(**kwargs)

Obtain Pandas Dataframe.

PandasToFile

PandasToFile

PandasToFile(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

PandasToFile

Overview

    This component exports a pandas DataFrame to a file in CSV, Excel, or JSON format.

.. table:: Properties
:widths: auto


+------------------------+----------+-----------+-----------------------------------------------------------------+
| Name                   | Required | Summary                                                                     |
+------------------------+----------+-----------+-----------------------------------------------------------------+
| filename               |   Yes    | The name of the file to save the DataFrame to.                              |
+------------------------+----------+-----------+-----------------------------------------------------------------+
| directory              |   No     | The directory where the file will be saved. If not specified,               |
|                        |          | it will be derived from the filename.                                       |
+------------------------+----------+-----------+-----------------------------------------------------------------+
| mime                   |   No     | The MIME type of the file. Supported types are "text/csv",                  |
|                        |          | "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",        |
|                        |          | "application/vnd.ms-excel", "application/json". Default is "text/csv".      |
+------------------------+----------+-----------+-----------------------------------------------------------------+
| zerofill               |   No     | If True, fills NaN values with "0" in string columns. Default is False.     |
+------------------------+----------+-----------+-----------------------------------------------------------------+
| quoting                |   No     | Specifies the quoting behavior for CSV files. Options are "all" (QUOTE_ALL),|
|                        |          | "string" (QUOTE_NONNUMERIC), and None (QUOTE_NONE). Default is None.        |
+------------------------+----------+-----------+-----------------------------------------------------------------+
| pd_args                |   No     | Additional arguments for pandas' to_csv, to_excel, or to_json methods.      |
|                        |          | Default is an empty dictionary.                                             |
+------------------------+----------+-----------+-----------------------------------------------------------------+
|  sep                   |   Yes    | Make a separation of the file name with this sign                           |
+------------------------+----------+-----------+-----------------------------------------------------------------+

Returns

This component returns the filename of the saved file.



Example:

```yaml
PandasToFile:
  filename: /home/ubuntu/symbits/bose/files/report/troc_open_tickets_{today}.csv
  masks:
    '{today}':
    - today
    - mask: '%Y-%m-%d'
  mime: text/csv
  quoting: string
  pd_args:
    sep: ','
```

Paradox

Paradox

Paradox(loop=None, job=None, stat=None, **kwargs)

Bases: HTTPService, CacheSupport, FlowComponent

Paradox Component

Overview

This component interacts with the Paradox API to perform various operations. The first step is to handle authentication and obtain an access token. The token is cached in Redis to avoid requesting a new one on each execution.

.. table:: Properties :widths: auto

+----------------------------+----------+----------------------------------------------------------------------------------------------+ | Name | Required | Summary | +----------------------------+----------+----------------------------------------------------------------------------------------------+ | type | Yes | Type of operation to perform with the API | +----------------------------+----------+----------------------------------------------------------------------------------------------+

candidates async
candidates()

Retrieves candidates from Paradox API using efficient pandas operations. Uses pagination to fetch all available candidates up to the maximum offset. Includes a delay between requests to avoid API rate limits.

Kwargs

offset_start (int): Starting offset for pagination (default: 0)

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing candidate information

Raises:

Type Description
ComponentError

If the request fails or returns invalid data

close async
close()

Cleanup any resources

get_cached_token async
get_cached_token()

Retrieves the cached authentication token from Redis if it exists.

run async
run()

Execute the main component logic based on the specified type. Currently supports authentication as the initial implementation.

set_auth_headers
set_auth_headers(token)

Set authentication token and headers

start async
start(**kwargs)

Initialize the component and authenticate with the API. Handles authentication flow including token caching in Redis.

ParamIterator

ParamIterator

ParamIterator(loop=None, job=None, stat=None, **kwargs)

Bases: IteratorBase

ParamIterator.

Overview

This component iterates over a set of parameters and executes a job for each set of parameters.

.. table:: Properties :widths: auto

+--------------+----------+-----------+-------------------------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+-------------------------------------------------------+ | init | Yes | This attribute is to initialize the component methods | +--------------+----------+-----------+-------------------------------------------------------+ | start | Yes | We start by validating if the file exists, then the function | | | | to get the data is started | +--------------+----------+-----------+-------------------------------------------------------+ | close | Yes | This attribute allows me to close the process | +--------------+----------+-----------+-------------------------------------------------------+ | create_job | Yes | This metod create the job component | +--------------+----------+-----------+-------------------------------------------------------+ | run | Yes | This method creates the job component by assigning parameters | | | | to it | +--------------+----------+-----------+-------------------------------------------------------+

Return the list of arbitrary days

Example:

```yaml
ParamIterator:
  params:
    formid:
    - 2552
    - 2567
    - 2569
```
start async
start(**kwargs)

Check if exists Parameters.

ParseHTML

ParseHTML

ParseHTML(job=None, *args, **kwargs)

Bases: FlowComponent

ParseHTML. Parse HTML Content using lxml etree and BeautifulSoup.

Example:

ParseHTML:
  xml: true
get_soup
get_soup(content, parser='html.parser')

Get a BeautifulSoup Object.

open_html async
open_html(filename)

Open the HTML file.

run async
run()

Open all Filenames and convert them into BeautifulSoup and etree objects.

PgVectorOutput

PgVectorOutput

PgVectorOutput(loop=None, job=None, stat=None, **kwargs)

Bases: CredentialsInterface, FlowComponent

Saving Langchain Documents on a Postgres Database using PgVector.

This component is designed to save documents into a PostgreSQL database using PgVector extension

Example:

PgVectorOutput:
    credentials:
        dsn:
    table: lg_products
    schema: lg
    embedding_model:
        model: thenlper/gte-base
        model_type: transformers
    id_column: "id"
    vector_column: 'embedding'
    pk: source_type
    create_table: true
    upsert: true
run async
run()

Saving Langchain Documents on a Postgres Database.

PlacerStores

PlacerStores

PlacerStores(loop=None, job=None, stat=None, **kwargs)

Bases: DBSupport, tPandas

PlacerStores.

Overview

The PlacerStores is used to match PlacerAI stores with Stores tables at different schemas.

Properties

.. table:: Properties :widths: auto

+------------------+----------+-----------+-----------------------------------------------------------------------------------+ | Name | Required | Type | Description | +------------------+----------+-----------+-----------------------------------------------------------------------------------+ | location_field | Yes | str | The name of the column to be used for matching. | +------------------+----------+-----------+-----------------------------------------------------------------------------------+

Return A New Dataframe with all stores matching using a Fuzzy Search Match.

find_best_match
find_best_match(address, choices, threshold=80, token_threshold=80)

Find the best fuzzy match for a given address.

Parameters: - address (str): The address to match. - choices (list): List of addresses to match against. - threshold (int): Minimum similarity score to consider a match.

Returns: - best_match (str) or None: The best matching address or None if no match meets the threshold. - score (int): The similarity score of the best match.

preprocess_address

preprocess_address(address)

Preprocess Address for Fuzzy String Matching.

Example:

PlacerStores:
  location_field: find_address

Pokemon

Pokemon

Pokemon(loop=None, job=None, stat=None, **kwargs)

Bases: HTTPService, FlowComponent

Pokémon Component

Overview

This component interacts with the Pokémon API to retrieve data about machines or their on-hand inventory. It supports two main operations determined by the type parameter:

  • "machines": Retrieves a list of machines.
  • "inventory": Retrieves on-hand inventory data for specified machines.
  • sites: Retrieves the Pokemon sites
  • ****: Retrieves the Pokemon
  • warehouses: Retrieves the Pokemon warehouses

The component handles authentication, constructs the necessary requests, processes the data, and returns a pandas DataFrame suitable for further analysis in your data pipeline.

.. table:: Properties :widths: auto

+----------------------------+----------+----------------------------------------------------------------------------------------------+ | Name | Required | Summary | +----------------------------+----------+----------------------------------------------------------------------------------------------+ | credentials | Yes | Dictionary containing API credentials: "BASE_URL", "CLIENT_ID", and "CLIENT_SECRET". | | | | Credentials can be retrieved from environment variables. | +----------------------------+----------+----------------------------------------------------------------------------------------------+ | type | Yes | Type of operation to perform. Accepts "machines" to retrieve machine data or "inventory" | | | | to retrieve machine inventory data. | +----------------------------+----------+----------------------------------------------------------------------------------------------+ | ids | No | List of machine IDs to retrieve inventory for when type is "inventory". | | | | Overrides IDs from the previous step if provided. | +----------------------------+----------+----------------------------------------------------------------------------------------------+ | data | No | Data from the previous step, typically a pandas DataFrame containing machine | | | | IDs in a column named "machine_id". Used when type is "inventory". | +----------------------------+----------+----------------------------------------------------------------------------------------------+

Returns

This component returns a pandas DataFrame containing the retrieved data from the Pokémon API. The structure of the DataFrame depends on the operation type:

  • For type = "machines": The DataFrame contains information about machines, with columns corresponding to the machine attributes provided by the API.
  • For type = "inventory": The DataFrame contains on-hand inventory details for each machine, including machineId and detailed slot information.

    Example:

    Pokemon:
      credentials:
        BASE_URL: POKEMON_BASE_URL
        CLIENT_ID: POKEMON_CLIENT_ID
        CLIENT_SECRET: POKEMON_CLIENT_SECRET
      type: machines
    
get_machines_inventory_payload
get_machines_inventory_payload(machines)

Create payload following API specification

split_chunk_ids staticmethod
split_chunk_ids(items, chunk_size)

Splits a Series of IDs into chunks of a specified size.

Parameters: items (pd.Series): A pandas Series containing the IDs to be split. chunk_size (int): The maximum number of IDs per chunk.

list: A list of NumPy arrays, each containing a chunk of IDs. If the Series is empty or all IDs are NaN, returns an empty list or a list containing an empty array.

PositiveBot

PositiveBot

PositiveBot(loop=None, job=None, stat=None, **kwargs)

Bases: ParrotBot, FlowComponent

PositiveBot.

Overview

The PositiveBot class is a component for interacting with an IA Agent for making Customer Satisfaction Analysis.
It extends the FlowComponent class.

.. table:: Properties :widths: auto

+------------------+----------+--------------------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                                      |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| output_column    |   Yes    | Column for saving the Customer Satisfaction information.                                         |
+------------------+----------+--------------------------------------------------------------------------------------------------+

Return

A Pandas Dataframe with the Customer Satisfaction statistics.

PowerPointSlide

PowerPointSlide

PowerPointSlide(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, PowerPointClient

PowerPointSlide

Overview

This component dynamically generates PowerPoint slides from a Pandas DataFrame and a .pptx template. It supports multiple custom modes (e.g., single, double, retailer_view, overview_3up, etc.) defined declaratively via a YAML configuration. Each mode specifies the layout and number of images per slide.

You can name your modes arbitrarily — e.g., single, double, quadruple, 1x1, retailer_block, etc. The logic does not assume or enforce fixed names. What matters is that each mode entry in mode_content defines a valid layout index and list of image placeholders.

The component intelligently splits grouped rows into chunks based on the selected mode’s image capacity. If there are leftover rows that don’t fill a full chunk, the component will automatically fall back to another mode with fewer images, based on availability.

Configuration Parameters (YAML):

.. table:: Properties :widths: auto

+---------------------+----------+-----------------------------------------------------------------+ | Name | Required | Summary | +---------------------+----------+-----------------------------------------------------------------+ | template_path | Yes | Absolute path to the .pptx PowerPoint template file. | +---------------------+----------+-----------------------------------------------------------------+ | output_file_path | Yes | Output path where the final .pptx will be saved. | +---------------------+----------+-----------------------------------------------------------------+ | mode | Yes | Primary mode to use (e.g., double, retailer_block, etc.) | | | | Controls the layout and chunking logic for slides. | +---------------------+----------+-----------------------------------------------------------------+ | mode_content | Yes | Either a dict of modes, or a dict with key file pointing to a | | | | .yaml or .json file containing the full mode structure. | +---------------------+----------+-----------------------------------------------------------------+

Variable Masking

Text and image fields in the YAML configuration can embed dynamic variables using curly-brace syntax. These placeholders are automatically replaced at runtime using each row's values from the DataFrame.

Example: text: "{retailer} #{location_code}, {city}, {state_code}" → becomes: "Target #1234, Miami, FL"

Missing columns will leave the placeholder unresolved (e.g., {missing_key}).

Text and Image Rendering

Each images: block must define: - an image placeholder name - a scale_factor - a path (can include variable masks) - an optional nested text: block: - placeholder_id: text placeholder where the caption will be rendered - text: masked string - plus optional formatting: font_size, font_color, bold, etc.

Text content shared across the slide (like headers) is defined in text_content: and rendered using the first row in the chunk.

Grouping and Fallback Logic
  • If group_by is defined, rows will be grouped accordingly.
  • Each group is divided into chunks of size N, where N = number of images in the selected mode.
  • Leftover rows that don’t fill a full slide are rendered using another mode that supports exactly the number of remaining images.
  • The fallback is automatically selected by checking all other modes and picking the one with the largest number of images ≤ remaining.

Example mode_content (YAML snippet):

mode: double
mode_content:
  # file: "slides_mode_content.yaml" (Optional, excludes following mode declarations)
  single:
    default_master_index: 0
    default_layout_index: 1
    text_content:
     - "Text Placeholder 3":
        text: "{retailer} | "
     - "Text Placeholder 3":
        text: "{category}"
        font_size: 24
        font_color: "808080"
        bold: False
    images:
     - "Picture Placeholder 1":
        scale_factor: 0.32 (optional)
        path: "{file_path}"
        # data: "{file_data}" (optional, excludes using path)
        text:
          placeholder_id: "Text Placeholder 2"
          text: "{retailer} #{location_code}, {city}, {state_code}"

  double:
    group_by: ["retailer", "category"]
    default_master_index: 0
    default_layout_index: 2
    text_content:
     - "Text Placeholder 5":
        text: "{retailer} | "
     - "Text Placeholder 5":
        text: "{category}"
        font_size: 24
        font_color: "808080"
        bold: False
    images:
     - "Picture Placeholder 2":
        scale_factor: 0.32 (optional)
        path: "{file_path}"
        # data: "{file_data}" (optional, excludes using path)
        text:
          placeholder_id: "Text Placeholder 1"
          text: "{retailer} #{location_code}, {city}, {state_code}"

     - "Picture Placeholder 3":
        scale_factor: 0.32 (optional)
        path: "{file_path}"
        # data: "{file_data}" (optional, excludes using path)
        text:
          placeholder_id: "Text Placeholder 4"
          text: "{retailer} #{location_code}, {city}, {state_code}"

  triple:
    group_by: ["retailer", "category"]
    default_master_index: 0
    default_layout_index: 3
    text_content:
     - "Text Placeholder 5":
        text: "{retailer} | "
     - "Text Placeholder 5":
        text: "{category}"
        font_size: 24
        font_color: "808080"
        bold: False
    images:
     - "Picture Placeholder 2":
        scale_factor: 0.32 (optional)
        path: "{file_path}"
        # data: "{file_data}" (optional, excludes using path)
        text:
          placeholder_id: "Text Placeholder 1"
          text: "{retailer} #{location_code}, {city}, {state_code}"

     - "Picture Placeholder 3":
        scale_factor: 0.32(optional)
        path: "{file_path}"
        # data: "{file_data}" (optional, excludes using path)
        text:
          placeholder_id: "Text Placeholder 4"
          text: "{retailer} #{location_code}, {city}, {state_code}"

     - "Picture Placeholder 6":
        scale_factor: 0.32(optional)
        path: "{file_path}"
        # data: "{file_data}" (optional, excludes using path)
        text:
          placeholder_id: "Text Placeholder 7"
          text: "{retailer} #{location_code}, {city}, {state_code}"

  quadruple:
    group_by: ["retailer", "category"]
    default_master_index: 0
    default_layout_index: 4
    text_content:
     - "Text Placeholder 5":
        text: "{retailer} | "
     - "Text Placeholder 5":
        text: "{category}"
        font_size: 24
        font_color: "808080"
        bold: False
    images:
     - "Picture Placeholder 2":
        scale_factor: 0.32(optional)
        path: "{file_path}"
        # data: "{file_data}" (optional, excludes using path)
        text:
          placeholder_id: "Text Placeholder 1"
          text: "{retailer} #{location_code}, {city}, {state_code}"

     - "Picture Placeholder 3":
        scale_factor: 0.32(optional)
        path: "{file_path}"
        # data: "{file_data}" (optional, excludes using path)
        text:
          placeholder_id: "Text Placeholder 4"
          text: "{retailer} #{location_code}, {city}, {state_code}"

     - "Picture Placeholder 6":
        scale_factor: 0.32(optional)
        path: "{file_path}"
        # data: "{file_data}" (optional, excludes using path)
        text:
          placeholder_id: "Text Placeholder 8"
          text: "{retailer} #{location_code}, {city}, {state_code}"

     - "Picture Placeholder 7":
        scale_factor: 0.32(optional)
        path: "{file_path}"
        # data: "{file_data}" (optional, excludes using path)
        text:
          placeholder_id: "Text Placeholder 9"
          text: "{retailer} #{location_code}, {city}, {state_code}"

  retailer_4up:
    default_layout_index: 4
    ...

Returns:

Name Type Description
str

Absolute path to the saved PowerPoint presentation file.

Raises:

Type Description
DataNotFound

If the DataFrame input is empty or not provided.

ValueError

If no suitable fallback layout is available for remaining records in a group.

PrintMessage

PrintMessage

PrintMessage(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

PrintMessage

Overview

    This component prints a formatted message to the console with optional coloring and logging.

.. table:: Properties
:widths: auto


+-------------+----------+-----------+-------------------------------------------------------+
| Name        | Required | Summary                                                           |
+-------------+----------+-----------+-------------------------------------------------------+
| message     |   Yes    | The message to print, with optional variable substitution.        |
+-------------+----------+-----------+-------------------------------------------------------+
| color       |   No     | The color to use for the message. Overrides the level-based color.|
+-------------+----------+-----------+-------------------------------------------------------+
| level       |   No     | The log level of the message ("INFO", "DEBUG", "WARN", "ERROR",   |
|             |          | "CRITICAL"). Default is "INFO".                                   |
+-------------+----------+-----------+-------------------------------------------------------+
| condition   |   No     | A condition to evaluate before printing the message. The message  |
|             |          | is printed only if the condition is True.                         |
+-------------+----------+-----------+-------------------------------------------------------+
|  first      |   Yes    | First message                                                     |
+-------------+----------+-----------+-------------------------------------------------------+
|  last       |   Yes    | Last message                                                      |
+-------------+----------+-----------+-------------------------------------------------------+

Returns

This component returns the printed message.


Example:

```yaml
PrintMessage:
  message: 'End Form Metadata: {orgid}/{formid}'
  color: green
  level: WARN
```
close async
close()

Method.

run async
run()

Run Message.

start async
start(**kwargs)

Initialize the color setup.

ProductCompetitors

ProductCompetitors

ProductCompetitors(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, SeleniumService, HTTPService

Product Competitors Scraper Component

Overview: Pluggable component for scraping product information from competitors (BestBuy and Lowes).

Properties: - url_column (str): Name of the column containing URLs to scrape (default: 'url') - account_name_column (str): Name of the column containing retailer name (default: 'account_name') - product_id_column (str): Name of the column containing product IDs (default: 'product_id') - competitors (list): List of competitor brands to search for (e.g. ['Insignia', 'TCL', 'LG', 'Sony', 'Samsung'])

close async
close()

Clean up resources.

run async
run()

Execute scraping for all URLs in the DataFrame.

start async
start(**kwargs)

Initialize the component and validate required parameters.

parsers

BestBuyScrapper
BestBuyScrapper(*args, **kwargs)

Bases: ProductCompetitorsBase

product_information async
product_information(response, idx, row)

Get the product information from BestBuy.

LowesScrapper
LowesScrapper(*args, **kwargs)

Bases: ProductCompetitorsBase

connect async
connect()

Creates the Driver and Connects to the Site.

disconnect async
disconnect()

Disconnects the Driver and closes the Connection.

product_information async
product_information(response, idx, row)

Get the product information from Lowes.

base
ProductCompetitorsBase
ProductCompetitorsBase(*args, **kwargs)

Bases: SeleniumService, HTTPService

ProductCompetitorsBase Model. Define how competitor product scrappers should work.

connect abstractmethod async
connect()

Creates the Driver and Connects to the Site.

disconnect abstractmethod async
disconnect()

Disconnects the Driver and closes the Connection.

set_empty_values
set_empty_values(row, brand)

Set empty values for all standard columns for a given brand

bestbuy
BestBuyScrapper
BestBuyScrapper(*args, **kwargs)

Bases: ProductCompetitorsBase

product_information async
product_information(response, idx, row)

Get the product information from BestBuy.

lowes
LowesScrapper
LowesScrapper(*args, **kwargs)

Bases: ProductCompetitorsBase

connect async
connect()

Creates the Driver and Connects to the Site.

disconnect async
disconnect()

Disconnects the Driver and closes the Connection.

product_information async
product_information(response, idx, row)

Get the product information from Lowes.

scrapper

ProductCompetitors
ProductCompetitors(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, SeleniumService, HTTPService

Product Competitors Scraper Component

Overview: Pluggable component for scraping product information from competitors (BestBuy and Lowes).

Properties: - url_column (str): Name of the column containing URLs to scrape (default: 'url') - account_name_column (str): Name of the column containing retailer name (default: 'account_name') - product_id_column (str): Name of the column containing product IDs (default: 'product_id') - competitors (list): List of competitor brands to search for (e.g. ['Insignia', 'TCL', 'LG', 'Sony', 'Samsung'])

close async
close()

Clean up resources.

run async
run()

Execute scraping for all URLs in the DataFrame.

start async
start(**kwargs)

Initialize the component and validate required parameters.

ProductCompliant

ProductCompliant

ProductCompliant(loop=None, job=None, stat=None, **kwargs)

Bases: ParrotBot, FlowComponent

ProductCompliant

Overview

The ProductCompliant class is a component for interacting with an IA Agent for making Customer Satisfaction Analysis.
It extends the FlowComponent class.

.. table:: Properties :widths: auto

+------------------+----------+--------------------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                                      |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| output_column    |   Yes    | Column for saving the Customer Satisfaction information.                                         |
+------------------+----------+--------------------------------------------------------------------------------------------------+

Return

A Pandas Dataframe with the Customer Satisfaction statistics.

ProductInfo

parsers

base
ParserBase
ParserBase(*args, **kwargs)

Bases: HTTPService, SeleniumService

Base class for product information parsers.

Defines the interface and common functionality for all product parsers.

create_search_query
create_search_query(term)

Create a search query for the given term.

Parameters:

Name Type Description Default
term str

Search term (typically product model)

required

Returns:

Type Description
str

Formatted search query

extract_model_code
extract_model_code(url)

Extract model code from URL using the regex pattern if defined.

Parameters:

Name Type Description Default
url str

URL to extract model code from

required

Returns:

Type Description
Optional[str]

Extracted model code or None if not found or pattern not defined

get_product_urls
get_product_urls(search_results, max_urls=5)

Extract relevant product URLs from search results.

Parameters:

Name Type Description Default
search_results List[Dict[str, str]]

List of search result dictionaries

required
max_urls int

Maximum number of URLs to return

5

Returns:

Type Description
List[str]

List of product URLs

parse abstractmethod async
parse(url, search_term)

Parse product information from a URL.

Parameters:

Name Type Description Default
url str

URL to parse

required
search_term str

Original search term

required

Returns:

Type Description
Dict[str, Any]

Dictionary with extracted product information

brother
BrotherParser
BrotherParser(*args, **kwargs)

Bases: ParserBase

Parser for Brother product information.

Extracts product details from Brother's USA website using Selenium.

get_product_urls
get_product_urls(search_results, max_urls=5)

Extract relevant product URLs from search results.

Parameters:

Name Type Description Default
search_results List[Dict[str, str]]

List of search result dictionaries

required
max_urls int

Maximum number of URLs to return

5

Returns:

Type Description
List[str]

List of product URLs that match the Brother product pattern

parse async
parse(url, search_term, retailer=None)

Parse product information from a Brother URL using Selenium.

Parameters:

Name Type Description Default
url str

Brother product URL

required
search_term str

Original search term

required
retailer Optional[str]

Optional retailer information (not used for Brother)

None

Returns:

Type Description
Dict[str, Any]

Dictionary with product information

canon
CanonParser
CanonParser(*args, **kwargs)

Bases: ParserBase

Parser for Canon product information.

Extracts product details from Canon's USA and Canada websites using Selenium.

create_search_query
create_search_query(term)

Create region-specific search query.

Parameters:

Name Type Description Default
term str

Search term (typically product model)

required

Returns:

Type Description
str

Formatted search query for the appropriate region

determine_region
determine_region(retailer)

Determine region based on retailer information.

Parameters:

Name Type Description Default
retailer Optional[str]

Retailer string that may contain region information

required

Returns:

Type Description
str

'ca' for Canada, 'us' for United States (default)

get_product_urls
get_product_urls(search_results, max_urls=5)

Extract relevant product URLs from search results.

Parameters:

Name Type Description Default
search_results List[Dict[str, str]]

List of search result dictionaries

required
max_urls int

Maximum number of URLs to return

5

Returns:

Type Description
List[str]

List of product URLs that match the Canon product pattern

parse async
parse(url, search_term, retailer=None)

Parse product information from a Canon URL using Selenium.

Parameters:

Name Type Description Default
url str

Canon product URL

required
search_term str

Original search term

required
retailer Optional[str]

Optional retailer information to determine region

None

Returns:

Type Description
Dict[str, Any]

Dictionary with product information

epson
EpsonParser
EpsonParser(*args, **kwargs)

Bases: ParserBase

Parser for Epson product information.

Extracts product details from Epson's website.

extract_model_code
extract_model_code(url)

Extract model code from URL using the regex pattern and clean it.

Parameters:

Name Type Description Default
url str

URL to extract model code from

required

Returns:

Type Description
Optional[str]

Cleaned model code or None if not found

parse async
parse(url, search_term, retailer=None)

Parse product information from an Epson URL.

Parameters:

Name Type Description Default
url str

Epson product URL

required
search_term str

Original search term

required
retailer str

Optional retailer information

None

Returns:

Type Description
Dict[str, Any]

Dictionary with product information

hp
HPParser
HPParser(*args, **kwargs)

Bases: ParserBase

Parser for HP product information.

Extracts product details from HP's website using Selenium for dynamic content.

get_product_urls
get_product_urls(search_results, max_urls=5)

Extract relevant product URLs from search results.

Parameters:

Name Type Description Default
search_results List[Dict[str, str]]

List of search result dictionaries

required
max_urls int

Maximum number of URLs to return

5

Returns:

Type Description
List[str]

List of product URLs that match the HP product pattern

parse async
parse(url, search_term, retailer=None)

Parse product information from an HP URL using Selenium.

Parameters:

Name Type Description Default
url str

HP product URL

required
search_term str

Original search term

required

Returns:

Type Description
Dict[str, Any]

Dictionary with product information

samsung
SamsungParser
SamsungParser(*args, **kwargs)

Bases: ParserBase

Parser for Samsung product information.

Extracts product details from Samsung's website using Selenium.

get_product_urls
get_product_urls(search_results, max_urls=1)

Extract relevant product URLs from search results.

Parameters:

Name Type Description Default
search_results List[Dict[str, str]]

List of search result dictionaries

required
max_urls int

Maximum number of URLs to return (default: 1)

1

Returns:

Type Description
List[str]

List of product URLs that match the Samsung product pattern

parse async
parse(url, search_term, retailer=None)

Parse product information from a Samsung URL using Selenium.

Parameters:

Name Type Description Default
url str

Samsung product URL

required
search_term str

Original search term

required
retailer Optional[str]

Optional retailer information (not used for Samsung)

None

Returns:

Type Description
Dict[str, Any]

Dictionary with product information

scraper

ProductInfo
ProductInfo(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, HTTPService, SeleniumService

Product Information Scraper Component

This component extracts detailed product information by: 1. Searching for products using search terms 2. Extracting model codes from URLs 3. Parsing product details from manufacturer websites

Configuration options: - search_column: Column name containing search terms (default: 'model') - parsers: List of parser names to use (default: ['epson']) - max_results: Maximum number of search results to process (default: 5) - concurrently: Process items concurrently (default: True) - task_parts: Number of parts to split concurrent tasks (default: 10)

close async
close()

Clean up resources.

run async
run()

Execute product info extraction for each row.

split_parts
split_parts(tasks, num_parts=5)

Split tasks into parts for concurrent processing.

start async
start(**kwargs)

Initialize component and validate requirements.

ProductPricing

ProductPricing

ProductPricing(loop=None, job=None, stat=None, **kwargs)

Bases: HTTPService, FlowComponent

ProductPricing.

Overview

This component Get the price of a list of products

.. table:: Properties :widths: auto

+--------------+----------+-----------+-------------------------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+-------------------------------------------------------+ | column | Yes | Set the column in dataframe used as term search | +--------------+----------+-----------+-------------------------------------------------------+ | price_column| No | name of the price column | +--------------+----------+-----------+-------------------------------------------------------+

Return the prices in a column of dataframe

QS

QuerySource. QS is a new kind of component supporting the new sources for QuerySource and making transformations of data, returning a transformed Pandas DataFrame.

Example:

```yaml
QS:
  query: troc_mileage.tpl
  from_templates_dir: true
  conditions:
    tenant: bose
    firstdate: '{first}'
    lastdate: '{last}'
    forms:
    - 4184
    - 4149
    - 4177
    - 4152
    - 3900
    - 3931
    - 3959
  masks:
    first:
    - date_diff_dow
    - day_of_week: monday
      diff: 8
      mask: '%Y-%m-%d'
    last:
    - date_diff_dow
    - day_of_week: monday
      diff: 2
      mask: '%Y-%m-%d'
```

QS

QS(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, TemplateSupport, PandasDataframe

QS.

Overview

Calling Complex QuerySource operations from Flowtask.
This component supports QuerySource,
making transformations of data and returning a transformed Pandas DataFrame.
Component Syntax

"QS": { "query": "path to file", "conditions": { "firstdate": "", "lastdate": "", forms: [1, 2, 3, 4] } } or "QS": { "slug": "troc_mileage" }

.. table:: Properties :widths: auto

+------------------------+----------+-----------+------------------------------------------------------------------+ | Name | Required | Summary | +------------------------+----------+-----------+------------------------------------------------------------------+ | slug | Yes | The slug identifier for the query. | +------------------------+----------+-----------+------------------------------------------------------------------+ | query | No | The query template file to use. | +------------------------+----------+-----------+------------------------------------------------------------------+ | conditions | No | Conditions to apply to the query. | +------------------------+----------+-----------+------------------------------------------------------------------+ | map | No | Dictionary for mapping or transforming the resulting DataFrame. | +------------------------+----------+-----------+------------------------------------------------------------------+ | infer_types | No | If True, converts DataFrame columns to appropriate dtypes. Default is False. | +------------------------+----------+-----------+------------------------------------------------------------------+ | to_string | No | If True, converts DataFrame columns to string dtype. Default is True. | +------------------------+----------+-----------+------------------------------------------------------------------+ | use_template | No | If True, use a query template for the query. Default is True. | +------------------------+----------+-----------+------------------------------------------------------------------+

Returns

This component returns a pandas DataFrame containing the queried and transformed data.

close async
close()

Closing QS Object.

QSBase

QuerySource. QSBase is a new kind of component supporting the new sources for QuerySource and making transformations of data, returning a transformed Pandas DataFrame.

QSBase

QSBase(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, PandasDataframe

QSBase

Overview

This component is a helper to build components extending from QuerySource,
providing functionality to query data sources and
convert the results into pandas DataFrames.

.. table:: Properties :widths: auto

+------------------------+----------+-----------+------------------------------------------------------------------+ | Name | Required | Summary | +------------------------+----------+-----------+------------------------------------------------------------------+ | type | Yes | The type of query or operation to perform. | +------------------------+----------+-----------+------------------------------------------------------------------+ | pattern | No | The pattern to use for setting attributes. | +------------------------+----------+-----------+------------------------------------------------------------------+ | conditions | No | Conditions to apply to the query. | +------------------------+----------+-----------+------------------------------------------------------------------+ | map | No | Dictionary for mapping or transforming the resulting DataFrame. | +------------------------+----------+-----------+------------------------------------------------------------------+ | infer_types | No | If True, converts DataFrame columns to appropriate dtypes. Default is False. | +------------------------+----------+-----------+------------------------------------------------------------------+ | to_string | No | If True, converts DataFrame columns to string dtype. Default is True. | +------------------------+----------+-----------+------------------------------------------------------------------+

Returns

This component returns a pandas DataFrame containing the queried data. If multiple data sets are retrieved, it returns a dictionary of DataFrames.

close async
close()

Closing QS Object.

QueryIterator

QueryIterator

QueryIterator(loop=None, job=None, stat=None, **kwargs)

Bases: DBSupport, TemplateSupport, IteratorBase

QueryIterator

Overview

This component creates a Pandas Iterator from a QuerySource query,
allowing for the iteration over data returned from a query.

.. table:: Properties :widths: auto

+------------------------+----------+-----------+--------------------------------------------------------+ | Name | Required | Summary | +------------------------+----------+-----------+--------------------------------------------------------+ | file_sql | No | The SQL file to read the query from. | +------------------------+----------+-----------+--------------------------------------------------------+ | query_slug | No | The query slug to use for fetching data. | +------------------------+----------+-----------+--------------------------------------------------------+ | query | No | The raw SQL query string to execute. | +------------------------+----------+-----------+--------------------------------------------------------+ | conditions | No | Conditions to apply to the query. | +------------------------+----------+-----------+--------------------------------------------------------+ | columns | No | Specific columns to extract from the result. | +------------------------+----------+-----------+--------------------------------------------------------+ | use_template | No | If True, use a query template for the query. Default is False. | +------------------------+----------+-----------+--------------------------------------------------------+ | drop_empty | No | If True, drop empty rows and columns from the DataFrame. | +------------------------+----------+-----------+--------------------------------------------------------+ | dropna | No | Subset of columns to check for NaN values before dropping rows. | +------------------------+----------+-----------+--------------------------------------------------------+

Returns

This component returns the last value generated by the iteration process. Typically, it returns a pandas DataFrame containing the query results.

createJob
createJob(target, params, row)

Create the Job Component.

run async
run()

Async Run Method.

start async
start(**kwargs)

Getting kind of Query.

QueryToInsert

QueryToInsert

QueryToInsert(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

QueryToInsert.

Overview

This component allows me to insert data into a database schema

.. table:: Properties :widths: auto

+--------------+----------+-----------+-------------------------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+-------------------------------------------------------+ | schema | Yes | Name of the schema where is to the table | +--------------+----------+-----------+-------------------------------------------------------+ | tablename | Yes | Name of the table in the database | +--------------+----------+-----------+-------------------------------------------------------+ | action | Yes | Sets the action to execute in this case an insert | +--------------+----------+-----------+-------------------------------------------------------+ | pk | Yes | Primary key to the table in the database | +--------------+----------+-----------+-------------------------------------------------------+ | directory | Yes | Source directory where the file is located | +--------------+----------+-----------+-------------------------------------------------------+ | filter | Yes | This attribute allows me to apply a filter to the data | +--------------+----------+-----------+-------------------------------------------------------+

Return the list of arbitrary days

Example:

```yaml
QueryToInsert:
  schema: public
  tablename: queries
  action: insert
  pk:
  - query_slug
  directory: /home/ubuntu/symbits/
  filter:
    query_slug: walmart_stores
```
close
close()

Method.

run async
run()

Async Method.

start async
start(**kwargs)

Start.

QueryToPandas

QueryToPandas

QueryToPandas(loop=None, job=None, stat=None, **kwargs)

Bases: TemplateSupport, FlowComponent, QSSupport

QueryToPandas.

Overview

This component fetches data using QuerySource and transforms it into a Pandas DataFrame.

.. table:: Properties :widths: auto

+--------------+----------+-----------+-------------------------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+-------------------------------------------------------+ | query | Yes | Represents an SQL query | +--------------+----------+-----------+-------------------------------------------------------+ | query_slug | Yes | Named queries that are saved in Navigator (QuerySource) | +--------------+----------+-----------+-------------------------------------------------------+ | as_dict | Yes | True | False. if true, it returns the data in JSON format | | | | instead of a dataframe | +--------------+----------+-----------+-------------------------------------------------------+ | raw_result | Yes | Returns the data in the NATIVE FORMAT of the database for | | | | example ( pg RECORDSET) | +--------------+----------+-----------+-------------------------------------------------------+ | file_sql | Yes | SQL comes from a sql file | +--------------+----------+-----------+-------------------------------------------------------+ | use_template | Yes | The component is passed to the SQL file through a template | | | | replacement system | +--------------+----------+-----------+-------------------------------------------------------+ | infer_types | Yes | Type inference, give the component the power to decide the data | | | | types of each column | +--------------+----------+-----------+-------------------------------------------------------+ | drop_empty | Yes | False | True delete (drop) any column that is empty | +--------------+----------+-----------+-------------------------------------------------------+ | dropna | Yes | False | True delete all NA (Not a Number) | +--------------+----------+-----------+-------------------------------------------------------+ | fillna | Yes | False | True fills with an EMPTY SPACE all the NAs of the | | | | dataframe | +--------------+----------+-----------+-------------------------------------------------------+ | clean_strings| Yes | Fills with an empty space the NA,but ONLY of the fields of | | | | type string | +--------------+----------+-----------+-------------------------------------------------------+ | clean_dates | Yes | Declares NONE any date field that has a NAT (Not a Time) | +--------------+----------+-----------+-------------------------------------------------------+ | conditions | Yes | This attribute allows me to apply conditions to filter the data | +--------------+----------+-----------+-------------------------------------------------------+ | formit | Yes | Form id (i have doubts about this) | +--------------+----------+-----------+-------------------------------------------------------+ | orgid | Yes | Organization id (i have doubts about this) | +--------------+----------+-----------+-------------------------------------------------------+ | refresh | Yes | Refreshes the data in the QueryToPandas | +--------------+----------+-----------+-------------------------------------------------------+ | to_string | No | Whether to convert all data to string type. Default is True. | +--------------+----------+-----------+-------------------------------------------------------+ | as_objects | No | Whether to return the result as objects. Default is True. | +--------------+----------+-----------+-------------------------------------------------------+ | datatypes | No | A dictionary specifying data types for columns. | +--------------+----------+-----------+-------------------------------------------------------+ | datasource | No | The datasource to fetch the data from. Default is "db". | +--------------+----------+-----------+-------------------------------------------------------+ Returns

This component returns a Pandas DataFrame if the query is successfully executed and data is fetched, otherwise raises a ComponentError.

Example:

```yaml
QueryToPandas:
  query: SELECT formid, orgid FROM banco.forms WHERE enabled = true
```
close async
close()

Method.

RESTClient

RESTClient.

Basic component for making RESTful queries to URLs.

    Example:

    ```yaml
    RESTClient:
      url: https://api.upcdatabase.org/product/{barcode}
      barcode: '0111222333446'
      credentials:
        apikey: UPC_API_KEY
      as_dataframe: true
    ```

AbstractREST

AbstractREST(loop=None, job=None, stat=None, **kwargs)

Bases: RESTClient

AbstractREST. Abstract Method for RESTful Components.

RESTClient

RESTClient(loop=None, job=None, stat=None, **kwargs)

Bases: HTTPClient

RESTClient

Overview

The RESTClient class is a component for making RESTful queries to URLs. It extends the HTTPClient class and provides
functionality to send requests and process responses from REST APIs. It supports creating DataFrames from JSON responses
if specified.

.. table:: Properties :widths: auto

+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                                      |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| _result          |   No     | The result of the REST query, can be a list or dictionary.                                        |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| accept           |   No     | The accepted response type, defaults to "application/json".                                       |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| url              |   Yes    | The URL to send the REST query to.                                                                |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| method           |   No     | The HTTP method to use for the request, defaults to the method specified in the class.            |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+

Return

The methods in this class manage the execution of RESTful queries and handle the response. It includes functionality to
convert JSON responses into DataFrames if specified.

RethinkDBQuery

RethinkDBQuery

RethinkDBQuery(loop=None, job=None, stat=None, **kwargs)

Bases: RethinkDBSupport, TemplateSupport, FlowComponent, PandasDataframe

RethinkDBQuery.

Class to execute queries against a RethinkDB database and retrieve results. using asyncDB as backend.

RethinkDB Query can support queries by mapping RethinkDB methods as attributes. Methods as "table", "filter", "order_by", "limit", "pluck" are supported.

Example:

```yaml
RethinkDBQuery:
  table: stores_reviews
  schema: epson
  filter:
  - rating:
      gt: 4
  - rating:
      lt: 6
  order_by:
  - rating: desc
  limit: 50
  columns:
  - store_id
  - store_name
  - formatted_address
  - latitude
  - longitude
  - reviews
  - rating
  - user_ratings_total
  as_dataframe: true
```
close async
close()

Close the connection to the RethinkDB database.

run async
run()

Execute the RethinkDB query and retrieve the results.

RunSSH

RunSSH

RunSSH(loop=None, job=None, stat=None, **kwargs)

Bases: SSHClient, FlowComponent

RunSSH.

Run any arbitrary command into an SSH server.

start async
start(**kwargs)

Start.

Processing variables and credentials.

RunShell

RunShell

RunShell(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

RunShell.

Overview

Execute a Command to run a task

.. table:: Properties :widths: auto

+--------------+----------+-----------+-------------------------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+-------------------------------------------------------+ | name | Yes | Name of task | +--------------+----------+-----------+-------------------------------------------------------+ | description | Yes | Task description | +--------------+----------+-----------+-------------------------------------------------------+ | steps | Yes | Not assigned steps | +--------------+----------+-----------+-------------------------------------------------------+ | runtask | Yes | This method runs the task | +--------------+----------+-----------+-------------------------------------------------------+ | program | Yes | Program name | +--------------+----------+-----------+-------------------------------------------------------+ | task | Yes | Assign the run shell attribute | +--------------+----------+-----------+-------------------------------------------------------+

Return the list of arbitrary days

SalesForce

SalesForce

SalesForce(loop=None, job=None, stat=None, **kwargs)

Bases: QSBase

SalesForce Connector.

Sassie

Sassie

Sassie(loop=None, job=None, stat=None, **kwargs)

Bases: SassieClient, FlowComponent

Sassie

Overview

Get Data from Sassie API.

Properties (inherited from Sassie)

.. table:: Properties :widths: auto

+--------------------+----------+-----------+----------------------------------------------------------------------+
| Name               | Required | Summary                                                                          |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| domain             |   Yes    | Domain of Sassie API                                                             |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| credentials        |   Yes    | Credentials to establish connection with Polestar site (user and password)       |
|                    |          | get credentials from environment if null.                                        |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| data               |   No     | Type of data to retrieve (surveys, questions, jobs, waves, locations, clients)   |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| filter             |   No     | List of filters to apply to the results. Each filter must have:                  |
|                    |          | - column: The column name to filter on                                           |
|                    |          | - operator: One of: eq (equals), lt (less than), gt (greater than),              |
|                    |          |   lte (less than or equal), gte (greater than or equal), btw (between)           |
|                    |          | - value: The value to compare against                                            |
+--------------------+----------+-----------+----------------------------------------------------------------------+
| masks              |   No     | A dictionary mapping mask strings to replacement strings used for                |
|                    |          | replacing values in filters.                                                     |
+--------------------+----------+-----------+----------------------------------------------------------------------+

Save the downloaded files on the new destination.



Example:

```yaml
Sassie:
  domain: SASSIE_PROD_URL
  data: locations
  credentials:
    client_id: SASSIE_CLIENT_ID
    client_secret: SASSIE_CLIENT_SECRET
  filter:
    - column: updated
      operator: eq
      value: '{today}'
  masks:
    '{today}':
      - today
      - mask: '%Y-%m-%d'
```

SaveImageBank

SaveImageBank

SaveImageBank(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

SaveImageBank.

Save images into a postgreSQL Table, with UPSERT and optional evaluation for duplicates.

pgvector_init async
pgvector_init(conn)

Initialize pgvector extension in PostgreSQL.

run async
run()

Run the task.

qid

qid(name)

Very small helper to quote SQL identifiers safely. Raises if name contains anything but letters, digits or '_'.

SchedulingVisits

SchedulingVisits

SchedulingVisits(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

Generating the Schedule of Employee Visits with Market Constraints and Visit Cadence.

Overview

The SchedulingVisits class is a Flowtask component for generating a schedule of employee visits based on a set of rules and constraints. This component can be used to optimize the order of visits, minimize travel time, and balance workloads across employees. The schedule is generated by solving a combinatorial optimization problem with support for custom objective functions and constraints.

Example of row consumed:

associate_oid -> object -> G3Q86F5E1JXN1XVM
corporate_email -> object -> buko@trocglobal.com
employee_position -> object -> (3442724.8764311927, -10973885.176252203)
store_id -> object -> BBY0178
store_position -> object -> (3564143.804984759, -10887222.41833608)
market -> object -> Market1
visit_rule -> int64 -> 2
visit_frequency -> object -> Monthly

Example of Row Generated:


Example:

SchedulingVisits:
    use_ghost_employees: true
    ghost_employees: 1
    ghost_employees_column: 'ghost_employees'  # Column name in dataframe for dynamic ghost employees count
    ghost_domain: 'company.com'  # Domain for ghost employee emails
    in_store_percentage: 0.6
    in_store_visit: 0.75
    max_visits_per_day: 4
    max_distance: 120
    year: 2024
    month: 12
    start_hour: 9  # Start working at 9:00 AM
    exception_dates:
    - '2024-12-25'
    exceptions_filename: /home/ubuntu/symbits/Scheduling-Visits-Exceptions.xlsx

Note: - If 'ghost_employees_column' exists in the dataframe, it will use that value for each market - If the column doesn't exist or has null values, it will fall back to the 'ghost_employees' parameter - The 'ghost_domain' parameter allows you to customize the email domain for ghost employees - Ghost employee emails will be generated as: ghost_1@domain, ghost_2@domain, etc. - The 'start_hour' parameter sets when employees begin their workday (default: 9 AM)

When roundtrip=True: - Reduces effective daily capacity (time/distance used for return trips) - More realistic scheduling for depot-based operations - Employees always end day at their starting hub/depot - May increase unscheduled stores due to return trip constraints

When to use roundtrip=True: - Employees must return to central depot/hub - Vehicle check-in/check-out required - Union/labor requirements for end-of-day location - Security/safety requirements

When to use roundtrip=False (default): - Employees can end day anywhere - Maximum coverage/efficiency desired - Field-based operations without central depot

analyze_market_capacity
analyze_market_capacity(df)

Analyze each market's capacity requirements using actual assigned employees.

debug_scheduling_progress
debug_scheduling_progress(schedule_df, exception_df)

Enhanced debug method with duplicate store visit detection.

get_distance
get_distance(coord1, coord2)

Function to calculate distance between two points (latitude, longitude).

get_labor_days
get_labor_days(year=2024, month=9)

Function to get all workdays (Monday to Friday) in a given month.

get_scheduled_dates
get_scheduled_dates(cadence, visit_rule, visit_frequency, workdays, store_index)

Given the visit_rule and visit_frequency, return a list of scheduled dates for the visits.

get_travel_duration
get_travel_duration(origin, destination)

Helper function to get distance and duration between two points.

get_workdays
get_workdays(year, month, exception_dates=None)

Get all workdays (Monday to Friday) in a given month, excluding exception dates.

log_roundtrip_summary
log_roundtrip_summary(schedule_df)

Log summary statistics about roundtrip scheduling.

ScrapPage

Scrapping a Web Page Using Selenium + ChromeDriver + BeautifulSoup.

    Example:

    ```yaml
    ScrapPage:
      url: https://www.consumeraffairs.com/insurance/assurant-phone-insurance.html?page=2#sort=recent
      rotate_ua: true
      use_selenium: true
      use_proxy: true
      paid_proxy: true
      as_mobile: false
      timeout: 60
      wait_until:
      - - class_name
        - js-rvw
      screenshot:
        filename: reviews_page.png
        directory: /home/ubuntu/symbits/ejemplo/screenshots/
      outputs:
      - scraped_content
      - screenshot_path
    ```

ScrapPage

ScrapPage(loop=None, job=None, stat=None, **kwargs)

Bases: SeleniumService, HTTPService, FlowComponent

ScrapPage. Scrapping a Web Page using Selenium.

run async
run()

Run the Scrapping Tool.

run_http async
run_http()

Run the Scrapping Tool Using HTTPx.

run_selenium async
run_selenium()

Run the Scrapping Tool Using Selenium.

ScrapSearch

ScrapSearch

ScrapSearch(**kwargs)

Bases: ScrapPage

Search by a Product, retrieve the URL (based on rules) and scrap the page.

SendEmail

SendEmail

SendEmail(loop=None, job=None, stat=None, **kwargs)

Bases: TemplateSupport, CredentialsInterface, FlowComponent

SendEmail

Renders a Jinja template per dataframe row and emails the output to the address in the configured to column.

Properties

credentials / account : dict (required) SMTP credentials (hostname, port, username, password). template : str (required) Path to a Jinja template (HTML or text). subject : str (required) Jinja-templated subject, rendered per-row. to : str (required) Dataframe column containing the recipient email address. attachments : list[str] (optional) File paths or globs. Masks/globs supported via Flowtask masks. masks : dict (optional) Flowtask masks (applied to subject, template path and attachments).

Input: pandas.DataFrame (or list[dict]); Output: original input (pass-through).

expand_path

expand_path(filename)

Expand a (possibly masked) filename that may include globs (~, *, ?).

SendNotify

SendNotify

SendNotify(loop=None, job=None, stat=None, **kwargs)

Bases: DBSupport, FlowComponent

SendNotify

Overview

    The SendNotify class is a component for sending notifications to a list of recipients via various
    channels (e.g., email) using the Notify component. It supports adding attachments, templating messages
    with masked variables, and utilizing custom credentials for authentication.

.. table:: Properties
:widths: auto

    +----------------+----------+-----------+----------------------------------------------------------------------+
    | Name           | Required | Summary                                                                          |
    +----------------+----------+-----------+----------------------------------------------------------------------+
    | via            |   Yes    | The method for sending the notification, e.g., "email".                          |
    +----------------+----------+-----------+----------------------------------------------------------------------+
    | account        |   Yes    | A dictionary with server credentials, including `host`, `port`,                  |
    |                |          | `username`, and `password`.                                                      |
    +----------------+----------+-----------+----------------------------------------------------------------------+
    | recipients     |   Yes    | List of dictionaries with target user details for notification.                  |
    +----------------+----------+-----------+----------------------------------------------------------------------+
    | list           |   No     | Optional mailing list name for retrieving recipients from the database.          |
    +----------------+----------+-----------+----------------------------------------------------------------------+
    | attachments    |   No     | List of file paths for attachments to include in the notification.               |
    +----------------+----------+-----------+----------------------------------------------------------------------+
    | message        |   Yes    | Dictionary with the notification message content, supporting template variables. |
    +----------------+----------+-----------+----------------------------------------------------------------------+

Returns

    This component returns the input data after sending the notification. Metrics are recorded for each
    successful send, with details on recipients and the message content. If any specified attachment file
    is missing, a `FileNotFound` exception is raised. If there are errors in setting up or sending the
    notification, a `ComponentError` is raised with descriptive messages.


Example:

```yaml
SendNotify:
  via: email
  account:
    hostname: NAVIGATOR_ALERT_EMAIL_HOSTNAME
    port: NAVIGATOR_ALERT_EMAIL_PORT
    password: NAVIGATOR_ALERT_EMAIL_PASSWORD
    username: NAVIGATOR_ALERT_EMAIL_USERNAME
  attachments:
  - /home/ubuntu/symbits/bose/files/report/troc_open_tickets_{today}.csv
  masks:
    '{today}':
    - today
    - mask: '%Y-%m-%d'
    '{yesterday}':
    - yesterday
    - mask: '%Y-%m-%d'
    '{human-today}':
    - today
    - mask: '%m/%d/%Y'
    '{human-yesterday}':
    - yesterday
    - mask: '%m/%d/%Y'
    '#today-timestamp#':
    - current_timestamp
    - tz: America/New_York
  recipients:
  - name: Carlos Rivero
    account:
      address: crivero@trocglobal.com
  - name: Arturo Martinez
    account:
      address: amartinez@trocglobal.com
  - name: Jhoanir Torres
    account:
      address: jhtorres@trocglobal.com
  - name: Steven Greenstein
    account:
      address: sgreenstein@trocglobal.com
  - name: Sabra Pierre
    account:
      address: spierre1@trocglobal.com
  - name: Daniel McGee
    account:
      address: dmcgee@trocglobal.com
  - name: Kile Harris
    account:
      address: kharris10@trocglobal.com
  - name: Gerardo Espinoza
    account:
      address: gespinoza@trocglobal.com
  - name: Christopher Harmon
    account:
      address: charmon@trocglobal.com
  message:
    subject: T-ROC BOSE Break&Fix Open Tickets ({human-today})
    message_content: Please find attached the report generated on {human-today}.This
      is an automated message - please do not reply directly to this email.
    template: email_custom_report.html
    clientName: Bose
    dateGenerated: '{human-today}'
    created_at: '#today-timestamp#'
```
run async
run()

Running the Notification over all recipients.

SentimentAnalysis

ModelPrediction

ModelPrediction(sentiment_model='tabularisai/robust-sentiment-analysis', emotions_model='bhadresh-savani/distilbert-base-uncased-emotion', classification='sentiment-analysis', levels=5, max_length=512, use_bertweet=False, use_bert=False, use_roberta=False)

ModelPrediction

Overview

Performs sentiment analysis and emotion detection on text using Hugging Face Transformers.

This class utilizes pre-trained models for sentiment analysis and emotion detection.
It supports different model architectures like BERT, BERTweet, and RoBERTa.
The class handles text chunking for inputs exceeding the maximum token length
and provides detailed sentiment and emotion scores along with predicted labels.

Attributes:

Name Type Description
sentiment_model str

Name of the sentiment analysis model to use from Hugging Face.

emotions_model str

Name of the emotion detection model to use from Hugging Face.

classification str

Type of classification pipeline to use (e.g., 'sentiment-analysis').

levels int

Number of sentiment levels for sentiment analysis (2, 3, or 5).

max_length int

Maximum token length for input texts. Defaults to 512.

use_bertweet bool

If True, uses BERTweet model for sentiment analysis. Defaults to False.

use_bert bool

If True, uses BERT model for sentiment analysis. Defaults to False.

use_roberta bool

If True, uses RoBERTa model for sentiment analysis. Defaults to False.

Returns:

Name Type Description
DataFrame

A DataFrame with sentiment and emotion analysis results.

Includes columns for sentiment scores, sentiment labels, emotion scores, and emotion labels.

Raises:

Type Description
ComponentError

If there is an issue during text processing or data handling.

Example
SentimentAnalysis

text_column: text sentiment_model: tabularisai/robust-sentiment-analysis sentiment_levels: 5 emotions_model: bhadresh-savani/distilbert-base-uncased-emotion

Sets up the sentiment analysis and emotion detection models and tokenizers based on the provided configurations.

aggregate_sentiments
aggregate_sentiments(sentiments, levels)

Aggregates sentiment predictions from multiple texts to produce a single overall sentiment.

Calculates the average sentiment score across a list of sentiment predictions and determines the overall predicted sentiment based on these averages.

Parameters:

Name Type Description Default
sentiments list

A list of dictionaries, each containing sentiment prediction results

required
levels int

The number of sentiment levels used in the analysis, determining the sentiment map.

required

Returns:

Name Type Description
str

The aggregated predicted sentiment label (e.g., 'Positive', 'Negative', 'Neutral').

predict_emotion
predict_emotion(text)

Predicts the emotion of the input text.

Handles text chunking for long texts to ensure they fit within the model's token limit. Returns a dictionary containing emotion predictions.

Parameters:

Name Type Description Default
text str

The input text to predict emotion for.

required

Returns:

Name Type Description
dict dict

A dictionary containing emotion predictions.

dict

For example: {'emotions': [{'label': 'joy', 'score': 0.99}]}

dict

Returns an empty dictionary if the input text is empty.

predict_sentiment
predict_sentiment(text)

Predicts the sentiment of the input text.

Utilizes the sentiment analysis pipeline to classify the text and returns sentiment scores and the predicted sentiment label. Handles text chunking for texts exceeding the maximum token length.

Parameters:

Name Type Description Default
text str

The text to analyze for sentiment.

required

Returns:

Name Type Description
dict dict

A dictionary containing sentiment analysis results.

dict

Includes 'score' (list of sentiment scores) and 'predicted_sentiment' (string label).

dict

Returns None if the input text is empty.

split_into_sentences
split_into_sentences(text)

Splits a text into sentences using NLTK's sentence tokenizer.

Leverages nltk.tokenize.sent_tokenize for robust sentence splitting, handling various sentence terminators and abbreviations.

Parameters:

Name Type Description Default
text str

The input text to be split into sentences.

required

Returns:

Name Type Description
list

A list of strings, where each string is a sentence from the input text.

SentimentAnalysis

SentimentAnalysis(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

Applies sentiment analysis and emotion detection to a DataFrame of text data.

This component processes a DataFrame, applying Hugging Face Transformer models to analyze the sentiment and emotions expressed in a specified text column. It leverages the ModelPrediction class to perform the actual predictions and integrates these results back into the DataFrame.

Properties

text_column (str): The name of the DataFrame column containing the text to analyze. Defaults to 'text'. sentiment_model (str): Model name for sentiment analysis. Defaults to 'tabularisai/robust-sentiment-analysis'. emotions_model (str): Model name for emotion detection. Defaults to 'cardiffnlp/twitter-roberta-base-emotion'. pipeline_classification (str): Classification type for the pipeline (e.g., 'sentiment-analysis'). Defaults to 'sentiment-analysis'. with_average (bool): Boolean to indicate if sentiment should be averaged across rows (if applicable). Defaults to True. sentiment_levels (int): Number of sentiment levels (2, 3, or 5). Default is 5. use_bert (bool): Boolean to use BERT model for sentiment analysis. Defaults to False. use_roberta (bool): Boolean to use RoBERTa model for sentiment analysis. Defaults to False. use_bertweet (bool): Boolean to use BERTweet model for sentiment analysis. Defaults to False.

Returns:

Name Type Description
DataFrame

The input DataFrame augmented with new columns for sentiment scores,

predicted sentiment, emotion scores, and predicted emotion.

Specifically, it adds: 'sentiment_scores', 'sentiment_score', 'emotions_score',

'predicted_emotion', and 'predicted_sentiment' columns.

Raises:

Type Description
ComponentError

If input data is not a Pandas DataFrame or if the text column is not found.

run async
run()

Executes the sentiment analysis and emotion detection process on the input DataFrame.

Uses a single shared predictor instance to process data in larger batches. After processing, it concatenates the results and extracts relevant prediction scores and labels.

Returns:

Type Description

pd.DataFrame: The DataFrame with added sentiment and emotion analysis results.

ServiceScrapper

ServiceScrapper

ServiceScrapper(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, SeleniumService, HTTPService

Service Scraper Component

Overview:

Pluggable component for scrapping several services and sites using different scrapers.

.. table:: Properties :widths: auto

+-----------------------+----------+------------------------------------------------------------------------------------------------------+ | Name | Required | Description | +-----------------------+----------+------------------------------------------------------------------------------------------------------+ | url_column (str) | Yes | Name of the column containing URLs to scrape (default: 'search_url') | +-----------------------+----------+------------------------------------------------------------------------------------------------------+ | wait_for (tuple) | No | Element to wait for before scraping (default: ('class', 'company-overview')) | +-----------------------+----------+------------------------------------------------------------------------------------------------------+

Return: - DataFrame with company information

close async
close()

Clean up resources.

run async
run()

Execute scraping for requested URL in the DataFrame.

start async
start(**kwargs)

Initialize the component and validate required parameters.

parsers

base
ScrapperBase
ScrapperBase(*args, **kwargs)

Bases: SeleniumService, HTTPService

ScrapperBase Model.

Define how scrappers should be work.-

connect abstractmethod async
connect()

Creates the Driver and Connects to the Site.

disconnect abstractmethod async
disconnect()

Disconnects the Driver and closes the Connection.

start async
start()

Starts de Navigation to Main Site.

costco
CostcoScrapper
CostcoScrapper(*args, **kwargs)

Bases: ScrapperBase

connect async
connect()

Creates the Driver and Connects to the Site.

disconnect async
disconnect()

Disconnects the Driver and closes the Connection.

product_information async
product_information(response, idx, row)

Get the product information from Costco.

special_events async
special_events(response, idx, row)

Get the special events from Costco.

scrapper

ServiceScrapper
ServiceScrapper(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, SeleniumService, HTTPService

Service Scraper Component

Overview:

Pluggable component for scrapping several services and sites using different scrapers.

.. table:: Properties :widths: auto

+-----------------------+----------+------------------------------------------------------------------------------------------------------+ | Name | Required | Description | +-----------------------+----------+------------------------------------------------------------------------------------------------------+ | url_column (str) | Yes | Name of the column containing URLs to scrape (default: 'search_url') | +-----------------------+----------+------------------------------------------------------------------------------------------------------+ | wait_for (tuple) | No | Element to wait for before scraping (default: ('class', 'company-overview')) | +-----------------------+----------+------------------------------------------------------------------------------------------------------+

Return: - DataFrame with company information

close async
close()

Clean up resources.

run async
run()

Execute scraping for requested URL in the DataFrame.

start async
start(**kwargs)

Initialize the component and validate required parameters.

SetVariables

SetVariables

SetVariables(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

SetVariables

Overview

    The SetVariables class is a component for extracting values from data and setting them as variables
    for use in other components. This component can set variables based on specific column values in
    a DataFrame or by executing functions, with support for date formatting and value aggregation.

.. table:: Properties
:widths: auto

    +----------------+----------+-----------+---------------------------------------------------------------+
    | Name           | Required | Summary                                                                   |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | vars           |   Yes    | Dictionary defining variables to set with options for format,             |
    |                |          | row selection, and data sources.                                          |
    +----------------+----------+-----------+---------------------------------------------------------------+

Returns

    This component returns the original data after setting variables based on the `vars` dictionary.
    Each variable is created from a specified column or function, and supports formatting options
    such as date, timestamp, epoch, or custom string formatting. Metrics are recorded for each variable
    set, and any issues with variable definitions or data retrieval raise a descriptive `ComponentError`.


Example:

```yaml
SetVariables:
  vars:
    max_date:
    - order_date
    - row: max
    min_date:
    - order_date
    - row: min
```

SubTask

SubTask

SubTask(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

SubTask

Overview

    The SubTask class is a component for executing a specified task as a sub-task within a workflow.
    It allows passing configurations and parameters, including conditional steps, to dynamically manage 
    and execute the named task in the specified program.

.. table:: Properties
:widths: auto

    +----------------+----------+-----------+---------------------------------------------------------------+
    | Name           | Required | Summary                                                                   |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | task           |   Yes    | The name of the task to execute as a sub-task.                            |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | program        |   Yes    | The name of the program under which the task is defined.                  |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | ignore_steps   |   No     | List of steps to ignore during the sub-task execution.                    |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | run_only       |   No     | List of specific steps to run, ignoring others.                           |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | conditions     |   No     | Dictionary of conditions to apply to the sub-task execution.              |
    +----------------+----------+-----------+---------------------------------------------------------------+

Returns

    This component executes the specified task and returns the task’s output or state upon completion.
    If the task encounters an error or lacks data, an appropriate exception is raised. The component tracks 
    task and program details in the metrics, and logs the state of the task if debugging is enabled.


Example:

```yaml
SubTask:
  task: forms
  program: banco_chile
```

SuiteCRM

SuiteCRM

SuiteCRM(loop=None, job=None, stat=None, **kwargs)

Bases: QSBase

SuiteCRM

Overview

   This component captures the data from the SuiteCRM API to be
   processed and stored in Navigator.

.. table:: Properties :widths: auto

+--------------+----------+-----------+-------------------------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+-------------------------------------------------------+ | type | Yes | Type of query | +--------------+----------+-----------+-------------------------------------------------------+ | username | Yes | Credential Username | +--------------+----------+-----------+-------------------------------------------------------+ | password | Yes | Credential Password | +--------------+----------+-----------+-------------------------------------------------------+ | main_url | Yes | URL Base of the API | +--------------+----------+-----------+-------------------------------------------------------+ | module_name | Yes | Module to list | +--------------+----------+-----------+-------------------------------------------------------+

Return the entry list of module

Switch

Switch

Switch(loop=None, job=None, stat=None, **kwargs)

Bases: BaseLoop

Switch Component

Routes execution to a specific component based on user-defined conditions.

Defines a list of conditions and their corresponding components.

Each condition is a Python-style logical expression evaluated against self.data.

default: Specifies the fallback component if none of the conditions match.

run async
run()

Executes the appropriate component based on the conditions.

start async
start(**kwargs)

start.

Initialize (if needed) a task

TableBase

TableBase

TableBase(loop=None, job=None, stat=None, **kwargs)

Bases: TemplateSupport, FlowComponent

TableBase.

Abstract class for Using Pandas SQL features to manipulate data from databases.

close async
close()

Closing Operations.

TableDelete

TableDelete

TableDelete(loop=None, job=None, stat=None, **kwargs)

Bases: QSSupport, FlowComponent

TableDelete

Overview

The TableDelete class is a component for deleting rows from a SQL table based on specified primary keys. It uses
Pandas DataFrames to identify rows to delete and performs the deletion using SQLAlchemy and asyncpg.

.. table:: Properties :widths: auto

+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                                      |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| flavor           |   No     | The database flavor, defaults to "postgresql".                                                    |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| pk               |   Yes    | A list of primary key columns to identify rows for deletion.                                      |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| data             |   Yes    | The data containing rows to be deleted, can be a Pandas DataFrame or other supported format.      |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| _engine          |   No     | The SQLAlchemy engine for database connections.                                                   |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| tablename        |   Yes    | The name of the table from which rows will be deleted.                                            |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| schema           |   No     | The schema name, defaults to an empty string.                                                     |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| _dsn             |   No     | The Data Source Name for the database connection.                                                 |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| multi            |   No     | A flag indicating if multiple DataFrame deletions are supported, defaults to False.               |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+

Return

The methods in this class manage the deletion of rows from a SQL table based on primary keys, including initialization,
execution, and result handling. It ensures proper handling of database connections and provides metrics on the deleted rows.



Example:

```yaml
TableDelete:
  tablename: totem_heartbeats
  flavor: postgres
  schema: banco_chile
  pk:
  - hearbeat_id
```
close async
close()

Closing Operations.

run async
run()

Run TableDelete.

start async
start(**kwargs)

Get Pandas Dataframe.

table_delete async
table_delete(elem, df)

Running the process of Upsert-delete.

TableInput

TableInput

TableInput(loop=None, job=None, stat=None, **kwargs)

Bases: TableBase

TableInput

Overview

    The TableInput class is a component for loading data from a SQL table or custom SQL query into a Pandas DataFrame.
    It supports large tables by utilizing Dask for partitioned loading, along with options for column trimming,
    primary key setting, and removal of empty rows or columns.

.. table:: Properties
:widths: auto

    +----------------+----------+-----------+-------------------------------------------------------------------+
    | Name           | Required | Summary                                                                       |
    +----------------+----------+-----------+-------------------------------------------------------------------+
    | tablename      |   Yes    | The name of the SQL table to load into a DataFrame.                           |
    +----------------+----------+-----------+-------------------------------------------------------------------+
    | schema         |   No     | Database schema where the table is located, if applicable.                    |
    +----------------+----------+-----------+-------------------------------------------------------------------+
    | query          |   No     | SQL query string to load data if `tablename` is not used.                     |
    +----------------+----------+-----------+-------------------------------------------------------------------+
    | chunksize      |   No     | Number of rows per chunk when reading data in chunks.                         |
    +----------------+----------+-----------+-------------------------------------------------------------------+
    | bigfile        |   No     | Boolean indicating if Dask should be used for large tables.                   |
    +----------------+----------+-----------+-------------------------------------------------------------------+
    | drop_empty     |   No     | Boolean to drop columns and rows with only null values.                       |
    +----------------+----------+-----------+-------------------------------------------------------------------+
    | trim           |   No     | Boolean to trim whitespace from string columns.                               |
    +----------------+----------+-----------+-------------------------------------------------------------------+
    | pk             |   No     | Dictionary specifying primary key columns and settings for DataFrame indexing.|
    +----------------+----------+-----------+-------------------------------------------------------------------+

Returns

    This component returns a DataFrame populated with data from the specified SQL table or query. If `drop_empty` is set,
    columns and rows with only null values are removed. Metrics such as row and column counts are recorded, along with
    information on the table or query used for data extraction. If no data is found, a `DataNotFound` exception is raised.


Example:

```yaml
TableInput:
  tablename: sales_raw
  schema: epson
  chunksize: 1000
  pk:
    columns:
    - sales_id
```

TableOutput

TableOutput

TableOutput
TableOutput(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, CredentialsInterface

TableOutput

Overview

The TableOutput class is a component for copying data to SQL tables using Pandas and SQLAlchemy features. It supports
various SQL flavors such as PostgreSQL, MySQL, and SQLAlchemy. The class handles data type detection, data transformation,
and the INSERT-UPDATE mechanism.

.. table:: Properties :widths: auto

+------------------+----------+--------------------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                                      |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| _pk              |   No     | A list of primary keys for the table.                                                            |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| _fk              |   No     | The foreign key for the table.                                                                   |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| data             |   Yes    | The data to be copied to the table.                                                              |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| _engine          |   Yes    | The database engine used for the SQL operations.                                                 |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| _columns         |   No     | A list of columns in the table.                                                                  |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| _schema          |   No     | The schema of the table.                                                                         |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| _constraint      |   No     | A list of constraints for the table.                                                             |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| _dsn             |   Yes    | The data source name (DSN) for the database connection.                                          |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| flavor           |   Yes    | The SQL flavor for the database, defaults to "postgresql".                                       |
+------------------+----------+--------------------------------------------------------------------------------------------------+
| multi            |   No     | A flag indicating if multiple DataFrame transformations are supported, defaults to False.        |
+------------------+----------+--------------------------------------------------------------------------------------------------+

Returns:
    DataFrame: The data that was copied to the table.


Example:

```yaml
TableOutput:
  tablename: business_hours
  flavor: postgres
  schema: banco_chile
  pk:
  - store_id
  - weekday
  if_exists: append
```
close async
close()

Closing Operations.

run async
run()

Run TableOutput.

TableSchema

TableSchema

TableSchema(loop=None, job=None, stat=None, **kwargs)

Bases: QSSupport, FlowComponent

TableSchema

Overview

    The TableSchema class is a component for reading a CSV file or DataFrame and creating a table schema based
    on data models. It supports various formatting and normalization options for column names, datatype inference,
    and automatic handling of primary keys. This component also supports normalization settings for column names,
    such as camelCase to snake_case conversion, illegal character removal, and customizable name replacements.

.. table:: Properties
:widths: auto

    +-------------------+----------+-----------+------------------------------------------------------------------+
    | Name              | Required | Summary                                                                      |
    +-------------------+----------+-----------+------------------------------------------------------------------+
    | filename          |   Yes    | The CSV file or DataFrame input to read and infer schema from.               |
    +-------------------+----------+-----------+------------------------------------------------------------------+
    | schema            |   No     | The database schema for the table.                                           |
    +-------------------+----------+-----------+------------------------------------------------------------------+
    | tablename         |   Yes    | The name of the table to be created based on the data model.                 |
    +-------------------+----------+-----------+------------------------------------------------------------------+
    | drop              |   No     | Boolean specifying if an existing table with the same name should be dropped.|
    +-------------------+----------+-----------+------------------------------------------------------------------+
    | normalize_names   |   No     | Dictionary with options for column name normalization.                       |
    +-------------------+----------+-----------+------------------------------------------------------------------+
    | pk                |   No     | List of columns to define as primary keys.                                   |
    +-------------------+----------+-----------+------------------------------------------------------------------+
    | replace_names     |   No     | Dictionary of column name replacements for renaming specific columns.        |
    +-------------------+----------+-----------+------------------------------------------------------------------+

Returns

    This component returns the input data after creating a database table schema based on the data's inferred or
    specified structure. If the input is a file, it reads and processes the file; if a DataFrame, it directly
    processes the DataFrame. The component provides detailed metrics on column structure and row counts, as well as
    logging for SQL execution status and any schema creation errors.
handle_sql_reserved_word
handle_sql_reserved_word(column_name)

Verify if the column name is a SQL reserved word and raise an error if it is.

Parameters:

Name Type Description Default
column_name str

Column name to verify

required

Returns:

Name Type Description
str

The same column name if it is not a reserved word

Raises:

Type Description
ComponentError

If the column name is a SQL reserved word

Target

Scrapping a Web Page Using Selenium + ChromeDriver + BeautifulSoup.

Target

Target(loop=None, job=None, stat=None, **kwargs)

Bases: ReviewScrapper

Target.

Combining API Key and Web Scrapping, this component will be able to extract TARGET Information (reviews, etc).

reviews async
reviews()

reviews.

Target Product Reviews.

ThumbnailGenerator

ThumbnailGenerator

ThumbnailGenerator(loop=None, job=None, stat=None, **kwargs)

Bases: Boto3Client, FlowComponent

ThumbnailGenerator.

Overview
This component generates thumbnails for images stored in a DataFrame. It takes an image column, resizes the images
to a specified size, and saves them in a specified directory with a given filename format. The generated thumbnail
paths are added to a new column in the DataFrame.
.. table:: Properties
:widths: auto
+---------------------------+----------+-----------+---------------------------------------------------------------+
| Name                      | Required | Summary                                                                   |
+---------------------------+----------+-----------+---------------------------------------------------------------+
| data_column               |   Yes    | The name of the column containing the image data.                         |
+---------------------------+----------+-----------+---------------------------------------------------------------+
| thumbnail_column          |  Yes    | The name of the column to store the generated thumbnail paths.             |
+---------------------------+----------+-----------+---------------------------------------------------------------+
| size                      |   Yes    | The size of the thumbnail. Can be a tuple (width, height) or a single     |
|                           |          | integer for a square thumbnail.                                           |
+---------------------------+----------+-----------+---------------------------------------------------------------+
| format                    |   Yes    | The format of the thumbnail (e.g., 'JPEG', 'PNG').                        |
+---------------------------+----------+-----------+---------------------------------------------------------------+
| directory                 |   No     | The directory where the thumbnails will be saved (default: ./thumbnails). |
+---------------------------+----------+-----------+---------------------------------------------------------------+
| filename                  |   Yes    | The filename template for the thumbnails. It can include placeholders     |
|                           |          | for DataFrame columns (e.g., '{column_name}.jpg').                        |
+---------------------------+----------+-----------+---------------------------------------------------------------+
| use_s3                    |   No     | Flag to save thumbnails in S3 instead of local disk.                      |
+---------------------------+----------+-----------+---------------------------------------------------------------+
| s3_config                 |   No     | S3 configuration when use_s3 is True (default: default).                  |
+---------------------------+----------+-----------+---------------------------------------------------------------+
| s3_prefix                 | Yes*     | S3 prefix/path for thumbnails (required when use_s3=True).                |
+---------------------------+----------+-----------+---------------------------------------------------------------+
| url_thumbnail_column      |  No      | Column name to store the presigned URL of the thumbnail                   |
|                           |          | (default: url_thumbnail).                                                 |
+---------------------------+----------+-----------+---------------------------------------------------------------+
| thumbnail_directory_column|  No      | Column name to store the base directory/URL for thumbnails                |
|                           |          | (default: thumbnail_directory).                                           |
+---------------------------+----------+-----------+---------------------------------------------------------------+
Returns
This component returns a DataFrame with new columns containing the paths and URLs of the generated thumbnails.
Example:
```
- ThumbnailGenerator:
    data_column: image
    thumbnail_column: thumbnail_photo
    size: (128, 128)
    format: JPEG
    filename: {photo_id}.jpg
    use_s3: true
    s3_prefix: /thumbnails/epson
```
open async
open(**kwargs)

Override open method to only process S3 when needed

ToPandas

ToPandas

ToPandas(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

Convert any input into a DataFrame.

close async
close()

Close the component.

run async
run()

Run the component.

start async
start(**kwargs)

Initialize the component.

TransformRows

TransformRows.

TransformRows allow making column transformation over a Pandas Dataframe.

TransformRows

TransformRows
TransformRows(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

TransformRows

Overview

The TransformRows class is a component for transforming, adding, or modifying rows in a Pandas DataFrame based on
specified criteria. It supports single and multiple DataFrame transformations, and various operations on columns.

.. table:: Properties :widths: auto

+--------------------+----------+-------------------------------------------------------------------------------------------------------+
| Name               | Required | Description                                                                                           |
+--------------------+----------+-------------------------------------------------------------------------------------------------------+
| fields             |   No     | A dictionary defining the fields and corresponding transformations to be applied.                     |
+--------------------+----------+-------------------------------------------------------------------------------------------------------+
| filter_conditions  |   No     | A dictionary defining the filter conditions for transformations.                                      |
+--------------------+----------+-------------------------------------------------------------------------------------------------------+
| clean_notnull      |   No     | Boolean flag indicating if non-null values should be cleaned, defaults to True.                       |
+--------------------+----------+-------------------------------------------------------------------------------------------------------+
| replace_columns    |   No     | Boolean flag indicating if columns should be replaced, defaults to False.                             |
+--------------------+----------+-------------------------------------------------------------------------------------------------------+
| multi              |   No     | Boolean flag indicating if multiple DataFrame transformations should be supported, defaults to False. |
+--------------------+----------+-------------------------------------------------------------------------------------------------------+
| function           |   No     | View the list of function in the functions.py file on this directory                                  |
+--------------------+----------+-------------------------------------------------------------------------------------------------------+
| _applied           |   No     | List to store the applied transformations.                                                            |
+--------------------+----------+-------------------------------------------------------------------------------------------------------+

Return

The methods in this class manage the transformation of DataFrames, including initialization, execution, and result handling.



Example:

```yaml
TransformRows:
  fields:
    display_name:
      value:
      - concat
      - columns:
        - first_name
        - last_name
    legal_address:
      value:
      - concat
      - columns:
        - legal_street_address_1
        - legal_street_address_2
    work_address:
      value:
      - concat
      - columns:
        - work_location_address_1
        - work_location_address_2
    first_name:
      value:
      - capitalize
    last_name:
      value:
      - capitalize
    warp_id:
      value:
      - nullif
      - chars:
        - '*'
    old_warp_id:
      value:
      - nullif
      - chars:
        - '*'
    worker_category_description:
      value:
      - case
      - column: benefits_eligibility_class_code
        condition: PART-TIME
        match: Part Time
        notmatch: Full Time
    file_number:
      value:
      - ereplace
      - columns:
        - position_id
        - payroll_group
        newvalue: ''
    original_hire_date:
      value:
      - convert_to_datetime
    hire_date:
      value:
      - convert_to_datetime
    start_date:
      value:
      - convert_to_datetime
    updated:
      value:
      - convert_to_datetime
    gender_code:
      value:
      - convert_to_string
    payroll_id:
      value:
      - convert_to_string
    reports_to_payroll_id:
      value:
      - convert_to_string
```
start async
start(**kwargs)

Obtain Pandas Dataframe.

functions

Functions.

Tree of TransformRows functions.

add_timestamp_to_time
add_timestamp_to_time(df, field, date, time)

Takes a pandas DataFrame and combines the values from a date column and a time column to create a new timestamp column.

:param df: pandas DataFrame to be modified. :param field: Name of the new column to store the combined timestamp. :param date: Name of the column in the df DataFrame containing date values. :param time: Name of the column in the df DataFrame containing time values. :return: Modified pandas DataFrame with the combined timestamp stored in a new column.

any_tuple_valid
any_tuple_valid(df, field, columns)

Adds a boolean column (named field) to df that is True when any tuple in columns has all of its columns neither NaN nor empty.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame.

required
field str

The name of the output column.

required
columns list of tuple of str

List of tuples, where each tuple contains column names that must be checked. Example: [("start_lat", "start_long"), ("end_lat", "end_log")]

required

Returns:

Type Description
DataFrame

pd.DataFrame: The original DataFrame with the new field column.

apply_function
apply_function(df, field, fname, column=None, **kwargs)

Apply any scalar function to a column in the DataFrame.

Parameters: - df: pandas DataFrame - field: The column where the result will be stored. - fname: The name of the function to apply. - column: The column to which the function is applied (if None, apply to field column). - **kwargs: Additional arguments to pass to the function.

bytesio_to_base64
bytesio_to_base64(df, field, column, as_string=False, as_image=True, image_mime='image/png')

Converts bytes in a DataFrame column to a Base64 encoded string.

:param df: The DataFrame containing the bytes column. :param field: The name of the field to store the Base64 encoded string. :param column: The name of the bytes column. :param as_string: If True, converts the Base64 bytes to a string. :return: The DataFrame with the Base64 encoded string.

calculate_distance
calculate_distance(df, field, columns, unit='km', chunk_size=1000)

Add a distance column to a dataframe.

Parameters:

Name Type Description Default
df DataFrame

pandas DataFrame with columns 'latitude', 'longitude', 'store_lat', 'store_lng'

required
columns List[tuple]

list of tuples with column names for coordinates - First tuple: [latitude1, longitude1] - Second tuple: [latitude2, longitude2]

required
unit str

unit of distance ('km' for kilometers, 'm' for meters, 'mi' for miles)

'km'
chunk_size int

number of rows to process at once for large datasets

1000

Returns:

Type Description
DataFrame

df with additional 'distance_km' column

convert_timezone
convert_timezone(df, field, *, column=None, from_tz='UTC', to_tz=None, tz_column=None, default_timezone='UTC')

Convert field to a target time‑zone.

Parameters

df : DataFrame field : name of an existing datetime column column : name of the output column (defaults to field) from_tz : timezone used to localise naive timestamps to_tz : target timezone (ignored if tz_column is given) tz_column : optional column that contains a timezone per row default_tz: fallback when a row's tz_column is null/NaN

Returns:

Type Description
DataFrame

df with converted datetime column

create_attachment_column
create_attachment_column(df, field, columns, colnames=None)

Create a column with a list of attachments from one or more path/URL columns.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
field str

Name of the new column to store the list of attachments.

required
columns List[str]

Column names to convert. You can pass either the exact column (e.g., "pdf_path_m0") or the base name (e.g., "pdf_path").

required
colnames Optional[Dict[str, str]]

Optional list of names for the attachments. If not provided, the column names will be used as names.

None

Returns:

Type Description
DataFrame

The same DataFrame with field added.

day_of_week
day_of_week(df, field, column, locale='en_US.utf8')

Extracts the day of the week from a date column.

:param df: The DataFrame containing the date column. :param field: The name of the field to store the day of the week. :param column: The name of the date column. :return: The DataFrame with the day of the week.

drop_timezone
drop_timezone(df, field, column=None)

Drop the timezone information from a datetime column.

Parameters:

Name Type Description Default
df DataFrame

pandas DataFrame with a datetime column

required
field str

name of the datetime column

required

Returns:

Type Description
DataFrame

df with timezone-free datetime column

duration
duration(df, field, columns, unit='s')

Converts a duration column to a specified unit.

:param df: The DataFrame containing the duration column. :param field: The name of the field to store the converted duration. :param column: The name of the duration column. :param unit: The unit to convert the duration to. :return: The DataFrame with the converted duration.

extract_from_dictionary
extract_from_dictionary(df, field, column, key, conditions=None, as_timestamp=False)

Extracts a value from a JSON column in the DataFrame.

:param df: The DataFrame containing the JSON column. :param field: The name of the field to store the extracted value. :param column: The name of the JSON column. :param key: The key to extract from the JSON object. :param conditions: Optional dictionary of conditions to filter rows before extraction. :param as_timestamp: If True, converts the extracted value to a timestamp. :return: The DataFrame with the extracted value.

extract_from_object
extract_from_object(df, field, column, key, as_string=False, as_timestamp=False)

Extracts a value from an object column in the DataFrame.

:param df: The DataFrame containing the object column. :param field: The name of the field to store the extracted value. :param column: The name of the object column. :param key: The key to extract from the object. :param as_string: If True, converts the extracted value to a string. :param as_timestamp: If True, converts the extracted value to a timestamp. :return: The DataFrame with the extracted value.

fully_geoloc
fully_geoloc(df, field, columns, inverse=False)

Adds a boolean column (named field) to df that is True when, for each tuple in columns, all the involved columns are neither NaN nor empty.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame.

required
field str

The name of the output column.

required
columns list of tuple of str

List of tuples, where each tuple contains column names that must be valid (non-null and non-empty). Example: [("start_lat", "start_long"), ("end_lat", "end_log")]

required

Returns:

Type Description
DataFrame

pd.DataFrame: The original DataFrame with the new field column.

get_moment
get_moment(df, field, column, moments=None)

df: pandas DataFrame column: name of the column to compare (e.g. "updated_hour") ranges: list of tuples [(label, (start, end)), ...] e.g. [("night",(0,7)), ("morning",(7,10)), ...] returns: a Series of labels corresponding to each row

get_product
get_product(row, field, columns)

Retrieves product information from the Barcode Lookup API based on a barcode.

:param row: The DataFrame row containing the barcode. :param field: The name of the field containing the barcode. :param columns: The list of columns to extract from the API response. :return: The DataFrame row with the product information.

haversine_distance
haversine_distance(lat1, lon1, lat2, lon2, unit='km')

Distance between two points on Earth in kilometers.

path_to_url
path_to_url(df, field, column=None, base_path='files/', base_url='https://example.com/files/')

Converts a file path in a DataFrame column to a URL. Replaces the base path with the base URL.

:param df: The DataFrame containing the file path column. :param field: The name of the field to store the URL. :param column: The name of the file path column (defaults to field). :param base_path: The base path to replace in the file path. :param base_url: The base URL to use for the conversion.

:return: The DataFrame with the URL in the specified field.

string_to_vector
string_to_vector(df, field)

Converts a string representation of a list into an actual list.

:param df: The DataFrame containing the string representation. :param field: The name of the field to convert. :return: The DataFrame with the converted field.

upc_to_product
upc_to_product(df, field, columns=['barcode_formats', 'mpn', 'asin', 'title', 'category', 'model', 'brand'])

Converts UPC codes in a DataFrame to product information using the Barcode Lookup API.

:param df: The DataFrame containing the UPC codes. :param field: The name of the field containing the UPC codes. :param columns: The list of columns to extract from the API response. :return: The DataFrame with the product information.

TransposeRows

TransposeRows

TransposeRows(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

TransposeRows

Overview

 The TransposeRows class is a component for transposing specified rows in a DataFrame by converting row values
 into new columns based on pivot settings. This component supports options for preserving the original data,
 handling empty results, and custom column configurations for the transposition.

.. table:: Properties :widths: auto

 +------------------+----------+-----------+---------------------------------------------------------------+
 | Name             | Required | Summary                                                                   |
 +------------------+----------+-----------+---------------------------------------------------------------+
 | pivot            |   Yes    | List of columns to use as the pivot index for transposition.              |
 +------------------+----------+-----------+---------------------------------------------------------------+
 | columns          |   Yes    | Dictionary mapping row values to their target column names.               |
 +------------------+----------+-----------+---------------------------------------------------------------+
 | preserve_original|   No     | Boolean indicating if the original rows should be preserved.              |
 +------------------+----------+-----------+---------------------------------------------------------------+
 | allow_empty      |   No     | Boolean indicating if empty columns should be allowed in the output.      |
 +------------------+----------+-----------+---------------------------------------------------------------+

Returns

 This component returns a DataFrame with specified rows transposed into columns according to the provided pivot
 and column configurations. If `preserve_original` is set to False, the original rows used in transposition
 are removed. Any errors in column mapping or pivoting are raised with descriptive error messages.


 Example:

 ```yaml
 TransposeRows:
   column: column_name
   value: data
   pivot:
   - formid
   - form_id
   - orgid
   preserve_original: true
   allow_empty: true
   columns:
     '000_001': ad_hoc
     '000_003': creation_timestamp
     '000_004': user_device
     000_008: geoloc
     000_009: visit_length
     '000_012': store_name
     '000_013': store_id
     '000_023': account_name
     '000_024': store_designation
     '000_026': region_name
     000_029: store_timezone
     '000_037': visitor_username
     000_038: visitor_name
     000_039: visitor_email
     '000_045': visitor_role
     '000_055': updated_timestamp
     '000_065': activity_item_id
     '000_063': time_in
     '000_064': time_out
     '000_066': position_id
     '000_067': position_manager
     '000_070': visit_status
     '194834': retailer
     VisitDateLocal: visit_timestamp
 ```
row_to_column
row_to_column(df, row_to_pivot, new_name)

Add a pivoted column to the dataframe based on the given column name.

Parameters: - df: The input dataframe. - row_to_pivot: The column name to be pivoted. - new_name: The name of the column to be transposed.

Returns: - Dataframe with the new pivoted column.

UPCDatabase

UPCDatabase

UPCDatabase(loop=None, job=None, stat=None, **kwargs)

Bases: AbstractREST

UPCDatabase.

Querying UPC Database for Product information.

Example:

```yaml
UPCDatabase:
  method: currency
  base: USD
  credentials:
    apikey: UPC_API_KEY
```
currency async
currency()

currency.

Currency information and exchange rates supported by UPC

currency_history async
currency_history()

currency.

Retrieves the currency conversion rates for a specific date.

product async
product()

product.

Product information based on UPC barcode

search async
search()

product.

Search for a product based on item parameters.

UnGzip

UnGzip

UnGzip(loop=None, job=None, stat=None, **kwargs)

Bases: CompressSupport, FileCopy

UnGzip

Overview

    The UnGzip class is a component for decompressing Gzip (.gz) files, including compressed tarballs (e.g., .tar.gz, .tar.bz2, .tar.xz).
    This component extracts the specified Gzip or tarball file into a target directory and supports optional source file deletion
    after extraction.

.. table:: Properties
:widths: auto

    +----------------+----------+-----------+---------------------------------------------------------------+
    | Name           | Required | Summary                                                                   |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | filename       |   Yes    | The path to the Gzip file to uncompress.                                  |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | directory      |   Yes    | The target directory where files will be extracted.                       |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | delete_source  |   No     | Boolean indicating if the source file should be deleted post-extraction.  |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | extract        |   No     | Dictionary specifying filenames to extract and/or output directory.       |
    +----------------+----------+-----------+---------------------------------------------------------------+

Returns

    This component extracts files from a specified Gzip or tarball archive into the designated directory
    and returns a list of paths to the extracted files. It tracks metrics for the output directory and the source
    Gzip file. If configured, the original compressed file is deleted after extraction. Errors encountered during
    extraction or directory creation are logged and raised as exceptions.


Example:

```yaml
UnGzip:
  source:
    directory: /home/ubuntu/symbits/mso/files/commissions_statements/pr/
    filename: STATEMENT_STATEMENT-*.CSV.gz
  destination:
    directory: /home/ubuntu/symbits/mso/files/commissions_statements/pr/
  delete_source: true
```

Uncompress

Uncompress

Uncompress(loop=None, job=None, stat=None, **kwargs)

Bases: CompressSupport, FlowComponent

Uncompress

Overview

    The Uncompress class is a component for decompressing files in various archive formats, including but not limited to:
    7z (.7z), ACE (.ace), ALZIP (.alz), AR (.a), ARC (.arc), ARJ (.arj), BZIP2 (.bz2), CAB (.cab), compress (.Z), 
    CPIO (.cpio), DEB (.deb), DMS (.dms), GZIP (.gz), LRZIP (.lrz), LZH (.lha, .lzh), LZIP (.lz), LZMA (.lzma), 
    LZOP (.lzo), RPM (.rpm), RAR (.rar), RZIP (.rz), TAR (.tar), XZ (.xz), ZIP (.zip, .jar), and ZOO (.zoo). 
    It extracts the specified compressed file into a target directory and can optionally delete the source file 
    upon successful extraction.

.. table:: Properties
:widths: auto

    +----------------+----------+-----------+---------------------------------------------------------------+
    | Name           | Required | Summary                                                                   |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | filename       |   Yes    | The path to the compressed file to be decompressed.                       |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | directory      |   Yes    | The target directory where files will be extracted.                       |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | delete_source  |   No     | Boolean indicating if the source file should be deleted post-extraction.  |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | extract        |   No     | Dictionary specifying filenames to extract and/or output directory.       |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | password       |   No     | Optional password for encrypted files in supported formats.               |
    +----------------+----------+-----------+---------------------------------------------------------------+

Returns

    This component extracts files from the specified compressed archive into the designated directory and returns
    a list of paths to the extracted files. It tracks metrics for the output directory and compressed file name.
    If configured, the original compressed file is deleted after extraction. Errors related to file corruption 
    or extraction issues are logged and raised as exceptions.


Example:

```yaml
Uncompress:
  filename: organizational_unit_ancestors_{yesterday}.zip
  masks:
    yesterday:
    - yesterday
    - mask: '%Y-%m-%d'
  directory: /home/ubuntu/symbits/polestar/files/organizational_unit_ancestors/
  extract:
    filenames:
    - OrganizationalUnitAncestors.csv
    directory: /home/ubuntu/symbits/polestar/files/organizational_unit_ancestors/
  delete_source: false
```

UniqueRows

UniqueRows

UniqueRows(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

UniqueRows

Overview

    The UniqueRows class is a component for extracting unique rows from a Pandas DataFrame.
    It supports pre-sorting of rows, custom options for handling duplicates, and an option to save
    rejected rows that are duplicates.

.. table:: Properties
:widths: auto

    +----------------+----------+-----------+---------------------------------------------------------------+
    | Name           | Required | Summary                                                                   |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | unique         |   Yes    | List of columns to use for identifying unique rows.                       |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | order          |   No     | Dictionary specifying columns and sort order (`asc` or `desc`).           |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | keep           |   No     | Specifies which duplicates to keep: `first`, `last`, or `False`.          |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | save_rejected  |   No     | Dictionary with filename to save rejected rows as CSV, if specified.      |
    +----------------+----------+-----------+---------------------------------------------------------------+

Returns

    This component returns a DataFrame containing only unique rows based on the specified columns.
    If sorting is defined in `order`, rows are pre-sorted before duplicates are removed. Metrics such as
    the number of rows passed and rejected are recorded. If `save_rejected` is specified, rejected rows
    are saved to a file. Any data errors encountered during execution are raised with detailed error messages.


Example:

```yaml
UniqueRows:
  unique:
  - store_id
```
close
close()

Close.

run async
run()

Getting Unique rows from a Dataframe.

start async
start(**kwargs)

Start Component

Unzip

Unzip

Unzip(loop=None, job=None, stat=None, **kwargs)

Bases: CompressSupport, FileCopy

Unzip

Overview

    The Unzip class is a component for decompressing ZIP files in specified directories.
    It supports selecting specific files within the archive, applying directory masks, and
    optionally deleting the source ZIP file after extraction.

.. table:: Properties
:widths: auto

    +----------------+----------+-----------+---------------------------------------------------------------+
    | Name           | Required | Summary                                                                   |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | filename       |   Yes    | The name of the ZIP file to decompress.                                   |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | directory      |   Yes    | The target directory for decompression.                                   |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | extract        |   No     | Dictionary specifying files to extract and/or target output directory.    |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | delete_source  |   No     | Boolean indicating if the ZIP file should be deleted after extraction.    |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | password       |   No     | Optional password for encrypted ZIP files.                                |
    +----------------+----------+-----------+---------------------------------------------------------------+

Returns

    This component extracts the specified files from a ZIP archive into the target directory and
    returns a list of extracted file paths. Metrics such as the output directory and ZIP file name
    are tracked, and any errors related to file extraction or directory creation are logged for
    debugging purposes. If specified, the original ZIP file is deleted after extraction.

UpdateOperationalVars

UpdateOperationalVars

Overview

 This component updates operational variables stored in the `process_variables` table within the `troc` schema.
 It supports dynamic variable replacement and mask application, making it versatile for runtime updates in task flows.

.. table:: Properties :widths: auto

 +--------------+----------+-----------+---------------------------------------------------------------+
 | Name         | Required | Summary                                                                |
 +--------------+----------+-----------+---------------------------------------------------------------+
 | name         |   Yes    | The name of the variable to update.                                     |
 +--------------+----------+-----------+---------------------------------------------------------------+
 | value        |   Yes    | The new value to assign to the variable after the update.               |
 +--------------+----------+-----------+---------------------------------------------------------------+
 | masks        |   Yes    | Defines a section for dynamically setting the value of the variable.    |
 +--------------+----------+-----------+---------------------------------------------------------------+

Returns

 This component does not return data directly. Instead, it updates specified variables in the database and
 records metrics on the update status. Additionally, logging is provided for SQL execution, and potential errors
 are managed with detailed logging and exception handling for effective debugging.


 Example:

 ```yaml
 UpdateOperationalVars:
   name: EPSON_SALES
   value: '{saturday}'
   masks:
     '{saturday}':
     - date_diff_dow
     - diff: 2
       day_of_week: monday
       mask: '%Y-%m-%d'
 ```

UpdateOperationalVars

UpdateOperationalVars(loop=None, job=None, stat=None, **kwargs)

UploadTo

UploadToBase

UploadToBase(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

UploadToBase

Overview

The `UploadToBase` class is an abstract component designed to handle file uploads to various destinations,
including servers over HTTP/HTTPS. This class manages credentials, connection settings, SSL configurations,
and supports progress tracking during file uploads.

.. table:: Properties :widths: auto

+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| Name                    | Required | Description                                                                    |
+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| url                     |   Yes    | The URL to which files will be uploaded.                                        |
+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| credentials             |   Yes    | A dictionary containing the credentials necessary for authentication.           |
+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| source_file             |   No     | The path to the source file to be uploaded.                                     |
+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| source_dir              |   No     | The directory containing the source files to be uploaded.                       |
+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| filename                |   No     | The destination filename for the uploaded file.                                 |
+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| create_destination      |   No     | A flag indicating whether to create the destination directory if it doesn't exist.|
+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| ssl                     |   No     | A flag indicating whether to use SSL/TLS for the connection.                    |
+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| ssl_cafile              |   No     | The path to the CA file for SSL/TLS validation.                                 |
+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| ssl_certs               |   No     | A list of SSL certificates to be used for the connection.                       |
+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| host                    |   Yes    | The host address of the destination server.                                     |
+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| port                    |   Yes    | The port number of the destination server.                                      |
+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| overwrite               |   No     | A flag indicating whether to overwrite the file if it already exists.           |
+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| rename                  |   No     | A flag indicating whether to rename the file if a file with the same name exists.|
+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| timeout                 |   No     | The timeout value for the upload operation.                                     |
+-------------------------+----------+-----------+----------------------------------------------------------------------------+
| response_status         |   No     | A list of acceptable HTTP response statuses for a successful upload.            |
+-------------------------+----------+-----------+----------------------------------------------------------------------------+

Return

The methods in this class manage the upload of files to specified destinations, including initialization,
execution, and result handling.
http_response async
http_response(response)

http_response.

Return the request response of the HTTP Session

Parameters:

Name Type Description Default
response Response

the Response of the HTTP Session.

required

Returns:

Name Type Description
Any

any processed data.

start async
start(**kwargs)

Start.

Processing variables and credentials.

upload_session async
upload_session(url, method='get', data=None, data_format='json')

session. connect to an http source using aiohttp

UploadToS3

UploadToS3

UploadToS3(loop=None, job=None, stat=None, **kwargs)

Bases: Boto3Client, UploadToBase

UploadToS3

Overview

The `UploadToS3` class is a specialized component that facilitates the uploading of files to an Amazon S3 bucket. 
This class extends both the `Boto3Client` and `UploadToBase` classes, providing integration with AWS S3 for file 
storage operations. The component supports the upload of individual files, multiple files, or entire directories.

.. table:: Properties :widths: auto

+-------------------------+----------+-----------+--------------------------------------------------------------------------+
| Name                    | Required | Description                                                                  |
+-------------------------+----------+-----------+--------------------------------------------------------------------------+
| bucket                  |   Yes    | The name of the S3 bucket to which files will be uploaded.                   |
+-------------------------+----------+-----------+--------------------------------------------------------------------------+
| directory               |   Yes    | The S3 directory path where files will be uploaded.                         |
+-------------------------+----------+-----------+--------------------------------------------------------------------------+
| source_dir              |   Yes    | The local directory containing the files to be uploaded.                    |
+-------------------------+----------+-----------+--------------------------------------------------------------------------+
| _filenames              |   Yes    | A list of filenames to be uploaded.                                         |
+-------------------------+----------+-----------+--------------------------------------------------------------------------+
| whole_dir               |   No     | A flag indicating whether to upload all files in the source directory.       |
+-------------------------+----------+-----------+--------------------------------------------------------------------------+
| ContentType             |   No     | The MIME type of the files to be uploaded. Defaults to "binary/octet-stream".|
+-------------------------+----------+-----------+--------------------------------------------------------------------------+
| credentials             |   Yes    | A dictionary containing the credentials necessary for AWS authentication.    |
+-------------------------+----------+-----------+--------------------------------------------------------------------------+

Return

The `run` method uploads files to the specified S3 bucket, returning a dictionary containing the list of successfully 
uploaded files and any errors encountered during the upload process.
run async
run()

Running Upload file to S3.

start async
start(**kwargs)

start Method.

UploadToSFTP

UploadToSFTP

UploadToSFTP(loop=None, job=None, stat=None, **kwargs)

Bases: SSHClient, UploadToBase

UploadToSFTP

Overview

    The UploadToSFTP class is a component for uploading files or entire directories to an SSH/SFTP server.
    It supports various configurations, including recursive directory uploads, customizable transfer settings,
    and real-time upload progress tracking.

.. table:: Properties
:widths: auto

    +----------------+----------+-----------+---------------------------------------------------------------+
    | Name           | Required | Summary                                                                   |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | source         |   Yes    | A dictionary specifying the source directory, filename, and/or            |
    |                |          | recursive setting for selecting files to upload.                          |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | destination    |   Yes    | A dictionary defining the target directory on the SFTP server.            |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | whole_dir      |   No     | Boolean indicating if the entire source directory should be uploaded.     |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | block_size     |   No     | Integer defining the block size for file transfer, defaults to 65356.     |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | max_requests   |   No     | Integer setting the max number of parallel requests, defaults to 1.       |
    +----------------+----------+-----------+---------------------------------------------------------------+

Returns

    This component uploads files to the specified SFTP directory and returns a list of uploaded files on success.
    If no files are found or a connection error occurs, it raises a relevant exception. Metrics on the number of
    files uploade


Example:

```yaml
UploadToSFTP:
  host: sftp.example.com
  port: 22
  credentials:
    username: sftpuser
    password: abcd1234
  destination:
    directory: /incoming/
```
run async
run()

Running Download file.

start async
start(**kwargs)

start Method.

UploadToSharepoint

UploadToSharepoint

UploadToSharepoint(loop=None, job=None, stat=None, **kwargs)

Bases: SharepointClient, UploadToBase

UploadToSharepoint

Overview

    The UploadToSharepoint class is a component for uploading files or entire directories to a SharePoint site.
    It supports various configuration options for selecting files by name, extension, or pattern, and includes
    functionality for recursive directory searches.

.. table:: Properties
:widths: auto

    +----------------+----------+-----------+---------------------------------------------------------------+
    | Name           | Required | Summary                                                                   |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | credentials    |   Yes    | A dictionary with SharePoint credentials: `username`, `password`,         |
    |                |          | `tenant`, and `site`.                                                     |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | source         |   Yes    | A dictionary specifying the source directory, filename, and/or file       |
    |                |          | extension for selecting files to upload.                                  |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | destination    |   Yes    | A dictionary defining the SharePoint destination directory.               |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | whole_dir      |   No     | Boolean indicating if the entire source directory should be uploaded.     |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | recursive      |   No     | Boolean specifying whether to search recursively within directories.      |
    +----------------+----------+-----------+---------------------------------------------------------------+

Returns

    This component uploads files to a specified SharePoint directory and returns the upload result status.
    It logs the upload activity and records metrics for the number of files uploaded. If no files are found
    or the configuration is incomplete, it raises an error. The upload can handle both individual files and
    entire folders, depending on configuration.


Example:

```yaml
UploadToSharepoint:
  credentials:
    username: sharepoint_username
    password: sharepoint_password
    tenant: symbits
    site: trocstorage
  destination:
    directory: Shared Documents/Optimum Sales Files
  masks:
    '{today}':
    - today
    - mask: '%Y%m%d%H%M%S'
```
run async
run()

Upload a File to Sharepoint

start async
start(**kwargs)

start Method.

UserFunc

UserFunc

UserFunc(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

UserFunc.

Overview

Run a arbitrary user function and return result

.. table:: Properties :widths: auto

+--------------+----------+-----------+-------------------------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+-------------------------------------------------------+ | function | Yes | Name function | +--------------+----------+-----------+-------------------------------------------------------+ | params | Yes | Allows you to set parameters | +--------------+----------+-----------+-------------------------------------------------------+ | foo | Yes | Variable name | +--------------+----------+-----------+-------------------------------------------------------+ | api_keys | Yes | Api password to query | +--------------+----------+-----------+-------------------------------------------------------+

Return the list of arbitrary days

close async
close()

Close Method.

run async
run()

Run Method.

start async
start(**kwargs)

Obtain Previous data.

getFunction

getFunction(program, function)

getFunction.

Example:

UserFunc:
  function: scheduling_visits
  args:
    max_distance: 400
    max_stores: 5
    year: 2024
    month: 11

VivaTracker

VivaTracker

VivaTracker(job=None, *args, **kwargs)

Bases: ASPX

WSDLClient

WSDLClient

WSDLClient(job=None, *args, **kwargs)

Bases: FlowComponent

WSDLClient.

Client for WSDL SOAP Web Services using Zeep
close async
close()

Method.

start async
start(**kwargs)

Obtain Pandas Dataframe.

Walmart

Scrapping a Web Page Using Selenium + ChromeDriver + BeautifulSoup.

Walmart

Walmart(loop=None, job=None, stat=None, **kwargs)

Bases: ReviewScrapper

Walmart.

Combining API Key and Web Scrapping, this component will be able to extract Walmart Information (reviews, etc).

reviews async
reviews()

reviews.

Target Product Reviews.

Workday

Location

Bases: BaseModel

Pydantic model for a Workday location record. raw_data holds the full SOAP response dict for any extra fields.

LocationType

LocationType(component, max_retries=3, retry_delay=0.5)

Bases: WorkdayTypeBase

Handler for the Workday Get_Locations operation.

execute async
execute(**kwargs)

Execute the Get_Locations operation and return a pandas DataFrame.

Supported parameters: - location_id: Specific location ID to fetch (uses Request_References) - location_name: Filter by location name (uses Request_Criteria) - location_type: Filter by location type - location_usage: Filter by location usage - inactive: Filter by inactive status (True/False)

get_active_locations async
get_active_locations()

Convenience method to get all active locations.

get_location_by_id async
get_location_by_id(location_id)

Convenience method to get a specific location by ID.

get_location_by_name async
get_location_by_name(location_name)

Convenience method to get a specific location by name.

get_locations_by_type async
get_locations_by_type(location_type)

Convenience method to get locations by type.

Organization

Bases: BaseModel

Complete organization model based on actual Workday payload.

OrganizationType

OrganizationType(component, max_retries=3, retry_delay=0.5)

Bases: WorkdayTypeBase

Handler for the Workday Get_Organizations operation.

execute async
execute(**kwargs)

Execute the Get_Organizations operation and return a pandas DataFrame.

Supported parameters: - organization_id: Specific organization ID to fetch (uses Request_References) - organization_id_type: Type of organization ID (WID, Organization_Reference_ID, Cost_Center_Reference_ID, etc.) - organization_type: Filter by organization type (Company, Cost Center, Custom, Matrix, Pay Group, Region, Retiree, Supervisory, etc.) - include_inactive: Include inactive organizations (True/False) - enable_transaction_log_lite: Enable transaction log lite (True/False)

get_active_organizations async
get_active_organizations()

Get only active organizations.

:return: DataFrame with active organizations data

get_all_organizations async
get_all_organizations(include_inactive=True)

Get all organizations (active and optionally inactive).

:param include_inactive: Whether to include inactive organizations :return: DataFrame with all organizations data

get_companies async
get_companies()

Get only company organizations.

:return: DataFrame with company organizations data

get_cost_centers async
get_cost_centers()

Get only cost center organizations.

:return: DataFrame with cost center organizations data

get_organization_by_cost_center_id async
get_organization_by_cost_center_id(cost_center_id)

Get a specific organization by Cost Center Reference ID.

:param cost_center_id: The organization Cost Center Reference ID to fetch :return: DataFrame with organization data

get_organization_by_id async
get_organization_by_id(organization_id, id_type='Organization_Reference_ID')

Get a specific organization by ID.

:param organization_id: The organization ID to fetch :param id_type: Type of ID (WID, Organization_Reference_ID, Cost_Center_Reference_ID, etc.) :return: DataFrame with organization data

get_organization_by_wid async
get_organization_by_wid(wid)

Get a specific organization by WID.

:param wid: The organization WID to fetch :return: DataFrame with organization data

get_organizations_by_type async
get_organizations_by_type(organization_type)

Get organizations filtered by type.

:param organization_type: Organization type (Company, Cost Center, Custom, Matrix, Pay Group, Region, Retiree, Supervisory, etc.) :return: DataFrame with organizations data

get_supervisory_organizations async
get_supervisory_organizations()

Get only supervisory organizations.

:return: DataFrame with supervisory organizations data

TimeBlock

Bases: BaseModel

Pydantic model for a Workday calculated time block record. raw_data holds the full SOAP response dict for any extra fields.

TimeBlockType

TimeBlockType(component, max_retries=3, retry_delay=0.5)

Bases: WorkdayTypeBase

Handler for the Workday Get_Calculated_Time_Blocks operation.

execute async
execute(**kwargs)

Execute the Get_Calculated_Time_Blocks operation and return a pandas DataFrame.

Supported parameters: - worker_id: Specific worker ID to fetch time blocks for - start_date: Start date for date range filter (YYYY-MM-DD) - end_date: End date for date range filter (YYYY-MM-DD) - time_block_id: Specific time block ID to fetch - status: Filter by status - supervisory_org: Filter by supervisory organization - include_deleted: Whether to include deleted time blocks (default: False)

get_time_blocks_by_date_range async
get_time_blocks_by_date_range(start_date, end_date, status=None)

Convenience method to get time blocks for a date range.

get_time_blocks_by_worker async
get_time_blocks_by_worker(worker_id, start_date=None, end_date=None)

Convenience method to get time blocks for a specific worker.

TimeRequest

Bases: BaseModel

Pydantic model for a Workday time request record. raw_data holds the full SOAP response dict for any extra fields.

TimeRequestType

TimeRequestType(component, max_retries=3, retry_delay=0.5)

Bases: WorkdayTypeBase

Handles Get_Time_Requests operation for Workday Time Tracking API.

execute async
execute(**kwargs)

Execute the Get_Time_Requests operation.

get_time_request_by_id async
get_time_request_by_id(time_request_id)

Get a specific time request by ID.

get_time_requests_by_date_range async
get_time_requests_by_date_range(start_date, end_date)

Get all time requests within a date range.

get_time_requests_by_organization async
get_time_requests_by_organization(supervisory_organization_id, start_date=None, end_date=None)

Get time requests for a specific organization within an optional date range.

get_time_requests_by_worker async
get_time_requests_by_worker(worker_id, start_date=None, end_date=None)

Get time requests for a specific worker within an optional date range.

safe_serialize
safe_serialize(df)

Safely serialize the DataFrame for JSON output.

Workday

Workday(*, loop=None, job=None, stat=None, **kwargs)

Bases: SOAPClient, FlowComponent

Workday Component

Overview

The Workday class is a Flowtask component for the Workday SOAP API. It encapsulates all Workday-specific logic, including authentication, request/response handling, and data normalization.

Properties

type (str): Operation type to perform (e.g. 'get_workers', 'get_time_blocks') worker_id (str): Optional worker ID to fetch a specific worker use_storage (bool): Enable data storage functionality storage_path (str): Path where to store the data files

Returns:

Type Description

Returns a pandas DataFrame with the requested data.

Examples:

# Basic usage
Workday:
  type: get_workers 
  worker_id: "72037046323885"

# With storage enabled
Workday:
  type: get_workers
  use_storage: true
  storage_path: "/data/workday"

# Location hierarchy assignments with storage
Workday:
  type: get_location_hierarchy_assignments
  use_storage: true
  storage_path: "/data/workday"

# Organizations with storage
Workday:
  type: get_organizations
  organization_type: "Cost_Center"
  use_storage: true
  storage_path: "/data/workday"
add_metric
add_metric(key, value)

Add a metric to track

close async
close()

Cleanup resources

run async
run(operation=None, **kwargs)

Execute the component's main operation

serialize_object
serialize_object(obj)

Helper to serialize SOAP objects

start async
start(**kwargs)

Initialize the component

Worker

Bases: BaseModel

Pydantic model for a Workday worker record. raw_data holds the full SOAP response dict for any extra fields.

WorkerType

WorkerType(component, max_retries=5, retry_delay=0.2)

Bases: WorkdayTypeBase

Handler for the Workday Get_Workers operation, batching pages so that no more than max_parallel requests run concurrently.

:param component: Component instance :param max_retries: Maximum retry attempts (default: 5 for connection resilience) :param retry_delay: Base delay between retries in seconds (default: 0.5 for exponential backoff)

execute async
execute(**kwargs)

Execute the Get_Workers operation and return a pandas DataFrame.

If worker_id is provided, fetches only that one; otherwise fetches all pages in batches of at most max_parallel concurrent requests.

models

Location

Bases: BaseModel

Pydantic model for a Workday location record. raw_data holds the full SOAP response dict for any extra fields.

Organization

Bases: BaseModel

Complete organization model based on actual Workday payload.

TimeBlock

Bases: BaseModel

Pydantic model for a Workday calculated time block record. raw_data holds the full SOAP response dict for any extra fields.

TimeRequest

Bases: BaseModel

Pydantic model for a Workday time request record. raw_data holds the full SOAP response dict for any extra fields.

Worker

Bases: BaseModel

Pydantic model for a Workday worker record. raw_data holds the full SOAP response dict for any extra fields.

location
Location

Bases: BaseModel

Pydantic model for a Workday location record. raw_data holds the full SOAP response dict for any extra fields.

location_hierarchy_assignments

Pydantic models for Location Hierarchy Organization Assignments.

LocationHierarchyAssignment

Bases: BaseModel

Model for location hierarchy organization assignment.

LocationHierarchyAssignmentsResponse

Bases: BaseModel

Model for the complete location hierarchy assignments response.

LocationHierarchyReference

Bases: BaseModel

Model for location hierarchy reference.

OrganizationAssignment

Bases: BaseModel

Model for organization assignment by type.

OrganizationReference

Bases: BaseModel

Model for organization reference in assignments.

OrganizationTypeReference

Bases: BaseModel

Model for organization type reference.

organizations
Organization

Bases: BaseModel

Complete organization model based on actual Workday payload.

time_block
TimeBlock

Bases: BaseModel

Pydantic model for a Workday calculated time block record. raw_data holds the full SOAP response dict for any extra fields.

time_request
TimeRequest

Bases: BaseModel

Pydantic model for a Workday time request record. raw_data holds the full SOAP response dict for any extra fields.

worker
ManagementChainLevel

Bases: BaseModel

Model for a single level in the management chain

Worker

Bases: BaseModel

Pydantic model for a Workday worker record. raw_data holds the full SOAP response dict for any extra fields.

parsers

parse_benefits_and_roles
parse_benefits_and_roles(worker_data)

Parse benefit enrollments, roles, and worker documents.

parse_business_site
parse_business_site(worker_data)

Parse business site summary data.

parse_compensation_data
parse_compensation_data(worker_data)

Parse the compensation details of the worker.

Extracts
  • wage (float)
  • compensation_effective_date (str)
  • compensation_guidelines (package / grade / profile IDs)
  • salary_and_hourly (list of elements)
  • compensation_summary (nested summary)
  • reason_references (mapping of reason type → ID)
parse_contact_data
parse_contact_data(worker_data)

Parse the contact information (email, address, phone) of the worker.

parse_employment_data
parse_employment_data(worker_data)

Parse employment-related details (position, hours, job profile).

parse_identification_data
parse_identification_data(worker_data)

Parse identification details (national ID, license, custom IDs).

parse_location_data
parse_location_data(location_data)

Parse the main location data from Workday response.

parse_management_chain_data
parse_management_chain_data(worker_data)

Parse management chain data from Worker_Management_Chain_Data.

parse_organization_data
parse_organization_data(org_data)

Parse organization data from Workday SOAP response.

:param org_data: Raw organization data from Workday :return: Parsed Organization model

parse_payroll_and_tax_data
parse_payroll_and_tax_data(worker_data)

Parse payroll and tax related data from Position_Data.

parse_personal_data
parse_personal_data(worker_data)

Parse the personal information of the worker.

parse_position_management_chain_data
parse_position_management_chain_data(worker_data)

Parse management chain data from Position_Management_Chains_Data. This is different from Worker_Management_Chain_Data and contains the actual management chain.

parse_time_block_data
parse_time_block_data(time_block_data)

Parse the main time block data from Workday response.

parse_time_request_data
parse_time_request_data(time_request_data)

Parse time request data from the SOAP response.

parse_worker_organization_data
parse_worker_organization_data(worker_data)

Parse worker organization information from worker data

parse_worker_status
parse_worker_status(worker_data)

Parse worker status details (active, hire/termination dates, eligibility), asegurando no romper si algún _Reference es None.

location_hierarchy_assignments_parsers

Parsers for Location Hierarchy Organization Assignments data.

parse_location_hierarchy_assignment
parse_location_hierarchy_assignment(assignment_data)

Parse location hierarchy organization assignment data.

Parameters:

Name Type Description Default
assignment_data Dict[str, Any]

Raw assignment data from the API (already Location_Hierarchy_Organization_Assignments_Data)

required

Returns:

Type Description
LocationHierarchyAssignment

Parsed LocationHierarchyAssignment object

parse_location_hierarchy_assignments_data
parse_location_hierarchy_assignments_data(raw_data)

Main parser function for location hierarchy assignments data.

Parameters:

Name Type Description Default
raw_data Dict[str, Any]

Raw data from the API response

required

Returns:

Type Description
Dict[str, Any]

Dictionary with parsed assignments and metadata

parse_location_hierarchy_assignments_response
parse_location_hierarchy_assignments_response(response_data)

Parse the complete location hierarchy assignments response.

Parameters:

Name Type Description Default
response_data Dict[str, Any]

Raw response data from the API

required

Returns:

Type Description
List[LocationHierarchyAssignment]

List of parsed LocationHierarchyAssignment objects

parse_location_hierarchy_reference
parse_location_hierarchy_reference(reference_data)

Parse location hierarchy reference data.

Parameters:

Name Type Description Default
reference_data Dict[str, Any]

Raw location hierarchy reference data

required

Returns:

Type Description
LocationHierarchyReference

Parsed LocationHierarchyReference object

parse_organization_assignment
parse_organization_assignment(assignment_data)

Parse organization assignment by type data.

Parameters:

Name Type Description Default
assignment_data Dict[str, Any]

Raw organization assignment data

required

Returns:

Type Description
OrganizationAssignment

Parsed OrganizationAssignment object

parse_organization_reference
parse_organization_reference(org_data)

Parse organization reference data.

Parameters:

Name Type Description Default
org_data Dict[str, Any]

Raw organization reference data

required

Returns:

Type Description
OrganizationReference

Parsed OrganizationReference object

parse_organization_type_reference
parse_organization_type_reference(type_data)

Parse organization type reference data.

Parameters:

Name Type Description Default
type_data Dict[str, Any]

Raw organization type reference data

required

Returns:

Type Description
OrganizationTypeReference

Parsed OrganizationTypeReference object

parse_response_results
parse_response_results(response_data)

Parse response results (pagination info).

Parameters:

Name Type Description Default
response_data Dict[str, Any]

Raw response data from the API

required

Returns:

Type Description
Dict[str, Any]

Dictionary with pagination information

location_parsers
parse_location_data
parse_location_data(location_data)

Parse the main location data from Workday response.

organization_parsers
parse_organization_data
parse_organization_data(org_data)

Parse organization data from Workday SOAP response.

:param org_data: Raw organization data from Workday :return: Parsed Organization model

parse_organizations_response
parse_organizations_response(response_data)

Parse the complete organizations response from Workday.

:param response_data: Raw response data from Workday :return: List of parsed Organization models

time_block_parsers
parse_time_block_data
parse_time_block_data(time_block_data)

Parse the main time block data from Workday response.

time_request_parsers
parse_time_request_data
parse_time_request_data(time_request_data)

Parse time request data from the SOAP response.

worker_parsers
parse_benefits_and_roles
parse_benefits_and_roles(worker_data)

Parse benefit enrollments, roles, and worker documents.

parse_business_site
parse_business_site(worker_data)

Parse business site summary data.

parse_compensation_data
parse_compensation_data(worker_data)

Parse the compensation details of the worker.

Extracts
  • wage (float)
  • compensation_effective_date (str)
  • compensation_guidelines (package / grade / profile IDs)
  • salary_and_hourly (list of elements)
  • compensation_summary (nested summary)
  • reason_references (mapping of reason type → ID)
parse_contact_data
parse_contact_data(worker_data)

Parse the contact information (email, address, phone) of the worker.

parse_employment_data
parse_employment_data(worker_data)

Parse employment-related details (position, hours, job profile).

parse_identification_data
parse_identification_data(worker_data)

Parse identification details (national ID, license, custom IDs).

parse_management_chain_data
parse_management_chain_data(worker_data)

Parse management chain data from Worker_Management_Chain_Data.

parse_payroll_and_tax_data
parse_payroll_and_tax_data(worker_data)

Parse payroll and tax related data from Position_Data.

parse_personal_data
parse_personal_data(worker_data)

Parse the personal information of the worker.

parse_position_management_chain_data
parse_position_management_chain_data(worker_data)

Parse management chain data from Position_Management_Chains_Data. This is different from Worker_Management_Chain_Data and contains the actual management chain.

parse_worker_organization_data
parse_worker_organization_data(worker_data)

Parse worker organization information from worker data

parse_worker_status
parse_worker_status(worker_data)

Parse worker status details (active, hire/termination dates, eligibility), asegurando no romper si algún _Reference es None.

types

LocationType
LocationType(component, max_retries=3, retry_delay=0.5)

Bases: WorkdayTypeBase

Handler for the Workday Get_Locations operation.

execute async
execute(**kwargs)

Execute the Get_Locations operation and return a pandas DataFrame.

Supported parameters: - location_id: Specific location ID to fetch (uses Request_References) - location_name: Filter by location name (uses Request_Criteria) - location_type: Filter by location type - location_usage: Filter by location usage - inactive: Filter by inactive status (True/False)

get_active_locations async
get_active_locations()

Convenience method to get all active locations.

get_location_by_id async
get_location_by_id(location_id)

Convenience method to get a specific location by ID.

get_location_by_name async
get_location_by_name(location_name)

Convenience method to get a specific location by name.

get_locations_by_type async
get_locations_by_type(location_type)

Convenience method to get locations by type.

OrganizationType
OrganizationType(component, max_retries=3, retry_delay=0.5)

Bases: WorkdayTypeBase

Handler for the Workday Get_Organizations operation.

execute async
execute(**kwargs)

Execute the Get_Organizations operation and return a pandas DataFrame.

Supported parameters: - organization_id: Specific organization ID to fetch (uses Request_References) - organization_id_type: Type of organization ID (WID, Organization_Reference_ID, Cost_Center_Reference_ID, etc.) - organization_type: Filter by organization type (Company, Cost Center, Custom, Matrix, Pay Group, Region, Retiree, Supervisory, etc.) - include_inactive: Include inactive organizations (True/False) - enable_transaction_log_lite: Enable transaction log lite (True/False)

get_active_organizations async
get_active_organizations()

Get only active organizations.

:return: DataFrame with active organizations data

get_all_organizations async
get_all_organizations(include_inactive=True)

Get all organizations (active and optionally inactive).

:param include_inactive: Whether to include inactive organizations :return: DataFrame with all organizations data

get_companies async
get_companies()

Get only company organizations.

:return: DataFrame with company organizations data

get_cost_centers async
get_cost_centers()

Get only cost center organizations.

:return: DataFrame with cost center organizations data

get_organization_by_cost_center_id async
get_organization_by_cost_center_id(cost_center_id)

Get a specific organization by Cost Center Reference ID.

:param cost_center_id: The organization Cost Center Reference ID to fetch :return: DataFrame with organization data

get_organization_by_id async
get_organization_by_id(organization_id, id_type='Organization_Reference_ID')

Get a specific organization by ID.

:param organization_id: The organization ID to fetch :param id_type: Type of ID (WID, Organization_Reference_ID, Cost_Center_Reference_ID, etc.) :return: DataFrame with organization data

get_organization_by_wid async
get_organization_by_wid(wid)

Get a specific organization by WID.

:param wid: The organization WID to fetch :return: DataFrame with organization data

get_organizations_by_type async
get_organizations_by_type(organization_type)

Get organizations filtered by type.

:param organization_type: Organization type (Company, Cost Center, Custom, Matrix, Pay Group, Region, Retiree, Supervisory, etc.) :return: DataFrame with organizations data

get_supervisory_organizations async
get_supervisory_organizations()

Get only supervisory organizations.

:return: DataFrame with supervisory organizations data

TimeBlockType
TimeBlockType(component, max_retries=3, retry_delay=0.5)

Bases: WorkdayTypeBase

Handler for the Workday Get_Calculated_Time_Blocks operation.

execute async
execute(**kwargs)

Execute the Get_Calculated_Time_Blocks operation and return a pandas DataFrame.

Supported parameters: - worker_id: Specific worker ID to fetch time blocks for - start_date: Start date for date range filter (YYYY-MM-DD) - end_date: End date for date range filter (YYYY-MM-DD) - time_block_id: Specific time block ID to fetch - status: Filter by status - supervisory_org: Filter by supervisory organization - include_deleted: Whether to include deleted time blocks (default: False)

get_time_blocks_by_date_range async
get_time_blocks_by_date_range(start_date, end_date, status=None)

Convenience method to get time blocks for a date range.

get_time_blocks_by_worker async
get_time_blocks_by_worker(worker_id, start_date=None, end_date=None)

Convenience method to get time blocks for a specific worker.

TimeRequestType
TimeRequestType(component, max_retries=3, retry_delay=0.5)

Bases: WorkdayTypeBase

Handles Get_Time_Requests operation for Workday Time Tracking API.

execute async
execute(**kwargs)

Execute the Get_Time_Requests operation.

get_time_request_by_id async
get_time_request_by_id(time_request_id)

Get a specific time request by ID.

get_time_requests_by_date_range async
get_time_requests_by_date_range(start_date, end_date)

Get all time requests within a date range.

get_time_requests_by_organization async
get_time_requests_by_organization(supervisory_organization_id, start_date=None, end_date=None)

Get time requests for a specific organization within an optional date range.

get_time_requests_by_worker async
get_time_requests_by_worker(worker_id, start_date=None, end_date=None)

Get time requests for a specific worker within an optional date range.

safe_serialize
safe_serialize(df)

Safely serialize the DataFrame for JSON output.

WorkerType
WorkerType(component, max_retries=5, retry_delay=0.2)

Bases: WorkdayTypeBase

Handler for the Workday Get_Workers operation, batching pages so that no more than max_parallel requests run concurrently.

:param component: Component instance :param max_retries: Maximum retry attempts (default: 5 for connection resilience) :param retry_delay: Base delay between retries in seconds (default: 0.5 for exponential backoff)

execute async
execute(**kwargs)

Execute the Get_Workers operation and return a pandas DataFrame.

If worker_id is provided, fetches only that one; otherwise fetches all pages in batches of at most max_parallel concurrent requests.

base
WorkdayTypeBase
WorkdayTypeBase(component, max_retries=3, retry_delay=0.5)

Bases: ABC

Base class for Workday operation types.

Provides
  • Default payload structure for all Workday operations.
  • Generic pagination logic with retries and logging.
  • Common SOAP response handling utilities.

:param component: Component instance (used for run, logger, metrics). :param max_retries: Maximum number of retry attempts per page on failure. :param retry_delay: Seconds to wait between retry attempts.

execute abstractmethod async
execute(**kwargs)

Execute the specific operation logic. Must be implemented by subclasses.

location_hierarchy_assignments

Get_Location_Hierarchy_Organization_Assignments operation handler.

This module handles the Get_Location_Hierarchy_Organization_Assignments operation which retrieves organization assignments for location hierarchies.

LocationHierarchyAssignmentsType
LocationHierarchyAssignmentsType(component, max_retries=5, retry_delay=2.0, **kwargs)

Bases: WorkdayTypeBase

Handler for Get_Location_Hierarchy_Organization_Assignments operation.

Retrieves organization assignments for location hierarchies.

:param component: Component instance :param max_retries: Maximum retry attempts (default: 5 for connection resilience) :param retry_delay: Delay between retries in seconds (default: 2.0 for API rate limiting) :param kwargs: Additional parameters

execute async
execute(**kwargs)

Execute Get_Location_Hierarchy_Organization_Assignments operation.

Parameters:

Name Type Description Default
**kwargs

Additional parameters including: - location_hierarchy_ids: List of location hierarchy IDs to fetch - location_hierarchy_id_type: Type of ID (WID, Organization_Reference_ID) - as_of_effective_date: Date for effective data - as_of_entry_datetime: Date/time for entry data - page: Page number for pagination - count: Number of results per page

{}

Returns:

Type Description
DataFrame

pandas DataFrame containing the location hierarchy organization assignments

get_all_assignments async
get_all_assignments(**kwargs)

Get all location hierarchy organization assignments.

Parameters:

Name Type Description Default
**kwargs

Additional parameters for filtering

{}

Returns:

Type Description
DataFrame

DataFrame with all location hierarchy organization assignments

get_assignments_by_location_hierarchy async
get_assignments_by_location_hierarchy(location_hierarchy_id, id_type='Organization_Reference_ID')

Get organization assignments for a specific location hierarchy.

Parameters:

Name Type Description Default
location_hierarchy_id str

The location hierarchy ID to fetch

required
id_type str

Type of ID (WID, Organization_Reference_ID)

'Organization_Reference_ID'

Returns:

Type Description
DataFrame

DataFrame with organization assignments for the location hierarchy

locations
LocationType
LocationType(component, max_retries=3, retry_delay=0.5)

Bases: WorkdayTypeBase

Handler for the Workday Get_Locations operation.

execute async
execute(**kwargs)

Execute the Get_Locations operation and return a pandas DataFrame.

Supported parameters: - location_id: Specific location ID to fetch (uses Request_References) - location_name: Filter by location name (uses Request_Criteria) - location_type: Filter by location type - location_usage: Filter by location usage - inactive: Filter by inactive status (True/False)

get_active_locations async
get_active_locations()

Convenience method to get all active locations.

get_location_by_id async
get_location_by_id(location_id)

Convenience method to get a specific location by ID.

get_location_by_name async
get_location_by_name(location_name)

Convenience method to get a specific location by name.

get_locations_by_type async
get_locations_by_type(location_type)

Convenience method to get locations by type.

organization_single

Get_Organization operation handler.

This module handles the Get_Organization operation which retrieves a specific organization by its ID (singular, not plural).

GetOrganization
GetOrganization(client, **kwargs)

Bases: WorkdayTypeBase

Handler for Get_Organization operation.

Retrieves a specific organization by its ID.

execute async
execute(organization_id, organization_id_type='Organization_Reference_ID', **kwargs)

Execute Get_Organization operation to retrieve a specific organization.

Parameters:

Name Type Description Default
organization_id str

The ID of the organization to retrieve

required
organization_id_type str

The type of ID (Organization_Reference_ID, WID, etc.)

'Organization_Reference_ID'
**kwargs

Additional parameters

{}

Returns:

Type Description
DataFrame

pandas DataFrame containing the organization data

get_organization_by_custom_id async
get_organization_by_custom_id(custom_id, **kwargs)

Get organization by Custom_Organization_Reference_ID.

Parameters:

Name Type Description Default
custom_id str

The Custom_Organization_Reference_ID

required
**kwargs

Additional parameters

{}

Returns:

Type Description
DataFrame

pandas DataFrame containing the organization data

get_organization_by_reference_id async
get_organization_by_reference_id(reference_id, **kwargs)

Get organization by Organization_Reference_ID.

Parameters:

Name Type Description Default
reference_id str

The Organization_Reference_ID

required
**kwargs

Additional parameters

{}

Returns:

Type Description
DataFrame

pandas DataFrame containing the organization data

get_organization_by_wid async
get_organization_by_wid(wid, **kwargs)

Get organization by WID.

Parameters:

Name Type Description Default
wid str

The WID of the organization

required
**kwargs

Additional parameters

{}

Returns:

Type Description
DataFrame

pandas DataFrame containing the organization data

organizations
OrganizationType
OrganizationType(component, max_retries=3, retry_delay=0.5)

Bases: WorkdayTypeBase

Handler for the Workday Get_Organizations operation.

execute async
execute(**kwargs)

Execute the Get_Organizations operation and return a pandas DataFrame.

Supported parameters: - organization_id: Specific organization ID to fetch (uses Request_References) - organization_id_type: Type of organization ID (WID, Organization_Reference_ID, Cost_Center_Reference_ID, etc.) - organization_type: Filter by organization type (Company, Cost Center, Custom, Matrix, Pay Group, Region, Retiree, Supervisory, etc.) - include_inactive: Include inactive organizations (True/False) - enable_transaction_log_lite: Enable transaction log lite (True/False)

get_active_organizations async
get_active_organizations()

Get only active organizations.

:return: DataFrame with active organizations data

get_all_organizations async
get_all_organizations(include_inactive=True)

Get all organizations (active and optionally inactive).

:param include_inactive: Whether to include inactive organizations :return: DataFrame with all organizations data

get_companies async
get_companies()

Get only company organizations.

:return: DataFrame with company organizations data

get_cost_centers async
get_cost_centers()

Get only cost center organizations.

:return: DataFrame with cost center organizations data

get_organization_by_cost_center_id async
get_organization_by_cost_center_id(cost_center_id)

Get a specific organization by Cost Center Reference ID.

:param cost_center_id: The organization Cost Center Reference ID to fetch :return: DataFrame with organization data

get_organization_by_id async
get_organization_by_id(organization_id, id_type='Organization_Reference_ID')

Get a specific organization by ID.

:param organization_id: The organization ID to fetch :param id_type: Type of ID (WID, Organization_Reference_ID, Cost_Center_Reference_ID, etc.) :return: DataFrame with organization data

get_organization_by_wid async
get_organization_by_wid(wid)

Get a specific organization by WID.

:param wid: The organization WID to fetch :return: DataFrame with organization data

get_organizations_by_type async
get_organizations_by_type(organization_type)

Get organizations filtered by type.

:param organization_type: Organization type (Company, Cost Center, Custom, Matrix, Pay Group, Region, Retiree, Supervisory, etc.) :return: DataFrame with organizations data

get_supervisory_organizations async
get_supervisory_organizations()

Get only supervisory organizations.

:return: DataFrame with supervisory organizations data

time_blocks
TimeBlockType
TimeBlockType(component, max_retries=3, retry_delay=0.5)

Bases: WorkdayTypeBase

Handler for the Workday Get_Calculated_Time_Blocks operation.

execute async
execute(**kwargs)

Execute the Get_Calculated_Time_Blocks operation and return a pandas DataFrame.

Supported parameters: - worker_id: Specific worker ID to fetch time blocks for - start_date: Start date for date range filter (YYYY-MM-DD) - end_date: End date for date range filter (YYYY-MM-DD) - time_block_id: Specific time block ID to fetch - status: Filter by status - supervisory_org: Filter by supervisory organization - include_deleted: Whether to include deleted time blocks (default: False)

get_time_blocks_by_date_range async
get_time_blocks_by_date_range(start_date, end_date, status=None)

Convenience method to get time blocks for a date range.

get_time_blocks_by_worker async
get_time_blocks_by_worker(worker_id, start_date=None, end_date=None)

Convenience method to get time blocks for a specific worker.

time_requests
TimeRequestType
TimeRequestType(component, max_retries=3, retry_delay=0.5)

Bases: WorkdayTypeBase

Handles Get_Time_Requests operation for Workday Time Tracking API.

execute async
execute(**kwargs)

Execute the Get_Time_Requests operation.

get_time_request_by_id async
get_time_request_by_id(time_request_id)

Get a specific time request by ID.

get_time_requests_by_date_range async
get_time_requests_by_date_range(start_date, end_date)

Get all time requests within a date range.

get_time_requests_by_organization async
get_time_requests_by_organization(supervisory_organization_id, start_date=None, end_date=None)

Get time requests for a specific organization within an optional date range.

get_time_requests_by_worker async
get_time_requests_by_worker(worker_id, start_date=None, end_date=None)

Get time requests for a specific worker within an optional date range.

safe_serialize
safe_serialize(df)

Safely serialize the DataFrame for JSON output.

workers
WorkerType
WorkerType(component, max_retries=5, retry_delay=0.2)

Bases: WorkdayTypeBase

Handler for the Workday Get_Workers operation, batching pages so that no more than max_parallel requests run concurrently.

:param component: Component instance :param max_retries: Maximum retry attempts (default: 5 for connection resilience) :param retry_delay: Base delay between retries in seconds (default: 0.5 for exponential backoff)

execute async
execute(**kwargs)

Execute the Get_Workers operation and return a pandas DataFrame.

If worker_id is provided, fetches only that one; otherwise fetches all pages in batches of at most max_parallel concurrent requests.

utils

ensure_list
ensure_list(val)

Convert a potentially singular value to a list.

extract_by_type
extract_by_type(ids, desired_type)

Given a list of {'_value_1':…, 'type':…} dicts (or a single dict), return the _value_1 whose type matches desired_type, or None.

first
first(v)

Helper to get first item of a list or dict, or empty dict if neither.

safe_serialize
safe_serialize(val)

Serialize Decimal, list or dict into JSON-friendly string, or return empty string if None.

utils
ensure_list
ensure_list(val)

Convert a potentially singular value to a list.

extract_by_type
extract_by_type(ids, desired_type)

Given a list of {'_value_1':…, 'type':…} dicts (or a single dict), return the _value_1 whose type matches desired_type, or None.

extract_nested
extract_nested(data, path)

Helper to extract nested data from a dict given a list of keys.

first
first(v)

Helper to get first item of a list or dict, or empty dict if neither.

safe_serialize
safe_serialize(val)

Serialize Decimal, list or dict into JSON-friendly string, or return empty string if None.

workday

Workday
Workday(*, loop=None, job=None, stat=None, **kwargs)

Bases: SOAPClient, FlowComponent

Workday Component

Overview

The Workday class is a Flowtask component for the Workday SOAP API. It encapsulates all Workday-specific logic, including authentication, request/response handling, and data normalization.

Properties

type (str): Operation type to perform (e.g. 'get_workers', 'get_time_blocks') worker_id (str): Optional worker ID to fetch a specific worker use_storage (bool): Enable data storage functionality storage_path (str): Path where to store the data files

Returns:

Type Description

Returns a pandas DataFrame with the requested data.

Examples:

# Basic usage
Workday:
  type: get_workers 
  worker_id: "72037046323885"

# With storage enabled
Workday:
  type: get_workers
  use_storage: true
  storage_path: "/data/workday"

# Location hierarchy assignments with storage
Workday:
  type: get_location_hierarchy_assignments
  use_storage: true
  storage_path: "/data/workday"

# Organizations with storage
Workday:
  type: get_organizations
  organization_type: "Cost_Center"
  use_storage: true
  storage_path: "/data/workday"
add_metric
add_metric(key, value)

Add a metric to track

close async
close()

Cleanup resources

run async
run(operation=None, **kwargs)

Execute the component's main operation

serialize_object
serialize_object(obj)

Helper to serialize SOAP objects

start async
start(**kwargs)

Initialize the component

XMLToPandas

XMLToPandas

XMLToPandas(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

XMLToPandas.

Transform an XML list structure to Pandas

close async
close()

close.

close method
run async
run()

run.

Transform the XML list into a Pandas Dataframe
start async
start(**kwargs)

start.

Get if directory exists

Zammad

Zammad

Zammad(loop=None, job=None, stat=None, **kwargs)

Bases: BaseAction, zammad

zammad.

Generic Interface for managing Zammad instances.

Example:

```yaml
Zammad:
  Group: SyncZammad
  comment: Create User by Sync AD into Zammad
  method: create_user
  args:
    firstname: '{firstname}'
    lastname: '{lastname}'
    email: '{email}'
    login: '{login}'
    organization: T-ROC GLOBAL
    roles:
    - Customer
```

ZoomInfoScraper

ZoomInfoScraper

ZoomInfoScraper(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, HTTPService, SeleniumService

ZoomInfo Scraper Component that can use either HTTP or Selenium for scraping.

Overview:

This component scrapes company information from ZoomInfo pages using HTTPService. It can receive URLs from a previous component (like GoogleSearch) and extract specific company information.

.. table:: Properties :widths: auto

+-----------------------+----------+------------------------------------------------------------------------------------------------------+ | Name | Required | Description | +-----------------------+----------+------------------------------------------------------------------------------------------------------+ | url_column (str) | Yes | Name of the column containing URLs to scrape (default: 'search_url') | +-----------------------+----------+------------------------------------------------------------------------------------------------------+ | wait_for (tuple) | No | Element to wait for before scraping (default: ('class', 'company-overview')) | +-----------------------+----------+------------------------------------------------------------------------------------------------------+

Return:

The component adds new columns to the DataFrame with company information: - headquarters - phone_number - website - stock_symbol - naics_code - employee_count

close async
close()

Clean up resources.

extract_company_info
extract_company_info(soup, search_term, search_url)

Extract company information from the page.

run async
run()

Execute scraping for each URL in the DataFrame.

scrape_url async
scrape_url(idx, row)

Scrape a single ZoomInfo URL using either HTTP or Selenium.

split_parts
split_parts(task_list, num_parts=5)

Split task list into parts for concurrent processing.

start async
start(**kwargs)

Initialize the component and validate required parameters.

abstract

AbstractFlow

Bases: ABC

run abstractmethod async
run()

Execute the code for component.

start abstractmethod async
start(**kwargs)

Start Method called on every component.

flow

FlowComponent

FlowComponent(job=None, *args, **kwargs)

Bases: FuncSupport, MaskSupport, LogSupport, ResultSupport, StatSupport, LocaleSupport, AbstractFlow

Abstract

Overview:

    Helper for building components that consume REST APIs

.. table:: Properties

:widths: auto +--------------+----------+-----------+--------------------------------------+ | Name | Required | Summary | +--------------+----------+-----------+--------------------------------------+ | method | Yes | Component for Data Integrator | +--------------+----------+-----------+--------------------------------------+ | attributes | Yes | Attribute: barcode | +--------------+----------+-----------+--------------------------------------+

Return the list of arbitrary days

close abstractmethod async
close()

close.

Close (if needed) component requirements.

conditions_replacement
conditions_replacement(obj)

conditions_replacement.

Replacing occurrences of Conditions into an String. Args: obj (Any): Any kind of object.

Returns:

Name Type Description
Any

Object with replaced conditions.

get_filename
get_filename()

get_filename. Detect if File exists.

run abstractmethod async
run()

run.

Run operations declared inside Component.

start abstractmethod async
start(**kwargs)

start.

Initialize (if needed) a task
var_replacement
var_replacement(obj)

var_replacement.

Replacing occurrences of Variables into an String. Args: obj (Any): Any kind of object.

Returns:

Name Type Description
Any

Object with replaced variables.

google

GoogleBase

GoogleBase(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

GoogleBase.

Overview: A base class for Google API components.
chunkify
chunkify(lst, n)

Split list lst into chunks of size n.

column_exists
column_exists(column)

Returns True if the column exists in the DataFrame.

google_response_code async
google_response_code(response)

check if query quota has been surpassed or other errors that can happen. :param resp: json response :return:

run async
run()

Run the Google Places API.

group

GroupComponent

GroupComponent(loop=None, job=None, stat=None, component_list=None, **kwargs)

Bases: FlowComponent

GroupComponent

Overview

This component executes a group of other FlowTask components sequentially as a single unit.
It allows chaining multiple tasks together and provides error handling for various scenarios.

.. table:: Properties :widths: auto

+------------------------+----------+----------------------------------------------------------------------------------------------------------------+
| Name                   | Required | Description                                                                                                    |
+------------------------+----------+----------------------------------------------------------------------------------------------------------------+
| component_list (list)  |   Yes    | List of dictionaries defining the components to be executed in the group. Each dictionary                      |
|                        |          | should contain the following keys:                                                                             |
|                        |          |   - "component": The FlowTask component class to be used.                                                      |
|                        |          |   - "params": A dictionary containing parameters to be passed to the component.                                |
|                        |          | (Optional)                                                                                                     |
|                        |          |   - "conditions": A dictionary containing conditions that must be met before running the component. (Optional) |
+------------------------+----------+----------------------------------------------------------------------------------------------------------------+
| stat (Callable)        |    No    | Optional callback function for step-level monitoring and statistics collection.                                |
+------------------------+----------+----------------------------------------------------------------------------------------------------------------+
| skipError              |    No    | Defines the behavior when a component within the group raises an error.                                        |
|                        |          | Valid options are:                                                                                             |
|                        |          |   SkipErrors: Skip This makes the component continue his execution.                                            |
|                        |          |   SkipErrors: Raise This Raise the error and interrupt execution.                                              |
+------------------------+----------+----------------------------------------------------------------------------------------------------------------+

Return

The component modifies the data received from the previous component and returns the final output after
all components in the group have been executed.

reviewscrap

ReviewScrapper

ReviewScrapper(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent, SeleniumService, HTTPService

chunkify
chunkify(lst, n)

Split list lst into chunks of size n.

bad_gateway_exception

bad_gateway_exception(exc)

Check if the exception is a 502 Bad Gateway error.

tAutoincrement

tAutoincrement

tAutoincrement(loop=None, job=None, stat=None, **kwargs)

Bases: DBSupport, FlowComponent

tAutoincrement

Overview

The tAutoincrement component is designed to automatically increment values in a specific column of a dataset. This is particularly useful when you need to fill in missing (null) values in a column with unique, sequential integers starting from the maximum value currently present in the column.

Properties

.. table:: Properties :widths: auto

+------------------+----------+-----------+--------------------------------------------------------------------------------------+ | Name | Required | Type | Description | +------------------+----------+-----------+--------------------------------------------------------------------------------------+ | datasource | Yes | str | The datasource name (e.g., schema name) where the dataset is located. | +------------------+----------+-----------+--------------------------------------------------------------------------------------+ | dataset | Yes | str | The name of the dataset (e.g., table name) to work on. | +------------------+----------+-----------+--------------------------------------------------------------------------------------+ | column | Yes | str | The name of the column in which values will be auto-incremented. | +------------------+----------+-----------+--------------------------------------------------------------------------------------+

Return

Returns the dataframe with the given column and its auto-incremented sequence.



Example:

```yaml
tAutoincrement:
  skipError: skip
  datasource: pokemon
  dataset: districts
  column: district_id
  description: Auto-increment district_id for new districts
```
run async
run()

Run method to fetch the max value and auto-increment.

start async
start(**kwargs)

Start method.

tConcat

tConcat

tConcat(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

tConcat

Overview

    The tConcat class is a component for merging (concatenating) two DataFrames along a specified axis.
    It supports handling multiple DataFrames and configurable options for the concatenation, with metrics
    tracking for input and output row counts.

.. table:: Properties
:widths: auto

    +----------------+----------+-----------+---------------------------------------------------------------+
    | Name           | Required | Summary                                                                   |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | df1            |   Yes    | The first DataFrame to concatenate.                                       |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | df2            |   Yes    | The second DataFrame to concatenate.                                      |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | args           |   No     | Dictionary of arguments to pass to `pandas.concat`, such as `axis`.       |
    +----------------+----------+-----------+---------------------------------------------------------------+

Returns

    This component returns a concatenated DataFrame based on the specified axis and additional arguments.
    Metrics are recorded for the row counts of both input DataFrames and the final concatenated DataFrame.
    If either DataFrame is missing or empty, an error is raised with a descriptive message.


Example:

```yaml
tConcat:
  depends:
  - TransformRows_8
  - TransformRows_15
  args:
    axis: 0
```
start async
start(**kwargs)

Obtain Pandas Dataframe. TODO: iterate over all dataframes.

tCrosstab

tCrosstab

tCrosstab(loop=None, job=None, stat=None, **kwargs)

Bases: tPandas

tCrosstab

Overview

Creates a cross-tabulation (contingency table) from a DataFrame.

Properties

.. table:: Properties :widths: auto

+------------------+----------+-----------+-----------------------------------------------------------------------------------+ | Name | Required | Type | Description | +------------------+----------+-----------+-----------------------------------------------------------------------------------+ | index | Yes | list | List of columns to be used as index in the crosstab. | +------------------+----------+-----------+-----------------------------------------------------------------------------------+ | columns | Yes | list | List of columns to be used as columns in the crosstab. | +------------------+----------+-----------+-----------------------------------------------------------------------------------+ | values | No | list | List of columns to be used as values in the crosstab. | +------------------+----------+-----------+-----------------------------------------------------------------------------------+ | aggregate | No | str | Aggregation function to use when values are provided. | +------------------+----------+-----------+-----------------------------------------------------------------------------------+ | totals | No | dict | Dictionary with 'name' key to add totals row/column. | +------------------+----------+-----------+-----------------------------------------------------------------------------------+ | reset_index | No | bool | Whether to reset the index after creating the crosstab. | +------------------+----------+-----------+-----------------------------------------------------------------------------------+ | alpha_first | No | bool | Whether to sort columns alphabetically if they start with letters. | +------------------+----------+-----------+-----------------------------------------------------------------------------------+

Return The dataframe with the crosstab transformation applied.

tExplode

tExplode

tExplode(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

tExplode

Overview

    The tExplode class is a component for transforming a DataFrame by converting a column of lists or dictionaries
    into multiple rows. It supports options for dropping the original column after exploding, and for expanding
    nested dictionary structures into separate columns.

.. table:: Properties
:widths: auto

    +------------------+----------+-----------+-------------------------------------------------------------------------------+
    | Name             | Required | Summary                                                                                   |
    +------------------+----------+-----------+-------------------------------------------------------------------------------+
    | column           |   Yes    | The name of the column to explode into multiple rows.                                     |
    +------------------+----------+-----------+-------------------------------------------------------------------------------+
    | drop_original    |   No     | Boolean indicating if the original column should be dropped after exploding.              |
    +------------------+----------+-----------+-------------------------------------------------------------------------------+
    | explode_dataset  |   No     | Boolean specifying if nested dictionaries in the column should be expanded as new columns.|
    +------------------+----------+-----------+-------------------------------------------------------------------------------+
    | advanced_mode    |   No     | Boolean enabling enhanced features: preserve empty lists, propagate parent columns.      |
    +------------------+----------+-----------+-------------------------------------------------------------------------------+
    | propagate_columns|   No    | List of column names to propagate from parent to child rows (only in advanced_mode).   |
    +------------------+----------+-----------+-------------------------------------------------------------------------------+

Returns

    This component returns a DataFrame with the specified column exploded into multiple rows. If `explode_dataset` is
    set to True and the column contains dictionaries, these are expanded into new columns. Metrics on the row count
    after explosion are recorded, and any errors encountered during processing are logged and raised as exceptions.


Example:

```yaml
tExplode:
  column: reviews
  drop_original: false
  advanced_mode: true
  propagate_columns: ["id", "name"]
```

tFilter

tFilter

tFilter(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

tFilter

Overview

    The tFilter class is a component that applies specified filters to a Pandas DataFrame.
    It allows filtering rows based on multiple conditions and expressions, enabling targeted
    data extraction within a task flow.

.. table:: Properties
:widths: auto

    +--------------+----------+-----------+---------------------------------------------------------------+
    | Name         | Required | Summary                                                                   |
    +--------------+----------+-----------+---------------------------------------------------------------+
    | operator     |   Yes    | Logical operator (e.g., `and`, `or`) used to combine filter conditions.   |
    +--------------+----------+-----------+---------------------------------------------------------------+
    | conditions   |   Yes    | List of conditions with columns, values, and expressions for filtering.   |
    |              |          | Format: `{ "column": <col_name>, "value": <val>, "expression": <expr> }`  |
    +--------------+----------+-----------+---------------------------------------------------------------+

Returns

    This component returns a filtered Pandas DataFrame based on the provided conditions.
    The component tracks metrics
    such as the initial and filtered row counts, and optionally limits the returned columns if specified.
    Additional debugging information can be outputted based on configuration.


Example:

```yaml
tFilter:
  operator: '&'
  filter:
  - column: ClientId
    value:
    - 11076
    expression: ==
```

tGroup

tGroup

tGroup(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

tGroup

Overview

    The tGroup class is a component for performing a group-by operation on a DataFrame using specified columns.
    It returns unique combinations of the specified group-by columns, allowing data aggregation and summarization.

.. table:: Properties
:widths: auto

    +----------------+----------+-----------+---------------------------------------------------------------+
    | Name           | Required | Summary                                                                   |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | group_by       |   Yes    | List of columns to group by.                                              |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | columns        |   No     | List of columns to retain in the result DataFrame. If None,               |
    |                |          | all columns in `group_by` are returned.                                   |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | agg            |   No     | List of aggregation functions to apply to the grouped data.               |
    |                |          | Each aggregation should be a dictionary with the column name,             |
    |                | aggregation function, and an optional alias.                                         |
    +----------------+----------+-----------+---------------------------------------------------------------+
Returns

    This component returns a DataFrame with unique rows based on the specified `group_by` columns. If `columns`
    is defined, only those columns are included in the result. The component provides debugging information on
    column data types if enabled, and any errors during grouping are logged and raised as exceptions.

Example:

```
- tGroup:
    group_by:
    - store_id
    - formatted_address
    - state_code
    - latitude
    - longitude
    - store_name
    - city
```

- Aggregation Example:
```
- tGroup:
    group_by:
    - store_id
    - formatted_address
    - state_code
    - latitude
    - longitude
    - store_name
    - city
    agg:
        - store_id: distinct
        alias: unique_store_ids
        - latitude: mean
        alias: avg_latitude
        - longitude: mean
        alias: avg_longitude
        - state_code: count
```

tJoin

tJoin

tJoin(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

tJoin

Overview

The tJoin class is a component for joining two Pandas DataFrames based on specified join conditions. It supports various join types
(such as left, right, inner, and outer joins) and handles different scenarios like missing data, custom join conditions, and multi-source joins.

.. table:: Properties :widths: auto

+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                                      |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| df1              |   Yes    | The left DataFrame to join.                                                                       |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| df2              |   Yes    | The right DataFrame to join.                                                                      |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| type             |   No     | "left"    | The type of join to perform. Supported values are "left", "right", "inner",           |
|                  |          |           | "outer", and "anti-join". When "anti-join" is used, it returns the difference         |
|                  |          |           | of B - A, i.e., all rows present in df1 but not in df2.                               |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| depends          |   Yes    | A list of dependencies defining the sources for the join.                                         |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| operator         |   No     | The logical operator to use for join conditions, defaults to "and".                               |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| fk               |   No     | The foreign key or list of keys to use for joining DataFrames.                                    |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| no_copy          |   No     | A flag indicating if copies of the DataFrames should not be made, defaults to True.               |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| join_with        |   No     | A list of additional keys to use for join conditions.                                             |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+

Return

The methods in this class manage the joining of two Pandas DataFrames, including initialization, execution, and result handling.
It ensures proper handling of temporary columns and provides metrics on the joined rows.



Example:

```yaml
tJoin:
  depends:
  - TransformRows_2
  - QueryToPandas_3
  type: left
  fk:
  - store_number
  args:
    validate: many_to_many
```
start async
start(**kwargs)

Obtain Pandas Dataframe.

tMap

tMap.

Making Column Transformations using a Mapping JSON file.

functions

tMap Transformations functions based on Series.

concat
concat(df, columns, sep=' ')

Concatenates the values of the specified columns in the given DataFrame.

:param df: The input DataFrame :param columns: The list of columns to concatenate :param sep: The separator to use between the concatenated values (default is a space) :return: A Series with the concatenated values

to_integer
to_integer(series, **kwargs)

Converts a pandas Series to an integer type, handling errors by coercing invalid values to NaN.

:param series: The pandas Series to be converted. :param kwargs: Additional keyword arguments. :return: The converted pandas Series with integer type.

to_string
to_string(series, remove_nan=False, **kwargs)

to_string.

Converting to string a Pandas column (Series) Args: series (pandas.Series): Column Series to be converted remove_nan (bool, optional): remove Not a Number from Column. Defaults to False.

Returns:

Type Description
Series

pandas.Series: a New Serie is returned with string values.

tMap

tMap
tMap(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

tMap

Overview

The tMap class is a component for transforming and mapping data in a Pandas DataFrame. It supports various column name
transformations, data type conversions, and function applications to columns. It extends the FlowComponent class and
provides methods for column information retrieval, data transformation, and function execution.

.. table:: Properties :widths: auto

+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| Name             | Required | Description                                                                                      |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| tablename        |   No     | The name of the table to retrieve column information from.                                       |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| schema           |   No     | The schema of the table to retrieve column information from.                                     |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| model            |   No     | The model to use for data transformation.                                                        |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| _modelinfo       |   No     | A dictionary containing the model information.                                                   |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| map              |   No     | The map file to use for column transformations.                                                  |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| _mapping         |   No     | A dictionary containing the column mappings.                                                     |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| force_map        |   No     | A flag indicating if the map file should be forced, defaults to False.                           |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| replace_columns  |   No     | A flag indicating if columns should be replaced, defaults to True.                               |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
| drop_missing     |   No     | A flag indicating if missing columns should be dropped, defaults to False.                       |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
|  column_info     |   Yes    | I access the information of the column through a statement in sql to extract the data            |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+
|  clean_names     |   Yes    | Remove duplicate names from data                                                                 |
+------------------+----------+-----------+--------------------------------------------------------------------------------------+

Return

The methods in this class manage the transformation and mapping of data in a Pandas DataFrame, including initialization,
column information retrieval, data transformation, and function execution.



Example:

```yaml
tMap:
  schema: bose
  map: products_by_store
  drop_missing: false
```
close async
close()

close.

close method
run async
run()

run.

Iteration over all dataframes.

tMelt

tMelt

tMelt(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

tMelt

Overview

    The tMelt class is a component for transforming a DataFrame from a wide format to a long format using the
    Pandas `melt` function. It reshapes data by unpivoting columns, making it easier to analyze or process data
    with a simpler, long-form structure.

.. table:: Properties
:widths: auto

    +----------------+----------+-----------+---------------------------------------------------------------+
    | Name           | Required | Summary                                                                   |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | index          |   Yes    | Column(s) to use as identifier variables for melting.                     |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | name           |   No     | Name to use for the "variable" column in the result DataFrame.            |
    |                |          | Defaults to "name".                                                       |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | value          |   No     | Name to use for the "value" column in the result DataFrame.               |
    |                |          | Defaults to "value".                                                      |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | values         |   No     | List of columns to unpivot. If None, all remaining columns are used.      |
    +----------------+----------+-----------+---------------------------------------------------------------+

Returns

    This component returns a DataFrame in long format where specified columns are unpivoted to create two
    new columns: one for variable names (`name`) and one for values (`value`). Metrics on the row and column
    counts of the transformed DataFrame are recorded. Any errors during transformation are logged and raised
    with descriptive error messages.


Example:

```yaml
tMelt:
  index:
  - AL No.
  - Store Format
  name: product_name
  value: displays_quantity
```

tMerge

tMerge

tMerge(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

tMerge

Overview

    The tMerge class is a component for merging two DataFrames (or named Series objects) using a database-style join.
    It supports different join types such as 'inner', 'outer', 'left', 'right', and 'cross', allowing flexible merging
    configurations for complex data workflows.

.. table:: Properties
:widths: auto

    +------------------+----------+-----------+---------------------------------------------------------------+
    | Name             | Required | Summary                                                                   |
    +------------------+----------+-----------+---------------------------------------------------------------+
    | df1              |   Yes    | The left DataFrame to join.                                               |
    +------------------+----------+-----------+---------------------------------------------------------------+
    | df2              |   Yes    | The right DataFrame to join.                                              |
    +------------------+----------+-----------+---------------------------------------------------------------+
    | type             |   No     | The type of join to perform (e.g., 'inner', 'outer'). Defaults to 'cross'.|
    +------------------+----------+-----------+---------------------------------------------------------------+
    | pd_args          |   No     | Additional arguments for the Pandas merge function, if any.               |
    +------------------+----------+-----------+---------------------------------------------------------------+

Returns

    This component returns a DataFrame created by merging `df1` and `df2` based on the specified join type and arguments.
    It records metrics for the resulting DataFrame’s row and column counts. Any errors during merging are raised
    with detailed error messages, and additional debug information is available if debugging mode is enabled.


Example:

```yaml
tMerge:
  depends:
  - QueryToPandas_1
  - QueryToPandas_2
  type: cross
```

tOrder

tOrder

tOrder(loop=None, job=None, stat=None, **kwargs)

Bases: tPandas

tOrder

Overview

The tOrder class is a component designed to order a Pandas DataFrame by a specified column. It allows sorting the DataFrame either in ascending or descending order based on the specified column.

Properties

.. table:: Properties :widths: auto

+------------------+----------+-----------+-----------------------------------------------------------------------------------+ | Name | Required | Type | Description | +------------------+----------+-----------+-----------------------------------------------------------------------------------+ | columns | Yes | str | The name of the column to sort the DataFrame by. | +------------------+----------+-----------+-----------------------------------------------------------------------------------+ | ascending | No | bool | Specifies whether to sort the DataFrame in ascending order. Defaults to True. | +------------------+----------+-----------+-----------------------------------------------------------------------------------+

Return The dataframe ordinated by the column give it in the order_by either ascending or descending.

Example:

tOrder:
  columns:
  - district_id
  ascending: true

tPandas

tPandas

tPandas(loop=None, job=None, stat=None, **kwargs)

Bases: FlowComponent

tPandas

Overview

    The tPandas class is an abstract interface for performing various data transformations on Pandas DataFrames.
    It provides foundational methods and structure for components that need to apply transformations, merges, or other
    DataFrame operations within a task.

    This interface provides methods to initialize, transform, and debug Pandas DataFrame operations.
    Concrete implementations using `tPandas` can define specific transformations. On execution, metrics
    for rows and columns are recorded, and any transformation errors or data mismatches are raised as exceptions
    with detailed error messages for effective debugging.

tPivot

tPivot

tPivot(loop=None, job=None, stat=None, **kwargs)

Bases: tPandas

tPivot

Overview

Pivoting a Dataframe to transpose a column into other columns.

Properties

.. table:: Properties :widths: auto

+------------------+----------+-----------+-----------------------------------------------------------------------------------+ | Name | Required | Type | Description | +------------------+----------+-----------+-----------------------------------------------------------------------------------+ | columns | Yes | list | The List of Columns to be Pivoted. | +------------------+----------+-----------+-----------------------------------------------------------------------------------+ | index | No | list | List of columns to be preserved, default to all columns less "values" | +------------------+----------+-----------+-----------------------------------------------------------------------------------+ | values | Yes | str | Columns that transpose the values for pivoted column(s). | +------------------+----------+-----------+-----------------------------------------------------------------------------------+

Return The dataframe Pivoted by "columns" with values using the list of "values".

tPluckCols

tPluckCols

tPluckCols(loop=None, job=None, stat=None, **kwargs)

Bases: tPandas

tPluckCols

Overview

    The tPluckCols class is a component for selecting a specific subset of columns from a Pandas DataFrame.
    It provides a streamlined way to filter columns, allowing only specified columns to be retained in the output.

.. table:: Properties
:widths: auto

    +-------------+----------+-----------+---------------------------------------------------------------+
    | Name        | Required | Summary                                                                   |
    +-------------+----------+-----------+---------------------------------------------------------------+
    | columns     |   Yes    | A list of column names to retain in the DataFrame.                        |
    +-------------+----------+-----------+---------------------------------------------------------------+

Returns

    This component returns a Pandas DataFrame containing only the specified columns listed in `columns`.
    If no columns are provided, it raises an error. The resulting DataFrame is a copy to ensure
    any modifications do not affect the original DataFrame.


Example:

```yaml
tPluckCols:
  depends:
  - TransformRows_11
  columns:
  - location_code
  - ss_market
  - store_id
  - store_number
  - company_id
  - pt_ft
  - is_covered
  - endcap
  - rev_band
  - comparison_store
```
start async
start(**kwargs)

Obtain Pandas Dataframe.

tUnnest

tUnnest

tUnnest(loop=None, job=None, stat=None, **kwargs)

Bases: tPandas

tUnnest

Overview

    The tUnnest class is a component for splitting a column in a DataFrame into multiple rows, based on a specified
    separator. This component supports options to drop the original column after splitting and to define a new column
    for the split values.

.. table:: Properties
:widths: auto

    +----------------+----------+-----------+---------------------------------------------------------------+
    | Name           | Required | Summary                                                                |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | source_column  |   Yes    | The name of the column to split into multiple rows.                    |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | destination    |   No     | The name of the column to store the split values. Defaults to source.  |
    +----------------+----------+-----------+---------------------------------------------------------------+
    | drop_source    |   No     | Boolean indicating if the original column should be dropped after split.|
    +----------------+----------+-----------+---------------------------------------------------------------+
    | separator      |   No     | The separator used to split the values. Defaults to ", ".              |
    +----------------+----------+-----------+---------------------------------------------------------------+

Returns

    This component returns a DataFrame where the specified `source_column` is split into multiple rows based on the
    `separator`. If `drop_source` is set to True, the original column is removed after the split. Errors related to
    column splitting are logged and raised as exceptions.


Example:

```yaml
tUnnest:
  source_column: warehouse_store_ids
  destination: store_id
  drop_source: true
```

user

UserComponent

UserComponent(loop=None, job=None, stat=None, **kwargs)

Bases: DBSupport, FuncSupport, MaskSupport, LogSupport, ResultSupport, StatSupport, LocaleSupport, PandasDataframe, AbstractFlow

UserComponent Abstract Base Component for User-defined Components.

close abstractmethod async
close()

close. Close (if needed) a task

run abstractmethod async
run()

run. Close (if needed) a task

session async
session(url, method='get', headers=None, auth=None, data=None)

session. connect to an http source using aiohttp

start abstractmethod async
start(**kwargs)

start. Initialize (if needed) a task