Celery笔记 - wongxinjie/wongxinjie.github.io GitHub Wiki

celery

celery是一个优秀的任务的分布式任务调度系统,可以取代crontab来做一些任务调度。

纯celery

# coding: utf-8
# sample/celery.py
from __future__ import absolute_import
from celery import Celery

app = Celery('sample',
             include=['sample.tasks'])

app.config_from_object('sample.config')

app.conf.update(CELERY_TASK_RESULT_EXPIRES=3600,)


if __name__ == "__main__":
    app.run()
# coding: utf-8
# sample/config.py
from __future__ import absolute_import

from celery.schedules import crontab
from kombu import Exchange, Queue

BROKER_URL = 'redis://127.0.0.1:6379/2'

# 指定时区
CELERY_TIMEZONE = 'Asia/Shanghai'

CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('for_add_task', Exchange('for_add_task'), routing_key='add_task'),
    Queue('for_minus_task', Exchange('for_minus_task'),
          routing_key='minus_task'),
)

# 指定是那个队列处理
CELERY_ROUTES = {
    'sample.tasks.add': {'queue': 'for_add_task', 'routing_key': 'add_task'},
    'sample.tasks.minus': {'queue': 'for_task_minus',
                           'routing_key': 'minus_task'},
    'sample.tasks.remind': {'queue': 'default', 'routing_key': 'default'}

}

# 添加一些定时任务
CELERYBEAT_SCHEDULE = {
    'add': {
        'task': 'sample.tasks.add',
        'schedule': 10,
        'args': (16, 15),
    },
    'minus': {
        'task': 'sample.tasks.minus',
        'schedule': 30,
        'args': (24, 13),
    },
    'remind': {
        'task': 'sample.tasks.remind',
        # crontab 的最小粒度是minute
        'schedule': crontab(minute='*/1'),
    },
}
# coding: utf-8
# sample/tasks.py
from __future__ import absolute_import
import logging
from datetime import datetime

from sample.celery import app


@app.task(name='sample.tasks.add')
def add(x, y):
    result = x+y
    logging.info('add result %s', str(result))
    return result


@app.task(name='sample.tasks.minus')
def minus(x, y):
    result = x - y
    logging.info('minus result %s', str(result))
    return result


@app.task(name='sample.tasks.remind')
def remind():
    current = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    logging.info('now: %s', current)

调用命令:

celery worker --app=sample -l info

然后:

celery -A sample beat

这个命令会定时将任务发送到celery的任务队列