Общие ссылки
Пример использования
источник: https://habr.com/ru/company/biggo/blog/102742/
Начнем c конфигурации worker'a. Это демон, который собственно получает задания из очереди и выполняет их. Рекомендуемая очередь — RabbitMQ
Конфигурация хранится celeryconfig.py
Запуск демона: celeryd -l INFO -B Включаем логгирование в консоль и опция -B запуск демона периодических заданий. Последний можно запустить отдельно коммандой celerybeat
Теперь создадим тестовое задание. В конфиге мы импортируем tasks, поэтому и файл заданий у нас tasks.py:
from celery.decorators import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab
@periodic_task(run_every=timedelta(seconds=60))
def mail_queue():
print "Task is executed every minute"
@periodic_task(run_every=crontab(hour=0, minute=10))
def transactions():
print "Task is executed every day on 0:10"
@task
def delayed_function(id):
some_function()
@task
def delayed_heavy_function(id):
some_heavy_function()
Итак, у нас 4 задания в tasks. Первые два выполняются по расписанию, т.к. они отмечены декоратором @periodic_task. А вот два последних будут вызваны непосредственно из кода программы. Вот таким образом:
from tasks import delayed_function, delayed_heavy_function
delayed_function.apply_async(args=[id], countdown=300) # Будет запущена через 300 секунд
r = delayed_heavy_function.delay(id) #Будет запущена сразу(как только появится возможность), в асинхронном режиме
Первые шаги с Celery
источник: https://docs.celeryproject.org/en/master/getting-started/first-steps-with-celery.html
Выберем брокера сообщений
остановлюсь на RabbitMQ
установка
sudo apt-get install rabbitmq-server
или запустить через docker
docker run -d -p 5672:5672 rabbitmq
Установка Celery
pip install celery
Application
создадим файл tasks.py
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
Запуск Celery worker server
celery -A tasks worker --loglevel=INFO
В продакшене надо будет запустить worker как демон.
Вызов задачи
>>> from tasks import add
>>> add.delay(4, 4)
результатом работы будет AsyncResult
Сохранение результата работы задачи
Для сохранения состояния задачи используется так называемые result backends to choose from: SQLAlchemy/Django ORM, MongoDB, Memcached, Redis, RPC (RabbitMQ/AMQP), and – или что-то определяемые вами.
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
>>> result = add.delay(4, 4)
>>> result.ready() # возвращает завершен ли процесс или нет
False
>>> result.get(timeout=1) # получаем результат задачи
8
Бэкенды используют ресурсы для сохранения и передачу результатов, поэтому нужно вызывать или get(), или forget() для каждого AsyncResult экземпляра который возвращается после вызова задачи.
Конфигурация
можно конфигурировать в самом приложении или в отдельном модуле.
app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)
или
app.config_from_object('celeryconfig')
и определяем модуль celeryconfig.py:
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
для проверки есть ли синтаксические ошибки можно использовать следующее:
python -m celeryconfig
Также можно определить конфигурационный класс/объект:
from celery import Celery
app = Celery()
class Config:
enable_utc = True
timezone = 'Europe/London'
app.config_from_object(Config)
# or using the fully qualified name of the object:
# app.config_from_object('module:Config')
Конфигурация через переменные окружения, например CELERY_CONFIG_MODULE - модуль конфигурации
import os
from celery import Celery
#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')
app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')
CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l INFO
Использование Celery в приложении
proj/__init__.py
/celery.py
/tasks.py
proj/celery.py
from celery import Celery
app = Celery('proj',
broker='amqp://',
backend='rpc://',
include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
proj/tasks.py
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
Запуск worker
celery -A proj worker -l INFO
запуск в фоновом режиме: смотрите Daemonization
$ mkdir -p /var/run/celery
$ mkdir -p /var/log/celery
$ celery multi start w1 -A proj -l INFO --pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n%I.log
Вызов/запуск задач
>>> from proj.tasks import add
>>> add.delay(2, 2)
>>> add.apply_async((2, 2))
>>> add.apply_async((2, 2), queue='lopri', countdown=10)
>>> add(2, 2)
4
Если хотите получите результат:
>>> res = add.delay(2, 2)
>>> res.get(timeout=1)
4
>>> res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114
# статус задачи
>>> res.failed()
True
>>> res.successful()
False
>>> res.state
'FAILURE
Задача может переходить с следующие статусы:
PENDING -> STARTED -> SUCCESS
Если задача будет
PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
Примитивы
Groups - Группы
группы вызывают параллельно список задач и возвращает специальный экземплятор который можно получать результат по порядку
>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(i, i) for i in range(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
- Partial groups
>>> g = group(add.s(i) for i in range(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Chains - цепочки
связываем цепочки вызово различные задачи которые выполняются последовательно используя результат предыдущей задачи
>>> from celery import chain
>>> from proj.tasks import add, mul
# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64
Chords - связка
chord - это группа с callback
>>> from celery import chord
>>> from proj.tasks import add, xsum
>>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
90
Маршрутизация
app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'hipri'},
},
)
>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')
$ celery -A proj worker -Q hipri
Удаленное управление
можно контролировать и мониторить worker'ов в рантайме.
Можно посмотреть какие воркеры работают сейчас :
celery -A proj inspect active
Получить помощь по команду мониторинга:
celery -A proj inspect --help
Получить помощь по команде контроля:
celery -A proj control --help
Можно насильно воркерам включить event messages (используется для мониторига и конроля воркеров):
celery -A proj control enable_events
Когда события включены можно запустить событие dumper и посмотреть что делают воркеры:
celery -A proj events --dump
Завершить мониториг:
celery -A proj control disable_events
Статус удаленного управления и вывод списка воркеров которые онлайн в класетере:
celery -A proj status
Временные зоны
app.conf.timezone = 'Europe/London'