TaskFlow API в Apache Airflow 2.0

Декабрьский релиз Apache Airflow 2.0 принёс много нововведений в инструмент. А самое, пожалуй, заметное из них это TaskFlow API. В этой заметке я подробно разберу что это такое и как стало красиво и удобно описывать Python операторы, используя обычные функции и декоратор @task.

PythonOperator

PythonOperator это один из самых популярных операторов для создания тасков в Apache Airflow. Он позволяет выполнять любой код на Python. Нужно всего лишь передать функцию в аргумент python_callable при инициализации класса.

from airflow.operators.python import PythonOperator

def do_something(**context):
    pass

operator = PythonOperator(
    task_id='do_something',
    python_callable=do_something,
)

Если функция явно делает return, то значение автоматически записывается в XCom. В тех случаях, когда необходимо получить записанное значение из другой функции, нужно явно обращаться к XCom:

from airflow.operators.python import PythonOperator

def do_something(**context):
    return 'important_string'

def do_something2(**context):
    ti = context['ti']
    value_from_do_something = ti.xcom_pull(task_ids='do_something')  # вернёт important_string

operator = PythonOperator(
    task_id='do_something',
    python_callable=do_something,
)

operator2 = PythonOperator(
    task_id='do_something2',
    python_callable=do_something2,
)

Такой код зачастую выглядит небрежно, теряется бизнес-логика функций. В Apache Airflow 2 решили эту проблему, разработчики постарались добавить синтаксического сахара и упростить взаимодействие между Python операторами. Теперь передача значений из одной функции в другую выглядит как привычный цепочный вызов функций, а вся магия взаимодействия с XCom спрятана под капотом.

читать дальше

Apache Airflow и XCom

XCom или Cross-Communication, это механизм Apache Airflow для передачи параметров из одного оператора в другой. Если просто, то это таблица в базе данных, хранящая значения, записанные операторами Airflow. У этой таблицы есть несколько столбцов:

  • key — ключ записи
  • value — значение, данные хранятся в двоичном формате. Для MySQL это тип BLOB, а для Postgres BYTEA.
  • timestamp — время создания записи в базе
  • execution_date — дата выполнения DAG
  • task_id — ID оператора, записавшего данные в XCom
  • dag_id — ID DAG

Несмотря на то, что BYTEA в Postgres может хранить до 1 Гб двоичных данных, сообщество не рекомендует передавать через XCom большие данные. Это связано с тем, что перед укладкой в таблицу они подвергаются сериализации (JSON/pickle), а при чтении происходит этап десериализации. Также не стоит забывать, что данные передаются по сети, а это значительно замедляет работу пайплайна. Цитата из официальной доки по PostgreSQL:

The BYTEA data type is not well suited for storing very large amounts of binary data. While a column of type BYTEA can hold up to 1 GB of binary data, it would require a huge amount of memory to process such a large value.

Зачем всё так сложно? Почему нельзя передавать аргументы как это происходит при написании обычного кода на Python?

Главная причина в том, что операторы могут исполняться в разных адресных пространствах (Local Executor) и даже на отдельных физических машинах (Celery/Dask Executors, Kubernetes Executor). В такой ситуации нужен механизм для передачи сообщений от одной машины к другой. Центральным хранилищем в этом случае является база данных Airflow (рекомендуется использовать PostgreSQL).

читать дальше

Курс Apache Airflow 2.0

UPDATE: Курс доступен на платформе StartDataJourney, разработанной мною же. Сейчас есть возможность приобрести его с 15% скидкой по промокоду EARLYBIRD, ввести промокод можно на этапе оформления заказа. Приятного обучения - Apache Airflow 2.0: практический курс.

Наверняка вы читали мой пост про введение в Apache Airflow. Многое с тех пор изменилось в инструменте, в декабре 2020 года вышла новая версия Apache Airflow 2.0. В ней появилось множество интересных фишечек:

  • TaskFlow API
  • полноценный REST API
  • обновлённый UI, он теперь выглядит свежим
  • отказоустойчивый планировщик, отныне он не является точкой отказа
  • серьёзные улучшения по производительности Airflow
  • Task Group на замену SubDAGs
  • умные сенсоры

Сейчас Apache Airflow чуть ли не главный инструмент в арсенале современного дата инженера. В описании почти любой вакансии на должность дата инженера требуется навык работы с ним.

В связи с этим я решил создать практический курс по работе с Apache Airflow версии 2 и выше. В нём я постараюсь раскрыть весь инструмент, рассказать про основные компоненты, подводные камни. Безусловно не обойдётся и без практических примеров. Я разработаю как можно больше реальных примеров построения дата пайплайнов, затрону самые популярные операторы, работу с облаками и деплой в продакшен на базе Celery и Kubernetes.

Формат курса будет смешанный, но доминировать будет текст. Там где целесообразно продемонстрировать функционал или пример через видео будет видеоряд, но я сторонник текстовых курсов как с точки зрения потребителя так и создателя. У меня уже был опыт создания подобного формата курса про менее известный инструмент Luigi.

читать дальше

Введение в logging на Python

В стандартной библиотеке Python есть замечательный пакет для логирования — logging. В сети бытует мнение, что он сложный и настраивать его сплошная боль. В этой статье я попробую убедить вас в обратном. Мы разберём что из себя представляет этот пакет, изучим основные компоненты и закрепим материал практическим примером.

Зачем нужны логи?

Логи это рентген снимок выполнения вашей программы. Чем детальнее лог, тем проще разобраться в нестандартных ситуациях, которые могут приключиться с вашим скриптом. Наиболее популярным примером логов служат access логи веб-сервера, например, Apache httpd или nginx. Пример куска access лога моего блога:

92.63.107.227 - - [04/Nov/2020:06:30:48 +0000] "GET /ru/hosted-open-vpn-server/ HTTP/1.1" 301 169 "-" "python-requests/2.11.1" "-"
92.63.107.227 - - [04/Nov/2020:06:30:49 +0000] "GET /ru/data-engineering-course/ HTTP/1.1" 301 169 "-" "python-requests/2.11.1" "-"
213.180.203.50 - - [04/Nov/2020:06:36:07 +0000] "GET / HTTP/1.1" 301 169 "-" "Mozilla/5.0 (compatible; YandexMetrika/2.0; +http://yandex.com/bots yabs01)" "-"
114.119.160.75 - - [04/Nov/2020:06:36:41 +0000] "GET /robots.txt HTTP/1.1" 301 169 "-" "(compatible;PetalBot;+https://aspiegel.com/petalbot)" "10.179.80.67"
90.180.35.207 - - [04/Nov/2020:06:47:11 +0000] "GET / HTTP/1.0" 301 169 "-" "-" "-"
46.246.122.77 - - [04/Nov/2020:06:53:22 +0000] "GET / HTTP/1.1" 301 169 "<http://khashtamov.com>" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36" "-"
66.249.76.16 - - [04/Nov/2020:06:53:30 +0000] "GET / HTTP/1.1" 301 169 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)" "-"
66.102.9.118 - - [04/Nov/2020:07:11:19 +0000] "GET / HTTP/1.1" 301 169 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.75 Safari/537.36 Google Favicon" "46.159.204.234"
71.6.167.142 - - [04/Nov/2020:07:11:55 +0000] "GET / HTTP/1.1" 301 169 "-" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36" "-"

Помимо access логов веб-сервер также пишет error лог, там хранится информация обо всех ошибках при обработке HTTP запросов. Также и в ваших скриптах, логи могут делиться на информационные — вывод текущего состояния выполнения, отладочной информации, и на логи с ошибками — вывод исключений, ошибок с дополнительной информацией для отладки, содержащей контекст).

logging и Python

Точкой входа в работу с логированием в Python является библиотека logging. На первый взгляд может показаться, что библиотека сложная и запутанная, но потратив некоторое время на её изучение, можно убедиться в обратном. Для меня logging это классический пример дизайна ООП, где композиция преобладает над наследованием, поэтому в исходном коде библиотеки можно встретить множество функциональных классов. Цель этого туториала разобрать по косточкам каждый класс и воссоединить их в единый механизм логирования в Python. Начнём-с.читать дальше

Как настроить свой VPN сервер

Правительства разных стран так или иначе пытаются запретить доступ к определённым ресурсам в Интернете. В России "заблокирован" Telegram и LinkedIn, в Казахстане уже не первый год невозможно посетить сайт Медузы, под раздачу даже попал официальный сайт gRPC, в Украине на фоне политических проблем запретили доступ к Яндексу.

От мала до велика знают как обойти эти блокировки, никого уже не удивишь словом VPN. Спроси у продвинутого юзера про VPN и он лихо назовёт парочку приложений для своего любимого смартфона в названии которых точно будет VPN 😎 Но мало кто задумывается, что публичные впн-сервисы могут также таить угрозу. Например, они без труда могут перехватывать весь трафик между вашим телефоном (клиентом) и сервисом на который вы обращаетесь (сервером). Таким сервисом может быть в том числе и банковское приложение, интернет-банкинг. Согласитесь, неприятно будет в один прекрасный день увидеть 0 на балансе вашего личного счёта.

Чтобы чувствовать себя в безопасности при использовании VPN я рекомендую настроить свой собственный VPN сервер. Для этого не нужно уметь программировать и даже иметь навыки администрирования Linux. Всё что необходимо — это иметь аккаунт на облачном хостинге DigitalOcean и следовать инструкциям этого руководства.

Если у вас до сих пор нет аккаунта в DigitalOcean, то создать его можно по моей ссылке. Зарегистрировавшись по ней вы получите $100 на ваш аккаунт, которыми сможете воспользоваться в течение 2-х месяцев! То есть как минимум у вас будет 2 месяца бесплатного использования личного VPN сервиса! читать дальше

Обзор Python 3.9

Недавно в сети стала доступна для установки альфа-версия Python 3.9. Релиз планируется на октябрь 2020 года, но уже сейчас можно взглянуть, а что же он нам новенького готовит.

Установить альфа версию Python 3.9 можно с официального сайта.

Built-in Generic Types, PEP 585

Начиная с версии 3.9 появилась возможность использовать привычные для нас built-in коллекции в качестве аннотаций с указанием типа содержимого этих коллекций. Напомню, что ранее для таких целей использовались объекты List, Dict из модуля typing. Вот как это теперь выглядит:

# как было до 3.9
from typing import List

def func(payload: List[str]) -> str:
    return 'Python < 3.9'

# как можно начиная с 3.9
def func(payload: list[str]) -> str:
    return 'Python 3.9'

Операторы объединения и обновления словаря, PEP 584

В Python 3.9 появилось 2 новых оператора | и |=, применяемых для работы со словарями.

payload_1 = {'a': 1, 'b': 2, 'c': 3}
payload_2 = {'d': 4, 'e': 5, 'f': 6}

# объединение двух словарей в 1
payload_1 | payload_2 # -> {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6}

# обновление словаря payload_1 содержимым словаря payload_2
payload_1 |= payload_2
print(payload_1)
0: {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6}

Теперь и тип dict обзавёлся своим оператором объединения | как и его собрат тип set. Вкусовщина конечно, но мне ближе старые привычные способы "распаковки":

payload_1 = {'a': 1, 'b': 2, 'c': 3}
payload_2 = {'d': 4, 'e': 5, 'f': 6}

{**payload_1, **payload_2}

Статусы HTTP

В модуль http были добавлены новые HTTP статусы:

import http

http.HTTPStatus.EARLY_HINTS  # 103
http.HTTPStatus.IM_A_TEAPOT  # 418
http.HTTPStatus.TOO_EARLY    # 425

Если вам интересно, что означает статус I am teapot, то прочитать можно здесь. В остальном, "долгожданный" апдейт 🤣

Обновления в math

В модуле math появилось несколько интересных апдейтов, а именно:

  • функция нахождение наибольшего общего делителя теперь может принимать неограниченное количество аргументов, ранее она принимала только 2
import math

math.gcd(10, 15, 20, 100)
  • появилась функция нахождения наименьшего общего кратного, math.lcm

removeprefix() & removesuffix(), PEP 616

У строк появились 2 новых методы с говорящим за себя названием:

string = 'prefixText'
string.removeprefix('prefix')
0: 'Text'
string.removesuffix('Text')
1: 'prefix'

Метод removeprefix удаляет подстроку в начале строки, а removesuffix удаляет подстроку в конце строки.

Обновлённый парсер кода, PEP 617

Начиная с Python 3.9 для парсинга используется PEG, ранее использовали LL. Выигрыша в производительности нет, но благодаря PEG в будущем возможно вводить более гибкие конструкции в язык. Модуль ast уже использует PEG для разбора исходного кода скрипта. Одним из инициаторов этого PEP был наш любимый Гвидо, у него в блоге есть целая серия постов на эту тему.

читать дальше

Оконные функции SQL

Оконные функции SQL это, пожалуй, самая мистическая часть SQL для многих веб-разработчиков. Нередко встретишь и тех, кто и вовсе никогда о них не слышал. Да что греха таить, я сам продолжительное время не знал об их существовании, решая задачи далеко не самым оптимальным способом.

Оконные функции это функции применяемые к набору строк так или иначе связанных с текущей строкой. Наверняка всем известны классические агрегатные функции вроде AVG, SUM, COUNT, используемые при группировке данных. В результате группировки количество строк уменьшается, оконные функции напротив никак не влияют на количество строк в результате их применения, оно остаётся прежним.

Привычные нам агрегатные функции также могут быть использованы в качестве оконных функций, нужно лишь добавить выражение определения "окна". Область применения оконных функций чаще всего связана с аналитическими запросами, анализом данных.

Из чего состоит оконная функция

<название функции>(<выражение>) OVER (
    <окно>
  <сортировка>
  <границы окна>
)

Лучше всего понять как работают оконные функции на практике. Представим, что у нас есть таблица с зарплатами сотрудников по департаментам. Вот как она выглядит:

читать дальше

Введение в Data Engineering: дата-пайплайны. Курс.

data engineering

Свершилось! Я закончил создание курса по построению дата-пайплайнов на Python, используя Luigi — Введение в Data Engineering: дата-пайплайны. Курс размещён на самописной, его стоимость составляет всего 590 рублей.

О чем он? В октябре 2017 года здесь была размещена статья про замечательный инструмент Luigi — Строим Data Pipeline на Python и Luigi. На тот момент это был первый материал на русском языке. Статья ничто иное как базовое введение в инструмент, плюс небольшая мотивация почему он лучше чем кастомные скрипты на коленке. С тех пор я активно использую Luigi в своей работе, и сейчас у нас в облаке AWS крутится более 1000+ дата-пайплайнов, написанных на нём. О выборе в пользую Luigi я ни разу не пожалел, даже несмотря на то, что инструменты вроде Apache Airflow комплексно выглядят круче и масштабно. Сила Luigi в простоте. А простота зачастую ключ к успешному построению надёжных и быстрых систем. Очень сложно понять проблему, когда она зарыта под тонной кода с множеством зависимостей. Да-да, речь о монстре вроде Airflow. Я ни сколько не умаляю комплексные workflow менеджеры, но к выбору того или иного инструмента нужно подходить исходя из потребностей, которые хочется удовлетворить.

Если вы data scientist, data engineer или backend-разработчик, который часто сталкивается с задачами по обработке, анализу и хранению данных, пожалуйста, обратите внимание на Luigi. Это тёмная лошадка, которая может значительно упростить вашу жизнь, а также правильно структурировать ваш код для удобства его дальнейшего сопровождения и развития.

Курс я постарался сделать максимально практичным, получился микс из текста и видео. В нём я разбираю 5 практических примеров (планирую добавить ещё как минимум 2): от записи Hello World до оповещения об ошибках в пайплайнах в Telegram через бота. Не обошел стороной и тему деплоя. В ней я затронул сборку Docker контейнера, а также уникальный материал про построение serverless дата-пайплана на Amazon Web Services через Docker, Fargate, Cloud Map. Такой дата-пайплайн, во-первых, будет максимально дешевым, т.к. оплата идёт только за время выполнения кода. А во-вторых, масштабируемым — вам не нужно настраивать и поднимать сервера, чтобы увеличить количество воркеров.

Ознакомиться с содержимым, а также купить можно по ссылке — Введение в data engineering: дата-пайплайны.

читать дальше

Строим Data Lake на Amazon Web Services

С развитием мобильных устройств, дешевого и доступного мобильного Интернета, объём генерируемых данных пользователями значительно увеличился. IoT устройства уже реалии нашего времени, а не удел фантастов прошлого века. Большая часть имеющихся данных была произведена в течение последнего десятилетия, мне страшно представить что будет в следующие 10 лет.

Инфографика ниже показывает масштабы этой дата-эпидемии.

читать дальше

Введение в Apache Airflow

Также по теме Airflow:

Apache Airflow — это продвинутый workflow менеджер и незаменимый инструмент в арсенале современного дата инженера. Если смотреть открытые вакансии на позицию data engineer, то нередко встретишь опыт работы с Airflow как одно из требований к позиции.

Я разработал практический курс по Apache Airflow 2.0, он доступен на платформе StartDataJourney, создана она также мною. Сейчас есть возможность приобрести его с 15% скидкой по промокоду EARLYBIRD, действует до конца апреля 2021 года. Ввести промокод можно на этапе оформления заказа. Приятного обучения - Apache Airflow 2.0: практический курс.

Airflow был разработан в 2014 году в компании Airbnb, автор Maxime Beauchemin. Позже инструмент был передан под опеку в организацию Apache, а в январе 2019 получил статус Top-Level проекта. В этой статье я расскажу про установку, настройку и запуск первого дата пайплайна средствами Apache Airflow. К слову, в 2017 году я уже писал про не менее классный и простой инструмент Luigi от компании Spotify. По своей сути эти два инструмента похожи — оба предназначены для запуска цепочек задач (дата пайплайнов), но есть у них и ряд различий о которых я говорил во время своего выступления на PyCON Russia 2019:

В этой статье я постараюсь рассказать о необходимом минимуме для работы с Airflow. Для начала давайте рассмотрим основные сущности инструмента.

читать дальше