Airflow: Operators, hooks and sensors

Apache Airflow is a very powerful open-source workflow management system (WMS). It is widely used for the orchestration of data pipelines or workflows. Airflow enables you to sequence, coordinate, schedule, and manage complex data pipelines from diverse sources. The data sets delivered by those pipelines are ready for consumption either by business intelligence applications and data science, or machine learning models that support big data applications. You can author your workflows as Directed Acyclic Graphs (DAGs) of tasks. Furthermore, you will be able to schedule your tasks as code and visualize data pipelines’ dependencies, progress, logs etc.

SageDataTriggerSyncOperator

Use the SageDataTriggerSyncOperator to trigger a sync for an existing integration task in SageData.

 

Using the Operator

The SageDataTriggerSyncOperator requires the integration_task_id which is the UUID identifier (universally unique identifier) created in SageData between a data source and destination synchronization job. To find the uuid of the integration task, click on ”Integrations” from your configured integrations list:

SageData List of Integrations

 

After that look at the address bar:

SageData Integration UUID

 

 

The part of the URL after /parameters/ is the integration_task_id (uuid).

You will also need the sagedata_conn_id parameter in order to specify the SageData connection. You will use it to connect to your account. sagedata_conn_id is bound to the API access credentials in SageData platform.

To define sagedata_conn_id go to your Airflow account and then to Admin->Connections->HTTP. Use the following details to define sagedata_conn_id:

Host: api.sagedata.co

Schema: v1

Password: <'api_key'>

 

Trigger synchronisation job

In Airflow you can trigger a synchronization job in two ways using the Operator. The first one is a synchronous process. This will trigger the SageData job and the Operator will manage the status of the job.

Another way is to use the flag asynchronous = True so the Operator will only trigger the job and then use integration_task_id that should be passed to a SageDataSensor.

 

An example using the synchronous way:

sync_source_destination = SageDataTriggerSyncOperator(
    task_id='sagedata_run_sync_example',
    integration_task_id=<...>,
    sagedata_conn_id=<...>
)

 

An example using the async way:

async_source_destination = SageDataTriggerSyncOperator(
    task_id='sagedata_async_example',
    integration_task_id=<...>,
    asynchronous=True,
)
sagedata_sensor = SageDataIntegrationSensor(
    task_id='sagedata_sensor_source_dest_example',
    integration_task_id=<...>,
)
async_source_destination >> sagedata_sensor

 

SageDataStopSyncOperator

Use the SageDataStopSyncOperator to stop a sync for an existing integration task in SageData.

sync_source_destination = SageDataStopSyncOperator(
    task_id='sagedata_run_sync_example',
    integration_task_id=<...>,
    sagedata_conn_id=<...>
)

Example DAG

This example DAG will demonstrate the usage of the SageDataTriggerSyncOperator.


from datetime import datetime, timedelta from airflow import DAG
from plugins.operators.sagedata import SageDataTriggerSyncOperator
from plugins.sagedata.sensors.sagedata import SageDataSyncSensor with DAG(
    dag_id='example_sagedata_operator',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    dagrun_timeout=timedelta(minutes=60),
    tags=['example'],
    default_args={"sagedata_conn_id" : ""}
    catchup=False,
) as dag: 
# [START howto_operator_sagedata_synchronous]
sync_source_destination = SageDataTriggerSyncOperator(
    task_id='sagedata_sync_source_dest_example',
    integration_task_id='XXX',
) # [END howto_operator_sagedata_synchronous] # [START howto_operator_sagedata_asynchronous]
async_source_destination = SageDataTriggerSyncOperator(
    task_id='sagedata_async_source_dest_example',
    integration_task_id='YYY',
    asynchronous=True,
) 
sagedata_sensor = SageDataSyncSensor(
    task_id='sagedata_sensor_source_dest_example',
    integration_task_id='YYY',
)
# [END howto_operator_sagedata_asynchronous]
async_source_destination >> sagedata_sensor