Django, RQ и FakeRedis

Я часто в своих проектах использую связку Django + RQ вместо Celery. RQ удобный и максимально простой инструмент среди популярных Task Queue решений в экосистеме Python. Пару месяцев назад возникла необходимость тестировать код с сигналами в Django. Схема простая: в ответ на какое-то событие (создание объекта в БД, кастомный сигнал и т.д.) вызывался RQ Job через delay. Дело в том, что такое событие транслировалось ко всем получателям (receivers) как только объект удалялся из базы. Я активный пользователь pytest и создаю промежуточные объекты через стандартные фикстуры. Одно из решений — патчить/mockать job-функции во всех местах, где такие объекты создаются. Но это неудобно и непрактично, с развитием системы количество получателей может расти. Я нашел выход в подмене connection-класса в зависимости от условий. Условие в моём случае это наличие переменной FAKE_REDIS в Django settings.py. Когда FAKE_REDIS=True, то мы заменяем соединение с redis на инстанс класса FakeRedis из пакета fakeredis.

Чтобы этот подход сносно работал необходимо переписать job-декоратор. Вот что получилось у меня:

from rq.decorators import job as rq_job_decorator

def async_job(queue_name: str, *args: t.Any, **kwargs: t.Any) -> t.Any:
    """
    The same as RQ's job decorator, but it automatically replaces the
    ``connection`` argument with a fake one if ``settings.FAKE_REDIS`` is set to ``True``.
    """

    class LazyAsyncJob:
        def __init__(self, f: t.Callable[..., t.Any]) -> None:
            self.f = f
            self.job: t.Optional[t.Callable[..., t.Any]] = None

        def setup_connection(self) -> t.Callable[..., t.Any]:
            if self.job:
                return self.job
            if settings.FAKE_REDIS:
                from fakeredis import FakeRedis

                queue = get_queue(queue_name, connection=FakeRedis())  # type: ignore
            else:
                queue = get_queue(queue_name)

            RQ = getattr(settings, 'RQ', {})
            default_result_ttl = RQ.get('DEFAULT_RESULT_TTL')
            if default_result_ttl is not None:
                kwargs.setdefault('result_ttl', default_result_ttl)

            return rq_job_decorator(queue, *args, **kwargs)(self.f)

        def delay(self, *args: t.Any, **kwargs: t.Any) -> t.Any:
            self.job = self.setup_connection()
            return self.job.delay(*args, **kwargs)  # type: ignore

        def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Any:
            self.job = self.setup_connection()
            return self.job(*args, **kwargs)

    return LazyAsyncJob

И все rq таски необходимо декорировать через async_job:

@async_job('default')
def process_image():
    pass

В Django тестах я использую фикстуры pytest:

@pytest.fixture(autouse=True)
def fake_redis(settings: t.Any) -> None:
    settings.FAKE_REDIS = True

Параметр autouse=True нужен для того, чтобы абсолютно во всех тестах использовался fake_redis без явного на то указания.

читать дальше

FastAPI, asyncio и multiprocessing

Недавно товарищ поделился со мной ссылкой на статью про FastAPI и кооперативную мультипоточность. В ней автор, во-первых, ссылается на исследование другого автора про сравнение производительности между синхронными и асинхронными Python фреймворками. А во-вторых, приводит личный пример падения производительности приложения и как итог значительного увеличение задержки ответов от сервера.

Суть его проблемы в том, что в одном из API Endpoint выполняется CPU-intensive задача, а именно генерация списка из Pydantic-моделей, который позже конвертируется в JSON. Сериализация/десериализация огромного количества объектов недешевая процедура, а тем более для pydantic-моделей, где есть дополнительные накладные расходы. Т.е. эта задача попадает под классический CPU-bound пример. Но нужно понимать, что любой CPU-intensive код полностью блокирует eventloop, и переключение между корутинами прекращается до полной его разблокировки. Автор статьи предлагает использовать несколько воркеров при запуске uvicorn и надеяться, что нагрузка не прилетит сразу на все. Другой вариант это запускать тяжелые задачи в отдельном пуле потоков, но я не понимаю каким образом это решает проблему CPU-intensive задач в связи с присутствием общего GIL (Global Interpreter Lock). В конце концов чтобы улучшить производительность приложения, он рекомендует забрать у FastAPI конвертацию Pydantic модели в json и сделать это самостоятельно, используя метод .json() у pydantic-объекта, выплюнув в итоге голый Response:

Было:

@app.get("/clients", response_model=ClientsResponse)
def clients():
    return ClientsResponse(
        items=[
            Client(id=i, address=Address(id=i), bank_accounts=[Account(id=i)])
            for i in range(40_000)
        ]
    )

Стало:

@app.get("/clients")
def clients():
    return Response(
        content=ClientsResponse(
            items=[
                Client(id=i, address=Address(id=i), bank_accounts=[Account(id=i)])
                for i in range(40_000)
            ]
        ).json(),
        media_type="application/json",
    )

Производительность стала лучше, но проблема блокировки eventloop не ушла. Потому что CPU-intensive задача всё ещё выполняется в рамках того же процесса что и eventloop. Как сделать лучше? Вынести “тяжелую” задачу в отдельный процесс. В рамках работы с asyncio можно создавать процесс и ждать выполнения задачи асинхронно. Сделать этого можно через Executor-классы из модуля concurrent.futures и метод run_in_executor у eventloop’а. Для CPU-intensive задач подойдёт ProcessPoolExecutor.

Вот как бы это выглядело:

from concurrent.futures import ProcessPoolExecutor

def generate_client_ids(n: int) -> Response:
    return Response(
        content=ClientsResponse(
            items=[
                Client(id=i, address=Address(id=i), bank_accounts=[Account(id=i)])
                for i in range(n)
            ]
        ).json(),
        media_type='application/json',
    )

@app.get('/clients')
async def clients():
    with ProcessPoolExecutor() as executor:
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(executor, generate_client_ids, 50_000)
        return result

В этом случае не произойдёт ни одного таймаута или обрыва соединения, eventloop не блокируется и может переключаться между корутинами пока ожидает ответ от сабпроцесса. По-хорошему стоит вынести пул в общий доступ, чтобы не создавать его каждый раз при выполнении функции clients, но это уже другая история.

Внимательный читатель может заметить, что автор статьи использует синхронные функции (без ключевого слова async), такие функции будут запускаться через ThreadPool самим FastAPI, но сути это не меняет. Чтобы улучшить производительность и избежать таймаутов всё равно нужно запустить CPU-intensive задачу через ProcessPoolExecutor:

@app.get('/clients')
def clients(n: int) -> Response:
    with ProcessPoolExecutor() as pool:
        for f in as_completed([pool.submit(generate_client_ids, 40_000)]):
            return f.result()

Итог

  • если вы работаете в асинхронном режиме избегайте блокирующих операций (длительные CPU-intensive задачи)
  • если избежать не получается, то выносите их в отдельные процессы и общайтесь с ними асинхронно: очереди, ProcessPoolExecutor и т.д.
  • если ваше приложение в основном состоит из CPU-intensive задач, то возможно лучше использовать обычный синхронный фреймворк (Django, Flask, Bottle, Falcon), а все “тяжелые” операции проводить через очереди типа Celery, RQ и т.д.
читать дальше

TaskFlow API в Apache Airflow 2.0

Декабрьский релиз Apache Airflow 2.0 принёс много нововведений в инструмент. А самое, пожалуй, заметное из них это TaskFlow API. В этой заметке я подробно разберу что это такое и как стало красиво и удобно описывать Python операторы, используя обычные функции и декоратор @task.

PythonOperator

PythonOperator это один из самых популярных операторов для создания тасков в Apache Airflow. Он позволяет выполнять любой код на Python. Нужно всего лишь передать функцию в аргумент python_callable при инициализации класса.

from airflow.operators.python import PythonOperator

def do_something(**context):
    pass

operator = PythonOperator(
    task_id='do_something',
    python_callable=do_something,
)

Если функция явно делает return, то значение автоматически записывается в XCom. В тех случаях, когда необходимо получить записанное значение из другой функции, нужно явно обращаться к XCom:

from airflow.operators.python import PythonOperator

def do_something(**context):
    return 'important_string'

def do_something2(**context):
    ti = context['ti']
    value_from_do_something = ti.xcom_pull(task_ids='do_something')  # вернёт important_string

operator = PythonOperator(
    task_id='do_something',
    python_callable=do_something,
)

operator2 = PythonOperator(
    task_id='do_something2',
    python_callable=do_something2,
)

Такой код зачастую выглядит небрежно, теряется бизнес-логика функций. В Apache Airflow 2 решили эту проблему, разработчики постарались добавить синтаксического сахара и упростить взаимодействие между Python операторами. Теперь передача значений из одной функции в другую выглядит как привычный цепочный вызов функций, а вся магия взаимодействия с XCom спрятана под капотом.

читать дальше

Apache Airflow и XCom

XCom или Cross-Communication, это механизм Apache Airflow для передачи параметров из одного оператора в другой. Если просто, то это таблица в базе данных, хранящая значения, записанные операторами Airflow. У этой таблицы есть несколько столбцов:

  • key — ключ записи
  • value — значение, данные хранятся в двоичном формате. Для MySQL это тип BLOB, а для Postgres BYTEA.
  • timestamp — время создания записи в базе
  • execution_date — дата выполнения DAG
  • task_id — ID оператора, записавшего данные в XCom
  • dag_id — ID DAG

Несмотря на то, что BYTEA в Postgres может хранить до 1 Гб двоичных данных, сообщество не рекомендует передавать через XCom большие данные. Это связано с тем, что перед укладкой в таблицу они подвергаются сериализации (JSON/pickle), а при чтении происходит этап десериализации. Также не стоит забывать, что данные передаются по сети, а это значительно замедляет работу пайплайна. Цитата из официальной доки по PostgreSQL:

The BYTEA data type is not well suited for storing very large amounts of binary data. While a column of type BYTEA can hold up to 1 GB of binary data, it would require a huge amount of memory to process such a large value.

Зачем всё так сложно? Почему нельзя передавать аргументы как это происходит при написании обычного кода на Python?

Главная причина в том, что операторы могут исполняться в разных адресных пространствах (Local Executor) и даже на отдельных физических машинах (Celery/Dask Executors, Kubernetes Executor). В такой ситуации нужен механизм для передачи сообщений от одной машины к другой. Центральным хранилищем в этом случае является база данных Airflow (рекомендуется использовать PostgreSQL).

читать дальше

Курс Apache Airflow 2.0

UPDATE: Курс доступен на платформе StartDataJourney, разработанной мною же. Приятного обучения - Apache Airflow 2.0: практический курс.

Наверняка вы читали мой пост про введение в Apache Airflow. Многое с тех пор изменилось в инструменте, в декабре 2020 года вышла новая версия Apache Airflow 2.0. В ней появилось множество интересных фишечек:

  • TaskFlow API
  • полноценный REST API
  • обновлённый UI, он теперь выглядит свежим
  • отказоустойчивый планировщик, отныне он не является точкой отказа
  • серьёзные улучшения по производительности Airflow
  • Task Group на замену SubDAGs
  • умные сенсоры

Сейчас Apache Airflow чуть ли не главный инструмент в арсенале современного дата инженера. В описании почти любой вакансии на должность дата инженера требуется навык работы с ним.

В связи с этим я решил создать практический курс по работе с Apache Airflow версии 2 и выше. В нём я постараюсь раскрыть весь инструмент, рассказать про основные компоненты, подводные камни. Безусловно не обойдётся и без практических примеров. Я разработаю как можно больше реальных примеров построения дата пайплайнов, затрону самые популярные операторы, работу с облаками и деплой в продакшен на базе Celery и Kubernetes.

Формат курса будет смешанный, но доминировать будет текст. Там где целесообразно продемонстрировать функционал или пример через видео будет видеоряд, но я сторонник текстовых курсов как с точки зрения потребителя так и создателя. У меня уже был опыт создания подобного формата курса про менее известный инструмент Luigi.

читать дальше

Введение в logging на Python

В стандартной библиотеке Python есть замечательный пакет для логирования — logging. В сети бытует мнение, что он сложный и настраивать его сплошная боль. В этой статье я попробую убедить вас в обратном. Мы разберём что из себя представляет этот пакет, изучим основные компоненты и закрепим материал практическим примером.

Зачем нужны логи?

Логи это рентген снимок выполнения вашей программы. Чем детальнее лог, тем проще разобраться в нестандартных ситуациях, которые могут приключиться с вашим скриптом. Наиболее популярным примером логов служат access логи веб-сервера, например, Apache httpd или nginx. Пример куска access лога моего блога:

92.63.107.227 - - [04/Nov/2020:06:30:48 +0000] "GET /ru/hosted-open-vpn-server/ HTTP/1.1" 301 169 "-" "python-requests/2.11.1" "-"
92.63.107.227 - - [04/Nov/2020:06:30:49 +0000] "GET /ru/data-engineering-course/ HTTP/1.1" 301 169 "-" "python-requests/2.11.1" "-"
213.180.203.50 - - [04/Nov/2020:06:36:07 +0000] "GET / HTTP/1.1" 301 169 "-" "Mozilla/5.0 (compatible; YandexMetrika/2.0; +http://yandex.com/bots yabs01)" "-"
114.119.160.75 - - [04/Nov/2020:06:36:41 +0000] "GET /robots.txt HTTP/1.1" 301 169 "-" "(compatible;PetalBot;+https://aspiegel.com/petalbot)" "10.179.80.67"
90.180.35.207 - - [04/Nov/2020:06:47:11 +0000] "GET / HTTP/1.0" 301 169 "-" "-" "-"
46.246.122.77 - - [04/Nov/2020:06:53:22 +0000] "GET / HTTP/1.1" 301 169 "<http://khashtamov.com>" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36" "-"
66.249.76.16 - - [04/Nov/2020:06:53:30 +0000] "GET / HTTP/1.1" 301 169 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)" "-"
66.102.9.118 - - [04/Nov/2020:07:11:19 +0000] "GET / HTTP/1.1" 301 169 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.75 Safari/537.36 Google Favicon" "46.159.204.234"
71.6.167.142 - - [04/Nov/2020:07:11:55 +0000] "GET / HTTP/1.1" 301 169 "-" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36" "-"

Помимо access логов веб-сервер также пишет error лог, там хранится информация обо всех ошибках при обработке HTTP запросов. Также и в ваших скриптах, логи могут делиться на информационные — вывод текущего состояния выполнения, отладочной информации, и на логи с ошибками — вывод исключений, ошибок с дополнительной информацией для отладки, содержащей контекст).

logging и Python

Точкой входа в работу с логированием в Python является библиотека logging. На первый взгляд может показаться, что библиотека сложная и запутанная, но потратив некоторое время на её изучение, можно убедиться в обратном. Для меня logging это классический пример дизайна ООП, где композиция преобладает над наследованием, поэтому в исходном коде библиотеки можно встретить множество функциональных классов. Цель этого туториала разобрать по косточкам каждый класс и воссоединить их в единый механизм логирования в Python. Начнём-с.читать дальше

Как настроить свой VPN сервер

Правительства разных стран так или иначе пытаются запретить доступ к определённым ресурсам в Интернете. В России "заблокирован" Telegram и LinkedIn, в Казахстане уже не первый год невозможно посетить сайт Медузы, под раздачу даже попал официальный сайт gRPC, в Украине на фоне политических проблем запретили доступ к Яндексу.

От мала до велика знают как обойти эти блокировки, никого уже не удивишь словом VPN. Спроси у продвинутого юзера про VPN и он лихо назовёт парочку приложений для своего любимого смартфона в названии которых точно будет VPN 😎 Но мало кто задумывается, что публичные впн-сервисы могут также таить угрозу. Например, они без труда могут перехватывать весь трафик между вашим телефоном (клиентом) и сервисом на который вы обращаетесь (сервером). Таким сервисом может быть в том числе и банковское приложение, интернет-банкинг. Согласитесь, неприятно будет в один прекрасный день увидеть 0 на балансе вашего личного счёта.

Чтобы чувствовать себя в безопасности при использовании VPN я рекомендую настроить свой собственный VPN сервер. Для этого не нужно уметь программировать и даже иметь навыки администрирования Linux. Всё что необходимо — это иметь аккаунт на облачном хостинге DigitalOcean и следовать инструкциям этого руководства.

Если у вас до сих пор нет аккаунта в DigitalOcean, то создать его можно по моей ссылке. Зарегистрировавшись по ней вы получите $100 на ваш аккаунт, которыми сможете воспользоваться в течение 2-х месяцев! То есть как минимум у вас будет 2 месяца бесплатного использования личного VPN сервиса! читать дальше

Обзор Python 3.9

Недавно в сети стала доступна для установки альфа-версия Python 3.9. Релиз планируется на октябрь 2020 года, но уже сейчас можно взглянуть, а что же он нам новенького готовит.

Установить альфа версию Python 3.9 можно с официального сайта.

Built-in Generic Types, PEP 585

Начиная с версии 3.9 появилась возможность использовать привычные для нас built-in коллекции в качестве аннотаций с указанием типа содержимого этих коллекций. Напомню, что ранее для таких целей использовались объекты List, Dict из модуля typing. Вот как это теперь выглядит:

# как было до 3.9
from typing import List

def func(payload: List[str]) -> str:
    return 'Python < 3.9'

# как можно начиная с 3.9
def func(payload: list[str]) -> str:
    return 'Python 3.9'

Операторы объединения и обновления словаря, PEP 584

В Python 3.9 появилось 2 новых оператора | и |=, применяемых для работы со словарями.

payload_1 = {'a': 1, 'b': 2, 'c': 3}
payload_2 = {'d': 4, 'e': 5, 'f': 6}

# объединение двух словарей в 1
payload_1 | payload_2 # -> {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6}

# обновление словаря payload_1 содержимым словаря payload_2
payload_1 |= payload_2
print(payload_1)
0: {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6}

Теперь и тип dict обзавёлся своим оператором объединения | как и его собрат тип set. Вкусовщина конечно, но мне ближе старые привычные способы "распаковки":

payload_1 = {'a': 1, 'b': 2, 'c': 3}
payload_2 = {'d': 4, 'e': 5, 'f': 6}

{**payload_1, **payload_2}

Статусы HTTP

В модуль http были добавлены новые HTTP статусы:

import http

http.HTTPStatus.EARLY_HINTS  # 103
http.HTTPStatus.IM_A_TEAPOT  # 418
http.HTTPStatus.TOO_EARLY    # 425

Если вам интересно, что означает статус I am teapot, то прочитать можно здесь. В остальном, "долгожданный" апдейт 🤣

Обновления в math

В модуле math появилось несколько интересных апдейтов, а именно:

  • функция нахождение наибольшего общего делителя теперь может принимать неограниченное количество аргументов, ранее она принимала только 2
import math

math.gcd(10, 15, 20, 100)
  • появилась функция нахождения наименьшего общего кратного, math.lcm

removeprefix() & removesuffix(), PEP 616

У строк появились 2 новых методы с говорящим за себя названием:

string = 'prefixText'
string.removeprefix('prefix')
0: 'Text'
string.removesuffix('Text')
1: 'prefix'

Метод removeprefix удаляет подстроку в начале строки, а removesuffix удаляет подстроку в конце строки.

Обновлённый парсер кода, PEP 617

Начиная с Python 3.9 для парсинга используется PEG, ранее использовали LL. Выигрыша в производительности нет, но благодаря PEG в будущем возможно вводить более гибкие конструкции в язык. Модуль ast уже использует PEG для разбора исходного кода скрипта. Одним из инициаторов этого PEP был наш любимый Гвидо, у него в блоге есть целая серия постов на эту тему.

читать дальше

Оконные функции SQL

Оконные функции SQL это, пожалуй, самая мистическая часть SQL для многих веб-разработчиков. Нередко встретишь и тех, кто и вовсе никогда о них не слышал. Да что греха таить, я сам продолжительное время не знал об их существовании, решая задачи далеко не самым оптимальным способом.

Оконные функции это функции применяемые к набору строк так или иначе связанных с текущей строкой. Наверняка всем известны классические агрегатные функции вроде AVG, SUM, COUNT, используемые при группировке данных. В результате группировки количество строк уменьшается, оконные функции напротив никак не влияют на количество строк в результате их применения, оно остаётся прежним.

Привычные нам агрегатные функции также могут быть использованы в качестве оконных функций, нужно лишь добавить выражение определения "окна". Область применения оконных функций чаще всего связана с аналитическими запросами, анализом данных.

Из чего состоит оконная функция

<название функции>(<выражение>) OVER (
    <окно>
  <сортировка>
  <границы окна>
)

Лучше всего понять как работают оконные функции на практике. Представим, что у нас есть таблица с зарплатами сотрудников по департаментам. Вот как она выглядит:

читать дальше

Введение в Data Engineering: дата-пайплайны. Курс.

data engineering

Свершилось! Я закончил создание курса по построению дата-пайплайнов на Python, используя Luigi — Введение в Data Engineering: дата-пайплайны. Курс размещён на самописной, его стоимость составляет всего 590 рублей.

О чем он? В октябре 2017 года здесь была размещена статья про замечательный инструмент Luigi — Строим Data Pipeline на Python и Luigi. На тот момент это был первый материал на русском языке. Статья ничто иное как базовое введение в инструмент, плюс небольшая мотивация почему он лучше чем кастомные скрипты на коленке. С тех пор я активно использую Luigi в своей работе, и сейчас у нас в облаке AWS крутится более 1000+ дата-пайплайнов, написанных на нём. О выборе в пользую Luigi я ни разу не пожалел, даже несмотря на то, что инструменты вроде Apache Airflow комплексно выглядят круче и масштабно. Сила Luigi в простоте. А простота зачастую ключ к успешному построению надёжных и быстрых систем. Очень сложно понять проблему, когда она зарыта под тонной кода с множеством зависимостей. Да-да, речь о монстре вроде Airflow. Я ни сколько не умаляю комплексные workflow менеджеры, но к выбору того или иного инструмента нужно подходить исходя из потребностей, которые хочется удовлетворить.

Если вы data scientist, data engineer или backend-разработчик, который часто сталкивается с задачами по обработке, анализу и хранению данных, пожалуйста, обратите внимание на Luigi. Это тёмная лошадка, которая может значительно упростить вашу жизнь, а также правильно структурировать ваш код для удобства его дальнейшего сопровождения и развития.

Курс я постарался сделать максимально практичным, получился микс из текста и видео. В нём я разбираю 5 практических примеров (планирую добавить ещё как минимум 2): от записи Hello World до оповещения об ошибках в пайплайнах в Telegram через бота. Не обошел стороной и тему деплоя. В ней я затронул сборку Docker контейнера, а также уникальный материал про построение serverless дата-пайплана на Amazon Web Services через Docker, Fargate, Cloud Map. Такой дата-пайплайн, во-первых, будет максимально дешевым, т.к. оплата идёт только за время выполнения кода. А во-вторых, масштабируемым — вам не нужно настраивать и поднимать сервера, чтобы увеличить количество воркеров.

Ознакомиться с содержимым, а также купить можно по ссылке — Введение в data engineering: дата-пайплайны.

читать дальше