前言
在网络通信中,心跳(Heartbeat)指的是一种周期性的消息,用于维持通信连接的活动状态。心跳包的主要作用是检测连接是否处于活动状态,及时发现连接异常并重新恢复连接,维护网络通信的稳定性和可靠性。
MQTT(Message Queuing Telemetry Transport)是一种轻量级、开放式的消息协议,用于在低带宽和不可靠的网络环境下传输消息。MQTT协议规定了PINGREQ和PINGRESP消息类型,用于实现心跳机制。PINGREQ消息是由客户端向服务器发送的心跳包,PINGRESP消息是服务器回复的心跳包确认消息。
本文通过对mosquitto源码(mosquitto-2.0.15)分析,介绍mosquitto如何实现基于MQTT协议的心跳和网络重连机制。
一、MQTT各个版本对心跳机制的定义
不同版本的MQTT协议对心跳机制的定义略有不同,以下是各个版本对心跳机制的简要介绍:
- MQTT v3.1.0
MQTT v3.1.0协议定义了keepalive选项,它是一个16位的值,表示客户端与服务器之间的最大空闲时间。如果在这段时间内没有任何通信活动,客户端将发送PINGREQ消息给服务器,以确认服务器是否仍然在线。如果服务器没有在规定时间内响应PINGREQ消息,客户端将关闭连接。keepalive的默认值为60秒。 - MQTT v3.1.1
MQTT v3.1.1协议在v3.1.0的基础上做了一些改进。其中,keepalive选项的最小值改为了1.5倍的发送间隔,最大值改为65535秒。这样可以避免由于keepalive时间过短而导致频繁发送PINGREQ消息,从而影响性能。此外,如果客户端在规定时间内没有收到服务器的响应,它可以重新发送PINGREQ消息,最多重试三次。 - MQTT v5.0
MQTT v5.0协议进一步改进了心跳机制。它定义了一个心跳超时属性,用于指定服务器应该在多长时间内发送PINGREQ消息。客户端也可以在PINGREQ消息中设置超时属性。如果服务器在规定时间内没有响应PINGREQ消息,它将被视为已断开连接。同时,MQTT v5.0还引入了会话恢复机制,它可以让客户端在断开连接后重新连接并恢复之前的会话状态。 - MQTT-SN
MQTT-SN(MQTT for Sensor Networks)是专门设计用于传感器网络的MQTT版本。它定义了一个心跳间隔选项,表示客户端和网关之间的最大空闲时间。如果客户端在规定时间内没有发送消息,网关将发送PINGREQ消息。如果网关在规定时间内没有收到任何消息,它将发送DISCONNECT消息,断开连接。心跳间隔选项的默认值为30秒。
总的来说,MQTT协议的不同版本都对心跳机制进行了规定,旨在确保客户端和服务器之间的连接状态,并避免不必要的资源浪费。MQTT协议的心跳机制对于保证网络稳定性和消息传递的可靠性非常重要。
Mosquitto是一款常用的MQTT代理服务器,它支持多个MQTT协议版本。以下是Mosquitto版本与MQTT协议版本大体的对应关系:
- Mosquitto 0.x版本支持MQTT 3.1协议。
- Mosquitto 1.5.x版本支持MQTT 3.1和MQTT 3.1.1协议。
- Mosquitto 1.6.x以上版本支持MQTT 3.1、MQTT 3.1.1和MQTT 5.0协议。
不同版本的Mosquitto对应不同版本的MQTT协议,对于相同版本的MQTT协议,Mosquitto的实现与协议规范是相符合的。在Mosquitto中,心跳超时的实现遵循对应的MQTT协议规范,因此在不同版本的Mosquitto中,心跳部分的实现与相应版本的MQTT协议规范是一致的。
特别需要说明的是从Mosquitto 1.5.x开始,Mosquitto增加了对MQTT-SN的支持,同时包含了MQTT-SN网关功能,可以将MQTT-SN消息转发到MQTT broker。虽然Mosquitto支持MQTT-SN,但是MQTT-SN和MQTT其他协议的使用方式略有不同,必须仔细阅读Mosquitto的官方文档,确保正确设置选项和命令行参数。
二、Mosquitto心跳和网络重连机制的实现
MQTT 协议中的心跳机制用于维持客户端和服务器之间的连接,确保连接不会因为长时间没有数据交互而被断开。Mosquitto MQTT 代理服务器中的心跳功能和网络重连由服务器和客户端共同完成:
1.心跳功能实现过程
1)客户端向服务器发送心跳
在客户端使用的struct mosquitto
结构体中,有一个 last_msg_in
字段和一个 last_msg_out
字段,分别表示客户端最近一次收到消息(注意:这里是所有消息,包括PINGREQ、PINGRESP、PUBLISH等类型的消息)和发送消息的时间戳。当客户端在一段时间内没有发送任何消息时,主动向服务器发送一次心跳消息( PINGREQ 消息类型)。
客户端发送心跳包之间的时间间隔由客户端keepalive参数决定,发送心跳的时间应该是last_msg_out + keepalive
(不是last_msg_in + keepalive
)。
在客户端使用 MQTT 协议连接到服务器时,通过设置 MQTT CONNECT 消息中的 keep alive 字段, keepaliv参数发送到服务器。
keepalive参数的设置过程如下:
在客户端,mosquitto源码定义了两个重要的数据结构struct mosq_config
和struct mosquitto
,分别用于存储mosquitto客户端的配置信息和运行时状态和信息。对于keepalive
参数,首先需要在struct mosq_config
的对象cfg中设置,然后在连接前拷贝到struct mosquitto
结构体对象。
对于cfg->keepalive参数,客户端可以用三种方式确定(三种方式后面会覆盖前面):
(1)第一次清空struct mosq_config对象cfg数值,并对部分参数赋初值,其中cfg->keepalive = 60;
(2)如果客户端配置文件配置了keepalive参数,在初始化函数中赋值cfg->keepalive;
(3)用命令行参数设置的参数设置keepalive参数,在初始化函数中赋值cfg->keepalive。
2)服务器接收和回应来自客户端的心跳
当 Mosquitto 服务器接收到客户端的心跳包后,发送 PINGRESP 消息到此客户作为响应。同时更新链表中保存的此客户端struct mosquitto
结构体中的 last_msg_in
成员变量,记录最后一次接收到此客户消息的时间戳。
3)客户端接收来自服务器的心跳响应
Mosquitto 客户端接收来自服务器的心跳响应。当客户端接收到来自服务器的心跳响应时,更新struct mosquitto
结构体中 last_msg_in
成员变量,设置为当前的时间戳 。
2.断线的判定和重连
1)客户端
客户端当前正在使用的struct mosquitto
结构体实例中保存着当前状态和相关信息,其中包括客户端 ID、连接参数、订阅信息、回调函数等。如果一定时间客户端没有收到任何消息,则可以认为连接已经断开,这个超时时间就是keepalive
参数,而struct mosquitto
结构体中的last_msg_in
则记录了客户端最后一次收到消息的时间。换句话说如果在last_msg_in + keepalive
内未能收到任何消息,可以认为连接已经断开。此时,客户端向服务器发送DISCONNECT`消息进行断开连接操作,并尝试重新连接服务器。
2)服务器
在Mosquitto服务器中,当一个客户端连接到服务器时,Mosquitto服务器将客户端加入到 客户端列表中。判断客户端连接超时就是通过客户端列表中的最后连接时间来实现的,当客户端和 Mosquitto 服务器之间建立连接时,会向服务器发送一个 CONNECT
消息,消息中有一个 keepalive
参数,用来设置客户端需要在多长时间内发送至少一个 MQTT 消息或心跳包来保持连接。如果在 keepalive 值的两倍时间内,Mosquitto 服务器没有接收到任何来自客户端的 MQTT 消息或心跳包,那么服务器就会判定客户端连接已经超时。
在Mosquitto中,如果发现客户端连接超时,或收到客户端发送来的DISCONNECT 消息,服务器会关闭连接并从客户端列表中删除该客户端。这个过程并不会自动删除客户端所有的数据。如果需要删除客户端所有的数据,可以使用Mosquitto提供的on_disconnect
或on_client_disconnect
回调函数来执行相关操作,用户可以在这些回调函数中进行相关的清理操作。
在MQTTv5中有一项新特性,即服务器可以使用
mosquitto.conf
文件中的max_keepalive
参数,通过发送CONNACK
报文中的Max Keep Alive字段覆盖原来客户端设置的保持活动值keepalive
。如果服务器端将这个参数设置为0,意味着不会进行任何keepalive
检查,换句话说,即使收不到任何消息,客户端与服务器的连接也不会断开。
3)小结
Mosquitto客户端或服务器是否断开网络与心跳机制并无直接关系。心跳机制是为了当客户端与服务器出现长时间无需交换数据时,用于保持客户端与服务器之间的连接状态。
三、若干实现细节
1. 相关数据结构
1)每个客户端的状态和信息struct mosquitto
结构体
在 Mosquitto 2.0.15 版本中,这个结构体的定义在 lib/mosquitto_internal.h
文件中,当它用于客户端时,表示 Mosquitto 客户端的状态和相关信息,包括客户端 ID、连接参数、订阅信息、回调函数等。
当这个结构体用于服务器的时候同样表示单个 MQTT 客户端连接的状态和信息,但会用,记录所有连接的客户端信息,客户端 ID、连接参数、订阅信息、遗嘱消息等。
在 Mosquitto 服务器中,每个已连接的客户端都生成一个 struct mosquitto
结构体实例。服务器内部使用哈希表(Hash Table)组织和管理客户端数据。而这个哈希表则由struct mosquitto_db
结构体中的一个成员变量contexts_by_id
进行关联。
2)服务器管理客户端 struct mosquitto_db
结构体
struct mosquitto_db
是 mosquitto 服务器的数据结构,用于内部管理连接到服务器的客户端信息、订阅信息和已发布消息等许多重要状态信息。contexts_by_id
是 struct mosquitto_db
结构中的一个成员变量,它是一个哈希表,用于将客户端 ID(Client ID)与客户端上下文(Client Context)相关联。
struct mosquitto_db {
/* ... */
struct mosquitto *contexts_by_id; /* 客户端上下文哈希表(按客户端ID索引) */
/* ... */
};
具体来说,当客户端连接到 mosquitto 服务器时,客户端需要使用一个唯一的客户端 ID,用于标识该客户端。客户端 ID 可以用来查找客户端的相关信息,例如其订阅的主题、已发布的消息等。contexts_by_id
哈希表用于将客户端 ID 映射到相应的客户端上下文,以便更快速地访问该客户端的相关信息,或者将新的客户端上下文插入到哈希表中。
需要注意的是,在 MQTT 协议中,Client ID 可以由客户端指定,也可以由服务器自动生成,但需要保证唯一性。如果客户端在发送 CONNECT 消息时没有指定 Client ID,服务器会自动生成一个唯一的 Client ID。一般情况下,服务器会根据一定的规则生成 Client ID,例如可以使用时间戳、随机数、MAC 地址等信息生成唯一的标识符。如果客户端在发送 CONNECT 消息时指定了 Client ID,服务器将使用该 ID 来标识该客户端。在实际应用中,通常建议客户端使用一个固定的、唯一的 Client ID 来连接服务器。例如,可以使用设备的序列号、MAC 地址、IP 地址等信息作为 Client ID,以便在服务器端管理和维护客户端连接状态和消息订阅等信息。
3)服务器自身状态和信息 struct mosquitto__listener
结构体
struct mosquitto__listener
结构体用于 mosquitto 服务器本身的信息,如监听地址和端口等。mosquitto 服务器可以同时监听多个端口,每个监听器可以绑定一个特定的主机名或 IP 地址。在服务器运行期间,如果有新的连接请求到达监听器,服务器将使用 socks
数组中的文件描述符来处理这些连接。
4)服务器配置信息 struct mosquitto__config
结构体
struct mosquitto__config
是 mosquitto 服务器用于表示配置选项的结构体。该结构体定义在 mosquitto_broker_internal.h
头文件中。主要用于表示 mosquitto 服务器的配置选项,这些选项包括监听地址和端口、持久化存储、日志输出、身份验证等。在服务器启动时,服务器将使用 struct mosquitto__config
中的选项来配置服务器的行为。具体来说,服务器将从配置文件中读取配置选项并将其存储在 struct mosquitto__config
结构体中,然后使用这些选项来初始化服务器。
5)客户端配置信息 struct mosq_config
结构体
struct mosq_config
是 mosquitto 客户端使用的结构体,用于表示 MQTT 连接和消息传输的配置选项。其中包含了大量的配置选项,用于指定客户端连接 MQTT 代理服务器的方式、订阅主题、发布消息等行为,以及设置连接属性、发布消息的属性、订阅主题的属性等,这些选项可以通过命令行参数、配置文件或者代码中直接设置。用于保持客户端连接的是其中的keepalive
参数。
struct mosq_config {
/* ... */
int keepalive;
/* ... */
};
`keepalive` 参数是一个以秒为单位的时间间隔,用于指定客户端和服务器之间的最大允许空闲时间。
2. 主要用到的函数
客户端生命周期
Mosquitto 客户端的生命周期包括创建、连接、开启线程接收消息(订阅/发布、心跳)、断开连接和销毁等阶段。
1)创建客户端实例:mosquitto_new
mosquitto_new
函数是 Mosquitto C 语言客户端库的入口函数,用于创建一个新的 Mosquitto 客户端实例,是使用 Mosquitto 客户端库进行 MQTT 通信的第一步。
函数原型如下:
struct mosquitto *mosquitto_new(const char *id, bool clean_start, void *userdata);
参数解释如下:
-
id
:客户端的标识符,可以是任何字符串,如果为NULL
则表示由服务器自动生成一个唯一的标识符。 -
clean_start
:一个布尔值,表示客户端是否希望使用“清除会话”功能。如果为true
,则客户端会话结束后,服务器将清除客户端的订阅和 QoS 1 和 QoS 2 消息,否则服务器会将这些信息保留下来,以便在下一次客户端连接时恢复。建议在大多数情况下将其设置为true
。 -
userdata
:一个指向用户数据的指针,用于存储客户端特定的上下文信息,例如程序状态或回调函数的指针等。
函数返回一个struct mosquitto
类型的指针,表示创建的 Mosquitto 客户端实例。如果创建失败,返回NULL
。
在创建 Mosquitto 客户端实例后,需要调用client_connect
函数连接到 MQTT 代理服务器,并通过mosquitto_loop_start
函数启动客户端消息循环。之后可以通过mosquitto_subscribe
函数订阅主题、通过mosquitto_publish
函数发布消息等来实现 MQTT 客户端的功能。
2)连接MQTT服务器:client_connect
client_connect
函数是 Mosquitto 客户端代码库中用于连接到 MQTT 服务器的函数,其定义如下:
int client_connect(struct mosquitto *mosq, struct mosq_config *cfg)
该函数的参数 mosq
是一个 Mosquitto 客户端实例,而 cfg
则是一个 Mosquitto 客户端的配置参数结构体。
在函数内部,首先会检查 Mosquitto 客户端实例 mosq
和配置参数结构体 cfg
是否为空。然后,调用 mosquitto_loop_start
函数开启线程运行 Mosquitto 客户端消息循环,返回线程是否启动成功。同时检查客户端的认证信息、超时时间和遗嘱消息等参数是否设置正确等,连接成功后将相应参数发送到 MQTT 服务器。
为了 client_connect
函数能够正确连接到 MQTT 服务器,运行的程序需要正确设置服务器地址、端口号及其他连接参数,并确保服务器当前正处于正常工作状态。
3)创建线程开启客户端的消息循环:mosquitto_loop_start
mosquitto_loop_start
函数是 Mosquitto 客户端代码库中用于启动一个消息循环线程的函数,其定义如下:
int mosquitto_loop_start(struct mosquitto *mosq)
该函数的参数 mosq
是一个 Mosquitto 客户端实例。
在函数内部,首先会检查 Mosquitto 客户端实例 mosq
是否为空,然后调用 mosquitto__threaded_client
函数创建一个消息循环线程,并将其保存到客户端实例的内部结构体中。同时,为了确保线程的安全性和稳定性,还会设置一些线程属性和回调函数,并检查线程是否创建成功。如果线程创建成功,则返回 MOSQ_ERR_SUCCESS
;否则,返回相应的错误代码。
需要注意的是,在使用 mosquitto_loop_start
函数启动消息循环线程时,需要保证客户端实例已经正确初始化,并且在线程创建之后,需要及时处理客户端的事件和消息(订阅/发布、心跳),并在必要的情况下关闭线程和释放相关资源。
4)订阅消息:mosquitto_subscribe
mosquitto_subscribe
函数是 Mosquitto 客户端代码库中用于向 MQTT 3.1/3.1.1 服务器订阅一个主题的函数,其定义如下:
int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos)
该函数的参数 mosq
是一个 Mosquitto 客户端实例,mid
是一个整型指针,用于返回订阅消息的 ID,sub
是一个字符串,表示要订阅的主题,qos
表示订阅的 QoS 等级。
在函数内部,首先会检查 Mosquitto 客户端实例 mosq
是否为空,并检查客户端是否已经连接到 MQTT 3.1/3.1.1 服务器。然后,调用 mosquitto__mid_generate
函数生成订阅消息的 ID,并调用 mosquitto_send_subscribe_v3
函数向服务器发送订阅消息,并检查是否发送成功。如果成功,则将消息 ID 保存到参数 mid
中,并返回 MOSQ_ERR_SUCCESS
;否则,返回相应的错误代码。
需要注意的是,在使用 mosquitto_subscribe
函数订阅主题时,需要先确保客户端已经正确连接到服务器,并且在订阅主题之后,需要及时处理服务器返回的消息和事件,并在必要的情况下取消订阅主题和释放相关的资源和内存,以避免程序出现异常或资源泄露等问题。同时,为了提高程序的稳定性和安全性,也需要注意避免重复订阅相同的主题或订阅不存在的主题。
5)发布消息:mosquitto_publish
mosquitto_publish
函数是 Mosquitto 客户端代码库中用于向 MQTT 3.1/3.1.1 服务器发布一条消息的函数,其定义如下:
int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain)
该函数的参数 mosq
是一个 Mosquitto 客户端实例,mid
是一个整型指针,用于返回消息 ID,topic
是一个字符串,表示要发布消息的主题,payloadlen
表示消息内容的长度,payload
是一个指针,指向消息内容的缓冲区,qos
表示发布消息的 QoS 等级,retain
表示消息是否需要保留。
在函数内部,首先会检查 Mosquitto 客户端实例 mosq
是否为空,并检查客户端是否已经连接到 MQTT 3.1/3.1.1 服务器。然后,调用 mosquitto__mid_generate
函数生成消息 ID,并调用 mosquitto_send_publish_v3
函数向服务器发送消息,并检查是否发送成功。如果成功,则将消息 ID 保存到参数 mid
中,并返回 MOSQ_ERR_SUCCESS
;否则,返回相应的错误代码。
需要注意的是,在使用 mosquitto_publish
函数发布消息时,需要先确保客户端已经正确连接到服务器,并且在发送消息之后,需要及时处理服务器返回的消息和事件,并在必要的情况下取消发布消息和释放相关的资源和内存,以避免程序出现异常或资源泄露等问题。同时,为了提高程序的稳定性和安全性,也需要注意避免重复发送相同的消息或发送无效的消息。
6)心跳超时检测和处理:mosquitto_loop_misc
mosquitto_loop_misc
函数是 Mosquitto 客户端代码库中的一个循环处理函数,用于在后台线程中处理客户端的网络事件、心跳消息和超时事件等,其定义如下:
int mosquitto_loop_misc(struct mosquitto *mosq)
该函数的参数 mosq
是一个 Mosquitto 客户端实例。
在函数内部,首先会检查 Mosquitto 客户端实例 mosq
是否为空,并检查客户端是否已经连接到 MQTT 3.1/3.1.1 服务器。然后,调用 mosquitto__check_keepalive
函数检查客户端是否需要发送心跳消息,并检查客户端是否需要重新连接到服务器。接着,调用 mosquitto__packet_alloc
函数为客户端分配一个数据包,并调用 mosquitto__send_pingreq
函数向服务器发送心跳消息。最后,调用 mosquitto__check_pending_write
函数检查是否有待发送的消息,并检查是否需要将发送消息的缓冲区写入网络套接字中。
需要注意的是,在使用 mosquitto_loop_misc
函数处理客户端网络事件、心跳消息和超时事件时,需要先确保客户端已经正确连接到服务器,并且在处理事件之后,需要及时取消事件和释放相关的资源和内存,以避免程序出现异常或资源泄露等问题。同时,为了提高程序的稳定性和安全性,也需要注意避免重复发送心跳消息或发送无效的数据包。
7)断开连接:mosquitto_disconnect
mosquitto_disconnect
函数是 Mosquitto 客户端代码库中用于与 MQTT 3.1/3.1.1 服务器断开连接的函数,其定义如下:
int mosquitto_disconnect(struct mosquitto *mosq)
该函数的参数 mosq
是一个 Mosquitto 客户端实例。
在函数内部,首先会检查 Mosquitto 客户端实例 mosq
是否为空,并检查客户端是否已经连接到 MQTT 3.1/3.1.1 服务器。然后,调用 mosquitto__disconnect
函数断开客户端与服务器的连接,并检查是否断开成功。如果成功,则返回 MOSQ_ERR_SUCCESS
;否则,返回相应的错误代码。
需要注意的是,在使用 mosquitto_disconnect
函数与 MQTT 3.1/3.1.1 服务器断开连接时,需要先确保客户端已经正确连接到服务器,并且在断开连接之后,需要及时释放相关的资源和内存,以避免程序出现异常或资源泄露等问题。同时,为了保证客户端的稳定性和安全性,也需要注意避免在未断开连接的情况下重复调用该函数。
8)销毁客户端实例:mosquitto_destroy
mosquitto_destroy
函数是 Mosquitto 客户端代码库中用于销毁 Mosquitto 客户端实例的函数,其定义如下:
void mosquitto_destroy(struct mosquitto *mosq)
该函数的参数 mosq
是一个 Mosquitto 客户端实例。
在函数内部,首先会检查 Mosquitto 客户端实例 mosq
是否为空,并调用 mosquitto_disconnect
函数与 MQTT 3.1/3.1.1 服务器断开连接。然后,释放客户端的资源和内存,包括客户端实例、消息缓冲区、网络套接字等。
需要注意的是,在使用 mosquitto_destroy
函数销毁 Mosquitto 客户端实例时,需要先确保客户端已经正确连接到服务器,并且在销毁客户端之前,需要及时释放相关的资源和内存,以避免程序出现异常或资源泄露等问题。同时,为了提高程序的稳定性和安全性,也需要注意避免在未销毁客户端实例的情况下重复创建客户端实例或使用客户端实例的相关函数。
服务器对客户端的支撑
在 Mosquitto 客户端的生命周期中,服务器扮演了重要的角色,负责管理客户端连接状态、消息传输和路由等工作。在客户端的连接、订阅和消息发布等操作中,服务器会提供相应的响应和支持,为客户端提供消息传输和管理的服务。
具体来说,在客户端的生命周期中,服务器需要配合客户端进行如下工作:
-
连接:服务器需要接收来自客户端的 CONNECT 消息,对客户端进行身份验证和授权,并分配 Client ID、维护连接状态等。
-
订阅/发布:服务器需要接收客户端的 SUBSCRIBE 和 PUBLISH 消息,对订阅和发布进行管理和路由,并将消息传递给订阅方或发布到指定的主题。
-
断开连接:服务器需要处理客户端的 DISCONNECT 请求,关闭与客户端的连接,清除客户端的订阅和 QoS 1、QoS 2 消息等信息,并在需要时发送 Last Will and Testament 消息。
-
消息传输保证:服务器需要支持消息传输的 QoS 0、QoS 1 和 QoS 2 等级,并对消息传输进行保证,以确保消息传输的可靠性和正确性。文章来源:https://www.toymoban.com/news/detail-779667.html
因此,在 Mosquitto 客户端的生命周期中,服务器与客户端密切配合,为客户端提供必要的支持和服务,同时也维护和管理整个 MQTT 系统的稳定和安全运行。文章来源地址https://www.toymoban.com/news/detail-779667.html
到了这里,关于mosquitto心跳和网络重连机制(基于MQTT协议)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!