Celery: начинаем правильно

Python Celery
В этой статье мне хотелось бы поделиться с читателями своим опытом работы с таким замечательным инструментом в Python как Celery.

Celery это ничто иное как распределённая очередь заданий, реализованная на языке Python. На момент написания этой статьи, самой последней версией является 3.1.20. Неосведомлённый читатель может не знать для чего вообще нужна система очередей задач наподобие Celery, поэтому кратко поясню этот момент.

Что такое Celery и зачем оно нам?

Часто ли вам приходилось сталкиваться с типовыми задачами в веб-приложениях вроде отправки электронного письма посетителю или обработки загруженных данных. Чаще всего такого рода манипуляции не требуют участия конечного пользователя вашего проекта, то есть их можно выполнять в фоновом режиме. Те из нас, кто реализует выполнение этих задач в одном из процессов веб-сервера, «тормозят» тем самым его работу, увеличивая время отклика и ухудшают user experience.

В данной заметке я опущу вводную информацию по установке и настройке Celery в вашем проекте. Кстати, Celery из коробки умеет работать с Django. Ранее был отдельный python пакет, соединяющий Django и Celery, именовался он django-celery. Сейчас он заброшен, так как последнее обновление было более года назад. Стоит отметить, что django-celery не работает Django 1.9 из-за изменений в работе cache backend. Исправленную версию можно посмотреть в моём форке. Одной из удобных фич django-celery является интеграция с Django Admin по части управления periodic tasks.

Советы по работе с Celery

Не используйте базу данных в качестве broker/backend

Брокер отвечает за передачу сообщений (задач) между так называемыми исполнителями (workers). Проблема использования базы данных заключается в её ограничениях — она просто не предназначена для этого. Дело в том, что с ростом количества исполнителей, нагрузка на базу будет только возрастать, а учитывая тот факт, что каждый worker имеет ещё ряд потоков, ситуация может стать катастрофической даже при малых нагрузках. Всё это приведёт к бутылочному горлышку в виде затыка на I/O, потере задач, а возможно и неоднократному их исполнению (два воркера могут получить одну и ту же задачу на исполнение). Отличным production-ready решением является использование RabbitMQ или Redis для этой роли.

Бэкэнд в случае с Celery выступает в качестве хранилища результатов выполнения задач (task). Одной из причин создания django-celery как раз являлась возможность подключения БД для сохранения результатов. Признаюсь, что в самом начале работы с Celery я неоднократно в проектах использовал этот подход. Пожалуйста, не повторяйте мою ошибку. С ростом нагрузки на приложение проблемы будут расти словно грибы после дождя (более того, «из коробки» celery не чистит базу от «устаревших» результатов) . Правда тут есть нюансы касательно вашего приложения. Об этом читайте ниже. Production-ready решением для роли backend неплохо зарекомендовал себя демон memcached. Пользуемся более 2-х лет, проблем ни разу не было.

Разделяйте задачи по очередям

Это очень важный момент. По мере развития вашего приложения, в проекте будут появляться критичные для выполнения задачи: проверка статуса платежа, формирование отчёта, отправка электронных писем и так далее. Терять их недопустимо. Если все задачи складировать в одну очередь, то в один прекрасный момент она может забиться, поставив под угрозу выполнение критически важного кода. Мой подход: разделяйте очереди по приоритетам.

  • high
  • normal
  • low

Несомненно очередей может быть больше, тут всё на усмотрение разработчика и архитектуры его приложения.

В базовых настройках Celery это выглядит следующим образом:

CELERY_QUEUES = (
    Queue('high', Exchange('high'), routing_key='high'),
    Queue('normal', Exchange('normal'), routing_key='normal'),
    Queue('low', Exchange('low'), routing_key='low'),
)

CELERY_DEFAULT_QUEUE = 'normal'
CELERY_DEFAULT_EXCHANGE = 'normal'
CELERY_DEFAULT_ROUTING_KEY = 'normal'

CELERY_ROUTES = {
    # -- HIGH PRIORITY QUEUE -- #
    'myapp.tasks.check_payment_status': {'queue': 'high'},
    # -- LOW PRIORITY QUEUE -- #
    'myapp.tasks.close_session': {'queue': 'low'},
}

В данном конкретном примере объявлена очередь по-умолчанию под названием normal. То есть задачи явно не указанные в списке будут автоматически распределены в эту очередь. В high попадает задача под названием check_payment_status, а в low задача close_session.

Запускать исполнителей Celery для этих очередей необходимо следующим образом:

celery worker -E -l INFO -n worker.high -Q high
celery worker -E -l INFO -n worker.normal -Q normal
celery worker -E -l INFO -n worker.low -Q low

Здесь мы явно задаём имена исполнителей и названия очередей в которых необходимо мониторить задачи на исполнение.

ВАЖНО! Если вы явно указали для задачи очередь в которую ей нужно будет падать, и при этом запустили одного из исполнителей Celery без явного указания очереди, например вот так:

celery worker -E -l INFO -n worker.whatever

То при наступлении ситуации, когда все исполнители очереди high будут заняты, Celery автоматически перенаправит новую задачу исполнителям без конкретной очереди. Поэтому при использовании раздельных очередей задач, не запускайте исполнителей без указания для них явного наименования очереди.

Логгируйте ошибки

Логгирование ошибок и своевременный их анализ это основа надёжных приложений. Очень важно иметь полную картину происходящего внутри вашего кода. По-умолчанию Celery все ошибки пишет в stderr, а прочая информация, связанная с исполнением попадает в stdout. Контролировать вывод ошибок можно через стандартный python logging, достаточно повесить свой handler на logger под названием «celery». Практика развёртывания боевых приложений, использующих Celery, показывает, что в качестве процесс-менеджера используют supervisord. В его настройках можно задавать путь до файла в который он будет складировать всю информацию, генерируемую демоном. Но вручную анализировать текстовые логи на предмет ошибок неудобно и неэффективно. Лично я использую для этих целей Sentry. Вот как выглядит у меня logging config:

CELERYD_HIJACK_ROOT_LOGGER = False

LOGGING = {
    'handlers': {
        'celery_sentry_handler': {
            'level': 'ERROR',
            'class': 'core.log.handlers.CelerySentryHandler'
        }
    },

    'loggers': {
        'celery': {
            'handlers': ['celery_sentry_handler'],
            'level': 'ERROR',
            'propagate': False,
        },
    }
}

Важной опцией здесь является наличие CELERYD_HIJACK_ROOT_LOGGER = False. По-умолчанию значение этой переменной является True, что позволяет celery «перекрывать» все ранее объявленные кастомные обработчики logging.

При указанном выше подходе нет необходимости дополнительно в коде задач (task) логгировать ошибки/исключения отдельно. О том что такое Sentry, для чего оно используется и как его настроить я напишу отдельную статью немного позже.

Пишите задачи маленькими

При написании задач старайтесь придерживаться принципа минимализма кода. То есть не нужно в самом celery task описывать бизнес логику задачи. Например, если вам необходимо генерировать и отправлять отчёт, то не нужно в самом task писать код генерации и отправки. Разбейте его на 3 части:

  1. Код генерации отчёта
  2. Код отправки письма
  3. Задача (task) по выполнению этих действий
from .utils import generate_report, send_email

@app.task(bind=True)
def send_report():
    filename = generate_report()
    send_email(subject, message, attachments=[filename])

Это, во-первых, позволит легче читать код (есть явное разделение на подзадачи). Во-вторых, тестировать такой код намного легче (привет модульным тестам!). В-третьих, отлавливать ошибки также будет намного легче и прозрачнее.

«Гасите» задачи вовремя

Явно указывайте лимит на выполнение задачи. Это можно сделать несколькими способами:

  • Через декоратор @app.task, передавая soft_time_limit, time_limit.
  • Глобально задать таймлимит при запуске исполнителя (worker), передав ему соответствующие аргументы (их можно найти в документации к Celery). В этом случае для всех задач, попадающих в заданную очередь будет один и тот же таймлимит.

Указание таймлимита очень важно, так как в некоторых случаях его отсутствие попросту приведёт к «зависанию» исполнителя при выполнении неоднозначных задач (требующих длительного времени, коннект к внешнему сервису и так далее).

Не храните результаты исполнения без необходимости

В большинстве случаев результат выполнения вашей задачи вам не нужен (например, если происходит отправка письма). В такой ситуации вам нет необходимости хранить что-то. Если ваши задачи полностью попадают в эту категорию, то в настройках Celery можно задать глобальный параметр CELERY_IGNORE_RESULT = True, который будет игнорировать результат исполнения всех ваших task-функций.

Используйте Flower для мониторинга исполнения задач

Всегда используйте Flower при работе с Celery. Всегда! Данный инструмент это небольшое веб приложение, написанное с использованием микрофреймворка Flask, а также Tornado для поддержки веб-сокетов. Flower позволяет вам всегда быть в курсе того как исполняются ваши задачи. Немного скриншотов:

Celery Flower Monitoring

Flower Dashboard

Не поленитесь и потратьте время на его изучение. Оно окупится многократно!

Не передавайте ORM объекты в качестве аргументов

Я пару раз попадался на этом хитром трюке, который потрепал мне изрядно нервы. Рассмотрим вот такой код:

from .models import Profile

@app.task(bind=True):
def send_notification(profile):
    send_email(profile.user.email, subject, message_body)
    profile.notified = True
    profile.save()

def notify_user():
    profile = Profile.objects.get(id=1)
    check_smthng()
    send_notification.delay(profile)
    profile.activated = True
    profile.save()

Не самый лучший пример для демонстрации побочного эффекта при передаче ORM объекта, но всё же. В данной ситуации код, описанный в send_notification, сохранит объект, изменив лишь notified = True, но activated останется по-прежнему равен False. Лучшим решением будет передача идентификатора объекта в базе данных, а в самой task функции необходимо непосредственно обращаться к объекту через его id.

BROKER_TRANSPORT_OPTIONS и visibility_timeout

При использовании Celery нередко приходиться прибегать к помощи отложенных задач, используя apply_async и передавая аргументы eta или countdown. Но делать это нужно осторожно, так как даже здесь нас поджидают «подводные камни». О чём речь? Очень часто у разработчиков, начинающих использовать очередь задач вроде Celery, происходят аномалии вроде выполнения одного и того же таска несколькими воркерами одновременно. Согласитесь, нежелательный сценарий. Так может происходить по причине того, что время, через которое должна выполниться задача, превышает visibility_timeout. По умолчанию для Redis этот параметр равен 1 часу. То есть если вы укажете выполнение задачи через 2 часа, то демон celery подождёт 1 час, поймёт, что никто из доступных воркеров не откликнулся и насильно назначит всем воркерам её выполнение при наступлении дедлайна (eta/countdown). Поэтому не забывайте про этот параметр, если вы собираетесь использовать механизмы eta/countdown/retry, задайте visibility_timeout равным самому длительному eta/countdown в вашем проекте. Подробнее можно почитать тут.

UPD: С недавних пор у блога появился свой Telegram канал, где я стараюсь делиться со своими подписчиками интересными находками из сети на тему разработки программного обеспечения и смежных с этой областью материалов.

Полезные ссылки

  • Антон

    Всё по делу.
    По очередям надо точно разделять, помимо описанных тут приоритетов, заметил ещё непонятный баг — криво написанное, выполняющееся на одном воркере из всего пула задание способно повесить весь пул, и задания будут копиться у брокера.

    Насчёт таймлимита на задачу — поспорил бы. Но если писать задачи маленькими и короткими — то можно и поставить для надёжности. С длительными заданиями сложнее вычислить заранее, да и надо ли. Кстати, не стоит забывать пользоваться таймлимитами и в других местах, где они доступны — например, при осуществлении запроса с помощью urllib или requests.

    По поводу передачи аргументов: да, чем проще тип аргументов при передаче, тем лучше. И ещё лучше использовать сериализатор pickle — он более всеяден, чем тот же json. Иногда может понадобиться дополнительно подготавливать данные для передачи в задание — всегда держите в голове, что хоть и пишется .delay(args) или .apply(args), но под капотом там идёт сериализация-десериализация и передача данных по сети), например, мне как-то пришлось извлекать из объекта request (Django) данные, которые затем передавал в задание для последующей обработки.

    Ещё как-то столкнулся с проблемой: из выполняющегося задания хотел ставить другие задания и ожидать результата их выполнения. Например, выполняя парсинг веб-страницы, я собираю из html все адреса картинок и хотел бы загружать каждую из них в отдельном задании, ожидая результата выполнения их всех. Так вот, такой финт невозможен.

    • Johnny

      Насчет последнего, http://docs.celeryproject.org/en/latest/reference/celery.html#celery.group вам в помощь

    • MskGuy

      Антон, здравствуйте! Так уж вышло, что Ваш комментарий, это самое информативное, что я нашел по своей теме.

      Мучаюсь, с недавних пор, с celery и задачка также подразумевает ожидание параллельных результатов для последующей обработки. Судя по документации,примитив chord именно для этого и предназначен.

      Собственно вопрос — не изменилось ли мнение о осуществимости такого средствами celery за прошедшие 8 месяцев?)

  • Pingback: Python-RQ: очередь задач на базе Redis — Персональный блог Адиля Хаштамова()

  • Johnny

    Вы бы хоть указали первоисточник, с которого 90% статьи списали, а именно https://habrahabr.ru/post/269347/

    • Странно считать первоисточником перевод статьи. Видно, что вы мало работали с Celery, иначе бы пришли к тем же заключениям.

      • Johnny

        Причем здесь моя работа с Celery и факт того, что в вашей статье практически все пункты повторяют пункты той статьи (даже в той же последовательности)?

        • В том, что все эти пункты — common sense, который возникает при тесной работе с Celery продолжительное время.

          • Johnny

            Слишком мало пунктов возникло при тесной работе с Celery продолжительное время. Причем большая часть из них должна была возникнуть сразу после прочтения «First Steps with Celery» что на сайте целери лежит.

          • Значит плохо работал. Напишите гайд лучше.

          • Johnny

            зачем, если уже есть Ваш и https://habrahabr.ru/post/269347/

          • Так там же мало пунктов =)

          • Johnny

            Не люблю писателей, который, когда их статью критикуют, сразу заявляют мол «напиши свою». Я вижу, что вы хорошо поработали над этой статьей, точнее доработали мною приведенную по ссылке. Я просто сказал вам, что хорошим тоном будет указать статьи, которые взяты за основу. И не надо тут говорить, что эти пункты common sense и поэтому и тут и там они одинаковые. Не врите ни себе, ни людям. Я уверен, что при написании данной статьи вы пользовались уже написанными. Я не верю в такие совпадения.

          • Ваше право не любить и не верить. Хорошим тоном по-вашему является фраза «списал 90% статьи»? Я впервые вижу статью с хабра. Да, я пользовался разными источниками, в том числе официальной документацией.

  • Vitaliy Kopachyov

    Адиль, У вас постоянно нахожу то, что мне нужно.
    Сначала бот для телеграмма, потом requests. Вот заитересоввался Selery. И опять из Вашего блога можно почерпнуть дохотчивую актуальную информацию. Спасибо за труды.

    • Спасибо за отзыв) Приятно осознавать, что материал полезен для читателей.

  • Vitaliy Kopachyov

    Доброго времени суток.
    есть необходимость запускать по одной задаче в один момент времени и не начинать другую пока предыдущая будет выполняться. задачи будут стартовать по расписанию и если время выполнения следующей подошло, а предыдущая еще выполняется, то выполнение не начинать. как дать понять селери, что воркер еще работает над задачей?
    Не подскажите как это можно сделать? или где об этом почитать?
    Спасибо

  • zzzago

    Здраствуйте,
    у меня стоит задача, в одновременно делать около десяти XML и Soap запросов. Для выполнения XML запросов на данный момент использую urllib2, для Soap — suds. Подскажите пожалуйста подходит ли для таких задач Celery? И еще один вопрос, полученные в ответ данные, я плнаровал сохранять временно в Redis, и только выбранные пользователем данные сохранять в реляционную БД, правильно ли это? Заранее спасибо.

    • Добрый день. Да,вполне подходит. Что касается второго вопроса, то всё зависит от контекста вашего приложения, нагрузки, объёма возвращаемых данных от внешних сервисов.

      • zzzago

        Спасибо!