TaskFlow API в Apache Airflow 2.0

Декабрьский релиз Apache Airflow 2.0 принёс много нововведений в инструмент. А самое, пожалуй, заметное из них это TaskFlow API. В этой заметке я подробно разберу что это такое и как стало красиво и удобно описывать Python операторы, используя обычные функции и декоратор @task.

PythonOperator

PythonOperator это один из самых популярных операторов для создания тасков в Apache Airflow. Он позволяет выполнять любой код на Python. Нужно всего лишь передать функцию в аргумент python_callable при инициализации класса.

from airflow.operators.python import PythonOperator

def do_something(**context):
    pass

operator = PythonOperator(
    task_id='do_something',
    python_callable=do_something,
)

Если функция явно делает return, то значение автоматически записывается в XCom. В тех случаях, когда необходимо получить записанное значение из другой функции, нужно явно обращаться к XCom:

from airflow.operators.python import PythonOperator

def do_something(**context):
    return 'important_string'

def do_something2(**context):
    ti = context['ti']
    value_from_do_something = ti.xcom_pull(task_ids='do_something')  # вернёт important_string

operator = PythonOperator(
    task_id='do_something',
    python_callable=do_something,
)

operator2 = PythonOperator(
    task_id='do_something2',
    python_callable=do_something2,
)

Такой код зачастую выглядит небрежно, теряется бизнес-логика функций. В Apache Airflow 2 решили эту проблему, разработчики постарались добавить синтаксического сахара и упростить взаимодействие между Python операторами. Теперь передача значений из одной функции в другую выглядит как привычный цепочный вызов функций, а вся магия взаимодействия с XCom спрятана под капотом.

TaskFlow API

TaskFlow API появился в Apache Airflow 2.0. Эта фича принесла с собой парочку новых декораторов и классов. Теперь чтобы превратить функцию в PythonOperator, её нужно обернуть в декоратор @task.

Я разработал практический курс по Apache Airflow 2.0, он доступен на платформе StartDataJourney, создана она также мною. Приятного обучения - Apache Airflow 2.0: практический курс.

from airflow.decorators import task

@task
def do_something():
    return 1

С этим декоратором не нужно явно создавать инстанс PythonOperator и присваивать ему python_callable. В качестве task_id у оператора будет название функции, по желанию можно задать task_id в декораторе.

А теперь давайте разберёмся как работать с TaskFlow API на практическом примере. Представим, что перед нами стоит задача — необходимо скачать датасет и подсчитать количество строк в нём. Все данные нужно записать в XCom. В качестве датасета я возьму публичный файл с информацией о пассажирах на Титанике. Его можно скачать по адресу https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv.

Код дата пайплайна без TaskFlow API:

import os
import datetime as dt

import requests
from airflow import DAG
from airflow.operators.python import PythonOperator

def download_titanic_dataset():
    url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv'
    response = requests.get(url, stream=True)
    response.raise_for_status()
    filepath = os.path.join(os.path.expanduser('~'), 'titanic.csv')
    with open(filepath, 'w', encoding='utf-8') as f:
        for chunk in response.iter_lines():
            f.write('{}\\n'.format(chunk.decode('utf-8')))
    return filepath

def get_number_of_lines(file_path):
    lines = 0
    with open(file_path) as f:
        for line in f:
            if line:
                lines += 1
    return lines

def get_number_of_lines_wrapper(**context):
    ti = context['ti']
    file_path = ti.xcom_pull(task_ids='download_task')
    return get_number_of_lines(file_path)

with DAG(
        dag_id='titanic_old_style_dag',
        start_date=dt.datetime(2021, 3, 1),
        schedule_interval='@once'
) as dag:
    download_task = PythonOperator(
        task_id='download_task',
        python_callable=download_titanic_dataset,
    )

    get_lines_task = PythonOperator(
        task_id='get_lines',
        python_callable=get_number_of_lines_wrapper
    )

    download_task >> get_lines_task

Функция get_number_of_lines_wrapper это обёртка над get_number_of_lines, которая отвечает за извлечение пути до файла из XCom и вызов оригинальной функции. Последняя не должна знать как получить этот путь, её задача — открыть файл и посчитать количество строк. Вариант со строкой-шаблоном в качестве аргумента вместо функции-обёртки можно посмотреть у меня в репозитории. Сам путь до файла записывается в XCom, когда функция download_titanic_dataset возвращает значение.

А теперь давайте взглянем на код с использованием TaskFlow API:

import os
import datetime as dt

import requests
from airflow import DAG
from airflow.decorators import task

with DAG(
    dag_id='titanic_dag',
    start_date=dt.datetime(2021, 3, 1),
    schedule_interval='@once'
) as dag:

    @task
    def download_titanic_dataset():
        url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv'
        response = requests.get(url, stream=True)
        response.raise_for_status()
        filepath = os.path.join(os.path.expanduser('~'), 'titanic.csv')
        with open(filepath, 'w', encoding='utf-8') as f:
            for chunk in response.iter_lines():
                f.write('{}\\n'.format(chunk.decode('utf-8')))
        return filepath

    @task
    def get_number_of_lines(file_path):
        lines = 0
        with open(file_path) as f:
            for line in f:
                if line:
                    lines += 1
        return lines

    get_number_of_lines(download_titanic_dataset())

Код выглядит значительно чище и читается легко. Во-первых, оригинальные функции остались без изменений. Во-вторых, путь до файла не нужно явно получать из XCom как это делается вне TaskFlow API. Также обратите внимание на то как я вызываю функции:

get_number_of_lines(download_titanic_dataset())

Такой цепочный вызов под капотом формирует зависимость между операторами. Как это работает?

XComArg

В Airflow 2 появился новый класс XComArg. Он представляет из себя сущность для взаимодействия с XCom, также поддерживает методы определения зависимостей: set_upstream, set_downstream и >> и << соответственно. При вызове функции, обёрнутой в декоратор @task, возвращается инстанс класса XComArg:

>> print(type(download_titanic_dataset()))
<class 'airflow.models.xcom_arg.XComArg'>

Строковое представление объекта возвращает jinja-шаблон:

>> print(str(download_titanic_dataset()))
{{ task_instance.xcom_pull(task_ids='download_titanic_dataset', dag_id='titanic_dag', key='return_value') }}

Под капотом декоратор @task создаёт PythonOperator. Если декорируемая функция принимает аргументы, то они прокидываются в этот PythonOperator. Поэтому когда происходит подобный вызов:

get_number_of_lines(download_titanic_dataset())

PythonOperator, созданный для функции get_number_of_lines, получает XComArg, который позже превращается в значение, полученное из XCom. Зависимости строятся похожим образом. Если у оператора в аргументах есть инстанс XComArg, то строится зависимость через вызов set_xcomargs_dependencies. В нашем случае зависимостью для оператора get_number_of_lines будет оператор download_titanic_dataset. Код также можно переписать в такой вид:

 file_path = download_titanic_dataset()
 number_of_lines = get_number_of_lines(file_path)

А что если в DAGе хочется использовать оператор, отличный от PythonOperator? Легко! Вспомним, что XComArg поддерживает методы определения зависимостей. Представим, что нам захотелось перед оператором download_titanic_dataset добавить проверку наличия файла на удалённом хосте. Для этого будем использовать SimpleHttpOperator с HTTP методом HEAD. Предварительно необходимо завести Connection на URL, указанный для загрузки.

Код:

import os
import datetime as dt

import requests
from airflow import DAG
from airflow.decorators import task
from airflow.providers.http.operators.http import SimpleHttpOperator

with DAG(
    dag_id='titanic_dag',
    start_date=dt.datetime(2021, 3, 1),
    schedule_interval='@once'
) as dag:

    check_if_file_exists = SimpleHttpOperator(
        method='HEAD',
        task_id='check_file_existence',
        http_conn_id='web_stanford_http_id',
        endpoint='/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv',
    )

    @task
    def download_titanic_dataset():
        url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv'
        response = requests.get(url, stream=True)
        response.raise_for_status()
        filepath = os.path.join(os.path.expanduser('~'), 'titanic.csv')
        with open(filepath, 'w', encoding='utf-8') as f:
            for chunk in response.iter_lines():
                f.write('{}\\n'.format(chunk.decode('utf-8')))
        return filepath

    @task
    def get_number_of_lines(file_path):
        lines = 0
        with open(file_path) as f:
            for line in f:
                if line:
                    lines += 1
        return lines

    file_path = download_titanic_dataset()
    number_of_lines = get_number_of_lines(file_path)

    check_if_file_exists >> file_path

    # check_if_file_exists >> file_path >> number_of_lines  # так тоже можно

В Web UI граф выглядит так:

Советы

Может показаться, что теперь вызов функций и передача аргументов точно такие же как и при написании обычных python-скриптов, но это не так. Не забывайте, что операторы могут выполняться в разном адресном пространстве и даже на разных физических машинах, поэтому передача тяжелых объектов может значительно замедлить работу Airflow, т.к. обмен данными происходит через то же XCom у которого есть свои ограничения (только теперь это неявно). Более подробно о том как работает XCom в Airflow можно прочитать в моей заметке Apache Airflow и XCom. Но есть и хорошие новости. С выходом Apache Airflow 2 стало возможно написать свой бэкенд для XCom, как это сделать я расскажу в следующей статье.

Примеры к статьей можной найти в репозитории.