RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表

这篇具有很好参考价值的文章主要介绍了RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表

RabbitMQ 3.12 中发布的原生 MQTT 为物联网用例提供了显著的可扩展性和性能改进。

RabbitMQ 3.13 将支持 MQTT 5.0,因此将成为我们使 RabbitMQ 成为领先的 MQTT 代理之一的下一个重要步骤。

这篇博文解释了如何在 RabbitMQ 中使用新的 MQTT 5.0 功能。

1. MQTT概览

MQTT 是物联网 (IoT) 的标准协议。

物联网远程设备在连接到代理时网络质量可能较差。 因此,MQTT是轻量级的:MQTT协议头很小,可以节省网络带宽。

由于物联网设备可能经常断开连接并重新连接(想象一下一辆汽车驶过隧道),MQTT 也很高效:与其他消息传递协议相比,客户端通过更短的握手进行连接和身份验证。

MQTT协议已经存在了很多年。 如下表所示,最新的 MQTT 协议版本为 5.0。

MQTT 版本 CONNECT 数据包中的协议版本 MQTT 规范发布年份 自 Year 以来的 RabbitMQ 支持(版本)
3.1 3 2010 2012 (3.0)
3.1.1 4 2014 2014 (3.3)
5.0 5 2019 2024 (3.13)

值得一提的是,面向用户的协议版本和“内部”协议版本(也称为协议级别)是有区别的。 后者在 CONNECT 数据包中从客户端发送到服务器。 由于面向用户的协议版本 3.1.1 映射到内部协议版本 4,为了避免进一步的混淆,MQTT 委员会决定跳过面向用户的版本 4.0,以便面向用户的版本 5.0 映射到内部协议版本 5。

2. MQTT 5.0 特性

1. 特性概要

附录 C. MQTT v5.0 中的新功能摘要提供了 MQTT 5.0 新功能的完整列表。

由于您在 Web 上找到了很棒的 MQTT 5.0 资源,包括说明性图表和使用模式,因此这篇博文仅关注 RabbitMQ 的细节。 本节介绍 PR #7263 中实现的最重要功能。 对于每个功能,我们提供了一个如何将其与 RabbitMQ 一起使用的示例,或者概述了如何在 RabbitMQ 中实现它的高级描述。

2. Docker中安装RabbitMQ及启用MQTT5.0协议

要自己运行示例,请启动 RabbitMQ 服务器 3.13, 例如,使用以下 Docker 镜像标记:

docker run -it --rm --name rabbitmq -p 1883:1883 -p 15672:15672 -p 15692:15692 rabbitmq:3.13.0-management

在另一个终端窗口中,启用 MQTT 插件:

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt

由于 MQTT 插件是动态启用的,因此 MQTT 插件定义的功能标志被禁用。 启用所有功能标志,包括功能标志:mqtt_v5

docker exec rabbitmq rabbitmqctl enable_feature_flag all

现在,列出功能标志应显示所有功能标志都已启用:

docker exec rabbitmq rabbitmqctl list_feature_flags --formatter=pretty_table

以下示例使用 MQTTX CLI V1.9.4。 我们使用 CLI 而不是图形 UI,以便您可以通过复制粘贴命令轻松运行示例。

所有新功能也适用于 RabbitMQ Web MQTT 插件。

3. MQTT 5.0 功能列表

1. 消息过期

1. 描述

可以为发布到代理的每条消息设置以秒为单位的到期间隔。 如果在该过期时间间隔内未使用邮件,则该邮件将被丢弃或死信。

2. 举例

为 主题 创建订阅。 这将在 RabbitMQ 中创建一个队列。 通过键入终端断开客户端连接。 由于我们使用 600 秒的会话到期间隔,因此此队列将再存在 10 分钟。t/1``Ctrl+C

mqttx sub --client-id sub-1 --topic t/1 --session-expiry-interval 600 --qos 1
…  Connecting...
✔  Connected
…  Subscribing to t/1...
✔  Subscribed to t/1
^C

将消息发布到同一主题,消息过期间隔为 30 秒:

mqttx pub --topic t/1 --message m1 --message-expiry-interval 30 --qos 1
…  Connecting...
✔  Connected
…  Message publishing...
✔  Message published

在接下来的 30 秒内,列出队列:

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages
┌─────────────────────────────┬─────────┬──────────┐
│ name                        │ type    │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 1        │
└─────────────────────────────┴─────────┴──────────┘

等待 30 秒,然后再次列出队列:

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues
┌─────────────────────────────┬─────────┬──────────┐
│ name                        │ type    │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 0        │
└─────────────────────────────┴─────────┴──────────┘

该消息已过期,因为客户端尚未连接到代理以使用该消息。 如果设置了死字策略,则邮件将死信发送到交易所。 在我们的例子中,死字被禁用。 查询 Prometheus 端点可证明经典队列中有 1 条消息已过期。sub-1

curl --silent localhost:15692/metrics | grep rabbitmq_global_messages_dead_lettered_expired_total
# TYPE rabbitmq_global_messages_dead_lettered_expired_total counter
# HELP rabbitmq_global_messages_dead_lettered_expired_total Total number of messages dead-lettered due to message TTL exceeded
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

另一个有趣的功能是以下要求:

服务器发送到客户端的 PUBLISH 数据包必须包含设置为接收值减去应用程序消息在服务器中等待的时间的消息到期间隔。

向代理发送第二条消息,消息到期间隔为 60 秒:

mqttx pub --topic t/1 --message m2 --message-expiry-interval 60 --qos 1

等待 20 秒,然后重新连接订阅客户端:

mqttx sub --client-id sub-1 --topic t/1 --no-clean --session-expiry-interval 0  --qos 1 --output-mode clean
{
  "topic": "t/1",
  "payload": "m2",
  "packet": {
    ...
    "properties": {
      "messageExpiryInterval": 40
    }
  }
}

根据 MQTT 5.0 协议规范的规定,客户端接收第二条消息,消息到期间隔设置为 40 秒: 代理接收的 60 秒减去消息在代理中等待的 20 秒。

3. 实现

MQTT 5.0 消息到期是在 RabbitMQ 中使用每条消息 TTL 实现的,类似于 AMQP 0.9.1 发布者中的字段。expiration

2. 订阅标识符

1.描述

客户端可以在 SUBSCRIBE 数据包中设置订阅标识符。 如果客户端因该订阅而收到消息,则代理会将该订阅标识符包含在 PUBLISH 数据包中。

订阅标识符的用例列在 SUBSCRIBE 操作部分。

2. 举例

从同一客户端向服务器发送 3 个单独的 SUBSCRIBE 数据包,每个数据包具有不同的主题过滤器和不同的订阅标识符:

mqttx sub --client-id sub-2 --topic t/1 --subscription-identifier 1 --session-expiry-interval 600
^C
mqttx sub --client-id sub-2 --topic t/2 --subscription-identifier 2 --session-expiry-interval 600 --no-clean
^C
mqttx sub --client-id sub-2 --topic "t/#" --subscription-identifier 3 --session-expiry-interval 0 --no-clean --output-mode clean

在第二个终端窗口中,我们看到从同一队列到同一主题交换的 3 个绑定,每个绑定都具有不同的路由键:

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings \
    source_name source_kind destination_name destination_kind routing_key
┌─────────────┬─────────────┬─────────────────────────────┬──────────────────┬─────────────────────────────┐
│ source_name │ source_kind │ destination_name            │ destination_kind │ routing_key                 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│             │ exchange    │ mqtt-subscription-sub-2qos0 │ queue            │ mqtt-subscription-sub-2qos0 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic   │ exchange    │ mqtt-subscription-sub-2qos0 │ queue            │ t.#                         │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic   │ exchange    │ mqtt-subscription-sub-2qos0 │ queue            │ t.1                         │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic   │ exchange    │ mqtt-subscription-sub-2qos0 │ queue            │ t.2                         │
└─────────────┴─────────────┴─────────────────────────────┴──────────────────┴─────────────────────────────┘

第一个条目是与默认交换的隐式绑定。

每个具有 MQTT 主题筛选器的 MQTT 订阅对应一个带有绑定键的 AMQP 0.9.1 绑定。 准确地说,表列的名称错误:应该改为调用它。 MQTT 中的主题级分隔符是 “” 字符,而 AMQP 0.9.1 主题交换中的主题级分隔符是 “” 字符。routing_key``binding_key``/``.

再次在第二个终端窗口中,向主题发送消息:t/1

mqttx pub --topic t/1 --message m1

(订阅客户端的)第一个终端窗口接收以下 PUBLISH 数据包:

{
  "topic": "t/1",
  "payload": "m1",
  "packet": {
    ...
    "properties": {
      "subscriptionIdentifier": [
        1,
        3
      ]
    }
  }
}

它包含订阅标识符 1 和 3,因为 topic filters 和 match topic .t/1``t/#``t/1

同样,如果向主题发送第二条消息,订阅客户端将收到包含订阅标识符 2 和 3 的 PUBLISH 数据包。t/2

3. 实现

订阅标识符是 MQTT 会话状态的一部分。 因此,在客户端断开连接时,订阅标识符必须保留在服务器的数据库中,直到 MQTT 会话结束。 RabbitMQ 将订阅标识符存储在绑定参数中:

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings routing_key arguments
┌─────────────────────────────┬───────────────────────────────────────────────────────────────────────────────────┐
│ routing_key                 │ arguments                                                                         │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ mqtt-subscription-sub-2qos0 │                                                                                   │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.#                         │ {mqtt_subscription_opts,0,false,false,0,3}{<<"x-binding-key">>,longstr,<<"t.#">>} │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.1                         │ {mqtt_subscription_opts,0,false,false,0,1}{<<"x-binding-key">>,longstr,<<"t.1">>} │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.2                         │ {mqtt_subscription_opts,0,false,false,0,2}{<<"x-binding-key">>,longstr,<<"t.2">>} │
└─────────────────────────────┴───────────────────────────────────────────────────────────────────────────────────┘

绑定参数的确切结构并不重要,并且可能会在将来的 RabbitMQ 版本中更改。 但是,可以在绑定参数中看到整数 1、2 和 3,这些参数对应于订阅标识符。

当主题交换路由消息时,发布 Erlang 进程会将所有匹配的绑定键包含在消息中。 订阅 MQTT 客户端的 Erlang 进程将匹配的绑定密钥与其知道的 MQTT 主题过滤器进行比较,并将订阅标识符包含在发送到 MQTT 客户端的 PUBLISH 数据包中。

发布 Erlang 进程可以是 MQTT 连接进程,也可以是 AMQP 0.9.1 通道进程。 一如既往,RabbitMQ 在跨协议互操作性方面表现出色:当 AMQP 0.9.1(或 STOMP 或 AMQP 1.0)客户端向主题交换发送消息时, 正确的订阅标识符将包含在发送到 MQTT 客户端的 PUBLISH 数据包中。

3. 订阅选项

1. 描述

MQTT 5.0 提供了 3 个新的订阅选项:

  1. 无本地
  2. 保留为已发布
  3. 保留处理

所有订阅选项均由 RabbitMQ 实现。 在这里,我们只关注“保留处理”选项:

此选项指定在建立订阅时是否发送保留的消息。
这些值为:
0 = 在订阅
时发送保留消息 1 = 仅在订阅当前不存在
时在订阅时发送保留消息 2 = 在订阅时不发送保留的消息

2. 举例例

发送保留的消息:

mqttx pub --topic mytopic --message m --retain

保留处理值 0 将接收保留的消息,而值 2 不会:

mqttx sub --topic mytopic --retain-handling 0
…  Connecting...
✔  Connected
…  Subscribing to mytopic...
✔  Subscribed to mytopic
payload: m
retain: true
^C

mqttx sub --topic mytopic --retain-handling 2
…  Connecting...
✔  Connected
…  Subscribing to mytopic...
✔  Subscribed to mytopic

4 所有ACK上的原因代码

1. 描述

数据包 CONNACK、PUBACK、SUBACK、UNSUBACK 和 DISCONNECT 包含原因码。

2. 实现

一个实现示例是,如果消息未路由到任何队列,则 RabbitMQ 将在 PUBACK 数据包中使用原因代码进行回复。 MQTT 5.0 原因代码在概念上对应于 AMQP 0.9.1 中的强制消息属性和处理程序。No matching subscribers``No matching subscribers``BasicReturn

5. 用户属性

1. 描述

大多数 MQTT 数据包可以包含用户属性。 用户属性的含义不是由 MQTT 规范定义的。

2. 示例 PUBLISH 数据包

PUBLISH 数据包中的用户属性由客户端应用程序定义,并由服务器原封不动地转发。

在第一个终端窗口中订阅:

mqttx sub --topic t/5

在第二个终端窗口中发布包含用户属性的消息:

mqttx pub --topic t/5 --message m --user-properties "key1: value1"

第一个终端窗口将接收用户属性,原封不动:

payload: m
userProperties: [ { key: 'key1', value: 'value1' } ]

MQTT 5.0 PUBLISH 数据包中的用户属性类似于 AMQP 0.9.1 中的消息属性。headers

3. 示例 CONNECT 数据包

使用用户属性进行连接:

mqttx conn --client-id myclient --user-properties "connecting-from: London"

在浏览器中打开管理 UI http://localhost:15672/#/connections(用户名和密码都是 ),然后单击 MQTT 连接:guest

rabbitmq mqtt协议版本,MQ,Springboot,rabbitmq,分布式,消息中间件

RabbitMQ 将在管理 UI 中显示 CONNECT 数据包中的用户属性。

6. 有效负载格式和内容类型

1. 描述

发布者可以指定 MIME 内容类型。 它还可以设置有效负载格式指示器,指示有效负载是由 UTF-8 编码的字符数据还是未指定的二进制数据组成。

2. 举例

在第一个终端窗口中,订阅一个主题:

mqttx sub --topic t/6 --output-mode clean

在第二个终端窗口中,发送一条带有内容类型和有效负载格式指示符的消息:

mqttx pub --topic t/6 --message "my UTF-8 encoded data 🙂" --content-type text/plain --payload-format-indicator

第一个终端窗口将原封不动地接收内容类型和有效负载格式指示器:

{
  "topic": "t/6",
  "payload": "my UTF-8 encoded data 🙂",
  "packet": {
    ...
    "properties": {
      "payloadFormatIndicator": true,
      "contentType": "text/plain"
    }
  }
}

7. 请求/响应

1. 描述

MQTT 5.0 正式化了请求/响应模式。

在发布消息之前,MQTT 客户端(请求者)订阅响应主题。 请求者将响应主题和一些关联数据包含在请求消息中。

另一个 MQTT 客户端(响应者)接收到请求消息,执行一些操作,并将具有相同关联数据的响应消息发布到响应主题。

MQTT 5.0 请求/响应功能对应于 AMQP 0.9.1 中的远程过程调用。 但是,在 AMQP 0.9.1 中,请求者将在 AMQP 0.9.1 消息属性中包含回调队列的名称。 MQTT 协议没有定义队列的概念。因此,在 MQTT 中,被回复的“地址”是一个主题名称。reply_to

尽管协议规范之间存在不兼容性,但 RabbitMQ 在协议互操作性方面大放异彩: 因此,RabbitMQ 支持跨协议的请求/响应交互。

例如,MQTT 客户端可以在请求消息中包含响应主题和关联数据。 如果 AMQP 0.9.1 客户端创建了一个绑定到主题交换的队列,该队列的绑定密钥与请求消息的主题匹配,它将收到一条 AMQP 0.9.1 消息,其属性设置为 MQTT 客户端发送的关联数据和名为 . 然后,AMQP 0.9.1 客户端可以使用相同的 MQTT 5.0 客户端响应,并将响应消息发布到具有标头中存在的主题的主题交换。amq.topic``correlation_id``x-opt-reply-to-topic``correlation_id``amq.topic``x-opt-reply-to-topic

2. 举例

此示例仅关注 MQTT 客户端。

在第一个终端窗口中,响应的 MQTT 客户端订阅主题t/7;

mqttx sub --client-id responder --topic t/7 --session-expiry-interval 600 --output-mode clean --qos 1

在第二个终端窗口中,请求的 MQTT 客户端订阅了一个名为 :my/response/topic

mqttx sub --client-id requester --topic my/response/topic --session-expiry-interval 600 --qos 1
…  Connecting...
✔  Connected
…  Subscribing to my/response/topic...
✔  Subscribed to my/response/topic
^C

在第二个终端窗口中,请求者随后发布一条请求消息:

mqttx pub --client-id requester --topic t/7 --message "my request" \
    --correlation-data abc-123 --response-topic my/response/topic \
    --session-expiry-interval 600 --no-clean

在第 1 个终端窗口中,响应方收到请求消息:

{
  "topic": "t/7",
  "payload": "my request",
  "packet": {
    ...
    "properties": {
      "responseTopic": "my/response/topic",
      "correlationData": {
        "type": "Buffer",
        "data": [
          97,
          98,
          99,
          45,
          49,
          50,
          51
        ]
      }
    }
  }
}
^C

在第一个终端窗口中,响应方通过复制关联数据并发布到响应主题来响应请求者:

mqttx pub --client-id responder --topic my/response/topic --message "my response" --correlation-data abc-123

在第 2 个终端窗口中,请求者收到响应。

mqttx sub --client-id requester --topic my/response/topic --no-clean --qos 1 --output-mode clean
{
  "topic": "my/response/topic",
  "payload": "my response",
  "packet": {
    ...
    "properties": {
      "correlationData": {
        "type": "Buffer",
        "data": [
          97,
          98,
          99,
          45,
          49,
          50,
          51
        ]
      }
    }
  }
}

关联数据可用于将响应与请求相关联。 请求者通常为其发布的每个请求选取唯一的关联数据。

8. 分配的客户端标识符

1. 描述

如果客户端使用零长度的客户端标识符进行连接,则服务器必须使用包含分配的客户端标识符的 CONNACK 进行响应。

与 MQTT 3.1.1 相比,这解除了服务器分配的客户端 ID 只能用于连接的限制。Clean Session = 1

2. 实现

RabbitMQ 将生成一些随机的客户端 ID(例如 ),并在 CONNACK 数据包中返回它。dcGB2kSwS0JlXnaBa1A6QA

9. 主题别名

1. 描述

主题别名是一个整数值,用于标识主题,而不是使用主题名称。 这减小了 PUBLISH 数据包的大小,并且在主题名称很长且在网络连接中重复使用相同的主题名称时非常有用。

2. 实现

RabbitMQ 中的默认主题别名最大值为 16。 您可以在 中配置此值,例如:rabbitmq.conf

mqtt.topic_alias_maximum = 32

此配置值映射到从 RabbitMQ 发送到客户端的 CONNACK 数据包中的 Topic Alias Maximum。 它限制了任一方向的主题别名数,即从客户端到 RabbitMQ 和 RabbitMQ 到客户端。 如果客户端发送到许多不同的主题或从许多不同的主题接收,则设置更高的值将需要更多的内存使用量。

RabbitMQ 运算符可以通过设置以下设置来禁止使用主题别名:

mqtt.topic_alias_maximum = 0

10. 流量控制

1. 描述

MQTT 5.0 属性 Receive Maximum 定义了未确认的 QoS 1 PUBLISH 数据包的上限。

2. 实现

从 RabbitMQ 发送到客户端的未确认 QoS 1 PUBLISH 数据包的最大数量由 CONNECT 数据包中从客户端发送到 RabbitMQ 的 Receive Maximum 和配置值:mqtt.prefetch

mqtt.prefetch = 10

默认值为 10。mqtt.prefetch

该值在 MQTT 3.1 和 3.1.1 的 RabbitMQ 3.13 之前已存在。 它映射到 RabbitMQ 中的使用者预取。 换句话说,它定义队列发送到其 MQTT 连接进程的动态消息数量。mqtt.prefetch

11. 最大数据包大小

1. 描述

客户端和服务器可以独立指定它们支持的最大数据包大小。

2. 举例

此示例演示如何限制从客户端发送到 RabbitMQ 的最大 MQTT 数据包大小。

假设身份验证成功后,RabbitMQ 操作员不希望 RabbitMQ 接受任何大于 1 KiB 的 MQTT 数据包。 将以下配置写入 rabbitmq.conf(在当前工作目录中):

mqtt.max_packet_size_authenticated = 1024

停止 RabbitMQ 服务器后,启动 RabbitMQ 服务器并应用新配置:

docker run -it --rm --name rabbitmq -p 1883:1883 -p 15672:15672 -p 15692:15692 \
    --mount type=bind,source="$(pwd)"/rabbitmq.conf,target=/etc/rabbitmq/conf.d/11-blog-post.conf \
    rabbitmq:3.13.0-beta.2-management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt
docker exec rabbitmq rabbitmqctl enable_feature_flag all

在第一个终端窗口中,订阅一个主题:

mqttx sub --topic t/11

在第二个终端窗口中,向该主题发送有效负载为 3 字节的消息:

payload=$(head --bytes 3 < /dev/zero | tr '\0' x)
mqttx pub --topic t/11 -m "$payload"

第一行从特殊文件中读取 3 个字节(3 个 null 字符),将每个 null 字符转换为 ASCII 字符并将结果保存在变量中。/dev/zero``x``xxx``payload

第一个终端窗口将收到该消息:

payload: xxx

接下来,在第 2 个终端窗口中,发送有效负载为 2,000 字节的消息:

payload=$(head --bytes 2000 < /dev/zero | tr '\0' x)
mqttx pub --topic t/11 -m "$payload"

这一次,第一个终端窗口不会收到消息,因为从客户端发送到 RabbitMQ 的 PUBLISH 数据包大于配置的最大数据包大小 1024 字节。

相反,RabbitMQ 会记录一条描述性错误消息:

[error] <0.836.0> MQTT packet size (2007 bytes, type 3) exceeds mqtt.max_packet_size_authenticated (1024 bytes)

日志消息声明 2,007 字节,因为 PUBLISH 数据包的固定和可变标头需要 7 个字节(其中 4 个字节用于主题名称)。t/11

12. 服务器启动的断开连接

1. 描述

在 MQTT 5.0 中,DISCONNECT 数据包不仅可以从客户端发送到服务器,还可以从服务器发送到客户端。

2. 实现

在终止连接之前,RabbitMQ 会在以下情况下向客户端发送 DISCONNECT 数据包:

DISCONNECT 原因代码名称 情况
接管的会话 使用同一客户端 ID 连接的另一个客户端。
服务器关闭 RabbitMQ 进入维护模式。
保持活动超时 客户端无法在“保持活动”时间内进行通信。
数据包太大 RabbitMQ 收到大小超过mqtt.max_packet_size_authenticated

13. 会话到期

1. 描述

在 MQTT 5.0 中,客户端可以在 CONNECT 数据包中向服务器建议会话到期间隔。 服务器可以接受建议的会话到期间隔,也可以在 CONNACK 数据包中强制要求不同的会话到期间隔。

会话可以跨一系列网络连接继续进行。它的持续时间与最新的网络连接加上会话到期间隔一样长。

当会话到期间隔到期时,客户端和服务器都将删除任何会话状态。

2. 实现

只要会话持续,客户端和服务器就会保持会话状态。

服务器中的会话状态包括已发送到客户端但尚未确认的消息、待发送到客户端的消息以及客户端的订阅。 RabbitMQ 以队列和绑定的形式对这种 MQTT 会话状态进行建模。

因此,会话到期间隔映射到 RabbitMQ 中的队列 TTL。 当 MQTT 会话过期时,队列及其消息和绑定将被删除。

3. 举例

默认情况下,服务器允许的最大会话到期间隔为 1 天。 如果 MQTT 客户端在 1 天内没有重新连接,则其会话状态将在 RabbitMQ 中删除。

此值是可配置的。 出于此示例的目的,让我们在以下情况下设置一个非常低的会话到期间隔 1 分钟:rabbitmq.conf

mqtt.max_session_expiry_interval_seconds = 60

设置名称包含前缀,因为 MQTT 5.0 客户端可以通过在 CONNECT 数据包中发送会话到期间隔来选择较低的值。 如最大数据包大小示例中所做的那样,重新启动 RabbitMQ 节点,以便应用新设置。max

以 20 秒的会话到期间隔连接到 RabbitMQ 并创建订阅:

mqttx sub --client-id sub-13 --topic t/13 --session-expiry-interval 20 --qos 1
…  Connecting...
✔  Connected
…  Subscribing to t/13...
✔  Subscribed to t/13
^C

键入终端以断开客户端连接。Ctrl+C

在接下来的 20 秒内,列出队列和绑定:

docker exec rabbitmq rabbitmqctl list_queues name
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name
mqtt-subscription-sub-13qos1

docker exec rabbitmq rabbitmqctl list_bindings source_name destination_name routing_key --formatter=pretty_table
Listing bindings for vhost /...
┌─────────────┬──────────────────────────────┬──────────────────────────────┐
│ source_name │ destination_name             │ routing_key                  │
├─────────────┼──────────────────────────────┼──────────────────────────────┤
│             │ mqtt-subscription-sub-13qos1 │ mqtt-subscription-sub-13qos1 │
├─────────────┼──────────────────────────────┼──────────────────────────────┤
│ amq.topic   │ mqtt-subscription-sub-13qos1 │ t.13                         │
└─────────────┴──────────────────────────────┴──────────────────────────────┘

20 秒后,再次列出队列和绑定:

docker exec rabbitmq rabbitmqctl list_queues name
Timeout: 60.0 seconds ...
Listing queues for vhost / ...

docker exec rabbitmq rabbitmqctl list_bindings source_name destination_name routing_key --formatter=pretty_table
Listing bindings for vhost /...

RabbitMQ 删除了队列及其绑定,因为我们的客户端未在 20 秒的会话到期间隔内连接到 RabbitMQ。Clean Session = 0

接下来,执行相同的测试,但会话到期间隔较长,例如 1 小时:

mqttx sub --client-id sub-13 --topic t/13 --session-expiry-interval 3600 --qos 1
…  Connecting...
✔  Connected
…  Subscribing to t/13...
✔  Subscribed to t/13
^C

您应该注意到,队列及其绑定将在 1 分钟后被删除,因为有效的会话到期间隔 是客户端请求的最小值(1 小时)和 RabbitMQ 中配置的值(1 分钟)。mqtt.max_session_expiry_interval_seconds

14. 会延迟

1. 描述

客户端可以在 CONNECT 数据包中定义 Will Delay Interval。

服务器会延迟发布客户的遗嘱消息,直到遗嘱延迟间隔过去或会话结束,以先发生者为准。 如果在将延迟间隔过去之前与此会话建立了新的网络连接,则服务器不得发送将消息。 这样做的一个用途是,如果存在临时网络断开连接,并且客户端在发布遗嘱消息之前成功重新连接并继续其会话,则避免发布遗嘱消息。

Will Delay Interval 的另一个用例是通知会话到期:

客户端可以通过将 Will Delay Interval 设置为 Session Expiry Interval 长并发送带有原因0x04代码的 DISCONNECT(Disconnect with Will Message),来安排 Will Message 通知会话到期已发生。

2. 实现

尽管 will 消息有效负载通常很小,但 MQTT 规范允许 will 消息有效负载大小高达 64 KiB。

为了避免在 Khepri(RabbitMQ 未来的元数据存储)中存储大型二进制数据,RabbitMQ 创建了一个包含此单个遗嘱消息的经典队列。 我们称此队列为 Will 队列。 此消息具有每条消息的 TTL 集,该集以毫秒为单位定义,对应于以秒为单位的 Will Delay Interval。 此外,Will 队列还设置了一个队列 TTL,该队列以毫秒为单位定义,对应于以秒为单位的会话到期间隔。 每条消息的有效 TTL 至少比队列 TTL 低几毫秒,以便消息将在队列(会话)过期前不久发布。

Will 队列还定义(MQTT 插件使用的默认主题交换)为死信交换,将 will 主题定义为死信路由键。amq.topic

如果 MQTT 客户端未在其 Will 延迟间隔内重新连接,则 Will 队列中的消息将死信发送到主题交换。

让我们用一个例子来说明这一点。

3. 举例

在第一个终端窗口中,创建一个将使用遗嘱消息的订阅:

mqttx sub --client-id sub-14 --topic t/14

在第二个终端窗口中,创建一个 Will Delay Interval 为 20 秒的连接:

mqttx conn --client-id conn-14 --will-topic t/14 --will-message my-will-message --will-delay-interval 20 --session-expiry-interval 40

在第 3 个终端窗口中,我们看到到目前为止,订阅 MQTT 客户端创建了一个队列:

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages arguments
┌──────────────────────────────┬────────────┬──────────┬───────────┐
│ name                         │ type       │ messages │ arguments │
├──────────────────────────────┼────────────┼──────────┼───────────┤
│ mqtt-subscription-sub-14qos0 │ MQTT QoS 00        │           │
└──────────────────────────────┴────────────┴──────────┴───────────┘

在第二个终端窗口中,键入 to disconnect the MQTT connection with client ID。Ctrl+C``conn-14

这一次,列出队列显示已创建 Will 队列:

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages arguments
┌──────────────────────────────┬────────────┬──────────┬────────────────────────────────────────────────────────────┐
│ name                         │ type       │ messages │ arguments                                                  │
├──────────────────────────────┼────────────┼──────────┼────────────────────────────────────────────────────────────┤
│ mqtt-subscription-sub-14qos0 │ MQTT QoS 00        │                                                            │
├──────────────────────────────┼────────────┼──────────┼────────────────────────────────────────────────────────────┤
│ mqtt-will-conn-14            │ classic    │ 1{<<"x-expires">>,long,40000}                               │
│                              │            │          │ {<<"x-dead-letter-exchange">>,longstr,<<"amq.topic">>}     │
│                              │            │          │ {<<"x-dead-letter-routing-key">>,longstr,<<"t.14">>}       │
└──────────────────────────────┴────────────┴──────────┴────────────────────────────────────────────────────────────┘

Will 队列的命名模式为 。 它包含一条消息:遗嘱消息。mqtt-will-<MQTT Client ID>

如上一节所述,队列 TTL () 为 40,000 毫秒,因此与上面命令中的 40 秒会话到期间隔匹配。 如果您等待 20 秒,您的第一个终端窗口应该会收到遗嘱消息,因为我们的客户没有在遗嘱延迟间隔内重新连接:x-expires

› payload: my-will-message

15. 可选的服务器功能可用性

1. 描述

定义一组服务器不允许的功能,并为服务器提供一种机制,以便将其指定给客户端。 可以通过这种方式指定的功能包括:

  • 最大 QoS
  • 保留可用
  • 提供通配符订阅
  • 可用的订阅标识符
  • 提供共享订阅

客户端使用服务器声明不可用的功能是错误的。

2. 实现

RabbitMQ 3.13 在 CONNACK 属性中包括 Maximum QoS = 1 和 Shared Subscription Available = 0。

RabbitMQ 不支持 QoS 2。

如下一节所述,将来的 RabbitMQ 版本将支持共享订阅。

4. 局限性

本节列出了 RabbitMQ MQTT 实现的限制。

1. MQTT 5.0 特定限制

1. 共享订阅

共享订阅将在将来的 RabbitMQ 版本中添加。 尽管此功能很好地映射到 RabbitMQ 中的队列,但共享订阅是会话状态的一部分,并且需要进行某些 RabbitMQ 数据库迁移才能有效地查询给定 MQTT 客户端 ID 的共享订阅。

1. 延迟和保留的遗嘱消息

延迟和保留的遗嘱信息将不会被保留。 这是因为延迟的遗嘱消息将死信到主题交换,但保留进程当前不会从队列中使用。 将来可以通过保留邮件的新存储来解决此限制。

2. 非 MQTT 5.0 特定限制

为了完整起见,本节列出了在 RabbitMQ 3.13 中支持 MQTT 5.0 之前和 RabbitMQ 3.12 中提供原生 MQTT 之前存在的限制。

1. 保留的消息

保留消息的功能在 RabbitMQ 中受到限制。

保留的消息仅在本地节点上存储和查询。

一个有效的示例如下: MQTT 客户端向节点 A 发布一条保留的消息,主题为 。此后,另一个客户端在节点 A 上使用主题过滤器进行订阅。新订阅者将收到保留的消息。topic/1``topic/1

但是,如果主题筛选器包含通配符(多级通配符 “”或单级通配符 “”),则不会发送保留的消息 (问题 #8824)。#``+

此外,如果客户端在节点 A 上发布了保留的消息,而另一个客户端随后在节点 B 上订阅,则该订阅客户端将不会收到存储在节点 A 上的任何保留消息(问题 #8096)。

将来的 RabbitMQ 版本将复制集群中保留的消息,并发送与包含通配符的主题筛选器匹配的保留消息。

5. 总结

综上所述,RabbitMQ

  • 是领先的 AMQP 0.9.1 代理
  • 是一个流是代理
  • 擅长跨协议互操作性
  • 由于支持 3.13 中发布的 MQTT 5.0 和 3.12 中发布的原生 MQTT,它正在成为领先的 MQTT 代理之一

我们将 RabbitMQ 转变为成熟的物联网代理的旅程尚未完成,并计划在未来几个月和几年内进行更多的开发工作。 敬请关注!

6. 相关链接

MQTT 5.0 support is coming in RabbitMQ 3.13 | RabbitMQ文章来源地址https://www.toymoban.com/news/detail-850265.html

到了这里,关于RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ安装配置,笔记整理 RabbitMQ3.12.2版本安装配置

    官网下载 RabbitMQ 官方地址:RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ 下载时需注意Erlang Versions的版本  这里下载的是3.12.2  2. 安装依赖环境 在线安装依赖环境: yum   install   build-essential   openssl   openssl-devel   unixODBC   unixODBC-devel   make   gcc   gcc-c ++   kernel-d

    2024年02月12日
    浏览(38)
  • 麒麟V10环境安装RabbitMQ3.6.10

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。 1.安装依赖 2.erlang安装 链接:软件安装下载

    2024年02月05日
    浏览(49)
  • sensitive-word v0.13 特性版本发布 支持英文单词全词匹配

    sensitive-word-admin v1.3.0 发布 如何支持分布式部署? sensitive-word-admin 敏感词控台 v1.2.0 版本开源 sensitive-word 基于 DFA 算法实现的高性能敏感词工具介绍 更多技术交流 对于英文单词 Disburse 之类的,其中的 sb 字母会被替换,要怎么处理,能不能只有整个单词匹配的时候才替换。

    2024年02月20日
    浏览(46)
  • 【阿里云】物联网平台配置ESP8266真实设备AT串口连接,支持MQTT协议通信

    1 阿里云物联网平台 官方文档:https://help.aliyun.com/product/30520.html 官方控制台:https://iot.console.aliyun.com/lk/summary/new 左边有产品和设备, 产品是抽象的品类 ,比如说电灯,空调等等。 设备是品类下具体的物品 ,和实物一一对应的云端信息,比如电灯1,电灯2,电灯-卧室,电灯

    2024年02月01日
    浏览(55)
  • centos7安装erlang23.3.4.11及rabbitmq3.9.16版本

    rpm包有系统版本要求,el是Red Hat Enterprise Linux(EL)的缩写。 EL7是Red Hat 7.x,Centos 7.x EL8是Red Hat 8.x, Centos 8.x 所以我们在安装erlang及rabbitmq时需要选择与自己的服务器相对应的rpm包 # rabbitmq的rpm安装包 https://github.com/rabbitmq/rabbitmq-server/releases?page=10 # erlang的rpm安装包 https://github.com/

    2024年02月07日
    浏览(40)
  • Mainflux IoT:Go语言轻量级开源物联网平台,支持HTTP、MQTT、WebSocket、CoAP协议

    Mainflux是一个由法国的创业公司开发并维护的 安全、可扩展 的开源物联网平台,使用 Go语言开发、采用微服务的框架。Mainflux支持多种接入设备,包括设备、用户、APP;支持多种协议,包括HTTP、MQTT、WebSocket、CoAP,并支持他们之间的协议互转。 Mainflux的南向接口连接设备,北

    2024年02月01日
    浏览(109)
  • 用QUARTUS13.0自带仿真工具进行仿真

    quartus 13.0 自带仿真的使用 1、选中一个文件,右键设为顶层文件,编译它。 2、新建一个波形文件 3、打开波形文件,双击左侧空白处,再单击鼠标所处的键。 4、点击LIST菜单,并把信号从左面加到右面 5、这个比较关键,点击simulation,选options, 6、把输入信号加进去,点击仿真

    2024年02月12日
    浏览(46)
  • Android 13.0 rom定制专栏系列解读

    在从事android系统rom定制化的这几年里,经历了坎坎坷坷,开发过好几种类型的产品,也随着google对android系统的更新加快,也需要跟随上时代的进步,所以需要把平时工作中遇到的问题总结出来,及时做好记录归纳总结,然后进一步的提升自己能力,本专栏提供给在13.0的rom定

    2024年02月06日
    浏览(73)
  • 关于Quartus II 13.0破解失败问题

    问题描述: 安装quartus II 13.0时,使用破解器生成licence.dat文件时,提示了大致如下内容: 提示框的标题是sys_cpt.dll。 显示问题:该文件正在使用。。。。 (注意不是某些人遇到的未找到sys_cpt.dll未被找到) 请在检查确保破解器正确安装在对应的bin/bin64目录下无效后再寻找其

    2023年04月09日
    浏览(49)
  • QUARTUS联合modelsim仿真(quartus13.0)

    设置仿真软件(Modelsim/ Modelsim-Altera)路径 (1)点击tools -Options (2)选择EDA Tool Options PS:看自己情况设置,使用独立Modelsim仿真时设置ModelSim处的值为ModelSim安装路径下的win64(或者32)路径,使用独立Modelsim-Altera仿真时,设置Modelsim-Altera处的值为Modelsim-Altera路径(一般在quart

    2024年02月09日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包