异步任务 定时任务选型 - CongGreat/async-await GitHub Wiki



1.方案对比

方案 官方文档 应用场景 优点 缺点
APScheduler https://apscheduler.readthedocs.io

/en/latest/userguide.html
定时任务 使用简单,动态添加和删除日任务,

支持持久化,

易于在web框架种集成
不支持异步任务,

没有独立的监控,需要开发
celery https://www.celerycn.io/ru-men/celery-jian-jiehttps://www.celerycn.io/ru-men/celery-jian-jie 异步队列和定时任务 支持异步任务和定时任务

支持持久化

方便扩展

有完整的监控方案
依赖配置更复杂。

定时任务也需要单独启动启动celery进程
scheduler https://schedule.readthedocs.io/en/stable

/_modules/schedule.html#Scheduler
类似于linux的cron 使用简单,也不需要做什么配置; 不支持持久化

没有监控

2.celery的基本介绍

celery是一个简单的、灵活可靠分布式系统和调度框架,用于处理大量的信息(实时信息、定时任务)。

celery是一个功能完备即插即用的任务队列,异步任务的调度工具。

特点:

1.高效,单个celery进程每分钟可以处理数百万个任务。

2.灵活,celery中几乎每个部分都可以自定义扩展。

  • 支持的消息队列系统(redis rabbitmq),
  • 支持的结果存储(redis,mysql, mongodb, es,s3等),
  • 并发支持(prefork, eventlet, gevent, 线程池等),
  • 序列化方式(json,pickle, yaml, msgpack)。

3.celery非常易于集成到一些web开发框架中。

4.高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务。如果连接丢失或发生故障,worker和client 将自动重试,并且一些代理通过主/主或主/副本复制方式支持HA(redis数据会丢失)

celery架构图1

celery各个组件的描述
  • Celery Beat: 定时任务调度器,Beat 进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列

  • Task:任务是构建 Celery 应用程序的组成模块,通过装饰器的方式(@app.task, 其中的app是worker中的application,将任务函数注册到Celery APP中。

  • Celery Worker: 执行任务的消费者,worker是Celery提供的任务执行的单元, 通常会在多台服务器运行多个消费者来提高运行效率。

  • Broker: 消息中间件(消息代理),Celery本身不提供消息服务. 需要对接第三方消息队列或者数据库,接收任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方。

  • Producer: 任务生产者,调用 Celery API 的函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。

  • Backend,结果存储。用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

celery架构图2

celery

task调用流程

在客户端中调用定义的task, celery将task发到broker, 通过delay()调用的过程如图:

具体逻辑描述
  • 在客户端调用 apply_async 的时候,会调用 app.send_task 来具体发送任务

  • send_task 生成任务信息,调用amqp(celery自身对amqp协议的实现)发送任务

    # celery/app/task.py
    def apply_async():
        return app.send_task(
                    self.name, args, kwargs, task_id=task_id, producer=producer,
                    link=link, link_error=link_error, result_cls=self.AsyncResult,
                    shadow=shadow, task_type=self,
                    **options
                )
  • 调用 amqp 创建消息create_task_message,初始化Producer , 发送消息send_task_message;

    # celery/app/base.py
    def send_task():
        ...
        message = amqp.create_task_message(
                task_id, name, args, kwargs, countdown, eta, group_id, group_index,
                expires, retries, chord,
                maybe_list(link), maybe_list(link_error),
                reply_to or self.thread_oid, time_limit, soft_time_limit,
                self.conf.task_send_sent_event,
                root_id, parent_id, shadow, chain,
                ignore_result=ignore_result,
                argsrepr=options.get('argsrepr'),
                kwargsrepr=options.get('kwargsrepr'),
            )
        if connection:
            producer = amqp.Producer(connection, auto_declare=False)
    
        with self.producer_or_acquire(producer) as P:
            with P.connection._reraise_as_library_errors():
                if not ignore_result:
                    self.backend.on_task_call(P, task_id)
                amqp.send_task_message(P, name, message, **options)
        result = (result_cls or self.AsyncResult)(task_id)
        ...
  • producer调用publish发布消息,Channel 负责最终消息发布;

    # celery/app/ampq.py
     def send_task_message():
        ...
        ret = producer.publish(
                    body,
                    exchange=exchange,
                    routing_key=routing_key,
                    serializer=serializer or default_serializer,
                    compression=compression or default_compressor,
                    retry=retry, retry_policy=_rp,
                    delivery_mode=delivery_mode, declare=declare,
                    headers=headers2,
                    **properties
                )
        ...
    
    # kombu/message.py
    def _publish():
        ...
         return channel.basic_publish(
                message,
                exchange=exchange, routing_key=routing_key,
                mandatory=mandatory, immediate=immediate,
                timeout=timeout
            )
详细流程
  1. producer发出调用请求(message包含所调用任务的相关信息)

  2. 对应的交换机 接收请求message 【celery服务启动后,会产生一个或多个交换机(exchanges)】

  3. 交换机根据router_key,将message分发到一个或多个符合条件的队列(queue)

  4.  每个队列上都有一个或多个worker在监听,在监听到符合条件的message到达后,worker负责进行任务处理,任务处理完被确认后,队列中的message将被删除。

启动worker时根据配置监听指定的exchange, router_key, queue,生产者调用任务时根据exchange将消息通过router_key来路由消息,就把消息路由到其对应的队列上。

任务流程图和任务状态图

任务状态 说明
PENDING 任务等待中
STARTED 任务已开始
SUCCESS 任务执行成功
FAILURE 任务执行失败
RETRY 任务将被重试
REVOKED 任务取消

3.项目demo

4.任务监控

基础监控flower:
 celery -A celery_beat flower --address=127.0.0.1 --port=5556

https://flower.readthedocs.io/en/latest/

后续监控方案:

1.Flower+Prometheus+Grafana (待实现)

2.celery signal + statsd + graphite + grafana (待实现)

5.问题和疑点

1.Client发布任务后,任务会以一个消息的形式写入Broker队列,带有任务名称等相关参数,等待Worker获取。即使worker没有启动,消息也会被写入队列。

celery进程挂了之后,重启之后,任务还在队列中。

2.celery的broker目前支持,rabbitMQ , redis,以及数据库的持久化方案。

官方推荐是rabbitMQ,redis. redis目前只支持单机和哨兵。

3.修改定义的task内容,需要重启celery服务进程对任务进行重新注册。

官方推荐的Broker对比
名称 状态 监控 远程控制
RabbitMQ 稳定
Redis 稳定
Amazon SQS 稳定
Zookeeper 实验阶段

6. 具体集成到项目中的实施(待讨论)

定时任务脚本分散,部分是单独启的进程,部分是在项目中以http的方式调用,后台任务是以线程池的方式。

1.任务是否放在当前项目中管理,方便依赖当前项目中的配置,模型,以及代码复用。

  1. 任务最小化。某个大的任务,进行任务拆分。

  2. 如果使用celery,broker的选择。

图片参考:https://blog.csdn.net/weixin_43335288/article/details/123388030

参考:http://v5blog.cn/pages/fbe633/#_2-2-celery%E6%9E%B6%E6%9E%84%E5%9B%BE

celery 相关命令

from celery import Celery
celery_app = Celery("celery_beat",
                    include=[
                        "celery_tasks.tasks_demo.demo2.task4"
                    ])
celery_app.config_from_object(celery_config)
celery_app.conf.task_queues = (
    Queue('test_queue3', routing_key='key2', exchange="ex2"),
)
  1. 启动定时任务:
celery -A celery_app1 beat

celery_app1为Celery实例的名字,比如'celery_beat'

  1. 启动celery worker:
celery -A celery_app1 worker -l info -P eventlet -n w1

参数说明: -A 指定celery app

-P 指定并发方式

-c 并发数

-l 日志等级

-n worker名字 (多worker启动时)

-Q 启动时,指定队列名

3.启动flower监控

celery -A celery_app1 flower  --address=0.0.0.0  --port=5567
⚠️ **GitHub.com Fallback** ⚠️