Using celery with multiple queues, retries and scheduled tasks

On this post, I’ll show how to work with multiple queues, scheduled tasks, and retry when something goes wrong.

If you don’t know how to use celery, read this post first: https://fernandofreitasalves.com/executing-time-consuming-tasks-asynchronously-with-django-and-celery/

Retrying a task

Let’s say your task depends on an external API or connects to another web service and for any reason, it’s raising a ConnectionError, for instance. It’s plausible to think that after a few seconds the API, web service, or anything you are using may be back on track and working again. In this cases, you may want to catch an exception and retry your task.


from celery import shared_task
 
@shared_task(bind=True, max_retries=3)  # you can determine the max_retries here
def access_awful_system(self, my_obj_id):
    from core.models import Object
    from requests import ConnectionError
 
    o = Object.objects.get(pk=my_obj_id)
 
    # If ConnectionError try again in 180 seconds
    try:
 
        o.access_awful_system()
  
    except ConnectionError as exc:
        self.retry(exc=exc, countdown=180)  # the task goes back to the queue

The self.retry inside a function is what’s interesting here. That’s possible thanks to bind=True on the shared_task decorator. It turns our function access_awful_system into a method of Task class. And it forced us to use self as the first argument of the function too.

Another nice way to retry a function is using exponential backoff:

self.retry(exc=exc, countdown=2 ** self.request.retries)

ETA – Scheduling a task for later

Now, imagine that your application has to call an asynchronous task, but need to wait one hour until running it.

In this case, we just need to call the task using the ETA(estimated time of arrival)  property and it means your task will be executed any time after ETA. To be precise not exactly in ETA time because it will depend if there are workers available at that time. If you want to schedule tasks exactly as you do in crontab, you may want to take a look at CeleryBeat).


from django.utils import timezone
from datetime import timedelta

now = timezone.now() 
 
# later is one hour from now
later = now + timedelta(hours=1)

access_awful_system.apply_async((object_id), eta=later) 

Using more queues

When you execute celery, it creates a queue on your broker (in the last blog post it was RabbitMQ). If you have a few asynchronous tasks and you use just the celery default queue, all tasks will be going to the same queue.

Suppose that we have another task called too_long_task and one more called quick_task and imagine that we have one single queue and four workers.

In that scenario, imagine if the producer sends ten messages to the queue to be executed by too_long_task and right after that, it produces ten more messages to quick_task. What is going to happen? All your workers may be occupied executing too_long_task that went first on the queue and you don’t have workers on quick_task.

The solution for this is routing each task using named queues.


# CELERY ROUTES
CELERY_ROUTES = {
    'core.tasks.too_long_task': {'queue': 'too_long_queue'},
    'core.tasks.quick_task': {'queue': 'quick_queue'},
}

Now we can split the workers, determining which queue they will be consuming.


# For too long queue
celery --app=proj_name worker -Q too_long_queue -c 2

# For quick queue
celery --app=proj_name worker -Q quick_queue -c 2

I’m using 2 workers for each queue, but it depends on your system.

As, in the last post, you may want to run it on Supervisord

There is a lot of interesting things to do with your workers here.

Calling Sequential Tasks

Another common issue is having to call two asynchronous tasks one after the other. It can happen in a lot of scenarios, e.g. if the second tasks use the first task as a parameter.

You can use chain to do that


from celery import chain
from tasks import first_task, second_task
 
chain(first_task.s(meu_objeto_id) | second_task.s())

The chain is a task too, so you can use parameters on apply_async, for instance, using an ETA:


chain(salvar_dados.s(meu_objeto_id) | trabalhar_dados.s()).apply_async(eta=depois)

Ignoring the results from ResultBackend

If you just use tasks to execute something that doesn’t need the return from the task you can ignore the results and improve your performance.

If you’re just saving something on your models, you’d like to use this in your settings.py:


CELERY_IGNORE_RESULT = True

 

Sources:

http://docs.celeryproject.org/en/latest/userguide/tasks.html

http://docs.celeryproject.org/en/latest/userguide/optimizing.html#guide-optimizing

https://denibertovic.com/posts/celery-best-practices/

https://news.ycombinator.com/item?id=7909201

http://docs.celeryproject.org/en/latest/userguide/workers.html

http://docs.celeryproject.org/en/latest/userguide/canvas.html

 

Super Bônus

Celery Messaging at Scale at Instagram – Pycon 2013

Executing time-consuming tasks asynchronously with Django and Celery

This post is based on a lightning talk I gave on 2015, at GruPy-SP (July/15) in Sao Paulo.

What’s the matter of having time-consuming tasks on the server side?

Every time the client makes a request, the server has to read the request, parse the received data, retrieve or create something into the database, process what the user will receive, renders a template and send a response to the client. That’s usually what happens in a Django app.

Depending on what you are executing on the server, the response can take too long and it leads to problems such as poor user experience or even a time-out error. It’s a big issue. Loading time is a major contributing factor to page abandonment. So, slow pages lose money.

There’s a lot of functions that can take a long time to run, for instance, a large data report requested by a web client, emailing a long list or even editing a video after it’s uploaded on your website.

Real Case:

That’s a problem that I’ve face once when I was creating a report. The report took around 20 minutes to be sent and the client got a time-out error and obviously, nobody wants to wait 20 minutes for getting something. So, to handle this I’d have to let the task run in the background. (On Linux, you can do this putting a & at the end of a command and the OS will execute the command in the background)

django-view-os-system

It looks like the worst code ever.

Me arrependo imediatamente dessa decisão

Celery – the solution for those problems!

Celery is a distributed system to process lots of messages. You can use it to run a task queue (through messages). You can schedule tasks on your own project, without using crontab and it has an easy integration with the major Python frameworks.

How does celery work?

Celery Architecture Overview. (from this SlideShare)

estrutura-celery

  • The User (or Client or Producer) is your Django Application.
  • The AMPQ Broker is a Message Broker. A program responsible for the message queue, it receives messages from the Client and delivers it to the workers when requested. For Celery the AMPQ Broker is generally RabbitMQ or Redis
  • The workers (or consumers) that will run your tasks asynchronously.
  • The Result Store, a persistent layer where workers store the result of tasks.

The client produces messages, deliver them to the Message Broker and the workers read this messages from the broker, execute them and can store the results on a Memcached, RDBMS, MongoDB, whatever the client can access later to read the result.

Installing and configuring RabbitMQ

There is a lot of examples on How to Use Celery with Redis. I’m doing this with RabbitMQ.

  1. Install RabbitMQ
    sudo apt-get install rabbitmq-server
  2. Create a User, a virtual host and grant permissions for this user on the virtual host:
    
    sudo rabbitmqctl add_user myuser mypassword
    sudo rabbitmqctl add_vhost myvhost
    sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
    

Installing and configuring Celery

pip install celery

In your settings.py:

#Celery Config
BROKER_URL = 'amqp://guest:[email protected]:5672//'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

In your project’s directory (the same folder of settings.py), creates a celery.py file as following.


from __future__ import absolute_import

import os
from celery import Celery
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'nome_do_proj.settings')

app = Celery('nome_do_proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

This autodiscover_tasks allows your project to find the asynchronous task of each Django App. In the same directory, you have to modify your  __init__.py:


from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

Creating tasks for your app

In your app’s directory, put a tasks.py:


from __future__ import absolute_import
from celery import shared_task
from reports import generate_report_excel

@shared_task  # Use this decorator to make this a asyncronous function
def generate_report(data_inicial, data_final, email):
    generate_report_excel(
        ini_date = ini_date,
        final_date = final_date,
        email = email
    )

Now you just have to import this function anywhere you want and call the delay method, that was added by the shared_task decorator.

from tasks import generate_report

@login_required()
def my_view(request):
    ...
    generate_report.delay(ini_date, final_date, email)
    return "You will receive an email when the report is done"

 

Running celery workers

Now you have to run the celery workers so they can execute the tasks getting the messages from the RabbitMQ Broker. By default, celery starts one worker per available CPU. But you can change it using the concurrency parameter (-c)

celery --app=nome_projeto worker --loglevel=INFO

And your screen will look like this:

celery-rodando

In another terminal you can open the shell and call your task to see it working:

In [1]: from app.tasks import generate_report

 In [2]: generate_report("2012-01-01", "2015-03-14", "[email protected]")

 

And you will see something like this on Celery:

aparece no celery

Deploying Celery

To use celery in production you’ll need a process control system like Supervisor

To install supervisor:

sudo apt-get install supervisor

Now you have to create a configuration file for celery in /etc/supervisor/conf.d/

[program:celery]
command=/home/deploy/.virtualenvs/my_env/bin/celery --app=proj_name worker --loglevel=INFO
directory=/home/deploy/webapps/django_project
user=user_name
autostart=true
autorestart=true
redirect_stderr=true

Now inform Supervisor that there is a new process:

sudo supervisorctl reread
sudo supervisorctl update

And now starts celery on supervisor:

sudo supervisorctl start celery

My presentation in Brazilian Portuguese:

Originally posted in Portuguese

Sources:

http://www.celeryproject.org/

https://www.rabbitmq.com/

http://stackoverflow.com/questions/9140716/whats-the-advantage-of-using-celery-with-rabbitmq-over-redis-mongodb-or-django/9176046#9176046

http://www.slideshare.net/fireantology/europython-2011-playing-tasks-with-django-celery?qid=f37c05cf-f153-4ee3-bbbb-3e6e48a10e4d&v=default&b=&from_search=3

https://speakerdeck.com/allisson/tarefas-assincronas-com-django-e-celery?#stargazers

http://www.slideshare.net/alexef/django-celery-18659986?qid=f37c05cf-f153-4ee3-bbbb-3e6e48a10e4d&v=default&b=&from_search=6

http://www.slideshare.net/idangazit/an-introduction-to-celery

http://michal.karzynski.pl/blog/2014/05/18/setting-up-an-asynchronous-task-queue-for-django-using-celery-redis/

http://supervisord.org/

http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django

http://www.onurguzel.com/supervisord-restarting-and-reloading/

Bonus:

How Instagram Feed Work – Celery and RabbitMQ

Tarefas demoradas de forma assíncrona com Django e Celery

Estou escrevendo esse post empolgado por uma lightning talk que fiz no GruPy-SP (16/07). Assim aproveito pra explicar um pouco mais o que estou dizendo nos slides.

O problema de tarefas demoradas do lado do servidor

Sempre que o cliente faz uma requisição web (request), o servidor faz um processamento, normalmente pra quem usa o django, ele lê a requisição, trata os dados recebidos, salva ou recupera registros do banco de dados (através dos models), faz algum processamento do que será exibido para o usuário, renderiza isso em um template e manda uma resposta (response) para o cliente.

Dependendo da tarefa que você executa no servidor a resposta pode demorar muito e isso leva à problemas de TimeOut, a experiência do usuário fica comprometida (quanto mais o servidor demorar pra responder maior a chance do usuário sair do seu site) e também pode acontecer da lentidão ser causada por outros, como por exemplo o uso de uma API externa.

Existem diversas tarefas que podem demorar pra ser executadas. Se você tiver que criar um relatório pesado, acionado por um client web. Se de repente você precisar enviar diferentes emails para uma lista ou por exemplo editar um vídeo depois que o usuário faz o upload na sua página.

Caso real

Esse é um problema que me deparei um dia na geração de um relatório. O relatório levava cerca de 20 minutos para ser gerado, o cliente recebia um erro de timeout e obviamente não dá pra esperar 20 minutos pra gerar alguma coisa. Pra driblar isso e deixar a tarefa em background, alguém resolveu usar um comando do sistema operacional. (No linux se você colocar & no final de um comando esse comando roda em background).

django-view-os-system

Note que em seguida o django envia mostra uma mensagem ao usuário de que ele será avisado por e-mail ao final da geração do relatório.
Alguma coisa não parece certa aqui.

Me arrependo imediatamente dessa decisão

Celery – A solução para esses problemas!

O Celery é um gerenciador de tarefas assíncronas. Com ele você pode executar uma fila de tarefas (que ele recebe por meio de mensagens), pode agendar tarefas direto no seu projeto sem precisar do cron e ele ainda tem integração fácil com a maioria dos frameworks python mais utilizados (django, flask, tornado, etc.)

Como o Celery funciona

Essa imagem kibada da internet dá uma visão geral de como fica o fluxo da sua aplicação.

estrutura-celery

  • O User (ou Client ou Producer) é a sua aplicação Django.
  • O AMPQ Broker é um Message Broker. Um programa responsável por manter a fila de mensagens que serão trocadas entre o seu programa e o Celery, geralmente é o RabbitMQ ou o Redis
  • Os workers (ou consumers) que vão executar as tarefas que você quer que sejam assíncronas.
  • O Result Store, que é onde os workers podem ou não salvar os resultados das tarefas que foram executadas.

O Client pode passar uma tarefa, uma lista de tarefas, tarefas periódicas ou o retry de alguma tarefa pra fila do Message Broker. O Message Broker distribui essas tarefas entre os workers e o resultado dessas tarefas pode ser escrito em um Result Store (Memcahed, RDBMS, MongoDB, etc…) que mais tarde pode ser lido pelo Client novamente.

Qual Broker utilizar?

O recomendado pelo Celery é o RabbitMQ.

Instalando e configurando o RabbitMQ

Existem vários exemplos de como utilizar o Redis, eu vou mostrar como usar o RabbitMQ.

Você só precisa:

  1. Instalar o RabbitMQ (Baixar o pacote no site ou adicionar o repo deles no sources.list e instalar com APT-GET)
    sudo apt-get install rabbitmq-server
  2. Criar um usuário, um virtual host e dar permissões para esse usuário no virtualhost:
    
    sudo rabbitmqctl add_user myuser mypassword
    sudo rabbitmqctl add_vhost myvhost
    sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
    

Instalando e configurando o Celery

pip install celery

No seu settings.py:

#Celery Config
BROKER_URL = 'amqp://guest:[email protected]:5672//'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

Na pasta do projeto (a mesma pasta do settings.py) crie um arquivo chamado celery.py:


from __future__ import absolute_import

import os
from celery import Celery
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'nome_do_proj.settings')

app = Celery('nome_do_proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

Esse autodiscover_tasks permite que seu projeto ache todas as tarefas assíncronas de cada app instalada nele. Nessa mesma pasta do settings.py, você vai modificar o arquivo de pacote __init__.py:


from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

Criando as tarefas assíncronas do seu app

Na pasta do seu app crie um arquivo chamado tasks.py:

from __future__ import absolute_import
from celery import shared_task
from relatorios import gerar_relatorio_excel


@shared_task
def gerar_relatorio(data_inicial, data_final, email):
    gerar_relatorio_excel(
        data_inicial = data_inicial,
        data_final = data_final,
        email = email
    )
    return "Relatorio Excel gerado"

Agora é só fazer importar essa função que foi criada usando o método delay que foi adicionado à sua função pelo decorator shared_task.

tabelao assincrono

Rodando o Celery

Agora você precisa rodar o celery. Ele vai rodar os seus workers pra executar suas tarefas de forma assíncrona. Por padrão ele inicia um worker para cada núcleo de processador, mas você pode determinar quantos com -c Numero_de_workers

celery --app=nome_projeto worker --loglevel=INFO

E então você verá essa tela:

celery-rodando

Você pode então testar sua função assíncrona com o shell do django mesmo:

chama funcao

E o que aparece na tela que está mostrando o celery:

aparece no celery

Colocando o Celery em Produção

Para usar o Celery em produção eu recomendo que você use o Supervisor

Para instalar o supervisor:

sudo apt-get install supervisor

Depois crie um arquivo de configuração para o celery e coloque um link simbólico para esse arquivo no /etc/supervisor/conf.d/

[program:celery]
command=/home/deploy/.virtualenvs/meu_virtual_env/bin/celery --app=nome_proj worker --loglevel=INFO
directory=/home/deploy/webapps/projeto_django
user=nobody
autostart=true
autorestart=true
redirect_stderr=true

Para o supervisor ler novamente os arquivos que estão em sua pasta de configuração:

sudo supervisorctl reread
sudo supervisorctl update

Agora é só iniciar o celery:

sudo supervisorctl start celery

Bom, acho que é só isso. Esses são os slides da minha apresentação:

Continuação

Celery com múltiplas filas, retry e tarefas agendadas

Fontes

E aqui vão as fontes de pesquisa (ou algumas) que eu utilizei nesse processo:

Bônus:

Como o Feed do Instagram usa o Celery e o RabbitMQ