TaskFlow API в Apache Airflow 2.0
Декабрьский релиз Apache Airflow 2.0 принёс много нововведений в инструмент. А самое, пожалуй, заметное из них это TaskFlow API. В этой заметке я подробно разберу что это такое и как стало красиво и удобно описывать Python операторы, используя обычные функции и декоратор @task
.
PythonOperator
PythonOperator это один из самых популярных операторов для создания тасков в Apache Airflow. Он позволяет выполнять любой код на Python. Нужно всего лишь передать функцию в аргумент python_callable
при инициализации класса.
from airflow.operators.python import PythonOperator
def do_something(**context):
pass
operator = PythonOperator(
task_id='do_something',
python_callable=do_something,
)
Если функция явно делает return, то значение автоматически записывается в XCom. В тех случаях, когда необходимо получить записанное значение из другой функции, нужно явно обращаться к XCom:
from airflow.operators.python import PythonOperator
def do_something(**context):
return 'important_string'
def do_something2(**context):
ti = context['ti']
value_from_do_something = ti.xcom_pull(task_ids='do_something') # вернёт important_string
operator = PythonOperator(
task_id='do_something',
python_callable=do_something,
)
operator2 = PythonOperator(
task_id='do_something2',
python_callable=do_something2,
)
Такой код зачастую выглядит небрежно, теряется бизнес-логика функций. В Apache Airflow 2 решили эту проблему, разработчики постарались добавить синтаксического сахара и упростить взаимодействие между Python операторами. Теперь передача значений из одной функции в другую выглядит как привычный цепочный вызов функций, а вся магия взаимодействия с XCom спрятана под капотом.
TaskFlow API
TaskFlow API появился в Apache Airflow 2.0. Эта фича принесла с собой парочку новых декораторов и классов. Теперь чтобы превратить функцию в PythonOperator, её нужно обернуть в декоратор @task
.
Я разработал практический курс по Apache Airflow 2.0, он доступен на платформе StartDataJourney, создана она также мною. Приятного обучения - Apache Airflow 2.0: практический курс.
from airflow.decorators import task
@task
def do_something():
return 1
С этим декоратором не нужно явно создавать инстанс PythonOperator и присваивать ему python_callable
. В качестве task_id
у оператора будет название функции, по желанию можно задать task_id
в декораторе.
А теперь давайте разберёмся как работать с TaskFlow API на практическом примере. Представим, что перед нами стоит задача — необходимо скачать датасет и подсчитать количество строк в нём. Все данные нужно записать в XCom. В качестве датасета я возьму публичный файл с информацией о пассажирах на Титанике. Его можно скачать по адресу https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv.
Код дата пайплайна без TaskFlow API:
import os
import datetime as dt
import requests
from airflow import DAG
from airflow.operators.python import PythonOperator
def download_titanic_dataset():
url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv'
response = requests.get(url, stream=True)
response.raise_for_status()
filepath = os.path.join(os.path.expanduser('~'), 'titanic.csv')
with open(filepath, 'w', encoding='utf-8') as f:
for chunk in response.iter_lines():
f.write('{}\\n'.format(chunk.decode('utf-8')))
return filepath
def get_number_of_lines(file_path):
lines = 0
with open(file_path) as f:
for line in f:
if line:
lines += 1
return lines
def get_number_of_lines_wrapper(**context):
ti = context['ti']
file_path = ti.xcom_pull(task_ids='download_task')
return get_number_of_lines(file_path)
with DAG(
dag_id='titanic_old_style_dag',
start_date=dt.datetime(2021, 3, 1),
schedule_interval='@once'
) as dag:
download_task = PythonOperator(
task_id='download_task',
python_callable=download_titanic_dataset,
)
get_lines_task = PythonOperator(
task_id='get_lines',
python_callable=get_number_of_lines_wrapper
)
download_task >> get_lines_task
Функция get_number_of_lines_wrapper
это обёртка над get_number_of_lines
, которая отвечает за извлечение пути до файла из XCom и вызов оригинальной функции. Последняя не должна знать как получить этот путь, её задача — открыть файл и посчитать количество строк. Вариант со строкой-шаблоном в качестве аргумента вместо функции-обёртки можно посмотреть у меня в репозитории. Сам путь до файла записывается в XCom, когда функция download_titanic_dataset
возвращает значение.
А теперь давайте взглянем на код с использованием TaskFlow API:
import os
import datetime as dt
import requests
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id='titanic_dag',
start_date=dt.datetime(2021, 3, 1),
schedule_interval='@once'
) as dag:
@task
def download_titanic_dataset():
url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv'
response = requests.get(url, stream=True)
response.raise_for_status()
filepath = os.path.join(os.path.expanduser('~'), 'titanic.csv')
with open(filepath, 'w', encoding='utf-8') as f:
for chunk in response.iter_lines():
f.write('{}\\n'.format(chunk.decode('utf-8')))
return filepath
@task
def get_number_of_lines(file_path):
lines = 0
with open(file_path) as f:
for line in f:
if line:
lines += 1
return lines
get_number_of_lines(download_titanic_dataset())
Код выглядит значительно чище и читается легко. Во-первых, оригинальные функции остались без изменений. Во-вторых, путь до файла не нужно явно получать из XCom как это делается вне TaskFlow API. Также обратите внимание на то как я вызываю функции:
get_number_of_lines(download_titanic_dataset())
Такой цепочный вызов под капотом формирует зависимость между операторами. Как это работает?
XComArg
В Airflow 2 появился новый класс XComArg. Он представляет из себя сущность для взаимодействия с XCom, также поддерживает методы определения зависимостей: set_upstream
, set_downstream
и >>
и <<
соответственно. При вызове функции, обёрнутой в декоратор @task
, возвращается инстанс класса XComArg
:
>> print(type(download_titanic_dataset()))
<class 'airflow.models.xcom_arg.XComArg'>
Строковое представление объекта возвращает jinja-шаблон:
>> print(str(download_titanic_dataset()))
{{ task_instance.xcom_pull(task_ids='download_titanic_dataset', dag_id='titanic_dag', key='return_value') }}
Под капотом декоратор @task
создаёт PythonOperator. Если декорируемая функция принимает аргументы, то они прокидываются в этот PythonOperator. Поэтому когда происходит подобный вызов:
get_number_of_lines(download_titanic_dataset())
PythonOperator, созданный для функции get_number_of_lines
, получает XComArg, который позже превращается в значение, полученное из XCom. Зависимости строятся похожим образом. Если у оператора в аргументах есть инстанс XComArg, то строится зависимость через вызов set_xcomargs_dependencies. В нашем случае зависимостью для оператора get_number_of_lines
будет оператор download_titanic_dataset
. Код также можно переписать в такой вид:
file_path = download_titanic_dataset()
number_of_lines = get_number_of_lines(file_path)
А что если в DAGе хочется использовать оператор, отличный от PythonOperator? Легко! Вспомним, что XComArg поддерживает методы определения зависимостей. Представим, что нам захотелось перед оператором download_titanic_dataset
добавить проверку наличия файла на удалённом хосте. Для этого будем использовать SimpleHttpOperator с HTTP методом HEAD. Предварительно необходимо завести Connection на URL, указанный для загрузки.
Код:
import os
import datetime as dt
import requests
from airflow import DAG
from airflow.decorators import task
from airflow.providers.http.operators.http import SimpleHttpOperator
with DAG(
dag_id='titanic_dag',
start_date=dt.datetime(2021, 3, 1),
schedule_interval='@once'
) as dag:
check_if_file_exists = SimpleHttpOperator(
method='HEAD',
task_id='check_file_existence',
http_conn_id='web_stanford_http_id',
endpoint='/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv',
)
@task
def download_titanic_dataset():
url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv'
response = requests.get(url, stream=True)
response.raise_for_status()
filepath = os.path.join(os.path.expanduser('~'), 'titanic.csv')
with open(filepath, 'w', encoding='utf-8') as f:
for chunk in response.iter_lines():
f.write('{}\\n'.format(chunk.decode('utf-8')))
return filepath
@task
def get_number_of_lines(file_path):
lines = 0
with open(file_path) as f:
for line in f:
if line:
lines += 1
return lines
file_path = download_titanic_dataset()
number_of_lines = get_number_of_lines(file_path)
check_if_file_exists >> file_path
# check_if_file_exists >> file_path >> number_of_lines # так тоже можно
В Web UI граф выглядит так:
Советы
Может показаться, что теперь вызов функций и передача аргументов точно такие же как и при написании обычных python-скриптов, но это не так. Не забывайте, что операторы могут выполняться в разном адресном пространстве и даже на разных физических машинах, поэтому передача тяжелых объектов может значительно замедлить работу Airflow, т.к. обмен данными происходит через то же XCom у которого есть свои ограничения (только теперь это неявно). Более подробно о том как работает XCom в Airflow можно прочитать в моей заметке Apache Airflow и XCom. Но есть и хорошие новости. С выходом Apache Airflow 2 стало возможно написать свой бэкенд для XCom, как это сделать я расскажу в следующей статье.
Примеры к статьей можной найти в репозитории.