paho-mqtt实现多客户端订阅一个主题,并保证消息只被接收一次

这篇具有很好参考价值的文章主要介绍了paho-mqtt实现多客户端订阅一个主题,并保证消息只被接收一次。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

项目需求:原本做的项目是单进程单线程模式订阅mqtt,发现在消息回调处理消息时耗时较久,我们业务对消息处理是一次性的,只要求处理一次,所以需要提升并发处理能力。看了网上建议改为多线程模式,然而本人实践过程,采用多进程or多线程模式方式运行,发现并没达到预期效果。下面时本人的一下实践记录,仅供参考学习。

环境:python3.7

本地mqtt服务使用的emqx

操作工具用的MQTTX客户端

 1、下面是mqtt多线程模式运行代码实现,只实现消息订阅端。

import random, string
from paho.mqtt.client import Client
from threading import Thread

broker = '192.168.8.205'
port = 1883
topic = "python-mqtt"


def connect_mqtt():
    def on_connect(_, __, ___, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client_id = f"test-client_{''.join(random.choice(string.ascii_lowercase) for _ in range(4))}"
    client = Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def subscribe(client):
    def on_message(_, __, mesgage):
        print(f"Received `{mesgage.payload.decode()}` from `{mesgage.topic}` topic\n")
    client.subscribe(topic)
    client.on_message = on_message


def main():
    c = connect_mqtt()
    subscribe(c)
    c.loop_forever()
    # c.loop_start()


if __name__ == '__main__':

    lt = []
    for i in range(10):
        t = Thread(target=main, args=(), name=f'thread-{i}')
        lt.append(t)
    for t in lt:
        t.start()
        print(t.name)
    for t in lt:
        t.join()

 2、MQTTX连接发送消息:

paho-mqtt实现多客户端订阅一个主题,并保证消息只被接收一次

 3、运行效果,发现发布一条消息可以被接收10次(订阅客户端10个,分别被处理了10次),而我的需求是想要发布一条消息,被这个10个客户端之一消费掉,且只处理一次。

代码参考前文,修改如下

paho-mqtt实现多客户端订阅一个主题,并保证消息只被接收一次

paho-mqtt实现多客户端订阅一个主题,并保证消息只被接收一次

 4、后来又换个思路,尝试了下还是10个线程,客户端唯一(client_id唯一),这种模式由于mqtt协议要求客户端唯一,导致10个线程并发启动,出现抢占式连接mqqt服务,出现不停的断开连接,重新连接。这种模式下运行客户端实际也只有一个,订阅处理等能力同于一个线程模式下的客户端方式,也无法达到预期。

paho-mqtt实现多客户端订阅一个主题,并保证消息只被接收一次

-----------------------------------------------分割线------------------------------- 

后面无意间找到一篇文章,了解到mqtt服务有一种叫共享订阅模式。以emqx为例,emqx支持两种格式的共享订阅前缀:$share/topic 和$queue/topic,然后通过修改emqx服务的配置etc/emqx.conf

如图:paho-mqtt实现多客户端订阅一个主题,并保证消息只被接收一次

消息发布时,topic配置不变,订阅时,沿用$share/topic 和$queue/topic即可。

代码参考前文修改如下:

paho-mqtt实现多客户端订阅一个主题,并保证消息只被接收一次

 运行效果:

paho-mqtt实现多客户端订阅一个主题,并保证消息只被接收一次

 可以看到程序运行只收到一条消息了paho-mqtt实现多客户端订阅一个主题,并保证消息只被接收一次

 参考文章:

   1.(mqtt集群订阅如何只消费一个(一次)消息? - 程序新视界)

   2.(paho-mqtt 实现通信_nuc_baixu的博客-CSDN博客_paho mqtt)文章来源地址https://www.toymoban.com/news/detail-432557.html

到了这里,关于paho-mqtt实现多客户端订阅一个主题,并保证消息只被接收一次的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连

    简介: 之前介绍了RabbitMQ以及如何在SpringBoot项目中整合使用RabbitMQ,看过的朋友都说写的比较详细,希望再总结一下目前比较流行的MQTT。所以接下来,就来介绍什么MQTT?它在IoT中有着怎样的作用?如何在项目中使用MQTT? 之前介绍了RabbitMQ以及如何在SpringBoot项目中整合使用

    2024年02月05日
    浏览(42)
  • MQTT记录(概述,docker部署,基于spring-integration-mqtt实现消息订阅与发布,客户端工具测试)

    需要spring-boot集成spring-integration-mqtt代码的直接跳到第5部分 1.1 MQTT是什么呢? message queue telemetry translation 是一种基于发布与订阅的轻量级消息传输协议.适用于低带宽或网络不稳定的物联网应用.开发者可以使用极少的代码来实现物联网设备之间的消息传输.mqtt协议广泛应用于物

    2024年02月12日
    浏览(46)
  • MQTT 客户端出现连接订阅等问题时如何排查?

    大家好,这是一期社区专题 FAQ。我们整理了近期社区中关注度较高的问题,在这里进行统一汇总解答。 今后本系列内容将不定期推送,敬请关注。 同时,如果大家在使用 EMQX 的过程中遇到问题,欢迎通过以下方式进行解决: 查阅 EMQX 产品文档与博客文章。 如果在现有资料

    2023年04月20日
    浏览(38)
  • mqtt服务器搭建与qt下的mqtt客户端实现

      MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(Io

    2024年02月06日
    浏览(89)
  • Android 实现MQTT客户端,用于门禁消息推送

    添加MQTT依赖 implementation ‘org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.2’ implementation ‘org.eclipse.paho:org.eclipse.paho.android.service:1.1.1’ 在Manifest清单文件中添加服务 MqttClient的实现方式 MQTT初始化连接线程,实现与服务器的连接、订阅、发布消息 MQTT重连 MQTT断开 发送消息 MqttAndroid

    2024年02月14日
    浏览(50)
  • SpringBoot集成Milo库实现OPC UA客户端:连接、遍历节点、读取、写入、订阅与批量订阅

    前面我们搭建了一个本地的 PLC 仿真环境,并通过 KEPServerEX6 读取 PLC 上的数据,最后还使用 UAExpert 作为OPC客户端完成从 KEPServerEX6 这个OPC服务器的数据读取与订阅功能。在这篇文章中,我们将通过 SpringBoot 集成 Milo 库实现一个 OPC UA 客户端,包括连接、遍历节点、读取、写入

    2024年02月09日
    浏览(60)
  • mqtt安卓客户端

    1.MQTT(消息队列遥测传输协议),是一种基于 发布/订阅 (publish/subscribe)模式的\\\"轻量级\\\"通讯协议, 该协议构建于TCP/IP协议上 。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使

    2024年02月10日
    浏览(44)
  • 【MQTT】MQTT简介+安装+使用 python MQTT客户端

    目录 前言 MQTT 协议简介 为何选择 MQTT MQTT 通讯运作方式 MQTT 协议帧格式 MQTT服务器搭建和使用  公共MQTT 测试服务器 MQTT服务器搭建 各种MQTT代理服务程序比较 Mosquitto安装 MQTT使用方法 测试MQTT服务器 程序中使用MQTT 本文随时更新,转载请注明出处,源地址:http://t.csdn.cn/kCC0B 文

    2024年02月01日
    浏览(48)
  • MQTT 客户端 MQTT.fx 使用说明

    官网:https://softblade.de/en/download-2/ 说明:最后的免费版本是 MQTT.fx 1.7.1,官网已经没有免费的版本 下载 MQTT.fx 1.7.1 https://nowjava.com/download/44364 【需关注其公众号才能下载】 一路 Next 即可 安装好后,直接启动MQTT.fx 点击第 1 步中界面设置按键(齿轮图标)打开新窗口创建一个

    2024年02月03日
    浏览(42)
  • 通过Milo实现的OPC UA客户端连接并订阅Prosys OPC UA Simulation Server模拟服务器

    前面我们搭建了一个本地的 PLC 仿真环境,并通过 KEPServerEX6 读取 PLC 上的数据,最后还使用 UAExpert 作为 OPC 客户端完成从 KEPServerEX6 这个OPC服务器的数据读取与订阅功能:SpringBoot集成Milo库实现OPC UA客户端:连接、遍历节点、读取、写入、订阅与批量订阅。 注意,如果实际工

    2024年02月16日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包