Using Airflow xComm custom backend to share data

What is xComs

Airflow XComs enables users to share data between tasks. The problem with the native Airflow xComs is that it is designed to share small amounts of data.
It is a good principle to use Airflow as a data orchestrator. As opposed to using it as a data manipulator that ingests and transforms the data. Transformations generally must be offloaded to the Data Warehouse.
In the few use cases where Airflow must function as a data processor, having a custom xComms backend can be a great feature. Custom XCom backend can help you with

Why use custom xComs

  • Custom XCom can provide more storage than the metadata database behind the standard xCom.
  • If you would like to have custom retention, deletion, and backup policies for XComs data.
  • With a custom XCom backend, you don’t need to worry about cleaning, managing and maintaining metadata db
  • With a custom XCom backend, you can set up custom serialization and deserialization methods. Airflow’s default approach of JSON serialization puts restrictions on the types of data you may send over XComs.
  • Accessing XCom data easily wherever you choose to store it without having to access the metadata database.
While there are a few online tutorials available, for example this one from Astronomer, none of them actually worked for us well in handling all data types.
 
We wanted to be able to send the following data via custom xComms:
Data Frames
  • Plain CSVs
  • Dictionaries
  • Lists

We will use this tutorial to illustrate how to set up xComm for Airflow using Docker container as well as custom scripts

Required Knowledge

You should be at least familiar with and have the basic coding skills in

  • Airflow
  • Docker
  • yml 
  • Python

But if you follow these instructions, you should be generally fine. In this tutorial, we will skip some of the pre-setup instructions that can be viewed here:

Xcom Backend Tutorial by Astronomer

Assumed Set-Up

It is already assumed that you managed to:

  • Set up Docker container with Airflow
  • Running Airflow 2.5.0
  • Created S3 bucket for xComm Data Storage
  • Created role with Credentials to access the S3 bucket
  • Added the credentials to your Airflow via Admin -> Connections
  • Have a .env file with environmental variables that are loaded with the Docker container set up

Instructions

Let’s dive into an actual setup of the backend. 

1. Create an include directory in your Airflow default directory. In that directory you will need to create a file for the custom xComm backend code. In our case it is called s3_xcom_backend.py

 

Inside that file you can post the ready made code listed bellow.

import uuid
import pandas as pd

from typing import Any
from airflow.models.xcom import BaseXCom
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import pickle

class S3XComBackend(BaseXCom):
PREFIX = "xcom_s3://"
BUCKET_NAME = "{BUCKET_NAME}"

@staticmethod
def serialize_value(value: Any):
print(f"DEBUG: Custom xComm beckend object of type {type(value)} with content: {value}")
if isinstance(value, pd.DataFrame):
print(f"DEBUG: serializing value of type {type(value)} with content: {value}")
hook = S3Hook()
key = "data_csv_" + str(uuid.uuid4())
filename = f"{key}.csv"

value.to_csv(filename)
hook.load_file(
filename=filename,
key=key,
bucket_name=S3XComBackend.BUCKET_NAME,
replace=True
)
value = S3XComBackend.PREFIX + key
return BaseXCom.serialize_value(value)

if isinstance(value, list) or isinstance(value, dict):
print(f"DEBUG: serializing value of type {type(value)} with content: {value}")
hook = S3Hook()
key = "data_dat_" + str(uuid.uuid4())
filename = f"{key}.dat"

with open(filename, 'wb') as f:
pickle.dump(value, f)

hook.load_file(
filename=filename,
key=key,
bucket_name=S3XComBackend.BUCKET_NAME,
replace=True
)
value = S3XComBackend.PREFIX + key
return BaseXCom.serialize_value(value)

@staticmethod
def deserialize_value(result) -> Any:
result = BaseXCom.deserialize_value(result)
print("BaseXCOM deserialized result:", result)
if isinstance(result, str) and result.startswith(S3XComBackend.PREFIX):
hook = S3Hook()
key = result.replace(S3XComBackend.PREFIX, "")
filename = hook.download_file(
key=key,
bucket_name=S3XComBackend.BUCKET_NAME,
preserve_file_name=True,
local_path="/tmp"
)

print("filename:", filename)
# get the file format to distinguish between different types of data
fmt = filename.split('/')[3].split('_')[1]

print("fmt:", fmt)

if fmt == "csv":
result = pd.read_csv(filename)
return result

if fmt == "dat":
with open(filename, 'rb') as f:
result = pickle.load(f)
return result

Let us look at the code. There is a class called CustomXComBackend. This class has two methods:

.serialize_value() handles the serialization of the value that is pushed to XCom from an Airflow task,

and

.deserialize_value() handles the deserialization of thedata from the XCom backend back into xComm value(s)

The .serialize_value() method:

  • Creates the connection to the S3 bucket.
  • Creates a unique filename using the uuid package.
  • Employs the run_id and task_id obtained from the Airflow context to create the key under which the serialized value will be saved in the storage.
  • Writes the value that is being pushed to XCom to the object storage using serialization that is capable of handling not only data frames but also dictionaries and plain text values.
  • Creates a unique reference_string that is written to the Airflow metadata database as a regular XCom.

The .deserialize_value() method:

    • Recovers the reference_string for a given item (result) from the Airflow metadata database using regular XCom.
    • Downloads the serialised file at the key contained in the reference_string.
    • Deserializes the information from the JSON file.

Modifying Docker

Now that you have created your custom xComm class with 2 functions, we need to let docker know where this file is located and load it in Airflow.

In your docker-compose.yaml you need to do 2 things:
1. Add the following line in the environment section

AIRFLOW__CORE__XCOM_BACKEND=include.xcom_backend_json.CustomXComBackendJSON

2. Add ./include:/opt/airflow/include in the volumes and airflow init sections

You are now ready to re-launch your docker container with the new settings to use your custom xCom backend. Do so using docker-compose down and docker-compose up 

DAG for Custom XComs

Now that you are all set with the backend, you can start writing DAGs that will share the data between the tasks, using the S3 as data staging environment.

Here is an example of a DAG that can use custom xComm backend:

from datetime import datetime, timedelta
from airflow.decorators import task
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python import PythonOperator
from airflow.models import Variable

BASE_DIR = Variable.get("BASE_DIR")

DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 12, 16),
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1)
}

def get_data(query, **context):
"""
Querying the data in postgres database. This function only queries the columns from the table that are in the config file
:param query: string
:param context:
:return: list of documents
"""
pg_hook = PostgresHook(postgres_conn_id='DB_CONN')
df = pg_hook.get_pandas_df(sql=query)
df = df.reset_index() # reset the index
return df


@task
def retrieve_data(record):
print(record)

with DAG(
dag_id='TestDag',
default_args=DEFAULT_ARGS,
schedule_interval='@daily',
start_date=datetime(year=2022, month=2, day=1),
catchup=False
) as dag:
task_get_data = PythonOperator(
task_id='Get_Data',
python_callable=get_data,
provide_context=True,
op_kwargs={
'query': 'SELECT * FROM MY_TABLE'
}
)

task_retrieve_data = retrieve_data.partial().expand(record=task_get_data.output)

task_get_data >> task_retrieve_data

The above mentioned DAG has 2 tasks:

1. Task to query data from a database table and save it xComm S3 destination

2. Task to retrieve the queried data from xComm  S3 destination and cycle through the result row by row

 

Closing Thoughts

Congratulations! You have just learned how to use custom xComm backed to store data and pass it between tasks. If you need more help with Airflow to Data Management, reach out to SageData team

Comments are closed.