источник: https://docs.celeryproject.org/en/stable/userguide/tasks.html
Basics
Создание задачи используя декоратор task()
from .models import User
@app.task
def create_user(username, password):
User.objects.create(username=username, password=password)
Важно, если используются различные декораторы в комбинации с декоратором task, то он должен быть применен последним, т. е. первая в списке.
@app.task
@decorator2
@decorator1
def add(x, y):
return x + y
Наследование Task
import celery
class MyTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
@app.task(base=MyTask)
def add(x, y):
raise KeyError()
Логгирование
Воркеры автоматически логгируют, но можно вручную определить логгирование.
Хорошей практикой создание основного логгера для всех своих задач в модуле:
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task
def add(x, y):
logger.info('Adding {0} + {1}'.format(x, y))
return x + y
Повторение задачи
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)
Повторение задачи используя задержку
@app.task(bind=True, default_retry_delay=30 * 60) # retry in 30 minutes.
def add(self, x, y):
try:
something_raising()
except Exception as exc:
# overrides the default delay to retry after 1 minute
raise self.retry(exc=exc, countdown=60)
Статусы задач
Встроенные статусы
PENDING
Task is waiting for execution or unknown. Any task id that’s not known is implied to be in the pending state.
STARTED
Task has been started. Not reported by default, to enable please see app.Task.track_started
.
- meta-data
pid and hostname of the worker process executing the task.
SUCCESS
Task has been successfully executed.
- meta-data
result contains the return value of the task.
- propagates
Yes
- ready
Yes
FAILURE
Task execution resulted in failure.
- meta-data
result contains the exception occurred, and traceback contains the backtrace of the stack at the point when the exception was raised.
- propagates
Yes
RETRY
Task is being retried.
- meta-data
result contains the exception that caused the retry, and traceback contains the backtrace of the stack at the point when the exceptions was raised.
- propagates
No
REVOKED
Task has been revoked.
- propagates
Yes
Task classes
@app.task
def add(x, y):
return x + y
=>
class _AddTask(app.Task):
def run(self, x, y):
return x + y
add = app.tasks[_AddTask.name]
Создание экземпляра
from celery import Task
class NaiveAuthenticateServer(Task):
def __init__(self):
self.users = {'george': 'password'}
def run(self, username, password):
try:
return self.users[username] == password
except KeyError:
return False
сохраняем состояние между процессами.
Также можно использовать кеш ресурсов, например, класс задачи кеширует соединение с БД:
from celery import Task
class DatabaseTask(Task):
_db = None
@property
def db(self):
if self._db is None:
self._db = Database.connect()
return self._db
можно добавлять для каждой задачи:
@app.task(base=DatabaseTask)
def process_rows():
for row in process_rows.db.table.all():
process_row(row)
Вызов задачи
источник: https://docs.celeryproject.org/en/stable/userguide/calling.html
Quick Cheat Sheet
- T.delay(arg, kwarg=value) Star arguments shortcut to .apply_async. (.delay(args, *kwargs) calls .apply_async(args, kwargs)).
- T.apply_async((arg,), {'kwarg': value})
- T.apply_async(countdown=10) executes in 10 seconds from now.
- T.apply_async(eta=now + timedelta(seconds=10)) executes in 10 seconds from now, specified using eta
- T.apply_async(countdown=60, expires=120) executes in one minute from now, but expires after 2 minutes.
- T.apply_async(expires=now + timedelta(days=2)) expires in 2 days, set using datetime.
Linking (callbacks/errbacks)
@app.task
def add(x, y):
return x + y
@app.task
def error_handler(request, exc, traceback):
print('Task {0} raised exception: {1!r}\n{2!r}'.format(
request.id, exc, traceback))
add.apply_async((2, 2), link=add.s(16))
add.apply_async((2, 2), link_error=error_handler.s())
add.apply_async((2, 2), link=[add.s(16), other_task.s()])
Проектирование потоков worker'ов
источник: https://docs.celeryproject.org/en/stable/userguide/canvas.html