Python-RQ: очередь задач на базе Redis

Не так давно я рассказывал о том как лучше всего работать с очередью задач Celery. Бесспорно, Celery является наиболее продвинутым инструментом в экосистеме Python с массой возможностей для работы и управления выполнением ваших задач, включая кастомизацию брокеров и бэкэндов. Но Python мир не им един.

В этой заметке я расскажу об инструменте под названием Python RQ. Аббревиатура RQ обозначает Redis Queue. Из названия можно догадаться, что инструмент создан вокруг системы Redis. Она в данном случае выступает в качестве брокера и бэкэнда. RQ следует философии UNIX, то есть является маленьким инструментом, решающим одну задачу очень хорошо.

Установка

Тут всё просто и обыденно. Устанавливаем в отдельное виртуальное окружение:

$ pip install rq

Для любителей попробовать самое свежее следует устанавливать прямо из master ветки репозитория с кодом:

$ pip install -e git+git@github.com:nvie/rq.git@master#egg=rq

Для работы с очередью задач, также необходимо в систему установить redis-server (RQ требует Redis версии >= 2.6.0):

$ sudo apt-get install redis-server

Под Windows RQ работать не будет, так как полагается на механизм fork для порождения процессов, выполняющих задачи. Если вы используете Windows, то можно воспользоваться Vagrant для выполнения кода на виртуальной машине под управлением, например, Ubuntu 16.04 LTS.

Механизм работы RQ следующий:

  • Мы инициализируем worker процессы, которые выполняют мониторинг очередей в Redis (по-умолчанию default)
  • Внутри приложения устанавливаем соединение с очередью/очередями и кладём в неё задачу на исполнение
  • Как только задача приходит, worker её получает и исполняет
  • RQ использует в качестве механизма сериализации стандартный модуль pickle

Кодинг

Я решил не заморачиваться с названиями файлов, поэтому буду придерживаться практики с Celery. Задачи буду хранить в файле tasks.py:

tasks.py (пример кода взят с официальной документации python-rq)

import requests

def count_words_at_url(url):
    resp = requests.get(url)
    return len(resp.text.split())

Код считает количество слов на заданном url. Из кода можно заметить, что нам также необходимо установить пакет requests. О том как работать с HTTP в Python я ранее уже писал.

app.py

import time
from rq import Queue
from redis import Redis
from tasks import count_words_at_url

# задаём соединение с Redis по умолчанию
redis_conn = Redis()
queue = Queue(connection=redis_conn)

# кладём выполнение нашей задачи в очередь
job = queue.enqueue(count_words_at_url, 'https://khashtamov.com/')
print(job.result)   # функция возвратит None, так как задача скорее всего не будет выполнена к этому момент
# подождём 4 секунды
time.sleep(4)
print(job.result)   # => результат выполнения функции (кол-во слов)

Запускаем первый worker из корневой директории с кодом, чтобы он мог импортировать модули без ошибок импорта:

$ rq worker
12:10:28 RQ worker 'rq:worker:ultrabook-acer.7756' started, version 0.5.6
12:10:28
12:10:28 *** Listening on default...

В другом окне терминала выполняем наш код с приложением:

$ python3 app.py

И видимо вот такой результат:

$ python3 app.py
None
3039
09:11:23 default: tasks.count_words_at_url('https://khashtamov.com/') (fc4f5151-598a-49d1-ab6f-8d9907b7a89b)
09:11:25 Job OK

Задача успешно выполнена воркером.

Функциональные возможности

Сохранение результата

По умолчанию результат, возвращаемый функцией-таском, хранится в Redis 500 секунд. Это можно изменить, передав значение в секундах в именованный аргумент result_ttl.

job = queue.enqueue(count_words_at_url, 'https://khashtamov.com/', result_ttl=60*60*24)  # храним результат 1 день

Таймаут

Таймаут на выполнение можно задать как на уровне очереди, так и конкретно для задачи:

queue = Queue('low', connection=redis_conn, default_timeout=30) 

В данном случае мы задали 30 секунд на выполнение для задач, попадающих в очередь с названием low.

job = queue.enqueue(count_words_at_url, 'https://khashtamov.com/', timeout=5)

Тут указан таймаут для конкретной задачи - 5 секунд на выполнение.

Отложенные задачи

К сожалению (или к счастью, для кого как), в стандартном пакете RQ нет возможности откладывать выполнение задачи до наступления определенного времени (конкретной даты или спустя какое-то время). Но умельцы разработали расширение под названием RQ Scheduler.

Вот как можно его заюзать:

$ pip install rq-scheduler

sched.py

from datetime import timedelta
from redis import Redis
from rq_scheduler import Scheduler
from tasks import count_words_at_url

# задаём соединение с Redis по умолчанию
redis_conn = Redis()
scheduler = Scheduler(connection=redis_conn)
scheduler.enqueue_in(timedelta(seconds=5), count_words_at_url, 'https://khashtamov.com/')

Для того, чтобы задачи корректно были переданы воркерам, необходимо запустить демон rqscheduler

$ rqscheduler -i 5

Параметр -i указывает интервал для проверки очередей (по умолчанию 60 секунд). Помимо отложенных задач, RQ Scheduler поддерживает выполнение с заданной периодичностью и повторением (замена системному cron). Для этого необходимо зарегистрировать задачу, используя вызов метода schedule:

scheduler.schedule(datetime.utcnow(), count_words_at_url, args=('https://khashtamov.com/', ), repeat=5, interval=10)

Я задал выполнение задачи каждые 10 секунд 5 раз подряд.

Интеграция с веб-фреймворками

Большой необходимости в специальной интеграции с python веб-фреймворками нет, но для тех, кто любит иметь всё в одном место, умельцы разработали плагин для работы с Django. Имя ему django-rq, разработкой занимался автор RQ Scheduler. Отдельно этот пакет описывать я думаю смысла нет, так как настроить его совсем просто.

Мониторинг

В статье про Celery я описывал, что в production среде очень важно иметь наглядную картину выполнения ваших задач, так как это помогает выявлять проблемы на ранних стадиях работы (а чем раньше мы выявим баг, тем дешевле он обойдётся). Аналогом Flower для RQ является RQ Dashboard. Это небольшое, но очень полезное веб-приложение на Flask, позволяющее мониторить выполнение задач в режиме реального времени. Установка и настройка его тривиальна (см. ссылку в конце статьи).

Заключение

Целью данной заметки являлось обратить ваше внимание на столь маленький и гибкий инструмент как Python RQ и показать, что для решения небольших задач он порой подходит куда лучше Celery. Несомненно, RQ не обладает широчайшим функционалом присущим монструозному Celery. Например, RQ не умеет выполнять задачи в группе параллельно, дожидаясь результата от всех тасков (group в Celery), к нему невозможно прикрутить сторонний брокер, скажем RabbitMQ. В своей практике я использовал RQ на production, но, к сожалению, мне не доводилось его проверять под большими нагрузками.

Если вас заинтересовал инструмент, то более подробную информацию можно узнать по ссылкам, приведённым ниже.

Полезные ссылки