Apache Airflow is a very powerful open-source workflow management system (WMS). It is widely used for the orchestration of data pipelines or workflows. Airflow enables you to sequence, coordinate, schedule, and manage complex data pipelines from diverse sources. The data sets delivered by those pipelines are ready for consumption either by business intelligence applications and data science, or machine learning models that support big data applications. You can author your workflows as Directed Acyclic Graphs (DAGs) of tasks. Furthermore, you will be able to schedule your tasks as code and visualize data pipelines’ dependencies, progress, logs etc.
SageDataTriggerSyncOperator
Use the SageDataTriggerSyncOperator
to trigger a sync for an existing integration task in SageData.
Using the Operator
The SageDataTriggerSyncOperator requires the integration_task_id
which is the UUID identifier (universally unique identifier) created in SageData between a data source and destination synchronization job. To find the uuid of the integration task, click on ”Integrations” from your configured integrations list:
After that look at the address bar:
The part of the URL after /parameters/ is the integration_task_id
(uuid).
You will also need the sagedata_conn_id
parameter in order to specify the SageData connection. You will use it to connect to your account. sagedata_conn_id
is bound to the API access credentials in SageData platform.
To define sagedata_conn_id
go to your Airflow account and then to Admin->Connections->HTTP. Use the following details to define sagedata_conn_id
:
Host: api.sagedata.co
Schema: v1
Password:
<'api_key'>
Trigger synchronisation job
In Airflow you can trigger a synchronization job in two ways using the Operator. The first one is a synchronous process. This will trigger the SageData job and the Operator will manage the status of the job.
Another way is to use the flag asynchronous = True
so the Operator will only trigger the job and then use integration_task_id
that should be passed to a SageDataSensor.
An example using the synchronous way:
sync_source_destination = SageDataTriggerSyncOperator(
task_id='sagedata_run_sync_example',
integration_task_id=<...>,
sagedata_conn_id=<...>
)
An example using the async way:
async_source_destination = SageDataTriggerSyncOperator(
task_id='sagedata_async_example',
integration_task_id=<...>,
asynchronous=True,
)
sagedata_sensor = SageDataIntegrationSensor(
task_id='sagedata_sensor_source_dest_example',
integration_task_id=<...>,
)
async_source_destination >> sagedata_sensor
SageDataStopSyncOperator
Use the SageDataStopSyncOperator
to stop a sync for an existing integration task in SageData.
sync_source_destination = SageDataStopSyncOperator(
task_id='sagedata_run_sync_example',
integration_task_id=<...>,
sagedata_conn_id=<...>
)
Example DAG
This example DAG will demonstrate the usage of the SageDataTriggerSyncOperator.
from datetime import datetime, timedelta
from airflow import DAG from plugins.operators.sagedata import SageDataTriggerSyncOperator from plugins.sagedata.sensors.sagedata import SageDataSyncSensor
with DAG( dag_id='example_sagedata_operator', schedule_interval=None, start_date=datetime(2021, 1, 1), dagrun_timeout=timedelta(minutes=60), tags=['example'], default_args={"sagedata_conn_id" : ""} catchup=False, ) as dag:
# [START howto_operator_sagedata_synchronous] sync_source_destination = SageDataTriggerSyncOperator( task_id='sagedata_sync_source_dest_example', integration_task_id='XXX', )
# [END howto_operator_sagedata_synchronous]
# [START howto_operator_sagedata_asynchronous] async_source_destination = SageDataTriggerSyncOperator( task_id='sagedata_async_source_dest_example', integration_task_id='YYY', asynchronous=True, )
sagedata_sensor = SageDataSyncSensor( task_id='sagedata_sensor_source_dest_example', integration_task_id='YYY', )
# [END howto_operator_sagedata_asynchronous] async_source_destination >> sagedata_sensor