Celery: начинаем правильно

Python Celery
В этой статье мне хотелось бы поделиться с читателями своим опытом работы с таким замечательным инструментом в Python как Celery. Celery это ничто иное как распределённая очередь заданий, реализованная на языке Python. На момент написания этой статьи, самой последней версией является 3.1.20. Неосведомлённый читатель может не знать для чего вообще нужна система очередей задач наподобие Celery, поэтому кратко поясню этот момент.

Что такое Celery и зачем оно нам?

Часто ли вам приходилось сталкиваться с типовыми задачами в веб-приложениях вроде отправки электронного письма посетителю или обработки загруженных данных. Чаще всего такого рода манипуляции не требуют участия конечного пользователя вашего проекта, то есть их можно выполнять в фоновом режиме. Те из нас, кто реализует выполнение этих задач в одном из процессов веб-сервера, "тормозят" тем самым его работу, увеличивая время отклика и ухудшают user experience.

В данной заметке я опущу вводную информацию по установке и настройке Celery в вашем проекте. Кстати, Celery из коробки умеет работать с Django. Ранее был отдельный python пакет, соединяющий Django и Celery,именовался он django-celery. Сейчас он заброшен, так как последнее обновление было более года назад. Стоит отметить, что django-celery не работает Django 1.9 из-за изменений в работе cache backend. Исправленную версию можно посмотреть в моём форке. Одной из удобных фич django-celery является интеграция с Django Admin по части управления periodic tasks.

Советы по работе с Celery

Не используйте базу данных в качестве broker/backend

Брокер отвечает за передачу сообщений (задач) между так называемыми исполнителями (workers). Проблема использования базы данных заключается в её ограничениях - она просто не предназначена для этого. Дело в том, что с ростом количества исполнителей, нагрузка на базу будет только возрастать, а учитывая тот факт, что каждый worker имеет ещё ряд потоков, ситуация может стать катастрофической даже при малых нагрузках. Всё это приведёт к бутылочному горлышку в виде затыка на I/O, потере задач, а возможно и неоднократному их исполнению (два воркера могут получить одну и ту же задачу на исполнение). Отличным production-ready решением является использование RabbitMQ или Redis для этой роли.

Бэкэнд в случае с Celery выступает в качестве хранилища результатов выполнения задач (task). Одной из причин создания django-celery как раз являлась возможность подключения БД для сохранения результатов. Признаюсь, что в самом начале работы с Celery я неоднократно в проектах использовал этот подход. Пожалуйста, не повторяйте мою ошибку. С ростом нагрузки на приложение проблемы будут расти словно грибы после дождя (более того, "из коробки" celery не чистит базу от "устаревших" результатов) . Правда тут есть нюансы касательно вашего приложения. Об этом читайте ниже. Production-ready решением для роли backend неплохо зарекомендовал себя демон memcached. Пользуемся более 2-х лет, проблем ни разу не было.

Разделяйте задачи по очередям

Это очень важный момент. По мере развития вашего приложения, в проекте будут появляться критичные для выполнения задачи: проверка статуса платежа, формирование отчёта, отправка электронных писем и так далее. Терять их недопустимо. Если все задачи складировать в одну очередь, то в один прекрасный момент она может забиться, поставив под угрозу выполнение критически важного кода. Мой подход: разделяйте очереди по приоритетам.

  • high
  • normal
  • low

Несомненно очередей может быть больше, тут всё на усмотрение разработчика и архитектуры его приложения.

В базовых настройках Celery это выглядит следующим образом:

CELERY_QUEUES = (
    Queue('high', Exchange('high'), routing_key='high'),
    Queue('normal', Exchange('normal'), routing_key='normal'),
    Queue('low', Exchange('low'), routing_key='low'),
)
CELERY_DEFAULT_QUEUE = 'normal'
CELERY_DEFAULT_EXCHANGE = 'normal'
CELERY_DEFAULT_ROUTING_KEY = 'normal'
CELERY_ROUTES = {
    # -- HIGH PRIORITY QUEUE -- #
    'myapp.tasks.check_payment_status': {'queue': 'high'},
    # -- LOW PRIORITY QUEUE -- #
    'myapp.tasks.close_session': {'queue': 'low'},
}

В данном конкретном примере объявлена очередь по-умолчанию под названием normal. То есть задачи явно не указанные в списке будут автоматически распределены в эту очередь. В high попадает задача под названием check_payment_status, а в low задача close_session.

Запускать исполнителей Celery для этих очередей необходимо следующим образом:

celery worker -E -l INFO -n worker.high -Q high
celery worker -E -l INFO -n worker.normal -Q normal
celery worker -E -l INFO -n worker.low -Q low

Здесь мы явно задаём имена исполнителей и названия очередей в которых необходимо мониторить задачи на исполнение.

ВАЖНО! Если вы явно указали для задачи очередь в которую ей нужно будет падать, и при этом запустили одного из исполнителей Celery без явного указания очереди, например вот так:

celery worker -E -l INFO -n worker.whatever

То при наступлении ситуации, когда все исполнители очереди high будут заняты, Celery автоматически перенаправит новую задачу исполнителям без конкретной очереди. Поэтому при использовании раздельных очередей задач, не запускайте исполнителей без указания для них явного наименования очереди.

Логгируйте ошибки

Логгирование ошибок и своевременный их анализ это основа надёжных приложений. Очень важно иметь полную картину происходящего внутри вашего кода. По-умолчанию Celery все ошибки пишет в stderr, а прочая информация, связанная с исполнением попадает в stdout. Контролировать вывод ошибок можно через стандартный python logging, достаточно повесить свой handler на logger под названием "celery". Практика развёртывания боевых приложений, использующих Celery, показывает, что в качестве процесс-менеджера используют supervisord. В его настройках можно задавать путь до файла в который он будет складировать всю информацию, генерируемую демоном. Но вручную анализировать текстовые логи на предмет ошибок неудобно и неэффективно. Лично я использую для этих целей Sentry. Вот как выглядит у меня logging config:

CELERYD_HIJACK_ROOT_LOGGER = False
LOGGING = {
    'handlers': {
        'celery_sentry_handler': {
            'level': 'ERROR',
            'class': 'core.log.handlers.CelerySentryHandler'
        }
    },
    'loggers': {
        'celery': {
            'handlers': ['celery_sentry_handler'],
            'level': 'ERROR',
            'propagate': False,
        },
    }
}

Важной опцией здесь является наличие CELERYD_HIJACK_ROOT_LOGGER = False. По-умолчанию значение этой переменной является True, что позволяет celery "перекрывать" все ранее объявленные кастомные обработчики logging.

При указанном выше подходе нет необходимости дополнительно в коде задач (task) логгировать ошибки/исключения отдельно. О том что такое Sentry, для чего оно используется и как его настроить я напишу отдельную статью немного позже.

Пишите задачи маленькими

При написании задач старайтесь придерживаться принципа минимализма кода. То есть не нужно в самом celery task описывать бизнес логику задачи. Например, если вам необходимо генерировать и отправлять отчёт, то не нужно в самом task писать код генерации и отправки. Разбейте его на 3 части:

  1. Код генерации отчёта
  2. Код отправки письма
  3. Задача (task) по выполнению этих действий
from .utils import generate_report, send_email
@app.task(bind=True)
def send_report():
    filename = generate_report()
    send_email(subject, message, attachments=[filename])

Это, во-первых, позволит легче читать код (есть явное разделение на подзадачи). Во-вторых, тестировать такой код намного легче (привет модульным тестам!). В-третьих, отлавливать ошибки также будет намного легче и прозрачнее.

"Гасите" задачи вовремя

Явно указывайте лимит на выполнение задачи. Это можно сделать несколькими способами:

  • Через декоратор @app.task, передавая soft_time_limit, time_limit.
  • Глобально задать таймлимит при запуске исполнителя (worker), передав ему соответствующие аргументы (их можно найти в документации к Celery). В этом случае для всех задач, попадающих в заданную очередь будет один и тот же таймлимит.

Указание таймлимита очень важно, так как в некоторых случаях его отсутствие попросту приведёт к "зависанию" исполнителя при выполнении неоднозначных задач (требующих длительного времени, коннект к внешнему сервису и так далее).

Не храните результаты исполнения без необходимости

В большинстве случаев результат выполнения вашей задачи вам не нужен (например, если происходит отправка письма). В такой ситуации вам нет необходимости хранить что-то. Если ваши задачи полностью попадают в эту категорию, то в настройках Celery можно задать глобальный параметр CELERY_IGNORE_RESULT = True, который будет игнорировать результат исполнения всех ваших task-функций.

Используйте Flower для мониторинга исполнения задач

Всегда используйте Flower при работе с Celery. Всегда! Данный инструмент это небольшое веб приложение, написанное с использованием микрофреймворка Flask, а также Tornado для поддержки веб-сокетов. Flower позволяет вам всегда быть в курсе того как исполняются ваши задачи. Немного скриншотов:

Celery Flower Monitoring

Flower Dashboard

Не поленитесь и потратьте время на его изучение. Оно окупится многократно!

Не передавайте ORM объекты в качестве аргументов

Я пару раз попадался на этом хитром трюке, который потрепал мне изрядно нервы. Рассмотрим вот такой код:

from .models import Profile
@app.task(bind=True):
def send_notification(profile):
    send_email(profile.user.email, subject, message_body)
    profile.notified = True
    profile.save()

def notify_user():
    profile = Profile.objects.get(id=1)
    check_smthng()
    send_notification.delay(profile)
    profile.activated = True
    profile.save()

Не самый лучший пример для демонстрации побочного эффекта при передаче ORM объекта, но всё же. В данной ситуации код, описанный в send_notification, сохранит объект, изменив лишь notified = True, но activated останется по-прежнему равен False. Лучшим решением будет передача идентификатора объекта в базе данных, а в самой task функции необходимо непосредственно обращаться к объекту через его id.

BROKER_TRANSPORT_OPTIONS и visibility_timeout

При использовании Celery нередко приходиться прибегать к помощи отложенных задач, используя apply_async и передавая аргументы eta или countdown. Но делать это нужно осторожно, так как даже здесь нас поджидают "подводные камни". О чём речь? Очень часто у разработчиков, начинающих использовать очередь задач вроде Celery, происходят аномалии вроде выполнения одного и того же таска несколькими воркерами одновременно. Согласитесь, нежелательный сценарий. Так может происходить по причине того, что время, через которое должна выполниться задача, превышает visibility_timeout. По умолчанию для Redis этот параметр равен 1 часу. То есть если вы укажете выполнение задачи через 2 часа, то демон celery подождёт 1 час, поймёт, что никто из доступных воркеров не откликнулся и насильно назначит всем воркерам её выполнение при наступлении дедлайна (eta/countdown). Поэтому не забывайте про этот параметр, если вы собираетесь использовать механизмы eta/countdown/retry, задайте visibility_timeout равным самому длительному eta/countdown в вашем проекте. Подробнее можно почитать тут.

UPD: С недавних пор у блога появился свой Telegram канал, где я стараюсь делиться со своими подписчиками интересными находками из сети на тему разработки программного обеспечения и смежных с этой областью материалов.

Long-running tasks

Старайтесь не использовать Celery для выполнения долгих задач. На этот аргумент есть ряд причин:

  1. Процессы, живущие долго, потребляют память, но не освобождают её. Даже с учётом работы сборщика мусора. Такой механизм необходим, чтобы избежать фрагментации оперативной памяти.
  2. Celery заточен на выполнение большого количества задач, требующих мало времени на их исполнение. Когда задачи тяжелые и выполняются долго, образуются очереди.

Если нет возможности использовать что-то другое, то при работе с long-running tasks в Celery знайте следующее:

По-умолчанию 1 воркер процесс будет забирать из очереди 4 задачи за раз. Это особенно актуально знать, если Celery масштабируется на кластере через центрального брокера. То есть, если у вас 3 отдельные машины и на каждой крутится по 10 воркеров на очередь, то каждая машина будет забирать по 40 задач. Отсюда очевидно возникает проблема равномерного распределения задач по кластеру. Такое поведение оправдано в некоторых случаях, т.к. оно уменьшает количество обращений к брокеру, увеличивая производительность при выполнении небольших тасков. Чтобы изменить это, переопределите параметр CELERYD_PREFETCH_MULTIPLIER. Например:

CELERYD_PREFETCH_MULTIPLIER = 1

Долгоживущие процессы имеют тенденцию к пожиранию памяти, но вот назад её зачастую не возвращают, поэтому в контексте использования Celery с ними иногда имеет смысл перезагружать воркеры после выполнения заданного количества тасков. За это отвечает параметр CELERYD_MAX_TASKS_PER_CHILD

CELERYD_MAX_TASKS_PER_CHILD= 1

Настройка выше будет перезагружать воркер-процесс после выполнения 1 таска.

Полезные ссылки