- mqtt服务器的选择与安装
emqx拥有界面,可视化比较好,但是windows下安装有问题,后面采用虚拟机安装没问题
mosquitto:windows下安装简单,使用也简单,但是功能比较单一,只能通过命令操作,无界面
2.mosquitto的安装:
windows下搭建mqtt服务器
3.java相关坐标
<!--mqtt连接相关-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!--ssl相关,如果不使用ssl连接可以不引入 -->
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.70</version>
</dependency>
4.创建客户端:发布客户端、订阅客户端
import com.hanqian.common.queue.MessageEvent;
import com.hanqian.common.queue.MessageQueue;
import com.hanqian.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.Map;
/**
* MQTT 订阅客户端
* 本地mqtt服务器使用 mosquitto 测试命令
* 启动:mosquitto -c mosquitto.conf -v
* 订阅:mosquitto_sub.exe -h 127.0.0.1 -p 7777 -v -t sensor
* 发布:mosquitto_pub.exe -h 127.0.0.1 -p 7777 -t topic_hq -m "消息测试2323"
*/
@Configuration
@Slf4j
@ConditionalOnProperty(value = "mqtt.mqttIsLoad",havingValue="true")
public class MqttSubscribeClient {
/**
* MQtt服务器地址
*/
@Value("${mqtt.host}")
private String host;
/**
* 订阅主题
*/
@Value("${mqtt.topic}")
private String[] topic;
/**
* 连接mqtt服务器时使用的id,一般唯一。
* 不特殊指定可使用 MqttClient.generateClientId();
*/
@Value("${mqtt.clientId}")
private String clientId;
/**
* 设置是否清空session, 需要与qos配合
* 设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
*/
@Value("${mqtt.isCleanSession}")
private boolean isCleanSession;
/**
* 消息质量Qos:有0、1、2三个级别,默认用0,
* QoS0:至多一次;Sender 发送的一条消息,Receiver 最多能收到一次,如果发送失败,也就算了。
* QoS1:至少一次;Sender 发送的一条消息,Receiver 至少能收到一次,如果发送失败,会继续重试,
* 直到 Receiver 收到消息为止,但Receiver 有可能会收到重复的消息
* QoS2:确保只有一次。Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,
* 直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。
* QoS=1通讯时的注意事项
* 接收端连接服务端时cleanSession设置为false
* 接收端订阅主题时QoS=1
* 发布端发布消息时QoS=1
*
* QoS=2通讯时的注意事项
* 接收端连接服务端时cleanSession设置为false
* 接收端订阅主题时QoS=2
* 发布端发布消息时QoS=2
*/
@Value("${mqtt.qos}")
private int[] qos;
/**
* 超时时间 单位为秒
*/
@Value("${mqtt.connectionTimeout}")
private int connectionTimeout;
/**
* 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
*/
@Value("${mqtt.keepAliveInterval}")
private int keepAliveInterval;
/**
* 是否设置遗嘱,只能设置一个主题且topic不可以带通配符
*/
@Value("${mqtt.isWill}")
private boolean isWill;
@Value("${mqtt.userName}")
private String userName;
@Value("${mqtt.passWord}")
private String passWord;
@Value("${mqtt.ssl.cacert}")
private String cacert;
@Value("${mqtt.ssl.clientCert}")
private String clientCert;
@Value("${mqtt.ssl.clientKey}")
private String clientKey;
@Value("${mqtt.ssl.sslPassWord}")
private String sslPassWord;
@Value("${mqtt.ssl.isTwoWay}")
private boolean isTwoWay;
private MqttConnectOptions options;
private MqttClient client;
@Autowired
private MessageQueue messageQueue;
@Bean
public MqttClient start(){
try {
// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(host, clientId, new MemoryPersistence());
// MQTT的连接设置
options = new MqttConnectOptions();
// 自动重连,10分钟内 or connectionLost 方法
//options.setAutomaticReconnect(true);
//options.setMaxReconnectDelay(600000);
options.setCleanSession(isCleanSession);
if (StringUtil.isNotEmpty(userName)){
options.setUserName(userName);
}
if ( StringUtil.isNotEmpty(passWord)){
options.setPassword(passWord.toCharArray());
}
options.setConnectionTimeout(connectionTimeout);
options.setKeepAliveInterval(keepAliveInterval);
if (host.indexOf("ssl")>=0){
if (!isTwoWay){
// 单向ssl,这里的 TrustManager 是自己实现的,没有去校验服务端的证书
options.setHttpsHostnameVerificationEnabled(false);
TrustManager[] trustAllCerts = new TrustManager[1];
TrustManager tm = new MyTM();
trustAllCerts[0] = tm;
SSLContext sc = SSLContext.getInstance("SSL");
sc.init(null, trustAllCerts, null);
SocketFactory factory = sc.getSocketFactory();
options.setSocketFactory(factory);
//SSLSocketFactory socketFactory = SSLUtils.getSingleSocketFactory(cacert);
//options.setSocketFactory(socketFactory);
}else {
双向
SSLSocketFactory socketFactory = SSLUtils.getSocketFactory(cacert, clientCert, clientKey,
sslPassWord);
options.setSocketFactory(socketFactory);
}
}
// 设置回调
client.setCallback(new MQTTCallback());
if (isWill){
for (String s : topic) {
MqttTopic mqttTopic = client.getTopic(s);
String message = "我要关闭拉,客户端:"+clientId + "主题是:"+s;
options.setWill(mqttTopic, message.getBytes(), 2, true);
}
}
client.connect(options);
return client;
}catch (Exception e){
log.error("[MQTT]订阅客户端连接出错:{}",e);
return null;
}
}
class MQTTCallback implements MqttCallback, MqttCallbackExtended {
/**
* 连接成功后调用
*/
@Override
public void connectComplete(boolean b, String host) {
try {
//订阅消息: 单个 or 多个(topic与qos个数需一致,否则报错:已在进行连接)
client.subscribe(topic, qos);
} catch (MqttException e) {
log.error("[MQtt]订阅主题失败:{}",e);
}
log.info("[MQTT]连接完成:{},地址:{}",b,host);
}
/**
* 连接断开(也可在此配置重连)
*/
public void connectionLost(Throwable cause) {
log.error("[MQTT]:连接断开:{}", cause.getMessage());
// 需要先成功连接一次,断开连接才能进入此方法
while (!client.isConnected()) {
try {
client.reconnect();
if (client.isConnected()){
log.info("[MQTT]重新连接成功");
break;
}
log.info("[MQTT]重新连接失败,服务器未开启");
} catch (MqttException e) {
log.error("[MQTT]重连失败,错误信息:{}", e);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
log.error("[MQTT]重连失败每隔五秒尝试:{}", e);
}
}
}
/**
* 订阅接收消息
*/
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("[MQTT]:订阅主题:{},接受消息Qos:{},接受消息内容:{}",topic,message.getQos(),new String(message.getPayload()));
MessageEvent event = new MessageEvent();
Map map = new HashMap<>();
map.put("topic",topic);
map.put("message",new String(message.getPayload()));
event.setParam(map);
event.addHandlerType(MessageEvent.HandlerType.MESSAGE);
event.setBeanName("mqttReceiveMessage");
messageQueue.add(event);
}
/**
* 消息传递成功
* @param token
*/
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("[MQTT]:deliveryComplete:{}",token.isComplete());
}
}
/**
* MyTM 是自己实现的认证管理类,里面并有校验服务端的证书就返回true,永久成功!
*/
static class MyTM implements TrustManager, X509TrustManager {
@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}
public boolean isServerTrusted(X509Certificate[] certs) {
return true;
}
public boolean isClientTrusted(X509Certificate[] certs) {
return true;
}
@Override
public void checkServerTrusted(X509Certificate[] certs, String authType)
throws CertificateException {
return;
}
@Override
public void checkClientTrusted(X509Certificate[] certs, String authType)
throws CertificateException {
return;
}
}
/**
* 发送消息
*/
public static void main(String[] args) throws Exception {
MqttClient client = new MqttClient("ssl://127.0.0.1:7777", "xdcfsdfsfsdfsdfs", new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName("admin");
options.setPassword("admin".toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
// 单向ssl,这里的 TrustManager 是自己实现的,没有去校验服务端的证书
//options.setHttpsHostnameVerificationEnabled(false);
//TrustManager[] trustAllCerts = new TrustManager[1];
//TrustManager tm = new MyTM();
//trustAllCerts[0] = tm;
//SSLContext sc = SSLContext.getInstance("SSL");
//sc.init(null, trustAllCerts, null);
//SocketFactory factory = sc.getSocketFactory();
//options.setSocketFactory(factory);
SSLSocketFactory socketFactory = SSLUtils.getSocketFactory("D:\\yyb_ssl\\new\\ca.crt",
"D:\\yyb_ssl\\new\\client.crt", "D:\\yyb_ssl\\new\\client.key",
"123456");
options.setSocketFactory(socketFactory);
try {
client.setCallback(null);
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
MqttMessage message = new MqttMessage();
message = new MqttMessage();
message.setQos(1);
message.setRetained(true);
message.setPayload("hello,你好222阿".getBytes());
//MqttTopic topic11 = client.getTopic("topic_hq");
//MqttDeliveryToken token = topic11.publish(message);
//token.waitForCompletion();
client.publish("topic_hq2",message);
}
}
5.SSLUtils
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMKeyPair;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.FileReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
public class SSLUtils {
public static SSLSocketFactory getSingleSocketFactory(String caCrtFile) throws Exception {
Security.addProvider(new BouncyCastleProvider());
X509Certificate caCert = null;
BufferedInputStream bis = new BufferedInputStream(new ByteArrayInputStream(Files.readAllBytes(Paths.get(caCrtFile))));
//BufferedInputStream bis = new BufferedInputStream(caCrtFileInputStream);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
caCert = (X509Certificate) cf.generateCertificate(bis);
}
KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
caKs.load(null, null);
caKs.setCertificateEntry("cert-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(caKs);
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(null, tmf.getTrustManagers(), null);
return sslContext.getSocketFactory();
}
public static SSLSocketFactory getSocketFactory(final String caCrtFile,
final String crtFile, final String keyFile, final String password)
throws Exception {
Security.addProvider(new BouncyCastleProvider());
// load CA certificate
X509Certificate caCert = null;
FileInputStream fis = new FileInputStream(caCrtFile);
BufferedInputStream bis = new BufferedInputStream(fis);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
caCert = (X509Certificate) cf.generateCertificate(bis);
}
// load client certificate
bis = new BufferedInputStream(new FileInputStream(crtFile));
X509Certificate cert = null;
while (bis.available() > 0) {
cert = (X509Certificate) cf.generateCertificate(bis);
}
// load client private key
PEMParser pemParser = new PEMParser(new FileReader(keyFile));
Object object = pemParser.readObject();
JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
PEMParser pemParser2 = new PEMParser(new FileReader("D:\\yyb_ssl\\client\\client.pem"));
Object object2 = pemParser2.readObject();
PEMKeyPair pemKeyPair = new PEMKeyPair((SubjectPublicKeyInfo) object2, (PrivateKeyInfo) object);
KeyPair key = converter.getKeyPair(pemKeyPair);
pemParser.close();
// CA certificate is used to authenticate server
KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
caKs.load(null, null);
caKs.setCertificateEntry("ca-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
tmf.init(caKs);
// client key and certificates are sent to server, so it can authenticate
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, null);
ks.setCertificateEntry("certificate", cert);
ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
new java.security.cert.Certificate[]{cert});
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
.getDefaultAlgorithm());
kmf.init(ks, password.toCharArray());
// finally, create SSL socket factory
SSLContext context = SSLContext.getInstance("TLSv1.2");
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
return context.getSocketFactory();
}
}
6.配置文件相关配置文章来源:https://www.toymoban.com/news/detail-861901.html
#MQTT相关配置
mqtt:
host: ssl://127.0.0.1:7777
#需要与qos个数一致
topic: topic_hq,topic_hq2
Qos: 0,0
isCleanSession: false
clientId: client_test
connectionTimeout: 10
keepAliveInterval: 20
isWill: true
userName: admin
passWord: public
#配置是否加载
mqttIsLoad: true
ssl:
isTwoWay: true
cacert: D:\yyb_ssl\new\ca.crt
clientCert: D:\yyb_ssl\new\client.crt
clientKey: D:\yyb_ssl\new\client.key
sslPassWord:
7.ssl证书的生成:必须具备主题备用名称(Subject Alternative Name)文章来源地址https://www.toymoban.com/news/detail-861901.html
1.生成CA证书
1.1创建CA证书私钥
openssl genrsa -out ca.key 2048
1.2.请求证书 证数各参数含义如下
C—–国家(Country Name)
ST—-省份(State or Province Name)
L—-城市(Locality Name)
O—-公司(Organization Name)
OU—-部门(Organizational Unit Name)
CN—-产品名(Common Name)
emailAddress—-邮箱(Email Address)
openssl req -new -sha256 -key ca.key -out ca.csr -subj "/C=CN/ST=SZ/L=SZ/O=C.X.L/OU=C.X.L/CN=CA/emailAddress=123456@test.com"
1.3.自签署证书
openssl x509 -req -days 36500 -sha256 -extensions v3_ca -signkey ca.key -in ca.csr -out ca.crt
2.生成服务端证书
2.1.创建服务器私钥
openssl genrsa -out server.key 2048
2.2新建 openssl.cnf 文件,
req_distinguished_name :根据情况进行修改,
alt_names:BROKER_ADDRESS 修改为 EMQ X 服务器实际的 IP 或 DNS 地址,例如:IP.1 = 127.0.0.1,或 DNS.1 = broker.xxx.com
注意:IP 和 DNS 二者保留其一即可,如果已购买域名,只需保留 DNS 并修改为你所使用的域名地址。
[req]
default_bits = 2048
distinguished_name = req_distinguished_name
req_extensions = req_ext
x509_extensions = v3_req
prompt = no
[req_distinguished_name]
countryName = CN
stateOrProvinceName = SZ
organizationName = C.X.L
organizationalUnitName = C.X.L
commonName = service
[req_ext]
subjectAltName = @alt_names
[v3_req]
subjectAltName = @alt_names
[alt_names]
IP.1 = 127.0.0.1
IP.2 = 192.168.5.249
2.3请求证书
openssl req -new -sha256 -key server.key -config openssl.cnf -out server.csr
2.4.使用CA证书签署服务器证书
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 3650 -sha256 -extensions v3_req -extfile openssl.cnf
2.5.验证服务端证书
openssl verify -CAfile ca.crt server.crt
2.6查看服务端证书
openssl x509 -noout -text -in server.crt
2.7.Netty需要支持PKCS8格式读取私钥
openssl pkcs8 -topk8 -nocrypt -in server.key -out pkcs8_key.pem
**注:**错误日志也很明确的打印了:at sun.security.pkcs.PKCS8Key.decode(PKCS8Key.java:351),采用PKCS8无法解析证书。这是因为部分MQTT broker使用的是netty,netty默认使用PKCS8格式对证书进行解析,然而我们使用openssl生成的服务端server.key是PKCS1格式的,所以MQTT broker采用PKCS8无法对证书进行解析。
问题处理:对证书进行格式转行,将PKCS1格式转换成PKCS8即可。
证书格式区别:
PKCS1的文件头格式 —–BEGIN RSA PRIVATE KEY—–
PKCS8的文件头格式 —–BEGIN PRIVATE KEY—–
生成客户端证书
1.生成客户端私钥
openssl genrsa -out client.key 2048
2.请求证书
openssl req -new -sha256 -key client.key -out client.csr -subj "/C=CN/ST=SZ/L=SZ/O=C.X.L/OU=C.X.L/CN=CLIENT/emailAddress=123456@test.com"
3.使用CA证书签署客户端证书
openssl x509 -req -days 36500 -sha256 -extensions v3_req -CA ca.cer -CAkey ca.key -CAserial ca.srl -CAcreateserial -in client.csr -out client.crt
4.验证服务端证书
openssl verify -CAfile ca.crt client.crt
5.查看服务端证书
openssl x509 -noout -text -in client.crt
证书转换
CRT转为PEM
#.key 转换成 .pem:
openssl rsa -in server.key -out server-key.pem
#.crt 转换成 .pem:
openssl x509 -in server.crt -out server.pem -outform PEM
既然PEM与DER只是编码格式上的不同,那么不管是证书还是密钥,都可以随意转换为想要的格式:
PEM转DER
openssl x509 -outform der -in server.pem -out server.der
DER专PEM
openssl x509 -inform der -in server.der -out server.crt
注:也可以直接生成PEM格式的证书,生成方式和CRT一样
区别:
// 生成CA证书
openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.pem
// 生成服务端证书
openssl x509 -req -in server.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out server.pem -days 3650 -sha256 -extensions v3_req -extfile openssl.cnf
// 生成客户端证书
openssl x509 -req -days 3650 -in client.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out client.pem
到了这里,关于java连接mqtt(tcp、ssl单双向)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!