em Django, Português, programação, Python

Celery com múltiplas filas, retry e tarefas agendadas

No ano passado eu escrevi um post sobre Tarefas demoradas de forma assíncrona com Django e Celery, que serve para a maioria dos casos de uso do celery. O problema é que você pode querer e fazer muito mais com o celery, como ter filas diferentes para tarefas prioritárias, executar um retry após a falha de uma tarefa ou agendar a tarefa para ser executada apenas em outro horário ou outro dia.

Retry em uma Tarefa

Suponhamos que isso seja uma caveira que sua tarefa depende de uma API externa, conexão com algum site ou algum outro motivo que possa levá-la a um ConnectionError, por exemplo.

É provável que dentro de alguns instantes a API, site, etc. esteja no ar e que se você executar sua tarefa novamente ela funcione. Para isso podemos agendar um retry para esta tarefa:


from celery import shared_task
 
@shared_task(bind=True, max_retries=3)
def acessar_sistema_horrivel(self, meu_objeto_id):
    from core.models import MeuObjeto
    from requests import ConnectionError
 
    objeto = MeuObjeto.objects.get(pk=meu_objeto_id)
 
    # Se tiver ConnectionError tenta de novo em 180 segundos
    try:
 
        objeto.acessar_sistema_horrivel()
  
    except ConnectionError as exc:
        self.retry(exc=exc, countdown=180)
 
 

Interessante que dentro de uma função você está usando self.retry. Isso é possível graças ao bind=True no decorator do shared_task, que faz com que nossa função acessar_sistema_horrivel seja na verdade um método da classe Task. Isso nos obriga a colocar self como primeiro argumento da função também.

O número máximo de tentativas para executar essa tarefa é o max_retries=3 definido no decorator

ETA – Agendando a tarefa para começar mais tarde

Agora o cenário é o seguinte, seu sistema vai chamar uma tarefa assíncrona, mais quer que ela seja executada apenas daqui a uma hora.

Para isso podemos chamar a task setando a propriedade ETA (estimated time of arrival) e isso significa que sua tarefa só será executada depois daquele horário (não necessariamente para começar aquele horário, isso vai depender se você possui workers disponíveis, para agendar exatamente como no cron você pode usar o CeleryBeat) .

 


from django.utils import timezone

agora = timezone.now() 
 
# Depois é daqui a uma hora 
#(usei o replace pq quis mostrar, mas vc poderia usar o timedelta para somar também)
depois = agora.replace(hour=agora.hour + 1)
 
acessar_sistema_horrivel.apply_async((meu_objeto_id), eta=depois)
 

Colocando mais de uma fila de execução

Quando você executa o celery ele cria uma fila no seu broker (no caso do post anterior, era o RabbitMQ). A fila padrão chama celery. Qual a implicação? Se você possui várias tarefas assíncronas a serem executadas ficam todas na mesma fila, das qual os workers vão pegar as mensagens e executar.

Imagine que temos além da task acessar_sistema_horrivel, uma outra task chamada tarefa_demorada.

Então dado o seguinte cenário:

3 tarefas: uma lenta e uma mais rápida e a acessar_sistema_horrivel
1 fila
4 workers

E imagine que o sistema produziu 10 requisições da tarefa_demorada para serem executadas e em seguida produziu mais 10 da outra tarefa. O que vai acontecer? Seus workers ficarão todos ocupados buscando executando as requisições de tarefa_demorada que entraram primeiro e nenhum estará trabalhando na tarefa mais rápida.

A solução para isso é produzir filas diferenciadas para as tarefas prioritárias.


# CELERY ROUTES
CELERY_ROUTES = {
    'core.tasks.acessar_sistema_horrivel': {'queue': 'fila_horrivel'},
    'core.tasks.tarefa_demorada': {'queue': 'fila_demorada'},
    'core.tasks.tarefa_mais_rapida': {'queue': 'fila_rapida'},
}

Isso é suficiente para que as mensagens para essas tarefas sejam enviadas para essas filas do RabbitMQ. Detalhe, essas filas são criadas de forma permanente no RabbitMQ, mas isso também pode ser configurado.

Agora para rodar os consumidores para essas filas quando formos executar o celery podemos definir com o parâmetro -Q. No caso fica assim:


celery --app=nome_projeto worker -Q fila_horrivel,fila_demorada,fila_rapida --autoscale=10,5

Estamos iniciando os workers para todas essas filas. Neste caso estou colocando também o parâmetro autoscale, indicando para trabalhar com no mínimo 5 workers e caso haja muitas tarefas o celery adiciona mais workers até o máximo de 10. Isso é bom para economizar recursos do seu sistema, caso você tenha horários com mais produção de tarefas.

Como mostramos no outro post, para usar isto sob o supervisord, você pode modificar aquele mesmo script


[program:celery]
command=/home/deploy/.virtualenvs/meu_virtual_env/bin/celery --app=nome_projeto worker -Q fila_horrivel,fila_demorada,fila_rapida --autoscale=10,5
directory=/home/deploy/webapps/projeto_django
user=nobody
autostart=true
autorestart=true
redirect_stderr=true

Em seguida rodar o reread e o update, porque o arquivo de configuração mudou e reiniciar o supervisord para esse arquivo.


sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl restart gunicorn

Com isso seus workers vão executar tarefas de cada uma das filas, não travando os workers para só um tipo de tarefa. Há diversas outras formas de colocar seus workers para funcionar.

Chamando Tarefas Sequenciais

Outro problema comum, é precisar chamar duas tarefas assíncronas , uma depois da outra. Isso pode acontecer em diversos cenários, ex: o caso de a segunda tarefa receber como parâmetro o resultado da primeira, ou da primeira tarefa salvar algum dado e a segunda tarefa trabalhar aquele dado, etc…

Para isso nós podemos usar o Chain, que é o recomendado ao invés de chamar a segunda tarefa assíncrona com o delay dentro da primeira tarefa.


from celery import chain
from tasks import salvar_dados, trabalhar_dados
 
# Estamos chamando o método s das tasks, que as usa como subtasks de uma task gerada pelo chain.
chain(salvar_dados.s(meu_objeto_id) | trabalhar_dados.s())

A tarefa trabalhar_dados vai utilizar o return ou resultado da tarefa salvar_dados como parâmetro.
O chain em sí está gerando uma task, então para ele também serve chamar o apply_async com um eta definido, por exemplo:


chain(salvar_dados.s(meu_objeto_id) | trabalhar_dados.s()).apply_async(eta=depois)

Ignorando os resultados do ResultBackend

Se você só usa as tasks para executar alguma coisa que não precisa do return da task, você pode ignorar o ResultBackend e ter um ganho de performance.

A maioria dos casos não precisa guardar o resultado do return, então é só uma dica interessante se você já está salvando algo nos models, ou fazendo whatever diretamente dentro da sua task, coloque isso no seu settings.py:


CELERY_IGNORE_RESULT = True

 

Na verdade tem muito mais coisas legais no Celery, que ficarão pra um próximo post.

Fontes

http://docs.celeryproject.org/en/latest/userguide/tasks.html

http://docs.celeryproject.org/en/latest/userguide/optimizing.html#guide-optimizing

https://denibertovic.com/posts/celery-best-practices/

https://news.ycombinator.com/item?id=7909201

http://docs.celeryproject.org/en/latest/userguide/workers.html

http://docs.celeryproject.org/en/latest/userguide/canvas.html

 

Super Bônus

Celery Messaging at Scale at Instagram – Apresentação Pycon 2013

Escreva um comentário

Comentário

Webmentions

  • Tarefas demoradas de forma assíncrona com Django e Celery - Fernando Alves novembro 3, 2016

    […] Celery com múltiplas filas, retry e tarefas agendadas […]