Недавно товарищ поделился со мной ссылкой на статью про FastAPI и кооперативную мультипоточность. В ней автор, во-первых, ссылается на исследование другого автора про сравнение производительности между синхронными и асинхронными Python фреймворками. А во-вторых, приводит личный пример падения производительности приложения и как итог значительного увеличение задержки ответов от сервера.
Суть его проблемы в том, что в одном из API Endpoint выполняется CPU-intensive задача, а именно генерация списка из Pydantic-моделей, который позже конвертируется в JSON. Сериализация/десериализация огромного количества объектов недешевая процедура, а тем более для pydantic-моделей, где есть дополнительные накладные расходы. Т.е. эта задача попадает под классический CPU-bound пример. Но нужно понимать, что любой CPU-intensive код полностью блокирует eventloop, и переключение между корутинами прекращается до полной его разблокировки. Автор статьи предлагает использовать несколько воркеров при запуске uvicorn и надеяться, что нагрузка не прилетит сразу на все. Другой вариант это запускать тяжелые задачи в отдельном пуле потоков, но я не понимаю каким образом это решает проблему CPU-intensive задач в связи с присутствием общего GIL (Global Interpreter Lock). В конце концов чтобы улучшить производительность приложения, он рекомендует забрать у FastAPI конвертацию Pydantic модели в json и сделать это самостоятельно, используя метод .json() у pydantic-объекта, выплюнув в итоге голый Response:
Было:
@app.get("/clients", response_model=ClientsResponse)
def clients():
return ClientsResponse(
items=[
Client(id=i, address=Address(id=i), bank_accounts=[Account(id=i)])
for i in range(40_000)
]
)
Стало:
@app.get("/clients")
def clients():
return Response(
content=ClientsResponse(
items=[
Client(id=i, address=Address(id=i), bank_accounts=[Account(id=i)])
for i in range(40_000)
]
).json(),
media_type="application/json",
)
Производительность стала лучше, но проблема блокировки eventloop не ушла. Потому что CPU-intensive задача всё ещё выполняется в рамках того же процесса что и eventloop. Как сделать лучше? Вынести “тяжелую” задачу в отдельный процесс. В рамках работы с asyncio можно создавать процесс и ждать выполнения задачи асинхронно. Сделать этого можно через Executor-классы из модуля concurrent.futures и метод run_in_executor у eventloop’а. Для CPU-intensive задач подойдёт ProcessPoolExecutor.
Вот как бы это выглядело:
from concurrent.futures import ProcessPoolExecutor
def generate_client_ids(n: int) -> Response:
return Response(
content=ClientsResponse(
items=[
Client(id=i, address=Address(id=i), bank_accounts=[Account(id=i)])
for i in range(n)
]
).json(),
media_type='application/json',
)
@app.get('/clients')
async def clients():
with ProcessPoolExecutor() as executor:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, generate_client_ids, 50_000)
return result
В этом случае не произойдёт ни одного таймаута или обрыва соединения, eventloop не блокируется и может переключаться между корутинами пока ожидает ответ от сабпроцесса. По-хорошему стоит вынести пул в общий доступ, чтобы не создавать его каждый раз при выполнении функции clients, но это уже другая история.
Внимательный читатель может заметить, что автор статьи использует синхронные функции (без ключевого слова async), такие функции будут запускаться через ThreadPool самим FastAPI, но сути это не меняет. Чтобы улучшить производительность и избежать таймаутов всё равно нужно запустить CPU-intensive задачу через ProcessPoolExecutor:
@app.get('/clients')
def clients(n: int) -> Response:
with ProcessPoolExecutor() as pool:
for f in as_completed([pool.submit(generate_client_ids, 40_000)]):
return f.result()
Итог
если вы работаете в асинхронном режиме избегайте блокирующих операций (длительные CPU-intensive задачи)
если избежать не получается, то выносите их в отдельные процессы и общайтесь с ними асинхронно: очереди, ProcessPoolExecutor и т.д.
если ваше приложение в основном состоит из CPU-intensive задач, то возможно лучше использовать обычный синхронный фреймворк (Django, Flask, Bottle, Falcon), а все “тяжелые” операции проводить через очереди типа Celery, RQ и т.д.
Декабрьский релиз Apache Airflow 2.0 принёс много нововведений в инструмент. А самое, пожалуй, заметное из них это TaskFlow API. В этой заметке я подробно разберу что это такое и как стало красиво и удобно описывать Python операторы, используя обычные функции и декоратор @task.
PythonOperator
PythonOperator это один из самых популярных операторов для создания тасков в Apache Airflow. Он позволяет выполнять любой код на Python. Нужно всего лишь передать функцию в аргумент python_callable при инициализации класса.
Если функция явно делает return, то значение автоматически записывается в XCom. В тех случаях, когда необходимо получить записанное значение из другой функции, нужно явно обращаться к XCom:
Такой код зачастую выглядит небрежно, теряется бизнес-логика функций. В Apache Airflow 2 решили эту проблему, разработчики постарались добавить синтаксического сахара и упростить взаимодействие между Python операторами. Теперь передача значений из одной функции в другую выглядит как привычный цепочный вызов функций, а вся магия взаимодействия с XCom спрятана под капотом.
XCom или Cross-Communication, это механизм Apache Airflow для передачи параметров из одного оператора в другой. Если просто, то это таблица в базе данных, хранящая значения, записанные операторами Airflow. У этой таблицы есть несколько столбцов:
key — ключ записи
value — значение, данные хранятся в двоичном формате. Для MySQL это тип BLOB, а для Postgres BYTEA.
timestamp — время создания записи в базе
execution_date — дата выполнения DAG
task_id — ID оператора, записавшего данные в XCom
dag_id — ID DAG
Несмотря на то, что BYTEA в Postgres может хранить до 1 Гб двоичных данных, сообщество не рекомендует передавать через XCom большие данные. Это связано с тем, что перед укладкой в таблицу они подвергаются сериализации (JSON/pickle), а при чтении происходит этап десериализации. Также не стоит забывать, что данные передаются по сети, а это значительно замедляет работу пайплайна. Цитата из официальной доки по PostgreSQL:
The BYTEA data type is not well suited for storing very large amounts of binary data. While a column of type BYTEA can hold up to 1 GB of binary data, it would require a huge amount of memory to process such a large value.
Зачем всё так сложно? Почему нельзя передавать аргументы как это происходит при написании обычного кода на Python?
Главная причина в том, что операторы могут исполняться в разных адресных пространствах (Local Executor) и даже на отдельных физических машинах (Celery/Dask Executors, Kubernetes Executor). В такой ситуации нужен механизм для передачи сообщений от одной машины к другой. Центральным хранилищем в этом случае является база данных Airflow (рекомендуется использовать PostgreSQL).
Наверняка вы читали мой пост про введение в Apache Airflow. Многое с тех пор изменилось в инструменте, в декабре 2020 года вышла новая версия Apache Airflow 2.0. В ней появилось множество интересных фишечек:
TaskFlow API
полноценный REST API
обновлённый UI, он теперь выглядит свежим
отказоустойчивый планировщик, отныне он не является точкой отказа
серьёзные улучшения по производительности Airflow
Task Group на замену SubDAGs
умные сенсоры
Сейчас Apache Airflow чуть ли не главный инструмент в арсенале современного дата инженера. В описании почти любой вакансии на должность дата инженера требуется навык работы с ним.
В связи с этим я решил создать практический курс по работе с Apache Airflow версии 2 и выше. В нём я постараюсь раскрыть весь инструмент, рассказать про основные компоненты, подводные камни. Безусловно не обойдётся и без практических примеров. Я разработаю как можно больше реальных примеров построения дата пайплайнов, затрону самые популярные операторы, работу с облаками и деплой в продакшен на базе Celery и Kubernetes.
Формат курса будет смешанный, но доминировать будет текст. Там где целесообразно продемонстрировать функционал или пример через видео будет видеоряд, но я сторонник текстовых курсов как с точки зрения потребителя так и создателя. У меня уже был опыт создания подобного формата курса про менее известный инструмент Luigi.
В стандартной библиотеке Python есть замечательный пакет для логирования — logging. В сети бытует мнение, что он сложный и настраивать его сплошная боль. В этой статье я попробую убедить вас в обратном. Мы разберём что из себя представляет этот пакет, изучим основные компоненты и закрепим материал практическим примером.
Зачем нужны логи?
Логи это рентген снимок выполнения вашей программы. Чем детальнее лог, тем проще разобраться в нестандартных ситуациях, которые могут приключиться с вашим скриптом. Наиболее популярным примером логов служат access логи веб-сервера, например, Apache httpd или nginx. Пример куска access лога моего блога:
Помимо access логов веб-сервер также пишет error лог, там хранится информация обо всех ошибках при обработке HTTP запросов. Также и в ваших скриптах, логи могут делиться на информационные — вывод текущего состояния выполнения, отладочной информации, и на логи с ошибками — вывод исключений, ошибок с дополнительной информацией для отладки, содержащей контекст).
logging и Python
Точкой входа в работу с логированием в Python является библиотека logging. На первый взгляд может показаться, что библиотека сложная и запутанная, но потратив некоторое время на её изучение, можно убедиться в обратном. Для меня logging это классический пример дизайна ООП, где композиция преобладает над наследованием, поэтому в исходном коде библиотеки можно встретить множество функциональных классов. Цель этого туториала разобрать по косточкам каждый класс и воссоединить их в единый механизм логирования в Python. Начнём-с.читать дальше
Правительства разных стран так или иначе пытаются запретить доступ к определённым ресурсам в Интернете. В России "заблокирован" Telegram и LinkedIn, в Казахстане уже не первый год невозможно посетить сайт Медузы, под раздачу даже попал официальный сайт gRPC, в Украине на фоне политических проблем запретили доступ к Яндексу.
От мала до велика знают как обойти эти блокировки, никого уже не удивишь словом VPN. Спроси у продвинутого юзера про VPN и он лихо назовёт парочку приложений для своего любимого смартфона в названии которых точно будет VPN 😎 Но мало кто задумывается, что публичные впн-сервисы могут также таить угрозу. Например, они без труда могут перехватывать весь трафик между вашим телефоном (клиентом) и сервисом на который вы обращаетесь (сервером). Таким сервисом может быть в том числе и банковское приложение, интернет-банкинг. Согласитесь, неприятно будет в один прекрасный день увидеть 0 на балансе вашего личного счёта.
Чтобы чувствовать себя в безопасности при использовании VPN я рекомендую настроить свой собственный VPN сервер. Для этого не нужно уметь программировать и даже иметь навыки администрирования Linux. Всё что необходимо — это иметь аккаунт на облачном хостинге DigitalOcean и следовать инструкциям этого руководства.
Если у вас до сих пор нет аккаунта в DigitalOcean, то создать его можно по моей ссылке. Зарегистрировавшись по ней вы получите $100 на ваш аккаунт, которыми сможете воспользоваться в течение 2-х месяцев! То есть как минимум у вас будет 2 месяца бесплатного использования личного VPN сервиса! читать дальше
Недавно в сети стала доступна для установки альфа-версия Python 3.9. Релиз планируется на октябрь 2020 года, но уже сейчас можно взглянуть, а что же он нам новенького готовит.
Начиная с версии 3.9 появилась возможность использовать привычные для нас built-in коллекции в качестве аннотаций с указанием типа содержимого этих коллекций. Напомню, что ранее для таких целей использовались объекты List, Dict из модуля typing. Вот как это теперь выглядит:
# как было до 3.9
from typing import List
def func(payload: List[str]) -> str:
return 'Python < 3.9'
# как можно начиная с 3.9
def func(payload: list[str]) -> str:
return 'Python 3.9'
Операторы объединения и обновления словаря, PEP 584
В Python 3.9 появилось 2 новых оператора | и |=, применяемых для работы со словарями.
Теперь и тип dict обзавёлся своим оператором объединения | как и его собрат тип set. Вкусовщина конечно, но мне ближе старые привычные способы "распаковки":
Метод removeprefix удаляет подстроку в начале строки, а removesuffix удаляет подстроку в конце строки.
Обновлённый парсер кода, PEP 617
Начиная с Python 3.9 для парсинга используется PEG, ранее использовали LL. Выигрыша в производительности нет, но благодаря PEG в будущем возможно вводить более гибкие конструкции в язык. Модуль ast уже использует PEG для разбора исходного кода скрипта. Одним из инициаторов этого PEP был наш любимый Гвидо, у него в блоге есть целая серия постов на эту тему.
Оконные функции SQL это, пожалуй, самая мистическая часть SQL для многих веб-разработчиков. Нередко встретишь и тех, кто и вовсе никогда о них не слышал. Да что греха таить, я сам продолжительное время не знал об их существовании, решая задачи далеко не самым оптимальным способом.
Оконные функции это функции применяемые к набору строк так или иначе связанных с текущей строкой. Наверняка всем известны классические агрегатные функции вроде AVG, SUM, COUNT, используемые при группировке данных. В результате группировки количество строк уменьшается, оконные функции напротив никак не влияют на количество строк в результате их применения, оно остаётся прежним.
Привычные нам агрегатные функции также могут быть использованы в качестве оконных функций, нужно лишь добавить выражение определения "окна". Область применения оконных функций чаще всего связана с аналитическими запросами, анализом данных.
Из чего состоит оконная функция
<название функции>(<выражение>) OVER (
<окно>
<сортировка>
<границы окна>
)
Лучше всего понять как работают оконные функции на практике. Представим, что у нас есть таблица с зарплатами сотрудников по департаментам. Вот как она выглядит:
Свершилось! Я закончил создание курса по построению дата-пайплайнов на Python, используя Luigi — Введение в Data Engineering: дата-пайплайны. Курс размещён на самописной, его стоимость составляет всего 590 рублей.
О чем он? В октябре 2017 года здесь была размещена статья про замечательный инструмент Luigi — Строим Data Pipeline на Python и Luigi. На тот момент это был первый материал на русском языке. Статья ничто иное как базовое введение в инструмент, плюс небольшая мотивация почему он лучше чем кастомные скрипты на коленке. С тех пор я активно использую Luigi в своей работе, и сейчас у нас в облаке AWS крутится более 1000+ дата-пайплайнов, написанных на нём. О выборе в пользую Luigi я ни разу не пожалел, даже несмотря на то, что инструменты вроде Apache Airflow комплексно выглядят круче и масштабно. Сила Luigi в простоте. А простота зачастую ключ к успешному построению надёжных и быстрых систем. Очень сложно понять проблему, когда она зарыта под тонной кода с множеством зависимостей. Да-да, речь о монстре вроде Airflow. Я ни сколько не умаляю комплексные workflow менеджеры, но к выбору того или иного инструмента нужно подходить исходя из потребностей, которые хочется удовлетворить.
Если вы data scientist, data engineer или backend-разработчик, который часто сталкивается с задачами по обработке, анализу и хранению данных, пожалуйста, обратите внимание на Luigi. Это тёмная лошадка, которая может значительно упростить вашу жизнь, а также правильно структурировать ваш код для удобства его дальнейшего сопровождения и развития.
Курс я постарался сделать максимально практичным, получился микс из текста и видео. В нём я разбираю 5 практических примеров (планирую добавить ещё как минимум 2): от записи Hello World до оповещения об ошибках в пайплайнах в Telegram через бота. Не обошел стороной и тему деплоя. В ней я затронул сборку Docker контейнера, а также уникальный материал про построение serverless дата-пайплана на Amazon Web Services через Docker, Fargate, Cloud Map. Такой дата-пайплайн, во-первых, будет максимально дешевым, т.к. оплата идёт только за время выполнения кода. А во-вторых, масштабируемым — вам не нужно настраивать и поднимать сервера, чтобы увеличить количество воркеров.
С развитием мобильных устройств, дешевого и доступного мобильного Интернета, объём генерируемых данных пользователями значительно увеличился. IoT устройства уже реалии нашего времени, а не удел фантастов прошлого века. Большая часть имеющихся данных была произведена в течение последнего десятилетия, мне страшно представить что будет в следующие 10 лет.
Инфографика ниже показывает масштабы этой дата-эпидемии.читать дальше