user photo

Tareas asíncronas y colas de mensajes con Django + Celery + RabbitMQ

Published on
Sergio Perea · 8 min read
python
django
microservicios

Celery es una cola de tareas asincrónica basada en el paso de mensajes distribuidos. Las colas de tareas se utilizan como estrategia para distribuir la carga de trabajo entre diferentes microservicios. En este tutorial explicaré cómo instalar y configurar Celery + RabbitMQ para ejecutar tareas asíncronas en una aplicación Django.

Django

Para trabajar con Celery, también necesitamos instalar RabbitMQ porque Celery requiere una solución externa para enviar y recibir mensajes. Esas soluciones se denominan agentes de mensajes. Actualmente, Celery es compatible con RabbitMQ, Redis y Amazon SQS como soluciones de agente de mensajes.

RabbitMQ

En ocasiones queremos comunicar nuestro sistema con otros sistemas internos o externos. Pero además queremos hacerlo de manera ordenada, rápida y fiable. Y ahí es donde entran los sistemas de colas de mensajes. Éstos surgen ante la necesidad de implantar un intermediario en la comunicación entre dos sistemas.

RabbitMQ es ese intermediario. Se trata de software libre, es multiplataforma e implamenta el estándar AMQP (Advanced Message Queuing Protocol). Se encarga de gestionar colas de mensajes así como de la negociación de los mismos, y por tanto podríamos encajarlo en la categoría de middleware de mensajería o como se le conoce más comunmente: gestor de colas de mensajes.

Los mensajes en RabbitMQ siguen este flujo:

  • El sistema productor genera el mensaje y lo envía a nuestro servidor RabbitMQ
  • RabbitMQ recibe el mensaje y lo enruta hacia su cola correspondiente.
  • El mensaje permanece en la cola hasta que el consumidor lo recibe y confirma dicha recepción.
  • El sistema consumidor procesa el mensaje.

Configurar nuestro servidor RabbitMQ

Para instalar RabbitMQ os recomiendo dockerizarlo. Pero si no podéis o no queréis, la forma más sencilla de instalarlo en Ubuntu es la siguiente:

# instalamos el servidor de RabbitMQ
sudo apt-get install rabbitmq-server 

# creamos un usuario con su password
sudo rabbitmqctl add_user celery_user celery_password 
sudo rabbitmqctl set_user_tags celery_user administrator # le asignamos permisos

# creamos permisos
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl set_user_tags celery_user mytag
sudo rabbitmqctl set_permissions -p myvhost celery_user ".*" ".*" ".*"

Configurar Celery

Antes de nada, debemos incluir los siguientes paquetes en nuestro requirements.txt

  • celery
  • amqp

Tenemos que añadir la siguiente configuración al settings.py de nuestro proyecto. Observa que por seguridad la password de nuestro usuario celery se recoge de las variables de entorno de nuestro Ubuntu, así que no olvides incluirla antes:

BROKER_URL = 'amqp://celery_user:celery_password@localhost:5672'
CELERY_RESULT_BACKEND = 'amqp://celery_user:celery_password@localhost:5672'
# Celery Data Format
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_DEFAULT_QUEUE = 'MiQueue'
CELERY_TIMEZONE = 'Europe/Madrid'
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"

Ahora crearemos un archivo celery.py en la carpeta raíz de nuestra app, con el siguiente contenido:

from __future__ import absolute_import, unicode_literals
import os

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')

app = Celery(project_name_celery)

app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
   print('Request: {0!r}'.format(self.request))

Este código lo único que hace es:

  • Instancia una aplicación celery con un nombre dado (project_name_celery).
  • Cargamos la configuración que antes hemos incluído en nuestro settings.py
  • Activamos la capacidad de descubrir nuevas tareas.

Para que nuestra aplicación Django sea capaz de ver Celery, también tenemos que incluir lo siguiente en el init.py de nuestra carpeta raíz de nuestra app:

from .celery import app as celery_app

__all__ = ['celery_app']

Creando nuestra primera tarea asíncrona
Para crear nuestra primera tarea, podemos crear el archivo tasks.py en la app que queramos e incluir el siguiente código. Observa la anotación que uso al declarar la tarea:



from django.conf import settings
from project.celery import app as celery_app
import time


logger = logging.getLogger(__name__)

@celery_app.task
def hello_world(nombre):
   time.sleep(30)
   print("Hola {nombre}")

Y ahora ya podemos llamarla desde otra parte de nuestro código así:

from project.app.tasks import hola_mundo

def una_funcion_cualquiera(instance=None, created=False, **kwargs):

   hello_world.s(nombre="manolo").apply_async()

Programando tareas (cron)

Podríamos programar tareas dentro de nuestra aplicación Django con el cron del sistema operativo. Pero es mucho más flexible hacerlo con Celery Beats.

Celery Beats es un add-on de Celery que nos permite programar tareas asíncronas de manera parecida a como se hace con cron.

Pero ¿por qué no utilizar cron? Veamos algunas ventajas de utilizar celery sobre cron:

  • Celery + Celerybeat tiene una granularidad más fina que cron. Cron no puede ejecutarse más de una vez por minuto, mientras que Celerý sí.
  • Con una línea cron tienes que llamar a un script o un comando único, con ruta absoluta e información del usuario. Celery llama a funciones de Python, y puede utilizar todas las librerías de Django disponibles. Sólo hay que escribir código integrado en nuestra aplicación.
  • Celery está más indicado en momentos en que necesites coordinar trabajos en varias máquinas, garantizar que los trabajos se ejecuten incluso cuando se agregan o eliminan máquinas de un grupo de trabajo.
  • Celery tiene la capacidad de establecer tiempos de vencimiento para los trabajos, definir trabajos de varios pasos con estilo gráfico en lugar del típico flujo de dependencia.
  • Celery permite tener un único repositorio de lógica de programación que opera de la misma manera en múltiples sistemas operativos y versiones.
  • Programar una tarea Celery existente es tremendamente sencillo. Sólo hay que usar una anotación @periodic_task en la cabecera de la tarea como la siguiente:
@periodic_task(run_every=(crontab(minute='*/1')), name="hola_que_tal")
def hola_que_tal():
    logger = logging.getLogger("django")
    logger.setLevel(logging.INFO)
    logger.info('TAREA EJECUTADA')

Ejecutando todo esto

Necesitamos ejecutar dos workers. Uno para ejecutar Celery y otro para gestionar las tareas programadas. En desarrollo lo haríamos con estas dos llamadas, por separado:

celery --app=MiAPP worker  --loglevel=info -Q MiQueue

celery -A MiAPP beat -l info

Sin embargo, en producción, y por cuestiones de rendimiento, es más recomendable "demonizar" estas llamadas. Para ello, puedes una configuración de supervisor para el worker de Celery en /etc/supervisor/conf.d/ llamado celeryworker.conf:

[program:celeryworker]

; Set full path to celery program if using virtualenv
command=/bin/celery worker -A mycelery --loglevel=INFO

; The directory to your Django project
directory=

; If supervisord is run as the root user, switch users to this UNIX user account
; before doing any processing.
user=

; Supervisor will start as many instances of this program as named by numprocs
numprocs=1

; Put process stdout output in this file
stdout_logfile=/log/celeryworker-supervisor.log

; Put process stderr output in this file
stderr_logfile=/log/celeryworker-supervisor.log

; If true, this program will start automatically when supervisord is started
autostart=true

; May be one of false, unexpected, or true. If false, the process will never
; be autorestarted. If unexpected, the process will be restart when the program
; exits with an exit code that is not one of the exit codes associated with this
; process’ configuration (see exitcodes). If true, the process will be
; unconditionally restarted when it exits, without regard to its exit code.
autorestart=true

; The total number of seconds which the program needs to stay running after
; a startup to consider the start successful.
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600

; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; if your broker is supervised, set its priority higher
; so it starts first
priority=998

Del mismo modo, crearemos otro para el worker de Celery Beat llamado celerybeat.conf:

[program:celerybeat]

; Set full path to celery program if using virtualenv
command=/bin/celery beat -A mycelery --loglevel=INFO

; The directory to your Django project
directory=

; If supervisord is run as the root user, switch users to this UNIX user account
; before doing any processing.
user=

; Supervisor will start as many instances of this program as named by numprocs
numprocs=1

; Put process stdout output in this file
stdout_logfile=/log/celerybeat-supervisor.log

; Put process stderr output in this file
stderr_logfile=/log/celerybeat-supervisor.log

; If true, this program will start automatically when supervisord is started
autostart=true

; May be one of false, unexpected, or true. If false, the process will never
; be autorestarted. If unexpected, the process will be restart when the program
; exits with an exit code that is not one of the exit codes associated with this
; process’ configuration (see exitcodes). If true, the process will be
; unconditionally restarted when it exits, without regard to its exit code.
autorestart=true

; The total number of seconds which the program needs to stay running after
; a startup to consider the start successful.
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600

; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; if your broker is supervised, set its priority higher
; so it starts first
priority=999

Y ahora actualizamos el supervisor:

sudo supervisorctl reread
sudo supervisorctl update

Con la siguiente orden, podemos gestional el comienzo y finalización del demonio de cada worker:

sudo supervisorctl stop celeryworker
sudo supervisorctl start celeryworker
sudo supervisorctl status celeryworker

Bonus track:

Una vez todo esto nos funcione, podremos monitorizar todas nuestras tareas con Flower.

Para instalarlo sólo tendremos que hacer esto:

pip install flower
flower --port=5555

Y ya podremos acceder a través de la url: localhost:5555