mica-mqtt 低延迟、高性能的 mqtt 物联网组件

这篇具有很好参考价值的文章主要介绍了mica-mqtt 低延迟、高性能的 mqtt 物联网组件。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

mica-mqtt 低延迟、高性能的 mqtt 物联网组件,Java,物联网,java,中间件

一、简介

mica-mqtt 是基于 java aio 实现,简单易用、低延迟、高性能百万级 mqtt client 物联网开源组件和 mqtt broker 服务,更加易于集成到已有服务和二次开发,降低自研物联网平台开发成本。

二、使用场景

  • 物联网(云端 mqtt broker)

  • 物联网(边缘端消息通信)

  • 群组类 IM

  • 消息推送

  • 简单易用的 mqtt 客户端

三、优势

  • 平凡却不单调,简单却不失精彩。

  • 手动档(更加易于二次开发或扩展)。

  • 牛犊初生,无限可能。

四、功能

  • 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。

  • 支持 websocket mqtt 子协议(支持 mqtt.js)。

  • 支持 http rest api,http api 文档详见[1]。

  • 支持 MQTT client 客户端。

  • 支持 MQTT server 服务端。

  • 支持 MQTT client、server 共享订阅支持(捐助VIP版采用 topic 树存储,跟 topic 数无关,百万 topic 性能依旧)。

  • 支持 MQTT 遗嘱消息。

  • 支持 MQTT 保留消息。

  • 支持自定义消息(mq)处理转发实现集群。

  • MQTT 客户端 阿里云 mqtt 连接 demo。

  • 支持 GraalVM 编译成本机可执行程序。

  • 支持 Spring boot 项目快速接入(mica-mqtt-spring-boot-starter)。

  • mica-mqtt-spring-boot-starter 支持对接 Prometheus + Grafana。

  • 基于 redis pub/sub 实现集群,详见 mica-mqtt-broker 模块[2]。

五、依赖

Spring boot 项目

1. 客户端

<dependency>
  <groupId>net.dreamlu</groupId>
  <artifactId>mica-mqtt-client-spring-boot-starter</artifactId>
  <version>${mica-mqtt.version}</version>
</dependency>
1.1 客户端配置项示例
mqtt:
  client:
    enabled: true               # 是否开启客户端,默认:true
    ip: 127.0.0.1               # 连接的服务端 ip ,默认:127.0.0.1
    port: 1883                  # 端口:默认:1883
    name: Mica-Mqtt-Client      # 名称,默认:Mica-Mqtt-Client
    clientId: 000001            # 客户端Id(非常重要,一般为设备 sn,不可重复)
    user-name: mica             # 认证的用户名
    password: 123456            # 认证的密码
    timeout: 5                  # 超时时间,单位:秒,默认:5秒
    reconnect: true             # 是否重连,默认:true
    re-interval: 5000           # 重连时间,默认 5000 毫秒
    version: mqtt_3_1_1         # mqtt 协议版本,可选 MQTT_3_1、mqtt_3_1_1、mqtt_5,默认:mqtt_3_1_1
    read-buffer-size: 8KB       # 接收数据的 buffer size,默认:8k
    max-bytes-in-message: 10MB  # 消息解析最大 bytes 长度,默认:10M
    buffer-allocator: heap      # 堆内存和堆外内存,默认:堆内存
    keep-alive-secs: 60         # keep-alive 时间,单位:秒
    clean-session: true         # mqtt clean session,默认:true
    ssl:
      enabled: false            # 是否开启 ssl 认证,2.1.0 开始支持双向认证
      keystore-path:            # 可选参数:ssl 双向认证 keystore 目录,支持 classpath:/ 路径。
      keystore-pass:            # 可选参数:ssl 双向认证 keystore 密码
      truststore-path:          # 可选参数:ssl 双向认证 truststore 目录,支持 classpath:/ 路径。
      truststore-pass:          # 可选参数:ssl 双向认证 truststore 密码

注意:ssl 存在三种情况

服务端开启ssl 客户端
ClientAuth 为 NONE(不需要客户端验证) 仅仅需要开启 ssl 即可不用配置证书
ClientAuth 为 OPTIONAL(与客户端协商) 需开启 ssl 并且配置 truststore 证书
ClientAuth 为 REQUIRE (必须的客户端验证) 需开启 ssl 并且配置 truststore、 keystore证书
1.2 可实现接口(注册成 Spring Bean 即可)
接口 是否必须 说明
IMqttClientConnectListener 客户端连接成功监听
1.3 客户端上下线监听

使用 Spring event 解耦客户端上下线监听,注意: 1.3.4 开始支持。会跟自定义的 IMqttClientConnectListener 实现冲突,取一即可。


/**
 * 示例:客户端连接状态监听
 *
 * @author L.cm
 */
@Service
public class MqttClientConnectListener {
    private static final Logger logger = LoggerFactory.getLogger(MqttClientConnectListener.class);

    @Autowired
    private MqttClientCreator mqttClientCreator;

    @EventListener
    public void onConnected(MqttConnectedEvent event) {
        logger.info("MqttConnectedEvent:{}", event);
    }

    @EventListener
    public void onDisconnect(MqttDisconnectEvent event) {
        // 离线时更新重连时的密码,适用于类似阿里云 mqtt clientId 连接带时间戳的方式 
        logger.info("MqttDisconnectEvent:{}", event);
        // 在断线时更新 clientId、username、password
        mqttClientCreator.clientId("newClient" + System.currentTimeMillis())
            .username("newUserName")
            .password("newPassword");
    }

}
1.4 客户端自定义 java 配置(可选)

@Configuration(proxyBeanMethods = false)
public class MqttClientCustomizerConfiguration {

  @Bean
  public MqttClientCustomizer mqttClientCustomizer() {
    return new MqttClientCustomizer() {
      @Override
      public void customize(MqttClientCreator creator) {
        // 此处可自定义配置 creator,会覆盖 yml 中的配置
        System.out.println("----------------MqttServerCustomizer-----------------");
      }
    };
  }

}
1.5 客户端订阅示例

@Service
public class MqttClientSubscribeListener {
  private static final Logger logger = LoggerFactory.getLogger(MqttClientSubscribeListener.class);

  @MqttClientSubscribe("/test/#")
  public void subQos0(String topic, byte[] payload) {
    logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
  }

  @MqttClientSubscribe(value = "/qos1/#", qos = MqttQoS.AT_LEAST_ONCE)
  public void subQos1(String topic, byte[] payload) {
    logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
  }

  @MqttClientSubscribe("/sys/${productKey}/${deviceName}/thing/sub/register")
  public void thingSubRegister(String topic, byte[] payload) {
    // 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 +
    // 注意:mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。
    logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
  }

}
1.6 共享订阅 topic 说明

mica-mqtt client 支持两种共享订阅方式:

  1. 共享订阅:订阅前缀 $queue/,多个客户端订阅了 $queue/topic,发布者发布到topic,则只有一个客户端会接收到消息。

  2. 分组订阅:订阅前缀 $share/<group>/,组客户端订阅了$share/group1/topic$share/group2/topic..,发布者发布到topic,则消息会发布到每个group中,但是每个group中只有一个客户端会接收到消息。

1.7 MqttClientTemplate 使用示例
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
 * @author wsq
 */
@Service
public class MainService {
    private static final Logger logger = LoggerFactory.getLogger(MainService.class);
    @Autowired
    private MqttClientTemplate client;

    public boolean publish() {
        client.publish("/test/client", "mica最牛皮".getBytes(StandardCharsets.UTF_8));
        return true;
    }

    public boolean sub() {
        client.subQos0("/test/#", (context, topic, message, payload) -> {
            logger.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
        });
        return true;
    }

}

2 服务端
<dependency>
  <groupId>net.dreamlu</groupId>
  <artifactId>mica-mqtt-server</artifactId>
  <version>${mica-mqtt.version}</version>
</dependency>
2.1 服务端使用

// 注意:为了能接受更多链接(降低内存),请添加 jvm 参数 -Xss129k
MqttServer mqttServer = MqttServer.create()
    // 服务端 ip 默认为空,0.0.0.0,建议不要设置
    .ip("0.0.0.0")
    // 默认:1883
    .port(1883)
    // 默认为:8092(mqtt 默认最大消息大小),为了降低内存可以减小小此参数,如果消息过大 t-io 会尝试解析多次(建议根据实际业务情况而定)
    .readBufferSize(512)
    // 最大包体长度,如果包体过大需要设置此参数,默认为:8092
    .maxBytesInMessage(1024 * 100)
    // 自定义认证
    .authHandler((clientId, userName, password) -> true)
    // 消息监听
    .messageListener((context, clientId, message) -> {
        logger.info("clientId:{} message:{} payload:{}", clientId, message, new String(message.getPayload(), StandardCharsets.UTF_8));
    })
    // 堆内存和堆外内存选择,默认:堆内存
    .bufferAllocator(ByteBufferAllocator.HEAP)
    // 心跳超时时间,默认:120s
    .heartbeatTimeout(120_1000L)
    // ssl 配置
    .useSsl("", "", "")
    // 自定义客户端上下线监听
    .connectStatusListener(new IMqttConnectStatusListener() {
        @Override
        public void online(String clientId) {

        }

        @Override
        public void offline(String clientId) {

        }
    })
    // 自定义消息转发,可用 mq 广播实现集群化处理
    .messageDispatcher(new IMqttMessageDispatcher() {
        @Override
        public void config(MqttServer mqttServer) {

        }

        @Override
        public boolean send(Message message) {
            return false;
        }

        @Override
        public boolean send(String clientId, Message message) {
            return false;
        }
    })
    .debug() // 开启 debug 信息日志
    .start();

// 发送给某个客户端
mqttServer.publish("clientId","/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8));

// 发送给所有在线监听这个 topic 的客户端
mqttServer.publishAll("/test/123", "mica最牛皮".getBytes(StandardCharsets.UTF_8));

// 停止服务
mqttServer.stop();

六、默认端口

端口号 协议 说明
1883 tcp mqtt tcp 端口
8083 http、websocket http api 和 websocket mqtt 子协议端口

mica-mqtt 低延迟、高性能的 mqtt 物联网组件,Java,物联网,java,中间件文章来源地址https://www.toymoban.com/news/detail-720667.html

到了这里,关于mica-mqtt 低延迟、高性能的 mqtt 物联网组件的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 高性能、高扩展、高稳定:解读 EasyMR 大数据组件自定义可扩展能力

    随着互联网技术的不断发展以及大数据时代的兴起,企业对于数据分析和洞察的需求日益增长。大多数企业都积累了大量的数据,需要从这些数据中快速灵活地提取有价值的信息,以便为用户提供更好的服务或者帮助企业做出更明智的决策。 然而在不同的数据场景中,企业往

    2024年02月16日
    浏览(35)
  • 轻松掌握组件启动之Redis集群扩展秘籍:轻松扩容与缩容,释放高性能潜能

    在我们原始的集群基础上,我们决定增加一台主节点(8007)和一台从节点(8008),这样新增的节点将会在下图中以虚线框的形式显示在集群中。 1: 首先,在 /usr/local/redis-cluster 目录下创建两个文件夹,分别命名为 8007 和 8008。接下来,将 8001 文件夹下的 redis.conf 文件复制到 8007 和

    2024年02月08日
    浏览(57)
  • 结合 tensorflow.js 、opencv.js 与 Ant Design 创建美观且高性能的人脸动捕组件并发布到InsCode

    如何在前端项目中使用opencv.js | opencv.js入门 如何使用tensorflow.js实现面部特征点检测 tensorflow.js 如何从 public 路径加载人脸特征点检测模型 tensorflow.js 如何使用opencv.js通过面部特征点估算脸部姿态并绘制示意图 tensorflow.js 使用 opencv.js 将人脸特征点网格绘制与姿态估计线绘制结

    2024年04月17日
    浏览(33)
  • 《高性能MySQL》——创建高性能的索引(笔记)

    索引(在MySQL中也叫做“键(key)”) 是存储引擎用于快速找到记录的一种数据结构。 索引对于良好的性能非常关键。尤其是当表中的数据量越来越大时,索引对性能的影响愈发重要。 在数据量较小且负载较低时,不恰当的索引对性能的影响可能还不明显,但当数据量逐渐增大时

    2024年02月07日
    浏览(115)
  • 【Linux高性能服务器编程】——高性能服务器框架

      hello !大家好呀! 欢迎大家来到我的Linux高性能服务器编程系列之高性能服务器框架介绍,在这篇文章中, 你将会学习到高效的创建自己的高性能服务器,并且我会给出源码进行剖析,以及手绘UML图来帮助大家来理解,希望能让大家更能了解网络编程技术!!! 希望这篇

    2024年04月25日
    浏览(62)
  • 读高性能MySQL(第4版)笔记08_创建高性能索引(上)

    2.4.2.1. 按照索引列中的数据大小顺序存储的 2.4.3.1. 键前缀查找只适用于根据最左前缀的查找 2.4.4.1. 在查询某些条件的数据时,存储引擎不再需要进行全表扫描 2.4.4.2. 通过比较节点页的值和要查找的值可以找到合适的指针进入下层子节点,这些指针实际上定义了子节点页中

    2024年02月08日
    浏览(51)
  • 读高性能MySQL(第4版)笔记09_创建高性能索引(下)

    1.4.4.1. InnoDB的二级索引在叶子节点中保存了记录的主键值,所以如果二级索引能够覆盖查询,则可以避免对主键索引的二次查询 7.1.5.1. 常见的类似错误通常是由于尝试使用rsync备份InnoDB导致的 7.3.3.1. 否则,对于范围查询、索引覆盖扫描等操作来说,速度可能会降低很多 7

    2024年02月08日
    浏览(63)
  • 《高性能MYSQL》-- 查询性能优化

    查询性能优化 深刻地理解MySQL如何真正地执行查询,并明白高效和低效的原因何在 查询的生命周期(不完整):从客户端到服务器,然后服务器上进行语法解析,生成执行计划,执行,并给客户端返回结果。 一条查询,如果查询得很慢,原因大概率是访问的数据太多 对于低

    2024年03月11日
    浏览(74)
  • 高性能MySQL实战(三):性能优化

    大家好,我是 方圆 。这篇主要介绍对慢 SQL 优化的一些手段,而在讲解具体的优化措施之前,我想先对 EXPLAIN 进行介绍,它是我们在分析查询时必要的操作,理解了它输出结果的内容更有利于我们优化 SQL。为了方便大家的阅读,在下文中规定类似 key1 的表示二级索引,key_

    2024年02月11日
    浏览(73)
  • 《高性能MySQL》——查询性能优化(笔记)

    将查询看作一个任务,那么它由一系列子任务组成,实际我们所做的就是: 消除一些子任务 减少子任务的执行次数 让子任务运行更快 查询的生命周期大概可分为 = { 客户端 服务器 : 进行解析 , 生成执行计划 执行:包括到存储引擎的调用,以及用后的数据处理 { 排序 分组

    2024年02月13日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包