SpringBoot中使用Spring integration加Eclipse Paho Java Client 实现MQTT客户端

这篇具有很好参考价值的文章主要介绍了SpringBoot中使用Spring integration加Eclipse Paho Java Client 实现MQTT客户端。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spring Integration 是一个开源的集成消息处理框架,它提供了消息传递、消息过滤、消息转换、消息路由等功能,可以用于构建异步、分布式的系统。
Spring-integration-stream是Spring Integration框架的一个组件,用于在不同的系统和应用之间进行消息传递、集成和流处理。

它提供了一套流式编程模型,允许开发者通过定义输入源(Source)、处理器(Processor)和输出目标(Sink)来构建消息流。开发者可以使用不同的数据转换器和处理器来操纵和转换消息,在消息流中实现各种类型的操作和转换。

Spring-integration-stream还提供了一些用于连接外部系统和应用的适配器(Adapters),如JMS、HTTP、AMQP等,以便与不同的中间件和协议进行交互。开发者可以轻松地将spring-integration-stream与其他Spring Integration组件结合使用,构建更加复杂和灵活的集成解决方案。
Spring Integration MQTT 是一个基于 Spring Framework 的集成 MQTT 协议的消息通信模块,它提供了构建 MQTT 集成应用的工具和组件。通过 Spring Integration,你可以轻松地实现 MQTT 消息的发送、接收和处理。

Eclipse Paho Java Client 客户端是 Java 语言中使用最为广泛的 MQTT 客户端库。

1.连接MQTT 3.1.1协议代理服务器

1.1 添加maven依赖

添加以下依赖到项目 pom.xml 文件中

<!--集成MQTT-->
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-mqtt</artifactId>
		</dependency>
		<!--开启流支持-->
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>

1.2 添加配置类,配置入站订阅消息,出站发布消息


import cn.hutool.core.util.IdUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;

/**
 * mqtt消息处理配置
 */
@Configuration
@Slf4j
public class MqttConfig {

    /**
     * 服务器地址以及端口
     */
    @Value("${mqtt.services}")
    private String[] mqttServices;

    /**
     * 用户名
     */
    @Value("${mqtt.user}")
    private String user;

    /**
     * 密码
     */
    @Value("${mqtt.password}")
    private String password;

    /**
     * 心跳时间,默认为5分钟
     */
    @Value("${mqtt.KeepAliveInterval}")
    private Integer KeepAliveInterval;

    /**
     * 通信质量,详见MQTT协议
     */
    @Value("${mqtt.Qos:1}")
    private Integer Qos;
    /**
     * 是否不保持session,默认为session保持
     */
    @Value("${mqtt.CleanSession:false}")
    private Boolean CleanSession;

    /**
     * 是否自动重联,默认为开启自动重联
     */
    @Value("${mqtt.AutomaticReconnect:true}")
    private Boolean AutomaticReconnect;

    /**
     * 消费超时时间
     */
    @Value("${mqtt.CompletionTimeout:5000}")
    private Long completionTimeout;

    /**
     * 连接超时,默认为30秒
     */
    @Value("${mqtt.connectionTimeout}")
    private int  mqttConnectionTimeout;

    /**
     * 恢复间隔
     */
    @Value("${mqtt.recoveryInterval}")
    private int mqttRecoveryInterval;


    @Value("${mqtt.maxinflight:1000}")
    private int maxinflight;


    /**
     * //订阅主题,可以是多个主题
     */
    @Value("${mqtt.input.topic}")
    private String[] inputTopic;

    @Resource
    private MqttMessageReceiver mqttMessageReceiver;

    /**
     * 生成配置对象,用户名,密码等
     */
    public MqttConnectOptions getOptions() {
        //MQTT连接器选项
        MqttConnectOptions options = new MqttConnectOptions();
        // 设置代理端的URL地址,可以是多个
        options.setServerURIs(mqttServices);
        // 配置mqtt服务端地址,登录账号和密码
        options.setUserName(user);
        options.setPassword(password.toCharArray());
        // 配置 最大传输中数,默认值10,qos!=0 时生效
        //表示允许同时在传输中的最大消息数量。
        // MQTT 协议规定,在未收到 ACK 确认之前,客户端只能同时传输一定数量的消息。
        // MaxInflight 指标用来控制该数量,以避免网络拥塞
        options.setMaxInflight(maxinflight);
        //心跳时间
        // 设置会话心跳时间 单位为秒 服务器会每隔KeepAliveInterval秒的时间向客户端发送心跳判断客户端是否在线,
        options.setKeepAliveInterval(KeepAliveInterval);
        //断开是否自动重联
        options.setAutomaticReconnect(AutomaticReconnect);
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
        // 这里设置为true表示每次连接到服务器都以新的身份连接
        options.setCleanSession(CleanSession);
        options.setConnectionTimeout(mqttConnectionTimeout);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        //options.setWill("willTopic", WILL_DATA, 1, true);
        log.info("----生成mqtt配置对象----");
        return options;
    }


    /**
     * MQTT客户端
     * 配置DefaultMqttPahoClientFactory
     * 1. 配置基本的链接信息
     * 2. 配置maxInflight,在mqtt消息量比较大的情况下将值设大
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getOptions());
        return factory;
    }


    /**
     * MQTT消息处理器(生产者)
     * mqtt消息出站通道默认配置,用于向外发出mqtt消息,当前案例中发到了同一个mqtt服务器
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory) {
        String clientId = BaseConstants.MQTT_OUTBOUND + IdUtil.simpleUUID();
        log.info("MQTT消息处理器(生产者)clientId:{}", clientId);
        // 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, factory);
        // 如果设置成true,即异步,发送消息时将不会阻塞。
        messageHandler.setAsync(true);
        // 设置默认QoS
        messageHandler.setDefaultQos(Qos);
        // Paho消息转换器
        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
        // defaultPahoMessageConverter.setPayloadAsBytes(true); // 发送默认按字节类型发送消息
        messageHandler.setConverter(defaultPahoMessageConverter);
        return messageHandler;
    }

    /**
     * MQTT信息通道(生产者)
     * mqtt消息出站通道,用于发送出站消息
     *
     * @return
     */
    @Bean
    public MessageChannel mqttOutputChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT信息通道(消费者)
     * mqtt消息入站通道,订阅消息后消息进入的通道。
     * 可创建多个入站通道,对应多个不同的消息生产者。
     *
     * @return
     */
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }


    /**
     * MQTT消息订阅绑定(消费者)
     * 对于当前应用来讲,接收的mqtt消息的生产者。将生产者绑定到mqtt入站通道,即通过入站通道进入的消息为生产者生产的消息。
     * 可创建多个消息生产者,对应多个不同的消息入站通道,同时生产者监听不同的topic
     *
     * @return
     */
    @Bean
    public MessageProducer channelInbound(MessageChannel mqttInputChannel, MqttPahoClientFactory factory) {
        String clientId = BaseConstants.MQTT_INBOUND + IdUtil.simpleUUID();
        log.info("MQTT消息处理器(消费者)clientId:{}", clientId);
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, factory, inputTopic);
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setRecoveryInterval(mqttRecoveryInterval);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(Qos);
        adapter.setOutputChannel(mqttInputChannel);
        return adapter;
    }

    /**
     * MQTT消息处理器(消费者)
     * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler mqttMessageHandler() {
        return this.mqttMessageReceiver;
    }

    @Bean
    public ApplicationListener<?> eventListener() {
        return new ApplicationListener<MqttConnectionFailedEvent>() {
            @Override
            public void onApplicationEvent(MqttConnectionFailedEvent event) {
                log.info("MqttConnection异常:", event.getCause());
            }
        };
    }
}



1.3 消息发送器

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {
    /**
     * 定义重载方法,用于消息发送
     * @param  payload
     */
    void sendToMqtt(String payload);

    /**
     * 指定topic进行消息发送
     * @param topic
     * @param payload
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 指定topic进行消息发送
     * @param topic
     * @param qos
     * @param payload
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

    /**
     * 指定topic和消息保留时间进行消息发送
     * @param topic
     * @param qos
     * @param messageExpiryInterval
     * @param payload
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos,@Header(MqttHeaders.MESSAGE_EXPIRY_INTERVAL) int messageExpiryInterval, String payload);


    /**
     * 指定topic进行消息发送
     * @param topic
     * @param qos
     * @param payload
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}

1.4 消息接收器

@Component
@Slf4j
public class MqttMessageReceiver implements MessageHandler {

    @Value("${mqtt.client.message.expiry.interval}")
    private Long messageExpiryInterval;


    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        log.info("接收到mqtt消息{}", message);
        String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
        String payload = String.valueOf(message.getPayload());
    }
}



1.5 配置文件

#订阅主题,多个主题用逗号分隔
mqtt.input.topic=test/client
#MQTT服务器地址,可以是多个地址
mqtt.services=tcp://127.0.0.0:1883
#mqtt用户名,默认无
mqtt.user=xx
#mqtt密码,默认无
mqtt.password=xx
#心跳间隔时间,默认60
mqtt.KeepAliveInterval=60
#连接超时时间30
mqtt.connectionTimeout=30
#是否不保持session,默认false
mqtt.CleanSession=false
#是否自动连接,默认true
mqtt.AutomaticReconnect=true
mqtt.maxinflight=1000
#操作的完成超时时间
mqtt.CompletionTimeout=30000
mqtt.recoveryInterval=10000
#传输质量,默认1
mqtt.Qos=1

2.连接MQTT 5.0协议代理服务器

使用单向认证连接MQTT 5.0协议代理服务器,设置了断线自动重连,连接MQTT 5.0协议代理服务器

2.1 添加maven依赖

添加以下依赖到项目 pom.xml 文件中

   <!--集成MQTT-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.5.14</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
            <version>1.2.5</version>
            <scope>compile</scope>
            <optional>true</optional>
        </dependency>
        <!--开启流支持-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
            <version>5.5.14</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>5.5.14</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

2.2 添加配置类,配置入站订阅消息,出站发布消息

import cn.hutool.core.util.IdUtil;
import com.xx.message.util.SSLUtils;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaderMapper;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;

/**
 * mqtt消息处理配置
 *
 */
@Configuration
@Slf4j
public class MqttConfig {

    /**
     * 服务器地址以及端口
     */
    @Value("${mqtt.services}")
    private String[] mqttServices;

    /**
     * 用户名
     */
    @Value("${mqtt.user}")
    private String user;

    /**
     * 密码
     */
    @Value("${mqtt.password}")
    private String password;

    /**
     * 心跳时间,默认为5分钟
     */
    @Value("${mqtt.KeepAliveInterval}")
    private Integer KeepAliveInterval;

    /**
     * 通信质量,详见MQTT协议
     */
    @Value("${mqtt.Qos:1}")
    private Integer Qos;
    /**
     * 是否不保持session,默认为session保持
     */
    @Value("${mqtt.CleanSession:false}")
    private Boolean CleanSession;

    /**
     * 是否自动重联,默认为开启自动重联
     */
    @Value("${mqtt.AutomaticReconnect:true}")
    private Boolean AutomaticReconnect;

    /**
     * 消费超时时间
     */
    @Value("${mqtt.CompletionTimeout:5000}")
    private Long completionTimeout;

    /**
     * 连接超时,默认为30秒
     */
    @Value("${mqtt.connectionTimeout}")
    private int mqttConnectionTimeout;

    /**
     * 恢复间隔
     */
    @Value("${mqtt.recoveryInterval:5000}")
    private int mqttRecoveryInterval;

    @Value("${mqtt.automaticReconnectMinDelay:5}")
    private int mqttAutomaticReconnectMinDelay;

    @Value("${mqtt.automaticReconnectMaxDelay:30}")
    private int mqttAutomaticReconnectMaxDelay;

    @Value("${mqtt.maxReconnectDelay:40}")
    private int mqttMaxReconnectDelay;

    @Value("${mqtt.maxinflight:1000}")
    private int maxinflight;

    @Value("${mqtt.sessionExpiryInterval:172800}")
    private Long sessionExpiryInterval;

    /**
     * 订阅主题,可以是多个主题
     */
    @Value("${mqtt.input.topic}")
    private String[] inputTopic;


    @Resource
    private MqttMessageReceiver mqttMessageReceiver;

    /**
     * 生成配置对象,用户名,密码等
     */
    public MqttConnectionOptions getOptions() {
        //MQTT连接器选项
        MqttConnectionOptions options = new MqttConnectionOptions();
        // 设置代理端的URL地址,可以是多个
        options.setServerURIs(mqttServices);
        // 配置mqtt服务端地址,登录账号和密码
        options.setUserName(user);
        options.setPassword(password.getBytes());
        // 配置 最大传输中数,默认值10,qos!=0 时生效
        //表示允许同时在传输中的最大消息数量。
        // MQTT 协议规定,在未收到 ACK 确认之前,客户端只能同时传输一定数量的消息。
        // MaxInflight 指标用来控制该数量,以避免网络拥塞
        options.setReceiveMaximum(maxinflight);
        options.setAutomaticReconnectDelay(mqttAutomaticReconnectMinDelay, mqttAutomaticReconnectMaxDelay);
        options.setMaxReconnectDelay(mqttMaxReconnectDelay);
        //心跳时间
        // 设置会话心跳时间 单位为秒 服务器会每隔KeepAliveInterval秒的时间向客户端发送心跳判断客户端是否在线,
        options.setKeepAliveInterval(KeepAliveInterval);
        //断开是否自动重联
        options.setAutomaticReconnect(AutomaticReconnect);
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
        // 这里设置为true表示每次连接到服务器都以新的身份连接
        options.setCleanStart(false);
        options.setSessionExpiryInterval(sessionExpiryInterval);
        options.setConnectionTimeout(mqttConnectionTimeout);
        try {
            options.setSocketFactory(SSLUtils.getSingleSocketFactory("ca.crt"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.info("----生成mqtt配置对象----");
        return options;
    }




    /**
     * MQTT消息处理器(生产者)
     * mqtt消息出站通道默认配置,用于向外发出mqtt消息,当前案例中发到了同一个mqtt服务器
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler mqttOutbound() {
        String clientId = BaseConstants.MQTT_OUTBOUND + IdUtil.simpleUUID();
        log.info("MQTT消息处理器(生产者)clientId:{}", clientId);
        // 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
        MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
        Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(getOptions(), clientId);
        messageHandler.setHeaderMapper(mqttHeaderMapper);
        // 如果设置成true,即异步,发送消息时将不会阻塞。
        messageHandler.setAsync(false);
        // 设置默认QoS
        messageHandler.setDefaultQos(Qos);
        return messageHandler;
    }

    /**
     * MQTT信息通道(生产者)
     * mqtt消息出站通道,用于发送出站消息
     *
     * @return
     */
    @Bean
    public MessageChannel mqttOutputChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT信息通道(消费者)
     * mqtt消息入站通道,订阅消息后消息进入的通道。
     * 可创建多个入站通道,对应多个不同的消息生产者。
     *
     * @return
     */
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }


    /**
     * MQTT消息订阅绑定(消费者)
     * 对于当前应用来讲,接收的mqtt消息的生产者。将生产者绑定到mqtt入站通道,即通过入站通道进入的消息为生产者生产的消息。
     * 可创建多个消息生产者,对应多个不同的消息入站通道,同时生产者监听不同的topic
     *
     * @return
     */
    @Bean
    public MessageProducer channelInbound(MessageChannel mqttInputChannel) {
        String clientId = BaseConstants.MQTT_INBOUND + IdUtil.simpleUUID();
        log.info("MQTT消息处理器(消费者)clientId:{}", clientId);
        MyMqttv5PahoMessageDrivenChannelAdapter adapter = new MyMqttv5PahoMessageDrivenChannelAdapter (getOptions(), clientId, inputTopic);
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setPayloadType(String.class);
        adapter.setQos(Qos);
        adapter.setOutputChannel(mqttInputChannel);
        return adapter;
    }

    /**
     * MQTT消息处理器(消费者)
     * mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler mqttMessageHandler() {
        return this.mqttMessageReceiver;
    }

    @Bean
    public ApplicationListener<?> eventListener() {
        return new ApplicationListener<MqttConnectionFailedEvent>() {
            @Override
            public void onApplicationEvent(MqttConnectionFailedEvent event) {
                log.info("MqttConnection异常:", event.getCause());
            }
        };
    }

    @Bean
    public ApplicationListener<?> mqttSubscribedEventListener() {
        return new ApplicationListener<MqttSubscribedEvent>() {
            @Override
            public void onApplicationEvent(MqttSubscribedEvent event) {
                log.info("连接完成做订阅动作:{}", event.getMessage());
            }
        };
    }
}

SSLUtils

 // 单向认证
    public static SSLSocketFactory getSingleSocketFactory(final String caCrtFile) throws Exception {
        Security.addProvider(new BouncyCastleProvider());
        X509Certificate caCert = null;
        Resource resource = new ClassPathResource(caCrtFile);
        BufferedInputStream bis = new BufferedInputStream(resource.getInputStream());
        CertificateFactory cf = CertificateFactory.getInstance("X.509");

        while (bis.available() > 0) {
            caCert = (X509Certificate) cf.generateCertificate(bis);
        }
        KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
        caKs.load(null, null);
        caKs.setCertificateEntry("cert-certificate", caCert);
        TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        tmf.init(caKs);
        SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
        sslContext.init(null, tmf.getTrustManagers(), null);
        return sslContext.getSocketFactory();
    }

    public static SSLSocketFactory getSocketFactory(final String caCrtFile, final String crtFile, final String keyFile,
                                                    final String password) throws Exception {
        Security.addProvider(new BouncyCastleProvider());

        // load CA certificate
        PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(caCrtFile)))));
        X509Certificate caCert = (X509Certificate) reader.readObject();
        reader.close();

        // load client certificate
        reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(crtFile)))));
        X509Certificate cert = (X509Certificate) reader.readObject();
        reader.close();

        // load client private key
        reader = new PEMReader(
                new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFile)))),
                new PasswordFinder() {
                    @Override
                    public char[] getPassword() {
                        return password.toCharArray();
                    }
                }
        );
        // CA certificate is used to authenticate server
        KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
        caKs.load(null, null);
        caKs.setCertificateEntry("ca-certificate", caCert);
        TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        tmf.init(caKs);

        // client key and certificates are sent to server so it can authenticate us
        KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
        ks.load(null, null);
        ks.setCertificateEntry("certificate", cert);
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        kmf.init(ks, password.toCharArray());

        // finally, create SSL socket factory
        SSLContext context = SSLContext.getInstance("TLSv1.2");
        context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

        return context.getSocketFactory();
    }
}

解决重连后不重新订阅的问题

/**
 * @Desc   解决重连后不重新订阅的问题
 * @create 2023-08-31 16:07
 **/
@Slf4j
public class MyMqttv5PahoMessageDrivenChannelAdapter extends Mqttv5PahoMessageDrivenChannelAdapter {

    public MyMqttv5PahoMessageDrivenChannelAdapter (String url, String clientId, String... topic) {
        super(url, clientId, topic);
    }

    public MyMqttv5PahoMessageDrivenChannelAdapter (MqttConnectionOptions connectionOptions, String clientId, String... topic) {
        super(connectionOptions, clientId, topic);
    }

    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        if (reconnect) {
            log.info("重连需要重新订阅");
            reconnect = false;
        }
        super.connectComplete(reconnect, serverURI);
    }


}

2.3 消息发送器

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {
    /**
     * 定义重载方法,用于消息发送
     * @param  payload
     */
    void sendToMqtt(String payload);

    /**
     * 指定topic进行消息发送
     * @param topic
     * @param payload
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 指定topic进行消息发送
     * @param topic
     * @param qos
     * @param payload
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

    /**
     * 指定topic和消息保留时间进行消息发送
     * @param topic
     * @param qos
     * @param messageExpiryInterval
     * @param payload
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload,@Header(MqttHeaders.MESSAGE_EXPIRY_INTERVAL) Long messageExpiryInterval);


    /**
     * 指定topic进行消息发送
     * @param topic
     * @param qos
     * @param payload
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}


2.4 消息接收器

@Component
@Slf4j
public class MqttMessageReceiver implements MessageHandler {

    @Value("${mqtt.client.message.expiry.interval}")
    private Long messageExpiryInterval;


    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        log.info("接收到mqtt消息{}", message);
        String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
        String payload = String.valueOf(message.getPayload());
    }
}



2.5 配置文件

#订阅主题,多个主题用逗号分隔
mqtt.input.topic=$share/zm/test/client
#MQTT服务器地址,可以是多个地址
mqtt.services=ssl://127.0.0:8883
#mqtt用户名,默认无
mqtt.user=xx
#mqtt密码,默认无
mqtt.password=xx
#心跳间隔时间,默认60
mqtt.KeepAliveInterval=60
#连接超时时间30
mqtt.connectionTimeout=30
#是否不保持session,默认false
mqtt.CleanSession=false
#是否自动连接,默认true
mqtt.AutomaticReconnect=true
mqtt.maxinflight=2000
#操作的完成超时时间
mqtt.CompletionTimeout=30000
#自动重连执行时使用的最小和最大延迟-没用
mqtt.automaticReconnectMinDelay=5
mqtt.automaticReconnectMaxDelay=30
#自动重连执行时最大延迟
mqtt.maxReconnectDelay=60
#离线有效期要少于会话过期时间
mqtt.sessionExpiryInterval=172800
#传输质量,默认1
mqtt.Qos=1

2.6 ca证书

ca.crt文章来源地址https://www.toymoban.com/news/detail-689140.html

-----BEGIN CERTIFICATE-----
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
-----END CERTIFICATE-----

到了这里,关于SpringBoot中使用Spring integration加Eclipse Paho Java Client 实现MQTT客户端的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于 SpringBoot 2.7.x 使用最新的 Elasticsearch Java API Client 之 ElasticsearchClient

    从 Java Rest Client 7.15.0 版本开始,Elasticsearch 官方决定将 RestHighLevelClient 标记为废弃的,并推荐使用新的 Java API Client,即 ElasticsearchClient. 为什么要将 RestHighLevelClient 废弃,大概有以下几点: 维护成本高 :RestHighLevelClient 需要和 Elasticsearch APIs 的更新保持一致,而 Elasticsearch A

    2024年02月08日
    浏览(38)
  • SpringBoot集成Elasticsearch8.x(7)|(新版本Java API Client使用完整示例)

    章节 第一章链接: SpringBoot集成Elasticsearch7.x(1)|(增删改查功能实现) 第二章链接: SpringBoot集成Elasticsearch7.x(2)|(复杂查询) 第三章链接: SpringBoot集成Elasticsearch7.x(3)|(aggregations之指标聚合查询) 第四章链接: SpringBoot集成Elasticsearch7.x(4)|(aggregations之分桶聚合查询)

    2024年02月16日
    浏览(46)
  • 使用Eclipse搭建SpringBoot项目

    01、点击eclipse左上角file—new—other 02、选择 Maven Project,点击Next 03、 选择项目存放位置,点击Next 04、选择以 quickstart 或者webapp结尾的点击Next 05、填写好Group Id、 Artifact Id , 填写好后,Package一栏会自动生成,这也就是项目中的包名,点击Finish 06、项目创建完成 第一次创建

    2023年04月08日
    浏览(19)
  • SpringBoot整合最新Elasticsearch Java API Client 7.16教程

        最新在学习SpringBoot整合es的一些知识,浏览了网上的一些资料,发现全都是es很久之前的版本了,其中比较流行的是Java REST Client的High Level Rest Client版本,但是官方文档的说明中,已经申明该版本即将废弃,不再进行维护了。可见:官方文档     目前官方推荐的版本是

    2023年04月24日
    浏览(29)
  • SpringBoot整合ElasticSearch之Java High Level REST Client

    1 搭建SpringBoot工程 2 引入ElasticSearch相关坐标。 3 编写核心配置类 编写核心配置文件: 这里可以不写在配置,可以直接写在代码中,只是一般都是写在配置文件中 编写核心配置类 4 测试客户端对象 记得把maven的单元测试关了 注意:使用@Autowired注入RestHighLevelClient 如果报红线

    2024年02月05日
    浏览(44)
  • Java与es8实战之二:Springboot集成es8的Java Client

    配置springboot的application.yml 配置es的自签证书 执行如下命令将es容器中的crt文件复制到本地 docker cp 容器名称:/usr/share/elasticsearch/config/certs/http_ca.crt . 将crt文件放至springboot项目的resource路径下

    2024年02月12日
    浏览(35)
  • Spring Integration 快速入门教程

    本文通过小的实际示例介绍Spring Integration(SI)的核心概念。Spring Integration提供了许多功能强大的组件,这些组件可以极大地增强企业架构内系统和流程的互连互通。 它实现了一些优秀且常用的设计模式,帮助开发人员避免从头设计自己的模式。我们将探讨SI如何在企业级应用程

    2024年02月03日
    浏览(24)
  • Java系统eclipse和idea都可以SpringBoot 高校宿舍管理系统附源码带论文

    个人名片: 😊作者简介:一名大三在校生,计科专业 🤡 个人主页:莫白媛 🐼座右铭:舒服是留给死人的 🎅**学习目标: 坚持每一次的学习打卡,掌握更多知识!还要学iOS开发和考研呢! 基于 SpringBoot 的高校宿舍管理系统设计与开发 摘 要:宿舍是大学生学习与生活的主要

    2024年02月08日
    浏览(35)
  • 【java】Spring Cloud --Feign Client超时时间配置以及单独给某接口设置超时时间方法

    FeignClient面对服务级有三种超时时间配置 feign配置是在ribbon配置的基础上做了扩展,可以支持服务级超时时间配置,所以,feign配置和ribbon配置的效果应该是一样的。 SpringCloud对这两种配置的优先级顺序如下: Feign局部配置 Feign全局配置 Ribbon局部配置 Ribbon全局配置 在feign-co

    2024年02月12日
    浏览(31)
  • Java21 + SpringBoot3使用spring-websocket时执行mvn package报错

    近日心血来潮想做一个开源项目,目标是做一款可以适配多端、功能完备的模板工程,包含后台管理系统和前台系统,开发者基于此项目进行裁剪和扩展来完成自己的功能开发。 本项目为前后端分离开发,后端基于 Java21 和 SpringBoot3 开发,前端提供了vue、angular、react、uniap

    2024年02月02日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包