celery 是分布式的异步任务队列,既然是分布式,那么肯定是支持远程调度任务的,那么它是如何实现的呢?
celery 主要是通过中间人来实现远程调度的,中间人 broker 的工具如 RabbitMQ,Redis 服务支持远程访问。
由于官方的示例都是基于本地的任务调用,本文向大家展示如何使用 Celery 调用远程主机上的任务- 在主机 C 上调用主机 A 上的任务 taskA,调用主机 B 上的任务 taskB。
主机C ip地址:192.168.0.107 主机A ip地址:192.168.0.111 主机B ip地址:192.168.0.112
基于上一篇文章 分布式异步任务队列神器-Celery 中 myCeleryProj 项目的源码。
开始动手:
第一步:定义任务队列。修改 settings.py 使任务 taskA 运行在队列 tasks_A 上,任务 taskB 运行在队列 tasks_B 上,中间人均指向主机 C 上的 redis 数据库:redis://192.168.0.107:6379/0,redis 数据库不是必须在主机C上启用,redis 数据库可运行在任意一台主机上,只要确保其允许远程访问即可。完整的settings.py如下所示
from kombu import Queue
CELERY_TIMEZONE='Asia/Shanghai'
CELERY_QUEUES = ( # 定义任务队列
Queue("default", routing_key="task.#"), # 路由键以“task.”开头的消息都进default队列
Queue("tasks_A", routing_key="A.#"), # 路由键以“A.”开头的消息都进tasks_A队列
Queue("tasks_B", routing_key="B.#"), # 路由键以“B.”开头的消息都进tasks_B队列
)
CELERY_ROUTES = (
[
("myCeleryProj.tasks.add", {"queue": "default"}), # 将add任务分配至队列 default
("myCeleryProj.tasks.taskA", {"queue": "tasks_A"}),# 将taskA任务分配至队列 tasks_A
("myCeleryProj.tasks.taskB", {"queue": "tasks_B"}),# 将taskB任务分配至队列 tasks_B
],
)
BROKER_URL = "redis://192.168.137.129:6379/0" # 使用redis 作为消息代理
CELERY_RESULT_BACKEND = "redis://192.168.137.129:6379/0" # 任务结果存在Redis
CELERY_RESULT_SERIALIZER = "json" # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
第二步:确认中间人已启动,并部署源码。1 确保主机C上的redis数据库服务已经启动,如有以下信息说明已经成功启动。
$ps -ef|grep redis
aaron 2229 2187 0 21:43 pts/1 00:00:00 ./redis-server 0.0.0.0:6379
aaron 2452 2437 0 21:47 pts/2 00:00:00 grep --color=auto redis
2 将 myCeleryProj 目录分别复制到三台主机中。
scp -r myCeleryProj aaron@192.168.0.111:~
scp -r myCeleryProj aaron@192.168.0.112:~
scp -r myCeleryProj aaron@192.168.0.107:~
第三步:启动主机 A、B 上的 worker。在主机A 上启用worker,监控队列tasks_A(前提是已经完安装python库celery和redis),命令如下所示:
celery -A myCeleryProj.app worker -Q tasks_A -l info
在主机B上执行同样的操作:
celery -A myCeleryProj.app worker -Q tasks_B -l info第四步:调用程序。
编写 start_tasks.py,如下所示:
from myCeleryProj.tasks import taskA,taskB
import time
#异步执行 方法一
#resultA = taskA.delay()
#resultB = taskB.delay()
#异步执行 方法二
resultA = taskA.apply_async()
resultB = taskB.apply_async(args=[])
resultC = taskA.apply_async(queue='tasks_B')
while not (resultA.ready() and resultB.ready()):# 循环检查任务是否执行完毕
time.sleep(1)
print(resultA.successful()) #判断任务是否成功执行
print(resultB.successful()) #判断任务是否成功执行
其中上述代码第 9 行可以通过指定 queue=’tasks_B’ 的方式来在调用任务时改变taskA执行的队列,这在实用中是非常方便的。
执行 python start_tasks.py 得到如下结果
aaron@ubuntu:~$ python start_tasks.py
True
True
主机A,主机B上worker运行情况如下所示:
主机 A
主机 B
可以看出在主机 B 上,我们调用时指定队列的 taskA 也在队列 tasks_B 执行。