需要spring-boot集成spring-integration-mqtt代码的直接跳到第5部分
1.MQTT介绍
1.1 MQTT是什么呢?
message queue telemetry translation
是一种基于发布与订阅的轻量级消息传输协议.适用于低带宽或网络不稳定的物联网应用.开发者可以使用极少的代码来实现物联网设备之间的消息传输.mqtt协议广泛应用于物联网,移动互联网,智能硬件,车联网,远程医疗,电力石油等领域
1.2 mqtt必须具备一下几点优势:
简单易实现
消息传递可靠,支持QoS
轻量省带宽
数据无关性,不关心数据格式
心跳模式(时刻感知客户端状态)
1.3 MQTT与HTTP协议的区别
mqtt最小报文仅为2字节,比http占用更少的网络开销
mqtt基于发布订阅模型,http基于请求相应.mqtt支持双工通信,http不支持
mqtt是有状态的,http是无状态的
mqtt具有断开重连机制,http不支持
1.4 可靠的消息传递(QoS)
MQTT 协议提供了 3 种消息服务质量等级(Quality of Service),保证了在不同的网络环境下消息传递的可靠性
QoS 0:消息最多传递一次
如果当时客户端不可用,则会丢失该消息。发布者发送一条消息之后,就不再关心它有没有发送到对方,也不设置任何重发机制
QoS 1:消息传递至少 1 次
包含了简单的重发机制,发布者发送消息之后等待接收者的 ACK,如果没收到 ACK 则重新发送消息。这种模式能保证消息至少能到达一次,但无法保证消息重复
QoS 2:消息仅传送一次
设计了重发和重复消息发现机制,保证消息到达对方并且严格只到达一次
2.docker部署一个emqx容器
安装运行emqx镜像
docker run -itd --restart=always --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:latest
安装完成之后,我们相当于有了一个mqtt的服务器
看一下运行日志:docker logs -f emqx
[root@VM-4-3-centos dockerfile]# docker logs -f emqx
WARNING: Default (insecure) Erlang cookie is in use.
WARNING: Configure node.cookie in /opt/emqx/etc/emqx.conf or override from environment variable EMQX_NODE__COOKIE
WARNING: Use the same config value for all nodes in the cluster.
EMQX_RPC__PORT_DISCOVERY [rpc.port_discovery]: manual
EMQX_LOG__FILE_HANDLERS__DEFAULT__ENABLE [log.file_handlers.default.enable]: false
EMQX_LOG__CONSOLE_HANDLER__ENABLE [log.console_handler.enable]: true
EMQX_NODE__NAME [node.name]: emqx@172.17.0.7
Listener ssl:default on 0.0.0.0:8883 started.
Listener tcp:default on 0.0.0.0:1883 started.
Listener ws:default on 0.0.0.0:8083 started.
Listener wss:default on 0.0.0.0:8084 started.
Listener http:dashboard on :18083 started.
EMQX 5.0.16 is running now!
此时mqtt安装完毕,上述的四个端口只要防火墙允许通过,我们就可以用mqtt官方提供的客户端MQTTX进行连接了
查询启动的容器 docker ps
删除容器 docker rm -f 容器名
3.客户端工具MQTTBox和MQTTX安装(2选1)
3.1 MQTTBox安装
安装地址:http://workswithweb.com/mqttbox.html
如果无法访问,可以百度云获取
链接:https://pan.baidu.com/s/1HKd7qfHmezBwY6DXif9E1g
提取码:sei6
解压后找到MQTTBox.exe运行即可
客户端连接
3.2 MQTTX安装
安装地址:https://mqttx.app/zh
客户端连接
4.发布订阅测试(MQTTBOX)
mqttbox的好处是发布订阅在一个页面,便于观察,发布的topic是emqx/123/test,订阅的时候用emqx/+/test订阅,+是通配符
5.spring-boot项目基于spring-integration-mqtt实现
代码基于spring-integration-mqtt实现.spring-integration-mqtt内依赖了org.eclipse.paho.client.mqttv3(这个是Paho Java原生库)
下面的内容足以应对开发需求,实现了并发处理消息的能力,提供了回复消息的思路(往指定topic发送消息,让另一个客户端进行接收)
5.1 application.yml
mqtt:
protocol: tcp
host: yourHost
port: 1883
username: admin
password: admin
client-id: admin
inbound-topic: emqx/+/test,emqx/+/test_reply,emqx/young/+
keep-alive-interval: 60
connection-timeout: 120
5.2 pom.xml
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.4.11</version>
</parent>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
//核心依赖就这个
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.5</version>
</dependency>
</dependencies>
5.3 代码实现
大致的逻辑
1.通过协议ip端口等参数进行mqtt连接
2.定义一个订阅者客户端进行数据统一接入
3.定义一个topic路由器进行消息分发,每一个topic对应一个channel
4.接收消息
5.定义一个发布者客户端进行消息发布
6.topic订阅与取消订阅
5.3.1 配置类(5个)
MqttConnectConfiguration:mqtt连接配置
MqttInboundConfiguration:订阅消息的客户端
MqttOutboundConfiguration:发布消息的客户端
MqttMessageChannel:消息通道
ThreadPoolOfInbound:统一入栈的线程池,处理并发消息
/**
* @Description: mqtt连接配置
* @Author: young
* @Date: 2023/2/15 16:26
*/
@Configuration
@Data
@ConfigurationProperties(prefix = "mqtt")
public class MqttConnectConfiguration {
//协议
private String protocol;
//ip
private String host;
//端口
private Integer port;
private String username;
private String password;
//客户端id
private String clientId;
//客户端连接时需要自动订阅的topic,多个用逗号分割
private String inboundTopic;
//心跳时间:默认60s,如果该时间内客户端没有收到消息,客户端ping一次服务端,判断服务端是否宕机
private Integer keepAliveInterval;
//定义了客户端等待建立到MQTT服务器的网络连接的最大时间间隔,默认超时为30秒
private Integer connectionTimeout;
@Bean
//连接对象
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
serviceUrl = new StringBuilder()
.append(protocol.trim())
.append("://")
.append(host.trim())
.append(":")
.append(port)
.toString();
mqttConnectOptions.setServerURIs(new String[]{serviceUrl});
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setKeepAliveInterval(keepAliveInterval);
mqttConnectOptions.setConnectionTimeout(connectionTimeout);
//重连不清除session
mqttConnectOptions.setCleanSession(false);
return mqttConnectOptions;
}
@Bean
//客户端工厂
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
//建立连接
factory.setConnectionOptions(mqttConnectOptions());
return factory;
}
//tcp://127.0.0.1:1883
private String serviceUrl;
}
/**
* @Description: 订阅消息的客户端
* @Author: young
* @Date: 2023/2/15 16:51
*/
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {
@Resource
private MqttConnectConfiguration mqttConnectConfiguration;
@Resource
private MqttPahoClientFactory mqttClientFactory;
@Resource(name = ChannelName.INBOUND)
private MessageChannel inboundChannel;
/**
* 消费者订阅消息
*/
@Bean(name = "adapter")
public MessageProducerSupport mqttInbound() {
//消息适配器
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
mqttConnectConfiguration.getClientId() + "_consumer_" + System.currentTimeMillis(),
mqttClientFactory, mqttConnectConfiguration.getInboundTopic().split(","));
//消息转换器
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
//统一字节传输
converter.setPayloadAsBytes(true);
adapter.setConverter(converter);
//只接收一次
adapter.setQos(2);
//入栈的消息统一交给inbound通道处理
adapter.setOutputChannel(inboundChannel);
return adapter;
}
/**
* 非法主题数据进入
*/
@Bean
@ServiceActivator(inputChannel = ChannelName.DEFAULT)
public MessageHandler defaultInboundHandler() {
return message -> {
log.info("默认通道接收到数据但无法处理,topic:{},payload:{}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), new String((byte[]) message.getPayload()));
};
}
}
/**
* @Description: 发布消息的客户端
* @Author: young
* @Date: 2023/2/15 17:00
*/
@Configuration
public class MqttOutboundConfiguration {
@Resource
private MqttConnectConfiguration mqttConnectConfiguration;
@Resource
private MqttPahoClientFactory mqttClientFactory;
/**
* 生产者发布消息
*/
@Bean
@ServiceActivator(inputChannel = ChannelName.OUTBOUND)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
mqttConnectConfiguration.getClientId() + "_producer_" + System.currentTimeMillis(),
mqttClientFactory);
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
messageHandler.setAsync(true);
//只发送一次,不关心是客户端(订阅者)是否接收到
messageHandler.setDefaultQos(0);
messageHandler.setConverter(converter);
return messageHandler;
}
}
/**
* @Description: 消息的通道,通过ExecutorChannel引入线程池,可以并发接收数据
* @Author: young
* @Date: 2023/2/15 16:35
*/
@Configuration
public class MqttMessageChannel {
// @Bean(name = ChannelName.INBOUND)
// public MessageChannel inboundChannel() {
// return new DirectChannel();
// }
@Resource(name = "inboundThreadPool")
private Executor inboundThreadPool;
//通过ExecutorChannel引入线程池,可以并发接收数据
@Bean(name = ChannelName.INBOUND)
public MessageChannel inboundChannel() {
return new ExecutorChannel(inboundThreadPool);
}
@Bean(name = ChannelName.DEFAULT)
public MessageChannel defaultChannel() {
return new DirectChannel();
}
@Bean(name = ChannelName.OUTBOUND)
public MessageChannel outboundChannel() {
return new DirectChannel();
}
//每一个指定的通道(或者topic)都可以指定一个线程池来并发处理接收的消息,这里只在inbound中做并发处理
@Bean(name = ChannelName.TEST)
public MessageChannel testChannel() {
return new DirectChannel();
}
@Bean(name = ChannelName.TEST_REPLY)
public MessageChannel testReplyChannel() {
return new DirectChannel();
}
@Bean(name = ChannelName.YOUNG)
public MessageChannel youngChannel() {
return new DirectChannel();
}
}
/**
* @Description: 统一入栈的线程池
* @Author: young
* @Date: 2023/2/17 16:30
*/
@Component
public class ThreadPoolOfInbound {
@Value("${thread.pool.core-pool-size: 20}")
private int corePoolSize;
@Value("${thread.pool.maximum-pool-size: 40}")
private int maximumPoolSize;
@Value("${thread.pool.keep-alive-time: 120}")
private long keepAliveTime;
@Value("${thread.pool.queue.capacity: 2000}")
private int capacity;
@Bean("inboundThreadPool")
public Executor inboundThreadPool() {
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(capacity),
//线程名前缀
new CustomizableThreadFactory("inboundThreadPool-"),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
}
}
5.3.2 topic声明
TopicConstant :topic常量类,可自定义正则
TopicEnum:topic枚举,关键的一个类,根据topic正则匹配到对应的channel中
/**
* @Description: topic 常量
* @Author: young
* @Date: 2023/2/16 11:02
*/
public class TopicConstant {
public static final String EMQX = "emqx/";
public static final String YOUNG = "young/";
public static final String TEST = "/test";
public static final String REPLY = "_reply";
public static final String REGEX = "[A-Za-z0-9]+";
}
/**
* @Description: 主题枚举
* @Author: young
* @Date: 2023/2/15 17:13
*/
@Getter
public enum TopicEnum {
//^限制开始位,$限制结束位
TEST(Pattern.compile("^" + TopicConstant.EMQX + TopicConstant.REGEX + TopicConstant.TEST + "$"), ChannelName.TEST),
TEST_REPLY(Pattern.compile("^" + TopicConstant.EMQX + TopicConstant.REGEX + TopicConstant.TEST + TopicConstant.REPLY + "$"), ChannelName.TEST_REPLY),
YOUNG(Pattern.compile("^" + TopicConstant.EMQX + TopicConstant.YOUNG + TopicConstant.REGEX + "$"), ChannelName.YOUNG),
//默认通道
UNKNOWN(Pattern.compile("^.*$"), ChannelName.DEFAULT);
//主题匹配规则
Pattern pattern;
//通道名称
String channelName;
TopicEnum(Pattern pattern, String channelName) {
this.pattern = pattern;
this.channelName = channelName;
}
public static TopicEnum find(String topic) {
//如果无法匹配topic,则分发到默认通道
return Arrays.stream(TopicEnum.values()).filter(topicEnum -> topicEnum.pattern.matcher(topic).matches()).findAny().orElse(UNKNOWN);
}
}
5.3.3 channel与router声明
ChannelName:通道名称
InboundMessageRouter:从INBOUND中获取topic,路由到指定的channel
/**
* @Description: 消息通道
* @Author: young
* @Date: 2023/2/15 16:32
*/
public class ChannelName {
public static final String INBOUND = "inbound";
public static final String DEFAULT = "default";
public static final String OUTBOUND = "outbound";
public static final String TEST = "test";
public static final String TEST_REPLY = "testReply";
public static final String YOUNG = "young";
/**
* @Description: 入站消息路由
* @Author: young
* @Date: 2023/2/15 17:09
*/
@Component
@Slf4j
public class InboundMessageRouter extends AbstractMessageRouter {
@Resource
private ApplicationContext applicationContext;
private static final ConcurrentHashMap<String, MessageChannel> channels = new ConcurrentHashMap<>(16);
/**
* 入站数据路由到指定通道
*/
@Override
@Router(inputChannel = ChannelName.INBOUND)
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
MessageHeaders headers = message.getHeaders();
String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
byte[] payload = (byte[]) message.getPayload();
log.info("接收到消息,topic:{},payload:{}", topic, new String(payload));
//查询topic是否定义了
TopicEnum topicEnum = TopicEnum.find(topic);
if (channels.containsKey(topicEnum.getChannelName())) {
return Collections.singleton(channels.get(topicEnum.getChannelName()));
}
MessageChannel bean = (MessageChannel) applicationContext.getBean(topicEnum.getChannelName());
channels.put(topicEnum.getChannelName(), bean);
return Collections.singleton(bean);
}
}
5.3.4 消息订阅与发布
MessageListenService :消息监听类
重点是@ServiceActivator(inputChannel = ChannelName.TEST)指定要监听的通道
SubscribeTopicJob :定时器,每分钟获取订阅的topic文章来源:https://www.toymoban.com/news/detail-664228.html
/**
* @Description: 消息监听(订阅)
* @Author: young
* @Date: 2023/2/17 10:12
*/
@Service
@Slf4j
public class MessageListenService {
@Resource
private MqttMessageSenderService messageSenderService;
@ServiceActivator(inputChannel = ChannelName.TEST)
public void listenTest(Message<?> message) {
byte[] payload = (byte[]) message.getPayload();
log.info("listenTest receive message : {}", new String(payload));
//消息回复,回复的topic在连接客户端的时候我这边已经订阅了,第二个方法就能及时接收到回复的消息
String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
messageSenderService.publish(topic + TopicConstant.REPLY, "test reply...");
}
@ServiceActivator(inputChannel = ChannelName.TEST_REPLY)
public void listenTestReply(Message<?> message) {
byte[] payload = (byte[]) message.getPayload();
log.info("listenTestReply receive message : {}", new String(payload));
}
@ServiceActivator(inputChannel = ChannelName.YOUNG)
public void listenYoung(Message<?> message) {
byte[] payload = (byte[]) message.getPayload();
log.info("listenYoung receive message : {}", new String(payload));
}
}
/**
* @Description: 消息发布
* @Author: young
* @Date: 2023/2/16 9:20
*/
@Service
@Slf4j
public class MqttMessageSenderServiceImpl implements MqttMessageSenderService {
@Resource
private MqttMessageGateway messageGateway;
@Resource
private ObjectMapper objectMapper;
@Override
public void publish(String topic, Object message) {
try {
messageGateway.publish(topic, objectMapper.writeValueAsBytes(message));
} catch (JsonProcessingException e) {
e.printStackTrace();
log.info("发布消息失败:{}", e.getMessage());
}
}
@Override
public void publish(String topic, Object message, int qos) {
try {
messageGateway.publish(topic, objectMapper.writeValueAsBytes(message), qos);
} catch (JsonProcessingException e) {
e.printStackTrace();
log.info("发布消息失败:{}", e.getMessage());
}
}
}
//topic订阅与取消订阅的接口实现
@Service
public class MqttTopicServiceImpl implements MqttTopicService {
@Resource
private MqttPahoMessageDrivenChannelAdapter adapter;
@Override
public void subscribe(String topic) {
adapter.addTopic(topic);
}
@Override
public void unsubscribe(String topic) {
adapter.removeTopic(topic);
}
@Override
public String[] getSubscribedTopic() {
return adapter.getTopic();
}
}
//消息发送网关,最终由MessagingGateway发送消息,这个注解不能少
@Component
@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
public interface MqttMessageGateway {
void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos);
}
//消息发送接口
public interface MqttMessageSenderService {
void publish(String topic, Object message);
void publish(String topic, Object message, int qos);
}
//topic订阅与取消订阅的接口
public interface MqttTopicService {
void subscribe(@Header(MqttHeaders.TOPIC) String topic);
void unsubscribe(@Header(MqttHeaders.TOPIC) String topic);
String[] getSubscribedTopic();
}
//定时器:每分钟监听一次订阅
@Component
@Slf4j
public class SubscribeTopicJob {
@Resource
private MqttTopicService mqttTopicService;
//每分钟打印一下订阅情况
@Scheduled(cron = "0 0/1 * * * ?")
@Async
public void subscribeTopicListen() {
log.info("订阅了:{}", JSONObject.toJSONString(mqttTopicService.getSubscribedTopic()));
}
}
代码已提供,大家可以结合工具进行测试,除了可以测试消息发布和订阅,还可以测试客户端自动重连(docker 删除容器:docker rm -f emqx后观察日志)
如有侵权,请告知删除文章来源地址https://www.toymoban.com/news/detail-664228.html
到了这里,关于MQTT记录(概述,docker部署,基于spring-integration-mqtt实现消息订阅与发布,客户端工具测试)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!