Fork me on GitHub

Celery - планировщик фоновых и возможно периодческих задач

Общие ссылки

Пример использования

источник: https://habr.com/ru/company/biggo/blog/102742/

Начнем c конфигурации worker'a. Это демон, который собственно получает задания из очереди и выполняет их. Рекомендуемая очередь — RabbitMQ

Конфигурация хранится celeryconfig.py

Запуск демона: celeryd -l INFO -B Включаем логгирование в консоль и опция -B запуск демона периодических заданий. Последний можно запустить отдельно коммандой celerybeat

Теперь создадим тестовое задание. В конфиге мы импортируем tasks, поэтому и файл заданий у нас tasks.py:

from celery.decorators import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab

@periodic_task(run_every=timedelta(seconds=60))
def mail_queue():

    print "Task is executed every minute"

@periodic_task(run_every=crontab(hour=0, minute=10))
def transactions():
    print "Task is executed every day on 0:10"


@task
def delayed_function(id):
    some_function()

@task
def delayed_heavy_function(id):
    some_heavy_function()

Итак, у нас 4 задания в tasks. Первые два выполняются по расписанию, т.к. они отмечены декоратором @periodic_task. А вот два последних будут вызваны непосредственно из кода программы. Вот таким образом:

from tasks import delayed_function, delayed_heavy_function

delayed_function.apply_async(args=[id], countdown=300) # Будет запущена через 300 секунд

r = delayed_heavy_function.delay(id) #Будет запущена сразу(как только появится возможность), в асинхронном режиме

Первые шаги с Celery

источник: https://docs.celeryproject.org/en/master/getting-started/first-steps-with-celery.html

Выберем брокера сообщений

остановлюсь на RabbitMQ

установка

 sudo apt-get install rabbitmq-server

или запустить через docker

docker run -d -p 5672:5672 rabbitmq

Установка Celery

pip install celery

Application

создадим файл tasks.py

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

Запуск Celery worker server

celery -A tasks worker --loglevel=INFO

В продакшене надо будет запустить worker как демон.

Вызов задачи

>>> from tasks import add
>>> add.delay(4, 4)

результатом работы будет AsyncResult

Сохранение результата работы задачи

Для сохранения состояния задачи используется так называемые result backends to choose from: SQLAlchemy/Django ORM, MongoDB, Memcached, Redis, RPC (RabbitMQ/AMQP), and – или что-то определяемые вами.

app = Celery('tasks', backend='rpc://', broker='pyamqp://')
>>> result = add.delay(4, 4)

>>> result.ready()   # возвращает завершен ли процесс или нет
False

>>> result.get(timeout=1) # получаем результат задачи
8

Бэкенды используют ресурсы для сохранения и передачу результатов, поэтому нужно вызывать или get(), или forget() для каждого AsyncResult экземпляра который возвращается после вызова задачи.

Конфигурация

можно конфигурировать в самом приложении или в отдельном модуле.

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

или

app.config_from_object('celeryconfig')

и определяем модуль celeryconfig.py:

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

для проверки есть ли синтаксические ошибки можно использовать следующее:

python -m celeryconfig

Также можно определить конфигурационный класс/объект:

from celery import Celery

app = Celery()

class Config:
    enable_utc = True
    timezone = 'Europe/London'

app.config_from_object(Config)
# or using the fully qualified name of the object:
#   app.config_from_object('module:Config')

Конфигурация через переменные окружения, например CELERY_CONFIG_MODULE - модуль конфигурации

import os
from celery import Celery

#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')
CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l INFO

Использование Celery в приложении

proj/__init__.py
    /celery.py
    /tasks.py

proj/celery.py

from celery import Celery

app = Celery('proj',
             broker='amqp://',
             backend='rpc://',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

proj/tasks.py

from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)

Запуск worker

celery -A proj worker -l INFO

запуск в фоновом режиме: смотрите Daemonization

$ mkdir -p /var/run/celery
$ mkdir -p /var/log/celery
$ celery multi start w1 -A proj -l INFO --pidfile=/var/run/celery/%n.pid \
                                        --logfile=/var/log/celery/%n%I.log

Вызов/запуск задач

>>> from proj.tasks import add

>>> add.delay(2, 2)

>>> add.apply_async((2, 2))

>>> add.apply_async((2, 2), queue='lopri', countdown=10)

>>> add(2, 2)
4

Если хотите получите результат:

>>> res = add.delay(2, 2)
>>> res.get(timeout=1)
4

>>> res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114

#  статус задачи
>>> res.failed()
True

>>> res.successful()
False

>>> res.state
'FAILURE

Задача может переходить с следующие статусы:

PENDING -> STARTED -> SUCCESS

Если задача будет

PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

Примитивы

Groups - Группы

группы вызывают параллельно список задач и возвращает специальный экземплятор который можно получать результат по порядку

>>> from celery import group
>>> from proj.tasks import add

>>> group(add.s(i, i) for i in range(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
  • Partial groups
>>> g = group(add.s(i) for i in range(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Chains - цепочки

связываем цепочки вызово различные задачи которые выполняются последовательно используя результат предыдущей задачи

>>> from celery import chain
>>> from proj.tasks import add, mul

# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64

Chords - связка

chord - это группа с callback

>>> from celery import chord
>>> from proj.tasks import add, xsum

>>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
90

Маршрутизация

app.conf.update(
    task_routes = {
        'proj.tasks.add': {'queue': 'hipri'},
    },
)
>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')
$ celery -A proj worker -Q hipri

Удаленное управление

можно контролировать и мониторить worker'ов в рантайме.

Можно посмотреть какие воркеры работают сейчас :

celery -A proj inspect active

Получить помощь по команду мониторинга:

celery -A proj inspect --help

Получить помощь по команде контроля:

celery -A proj control --help

Можно насильно воркерам включить event messages (используется для мониторига и конроля воркеров):

celery -A proj control enable_events

Когда события включены можно запустить событие dumper и посмотреть что делают воркеры:

celery -A proj events --dump

Завершить мониториг:

celery -A proj control disable_events

Статус удаленного управления и вывод списка воркеров которые онлайн в класетере:

celery -A proj status

Временные зоны

app.conf.timezone = 'Europe/London'

social