Apache Airflow и XCom

XCom или Cross-Communication, это механизм Apache Airflow для передачи параметров из одного оператора в другой. Если просто, то это таблица в базе данных, хранящая значения, записанные операторами Airflow. У этой таблицы есть несколько столбцов:

  • key — ключ записи
  • value — значение, данные хранятся в двоичном формате. Для MySQL это тип BLOB, а для Postgres BYTEA.
  • timestamp — время создания записи в базе
  • execution_date — дата выполнения DAG
  • task_id — ID оператора, записавшего данные в XCom
  • dag_id — ID DAG

Несмотря на то, что BYTEA в Postgres может хранить до 1 Гб двоичных данных, сообщество не рекомендует передавать через XCom большие данные. Это связано с тем, что перед укладкой в таблицу они подвергаются сериализации (JSON/pickle), а при чтении происходит этап десериализации. Также не стоит забывать, что данные передаются по сети, а это значительно замедляет работу пайплайна. Цитата из официальной доки по PostgreSQL:

The BYTEA data type is not well suited for storing very large amounts of binary data. While a column of type BYTEA can hold up to 1 GB of binary data, it would require a huge amount of memory to process such a large value.

Зачем всё так сложно? Почему нельзя передавать аргументы как это происходит при написании обычного кода на Python?

Главная причина в том, что операторы могут исполняться в разных адресных пространствах (Local Executor) и даже на отдельных физических машинах (Celery/Dask Executors, Kubernetes Executor). В такой ситуации нужен механизм для передачи сообщений от одной машины к другой. Центральным хранилищем в этом случае является база данных Airflow (рекомендуется использовать PostgreSQL).

Я разработал практический курс по Apache Airflow 2.0, он доступен на платформе StartDataJourney, создана она также мною. Приятного обучения - Apache Airflow 2.0: практический курс.

Как работать с XCom

По умолчанию если оператор в методе .execute возвращает значение, это значение автоматически попадает в XCom. Это поведение можно изменить, передав в аргумент do_xcom_push значение False:

from airflow.operators.bash import BashOperator

BashOperator(
    task_id='list_dir',
    bash_command='ls -la',
    do_xcom_push=False,
)

Если callable, переданный в аргументе python_callable у PythonOperator возвращает какое-либо значение, то это значение также автоматически будет записано в XCom.

Пример кода:

from airflow.operators.python import PythonOperator

def return_something():
    return 1 + 1  # автоматически запишется в XCom

PythonOperator(
    task_id='return_something',
    python_callable=return_something,
)

Помимо этого с XCom можно работать вручную, записывая или получая данные путём вызова методов xcom_push и xcom_pull у производных от TaskInstance (кстати, тоже таблица в БД).

TaskInstance доступен в контексте выполнения DAG, а также как переменная в шаблоне под именем ti или task_instance. Например, если вам необходимо прочитать значение предыдущего таска внутри функции PythonOperator, то необходимо из контекста получить TaskInstance и вызвать метод xcom_pull, передав ему название оператора, который уложил результат в XCom:

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

def print_xcom(**kwargs):
    ti = kwargs['ti']
    print('The value is: {}'.format(
        ti.xcom_pull(task_ids='hello_world')
    ))

with DAG(
    dag_id='xcom_dag',
    start_date=datetime(2021, 3, 1),
    schedule_interval='@once',
) as dag:

    cmd = BashOperator(
        task_id='hello_world',
        bash_command='echo "Hello world"',
    )

    printer = PythonOperator(
        task_id='printer',
        python_callable=print_xcom,
    )

    cmd >> printer

Все переменные из контекста DagRun передаются в python_callable как именованные аргументы. Также контекст внутри функции можно получить путём вызова функции get_current_context, это удобно при использовании TaskFlow API:

from airflow.operators.python import get_current_context

@task
def download_file():
    context = get_current_context()
    return download_dataset(context['execution_date'].strftime('%Y-%m'))

Может возникнуть вопрос, а что если одновременно выполняются несколько DagRun, не получится ли так, что мы прочитаем "чужое" значение в XCom? Нет. Записи привязываются к dag_id, task_id и execution_date. У, скажем, 3-х одновременно выполняющихся DagRun будут разные значения execution_date.

Просмотр XCom доступен через веб-интерфейс Apache Airflow в разделе Admin → XComs:



На скрине пример запуска DAG чей код был выше.

Обратите внимание на колонку Key, в неё по умолчанию записывается значение return_value, что, видимо, является индикатором того, что оператор в методе execute вернул значение. При самостоятельной записи в XCom можно указать свой key:

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

def push_cmd(**kwargs):
    ti = kwargs['ti']
    ti.xcom_push(value='cat /etc/passwd | wc -l', key='passwd_len')

def pull_result(**kwargs):
    ti = kwargs['ti']
    print(ti.xcom_pull(task_ids='bash_executor'))

with DAG(
    dag_id='xcom_pull_dag',
    start_date=datetime(2021, 3, 1),
    schedule_interval='@once',
) as dag:

    push_cmd_task = PythonOperator(
        task_id='push_cmd',
        python_callable=push_cmd,
    )

    cmd = BashOperator(
        task_id='bash_executor',
        bash_command='{{ ti.xcom_pull(task_ids="push_cmd", key="passwd_len") }}',
    )

    pull_result_task = PythonOperator(
        task_id='pull_result',
        python_callable=pull_result,
    )

    push_cmd_task >> cmd >> pull_result_task

Также обратите внимание, что метод xcom_pull может принимать список в аргументе task_ids. Туда можно передать несколько task_id по которым хочется получить значения. Взгляните на следующий граф:



Каждый push таск укладывает в XCom значение, а calculator таск собирает их все и суммирует. Результат записывается обратно в XCom через возвращаемое значение в функции. Вот код этого DAGа:

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator

def calculator_func(**kwargs):
    ti = kwargs['ti']
    tasks = [f'push_{i}' for i in range(1, 10)]
    values = ti.xcom_pull(task_ids=tasks)
    return sum(values)

with DAG(
    dag_id='xcom_multiple_tasks',
    start_date=datetime(2021, 3, 1),
    schedule_interval='@once',
) as dag:

    tasks = []

    for i in range(1, 10):
        task = PythonOperator(
            task_id=f'push_{i}',
            python_callable=lambda i=i: i,
        )

        tasks.append(task)

    calculator = PythonOperator(
        task_id='calculator',
        python_callable=calculator_func,
    )

    calculator.set_upstream(tasks)

А вот так это выглядит в веб-интерфейсе:

На этом, пожалуй, всё. Главное помните, что не стоит передавать через XCom большие объекты, например, файлы или DataFrame объекты какого-нибудь pandas.

Примеры всех DAG можно найти в репозитории.