随心一记

一二三四五,上山打老鼠


  • 首页

  • 归档

  • 标签
ywcsb

ywcsb

游戏可以不玩,小说不能不看。

153 日志
3 分类
42 标签
RSS
GitHub 知乎 随心一记
Links
  • 随心一记
  • 追梦人物的
  • MSDN

celery rabbitmq 延迟消息

发表于 2023-02-17 | 阅读 1544 | 分类于 Python |

使用 RabbitMQ 延迟消息插件来调度celery任务。

RabbitMQ 延迟消息插件:

RabbitMQ 延迟交换插件用于实现消息到达交易所和传递到队列之间的等待时间。每次发布消息时,都可以指定以毫秒为单位的偏移量。

我们可以声明类型为“x-delayed-message”的交换,然后使用自定义标头 x-delay 发布任务,以毫秒为单位表示任务的延迟时间。消息将在 x 延迟毫秒后传递到相应的队列。

对 rabbitMq 队列和交换感到困惑,请单击此处获取相同的惊人解释(来自 rabbitMq 的核心开发人员之一)

此方法的问题:

  1. Celery 不支持开箱即用的“RabbitMQ 延迟消息插件”。因此,我们需要在 rabbit-MQ 中手动安装插件。
  2. 此插件的最新版本针对 RabbitMQ 3.10.x。3.9.x 之前的系列不受支持。
  3. 如果延迟消息的总数超过特定数量 (https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72)
  4. 有关详细信息,请参阅 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange#limitations

实施步骤

1、安装rabbitmq 延迟消息插件,创建延迟消息exchange,celery只能创建topic的交换机不会创建x-delayed-message交换。

首先,在rabbitmq中安装rabbitmq_delayed_message_exchange插件。可以使用rabbitmq-plugins命令启用插件。如下所示。

注意:有可能会提示没有此插件,需要去GitHub去下载插件并且上传到服务器。

# 开启 x-delayed-message类型的插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

然后再创建一个名为delay-tasks的延迟交换机,可以使用以下命令创建一个x-delayed-message类型的交换机。

rabbitmqadmin declare exchange name=delay-tasks type=x-delayed-message arguments='{"x-delayed-type":"direct"}'

或者直接在管理页面创建

image-20230217144200753

声明使用该类型的交换,然后发布带有自定义标头的消息,以毫秒为单位表示消息的延迟时间。消息将在毫秒后传递到相应的队列。x-delayed-message x-delay

2、celery配合rabbitmq延迟消息使用,创建一个celery应用程序并且使用rabbitmq的延迟消息。

import os
from typing import NamedTuple

from celery import Celery, platforms
from django.conf import settings
from kombu import Exchange, Queue

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'devops_api.settings')

app = Celery('devops_api', broker='amqp://guest@localhost//')

# 获取Django settings中的配置。
# app.config_from_object('django.conf:settings')
# 获取Django的应用配置 文件名需要命名为tasks.py
# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 可以使用root启动程序,不然使用root启动会出现报错。
# platforms.C_FORCE_ROOT = True
# 配置默认时区
# app.conf.timezone = 'Asia/Shanghai'

# 定义结构
class DelayedTaskDeliveryKit(NamedTuple):
    destination_queue: Queue
    destination_exchange: Exchange
    routing_key: str


def get_delayed_task_delivery_kit(destination_queue_name: str) -> DelayedTaskDeliveryKit:
    """
    For getting the utils using which we can schedule celery tasks.
    Publish tasks with the custom header x-delay expressing a delay time
    for the task in milliseconds.The message will be delivered to the
    respective queues after x-delay milliseconds.
    (For setting 'x-delay' header,
    see: https://stackoverflow.com/questions/35449234/how-could-i-send-a-delayed-message-in-rabbitmq-using-the-rabbitmq-delayed-messag )
    """
    # 要使用延迟消息功能,
    # 声明一个 x 延迟消息类型的交换
    destination_exchange = Exchange(
            destination_queue_name,
            type='x-delayed-message',
            # 可以在 exchange.declare 期间传递的 x 延迟类型参数。
            # 这里我们使用“直接”作为交换类型。这意味着插件
            # 将具有与直接交换相同的路由行为。
            arguments={"x-delayed-type": "direct"},
    )
    destination_queue = Queue(
            destination_queue_name,
            exchange=destination_exchange,
            routing_key=destination_queue_name,
    )
    return DelayedTaskDeliveryKit(
            destination_queue=destination_queue,
            destination_exchange=destination_exchange,
            routing_key=destination_queue_name,
    )
# 更改celery的默认队列,需要将要创建的队列添加到celery配置中,因为对于celery,默认的交换机类型是direct, 对于使用rabbitmq 延迟消息插件的延迟任务,需要我们自己手动声明。
get_delayed_task_delivery: DelayedTaskDeliveryKit = get_delayed_task_delivery_kit("delay-tasks")
# 添加Queue 队列必须先添加一个默认的队列不然有些 tasks会找不到路由,
app.conf.task_default_queue = 'devops_api'
app.conf.task_queues = [
    # 添加默认的 Queue
    Queue('devops_api', routing_key='task.#'),
    # 添加刚刚创建的 Queue
    get_delayed_task_delivery.destination_queue
]

3、定义一个celery任务

from celery_app import app
@app.task()
def add(a: int, b: int):
    print(f"{a} + {b} is {a+b}")

5、将任务发布到celery add

需要使用apply_async方法将celery任务发送到延迟队列,并添加x-delay,指定延迟值(以毫秒为单位)。

from tasks import add

if __name__ == '__main__':
  add.apply_async(kwargs={"a": 1, "b": 3},, queue='delay-tasks', headers={'x-delay': 10000}
觉得不错,支持一下!
geerniya WeChat Pay

微信打赏

geerniya Alipay

支付宝打赏

# Python # 系统运维 # PyCharm # Django
ssh MaxStartups 导致ansible使用代理连接时出现断开的情况
Python JsonPath模块使用

发表评论

共 0 条评论

    暂无评论
© 2018 - 2022 ywcsb
冀ICP备17022045号-1
Supported by 腾讯云