伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

一文搞定 celery 任务远程调用

来源:本站原创 浏览:128次 时间:2022-02-18

celery 是分布式的异步任务队列,既然是分布式,那么肯定是支持远程调度任务的,那么它是如何实现的呢?

celery 主要是通过中间人来实现远程调度的,中间人 broker 的工具如 RabbitMQ,Redis 服务支持远程访问。

由于官方的示例都是基于本地的任务调用,本文向大家展示如何使用 Celery 调用远程主机上的任务- 在主机 C 上调用主机 A 上的任务 taskA,调用主机 B 上的任务 taskB。

主机C ip地址:192.168.0.107  主机A ip地址:192.168.0.111   主机B ip地址:192.168.0.112

基于上一篇文章 分布式异步任务队列神器-Celery 中 myCeleryProj 项目的源码。

开始动手:

第一步:定义任务队列。

修改 settings.py 使任务 taskA 运行在队列 tasks_A 上,任务 taskB 运行在队列 tasks_B 上,中间人均指向主机 C 上的 redis 数据库:redis://192.168.0.107:6379/0,redis 数据库不是必须在主机C上启用,redis 数据库可运行在任意一台主机上,只要确保其允许远程访问即可。完整的settings.py如下所示

from kombu import Queue

CELERY_TIMEZONE='Asia/Shanghai'

CELERY_QUEUES = (  # 定义任务队列
   Queue("default", routing_key="task.#"),  # 路由键以“task.”开头的消息都进default队列
   Queue("tasks_A", routing_key="A.#"),  # 路由键以“A.”开头的消息都进tasks_A队列
   Queue("tasks_B", routing_key="B.#"),  # 路由键以“B.”开头的消息都进tasks_B队列
)

CELERY_ROUTES = (
  [
      ("myCeleryProj.tasks.add", {"queue": "default"}), # 将add任务分配至队列 default
      ("myCeleryProj.tasks.taskA", {"queue": "tasks_A"}),# 将taskA任务分配至队列 tasks_A
      ("myCeleryProj.tasks.taskB", {"queue": "tasks_B"}),# 将taskB任务分配至队列 tasks_B
  ],
)

BROKER_URL = "redis://192.168.137.129:6379/0"  # 使用redis 作为消息代理

CELERY_RESULT_BACKEND = "redis://192.168.137.129:6379/0"  # 任务结果存在Redis

CELERY_RESULT_SERIALIZER = "json"  # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
第二步:确认中间人已启动,并部署源码。

1 确保主机C上的redis数据库服务已经启动,如有以下信息说明已经成功启动。

$ps -ef|grep redis
aaron 2229 2187 0 21:43 pts/1 00:00:00 ./redis-server 0.0.0.0:6379
aaron 2452 2437 0 21:47 pts/2 00:00:00 grep --color=auto redis

2 将 myCeleryProj 目录分别复制到三台主机中。

scp -r myCeleryProj aaron@192.168.0.111:~
scp -r myCeleryProj aaron@192.168.0.112:~
scp -r myCeleryProj aaron@192.168.0.107:~
第三步:启动主机 A、B 上的 worker。

在主机A 上启用worker,监控队列tasks_A(前提是已经完安装python库celery和redis),命令如下所示:

celery -A myCeleryProj.app worker -Q tasks_A -l info

在主机B上执行同样的操作:

celery -A myCeleryProj.app worker -Q tasks_B -l info
第四步:调用程序。

编写 start_tasks.py,如下所示:

from myCeleryProj.tasks import taskA,taskB
import time
#异步执行 方法一
#resultA = taskA.delay()
#resultB = taskB.delay()

#异步执行 方法二
resultA = taskA.apply_async()
resultB = taskB.apply_async(args=[])
resultC = taskA.apply_async(queue='tasks_B')

while not (resultA.ready() and resultB.ready()):# 循环检查任务是否执行完毕
   time.sleep(1)

print(resultA.successful()) #判断任务是否成功执行
print(resultB.successful()) #判断任务是否成功执行

其中上述代码第 9 行可以通过指定 queue=’tasks_B’ 的方式来在调用任务时改变taskA执行的队列,这在实用中是非常方便的。
执行 python start_tasks.py 得到如下结果

aaron@ubuntu:~$ python start_tasks.py
True
True

主机A,主机B上worker运行情况如下所示:

主机 A

主机 B

可以看出在主机 B 上,我们调用时指定队列的 taskA 也在队列 tasks_B 执行。


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net