Apache Airflow и XCom
XCom или Cross-Communication, это механизм Apache Airflow для передачи параметров из одного оператора в другой. Если просто, то это таблица в базе данных, хранящая значения, записанные операторами Airflow. У этой таблицы есть несколько столбцов:
- key — ключ записи
- value — значение, данные хранятся в двоичном формате. Для MySQL это тип BLOB, а для Postgres BYTEA.
- timestamp — время создания записи в базе
- execution_date — дата выполнения DAG
- task_id — ID оператора, записавшего данные в XCom
- dag_id — ID DAG
Несмотря на то, что BYTEA в Postgres может хранить до 1 Гб двоичных данных, сообщество не рекомендует передавать через XCom большие данные. Это связано с тем, что перед укладкой в таблицу они подвергаются сериализации (JSON/pickle), а при чтении происходит этап десериализации. Также не стоит забывать, что данные передаются по сети, а это значительно замедляет работу пайплайна. Цитата из официальной доки по PostgreSQL:
The BYTEA data type is not well suited for storing very large amounts of binary data. While a column of type BYTEA can hold up to 1 GB of binary data, it would require a huge amount of memory to process such a large value.
Зачем всё так сложно? Почему нельзя передавать аргументы как это происходит при написании обычного кода на Python?
Главная причина в том, что операторы могут исполняться в разных адресных пространствах (Local Executor) и даже на отдельных физических машинах (Celery/Dask Executors, Kubernetes Executor). В такой ситуации нужен механизм для передачи сообщений от одной машины к другой. Центральным хранилищем в этом случае является база данных Airflow (рекомендуется использовать PostgreSQL).
Я разработал практический курс по Apache Airflow 2.0, он доступен на платформе StartDataJourney, создана она также мною. Приятного обучения - Apache Airflow 2.0: практический курс.
Как работать с XCom
По умолчанию если оператор в методе .execute
возвращает значение, это значение автоматически попадает в XCom. Это поведение можно изменить, передав в аргумент do_xcom_push
значение False
:
from airflow.operators.bash import BashOperator
BashOperator(
task_id='list_dir',
bash_command='ls -la',
do_xcom_push=False,
)
Если callable, переданный в аргументе python_callable
у PythonOperator возвращает какое-либо значение, то это значение также автоматически будет записано в XCom.
Пример кода:
from airflow.operators.python import PythonOperator
def return_something():
return 1 + 1 # автоматически запишется в XCom
PythonOperator(
task_id='return_something',
python_callable=return_something,
)
Помимо этого с XCom можно работать вручную, записывая или получая данные путём вызова методов xcom_push
и xcom_pull
у производных от TaskInstance (кстати, тоже таблица в БД).
TaskInstance доступен в контексте выполнения DAG, а также как переменная в шаблоне под именем ti
или task_instance
. Например, если вам необходимо прочитать значение предыдущего таска внутри функции PythonOperator, то необходимо из контекста получить TaskInstance и вызвать метод xcom_pull
, передав ему название оператора, который уложил результат в XCom:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
def print_xcom(**kwargs):
ti = kwargs['ti']
print('The value is: {}'.format(
ti.xcom_pull(task_ids='hello_world')
))
with DAG(
dag_id='xcom_dag',
start_date=datetime(2021, 3, 1),
schedule_interval='@once',
) as dag:
cmd = BashOperator(
task_id='hello_world',
bash_command='echo "Hello world"',
)
printer = PythonOperator(
task_id='printer',
python_callable=print_xcom,
)
cmd >> printer
Все переменные из контекста DagRun передаются в python_callable
как именованные аргументы. Также контекст внутри функции можно получить путём вызова функции get_current_context
, это удобно при использовании TaskFlow API:
from airflow.operators.python import get_current_context
@task
def download_file():
context = get_current_context()
return download_dataset(context['execution_date'].strftime('%Y-%m'))
Может возникнуть вопрос, а что если одновременно выполняются несколько DagRun, не получится ли так, что мы прочитаем "чужое" значение в XCom? Нет. Записи привязываются к dag_id, task_id и execution_date. У, скажем, 3-х одновременно выполняющихся DagRun будут разные значения execution_date.
Просмотр XCom доступен через веб-интерфейс Apache Airflow в разделе Admin → XComs:
На скрине пример запуска DAG чей код был выше.
Обратите внимание на колонку Key, в неё по умолчанию записывается значение return_value, что, видимо, является индикатором того, что оператор в методе execute
вернул значение. При самостоятельной записи в XCom можно указать свой key:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
def push_cmd(**kwargs):
ti = kwargs['ti']
ti.xcom_push(value='cat /etc/passwd | wc -l', key='passwd_len')
def pull_result(**kwargs):
ti = kwargs['ti']
print(ti.xcom_pull(task_ids='bash_executor'))
with DAG(
dag_id='xcom_pull_dag',
start_date=datetime(2021, 3, 1),
schedule_interval='@once',
) as dag:
push_cmd_task = PythonOperator(
task_id='push_cmd',
python_callable=push_cmd,
)
cmd = BashOperator(
task_id='bash_executor',
bash_command='{{ ti.xcom_pull(task_ids="push_cmd", key="passwd_len") }}',
)
pull_result_task = PythonOperator(
task_id='pull_result',
python_callable=pull_result,
)
push_cmd_task >> cmd >> pull_result_task
Также обратите внимание, что метод xcom_pull
может принимать список в аргументе task_ids. Туда можно передать несколько task_id по которым хочется получить значения. Взгляните на следующий граф:
Каждый push таск укладывает в XCom значение, а calculator таск собирает их все и суммирует. Результат записывается обратно в XCom через возвращаемое значение в функции. Вот код этого DAGа:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
def calculator_func(**kwargs):
ti = kwargs['ti']
tasks = [f'push_{i}' for i in range(1, 10)]
values = ti.xcom_pull(task_ids=tasks)
return sum(values)
with DAG(
dag_id='xcom_multiple_tasks',
start_date=datetime(2021, 3, 1),
schedule_interval='@once',
) as dag:
tasks = []
for i in range(1, 10):
task = PythonOperator(
task_id=f'push_{i}',
python_callable=lambda i=i: i,
)
tasks.append(task)
calculator = PythonOperator(
task_id='calculator',
python_callable=calculator_func,
)
calculator.set_upstream(tasks)
А вот так это выглядит в веб-интерфейсе:
На этом, пожалуй, всё. Главное помните, что не стоит передавать через XCom большие объекты, например, файлы или DataFrame объекты какого-нибудь pandas.
Примеры всех DAG можно найти в репозитории.