Celery Best Practices: practical approach
Celery is the most advanced task queue in the Python ecosystem and usually considered as a de facto when it comes to process tasks simultaneously in the background. The lastest version is 4.0.2, community around Celery is pretty big (which includes big corporations such as Mozilla, Instagram, Yandex and so on) and constantly evolves. Hence investing your time in this technology pays back.
This article is not about how to set up Celery, installation is pretty straightforward and instructions can be found on the official website. Instead, I would like to emphasize your attention on best practices while working with Celery.
Tips & Tricks with Celery
Broker and backend usage
When it comes to web apps and Python, developers usually prefer to use the following toolset: Django, Gunicorn, nginx. And many people will agree. Moreover, despite the fact that Celery has a built-in support for Django, developers used to use django-celery application which has more features including monitoring of periodic tasks and ORM as broker and backend. But... The idea of using database as a broker and backend in production is ridiculous because DB is not suitable for it. As your web app evolves and traffic increases, load on your database increases as well and it becomes a bottleneck which should be fixed as soon as possible, otherwise you will start loosing tasks, data and clients. The best practice here is to use production-ready systems like Redis or RabbitMQ as brokers. They are very well tested and proved their reliability and consistency. But there is a trade off as well: Redis is quick and responsive advanced in memory key-value store but not reliable with default settings, RabbitMQ is a reliable and robust system but little bit harder to set up and tune.
Speaking about backend in Celery, I recommend to use memcached or Redis. Please, do not use database. When I started using task queue I tend to use database as a backend very often due to several reasons:
- it is very easy to set up
- django-celery supports it very good
- no additional dependency
But the reasons not to use it are stated above. I have been using memcached and redis under pretty load website for about 3 years. Everything works perfectly.
Logging is the answer
When it comes to errors and abnormal behaviour logs are the key point for investigation. Same applies to background task processing systems. Celery supports standard Python logging mechanism (which is pretty straightforward). By default it outputs errors to stderr and using stdout for everything else. The best practice to run a celery worker is to use separate process manager like supervisord. Using its config file you may provide a filename and a path where it will put logs. But manual log analysis using grep and/or sed can be boring, isn't it. I prefer to use Sentry for every suspicious behaviour during a task runtime.
This is how my logging config looks like:
CELERYD_HIJACK_ROOT_LOGGER = False
LOGGING = {
'handlers': {
'celery_sentry_handler': {
'level': 'ERROR',
'class': 'core.log.handlers.CelerySentryHandler'
}
},
'loggers': {
'celery': {
'handlers': ['celery_sentry_handler'],
'level': 'ERROR',
'propagate': False,
},
}
}
This is how CelerySentryHandler looks like:
from raven.handlers.logging import SentryHandler
class CelerySentryHandler(SentryHandler):
def __init__(self):
super(AviataSentryHandler, self).__init__(settings.RAVEN_CELERY_DSN)
Take a look at CELERYD_HIJACK_ROOT_LOGGER. By default this parameter is set to True, which prevents execution of any custom logging handlers subscribed to "celery" logger. Following my approach you do not have to set up any additional exception handling mechanisms in your tasks intentionally.
Do not store result of execution when you do not need it
We usually use Celery as a background task processor hence do not care about the return value (i.e. sending an email or resizing an image). The result of a task execution is stored in the backend by default, but when you do not need a result it is better turn off saving mechanism via setting called CELERY_IGNORE_RESULT. Just set it to True.
Visibility Timeout and Broker settings
Besides background tasks execution, Celery also supports so called delayed tasks (apply_async method). In order to use them all you have to do is to provide datetime when the task should be invoked via eta or countdown (in seconds). This mechanism works perfectly and combined with periodic tasks replaces traditional crontab. But when developers just start using it, they regularly face abnormal behaviour of workers, specifically multiple execution of the same task by several workers. The reason which causes it is a visibility timeout setting. Put simply, visibility timeout determines the number of seconds for celery daemon to wait for specific worker which should acknowledge the task before it will be redelivered to another one (celery retry mechanism). By default for Redis broker it is set to 1 hour, hence if you have a task which should be called in 2 hours, it will be redelivered to all available workers and be executed several times. Simply set visibility_timeout to the highest eta/countdown (I usually have tasks which are due 1 day). Detailed information can be found here.
Task queue separation
When you invoke a task, it is put to the default queue unless otherwise stated. This is pretty simple and straightforward idea until you have tons of different tasks and your app continues to evolve rapidly. In this case every task starts having its own priority: some are less important than others. For example email sending task can have higher priority than image resizing in your app. It is better to separate them to different queues and workers in order to avoid future overflow of a particular queue (when you have the only one queue, overflowing can introduce problems with all tasks execution). I usually do it this way:
CELERY_QUEUES = (
Queue('high', Exchange('high'), routing_key='high'),
Queue('normal', Exchange('normal'), routing_key='normal'),
Queue('low', Exchange('low'), routing_key='low'),
)
CELERY_DEFAULT_QUEUE = 'normal'
CELERY_DEFAULT_EXCHANGE = 'normal'
CELERY_DEFAULT_ROUTING_KEY = 'normal'
CELERY_ROUTES = {
# -- HIGH PRIORITY QUEUE -- #
'myapp.tasks.check_payment_status': {'queue': 'high'},
# -- LOW PRIORITY QUEUE -- #
'myapp.tasks.close_session': {'queue': 'low'},
}
Starting a celery worker to listen to particular queue is straightforward:
celery worker -E -l INFO -n worker.high -Q high
celery worker -E -l INFO -n worker.normal -Q normal
celery worker -E -l INFO -n worker.low -Q low
I usually have 3 queues:
- high
- normal
- low
It depends on app's realm and its architechture, you could have more or less.
WARNING: When you set up a queue name for particular task (let's say high) and have running workers under no particular queue like this:
celery worker -E -l INFO -n worker.whatever
When the queue named high is full and its workers are busy processing tasks, your pending task will be executed by one of the workers which do not listen to specific queue. So, when it comes to task queue separation please be careful.
Try to keep tasks simple
Which simply means not to put your business logic into the task. For example, if you want to send emails using Celery, define your core email sending function outside:
from .utils import generate_report, send_email
@app.task(bind=True)
def send_report():
filename = generate_report()
send_email(subject, message, attachments=[filename])
This is more flexible approach during unit testing.
Use task monitoring systems
The best practice here is to use Flower web app as a task execution monitor. Use it on a daily basis. Spend some time to investigate this tool, it pays off.
Do not provide ORM object in arguments
Django ORM is great. But providing ORM objects/model instances as arguments to celery task functions is a bad taste. Let's explore following example:
from .models import Profile
@app.task(bind=True):
def send_notification(profile):
send_email(profile.user.email, subject, message_body)
profile.notified = True
profile.save()
def notify_user():
profile = Profile.objects.get(id=1)
check_smthng()
send_notification.delay(profile)
profile.activated = True
profile.save()
Probably not the best example to demostrate the side-effect, but anyway. When the task is executed, profile.activated will still be equal to False even if the code below task invocation will be executed first. The best practice here is to provide database ID and then retrieve object using it.
Provide task timeout
It is always better to provide timeout for task execution. You can do this using the following approaches:
- Provide to @app.task decorator arguments soft_time_limit and time_limit
- Globally set up a timeout for particular worker providing specific arguments (CELERYD_TASK_SOFT_TIME_LIMIT, CELERYD_TASK_TIME_LIMIT)
Setting up a timelimit is very important in order to avoid worker "freeze" and queue "deadlock".