MQTT介绍
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议。
该协议构建于TCP/IP协议上,它的设计思想是轻巧、开放、 简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT)。
MQTT通信模型
MQTT 协议提供一对多的消息发布,可以降低应用程序的耦合性,用户只需要编写极少量的应用代码就能完成一对多的消息发布与订阅,该协议是基于<客户端-服务器>模型,在协议中主要有三种身份:发布者(Publisher)、服务器(Broker)以及订阅者(Subscriber)。
其中,MQTT消息的发布者和订阅者都是客户端,服务器只是作为一个中转的存在,将发布者发布的消息进行转发给所有订阅该主题的订阅者;发布者可以发布在其权限之内的所有主题,并且消息发布者可以同时是订阅者,实现了生产者与消费者的脱耦,发布的消息可以同时被多个订阅者订阅。
MQTT通信模型示意图如下:
MQTT客户端
MQTT 客户端可以向服务端发布信息,也可以从服务端收取信息。我们把客户端发送信息的行为称为 “发布”信息。客户端要想从服务端收取信息,则首先要向服务端“订阅”信息。
客户端具体功能如下:
1.发布消息给其它相关的客户端。
2.订阅主题请求接收相关的应用消息。
3.取消订阅主题请求移除接收应用消息。
4.从服务端终止连接。
MQTT服务端
MQTT 服务端通常是一台服务器(broker),它是 MQTT 信息传输的枢纽,负责将 MQTT 客户端发送来的信息传递给 MQTT 客户端。MQTT 服务端还负责管理 MQTT 客户端,以确保客户端之间的通讯顺畅,保证 MQTT 信息得以正确接收和准确投递。
MQTT 服务器位于消息发布者和订阅者之间,以便用于接收消息并发送到订阅者之中,它的功能有:
1.接受来自客户端的网络连接请求。
2.接受客户端发布的应用消息。
3.处理客户端的订阅和取消订阅请求。
4.转发应用消息给符合条件的已订阅客户端(包括发布者自身)。
Android使用MQTT
集成MQTT库
MQTT有不同语言、不同版本的诸多实现,其中Eclipse Paho只是诸多Java实现中的一个。
我们将使用Eclipse Paho Java Client作为客户端,它是 Java 语言中使用最广泛的 MQTT 客户端库。
集成步骤:
- 在Android工程的bulid.gradle(:app) 文件中添加依赖包
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'
org.eclipse.paho也实现了一套针对Android端的通讯服务框架https://github.com/eclipse/paho.mqtt.android
不少开发者直接引用这个库:
implementation ‘org.eclipse.paho:org.eclipse.paho.android.service:1.1.1’
在Android 8.0以前这样做是没问题的,8.0以后Android Service行为发生了很大变更,需要进行适配,不然会出现异常。但是这个库的维护人员貌似对Android版本适配不是很积极,鉴于此,我们把库的源码下载下来,对源码进行改造,作为一个库工程使用:
implementation project(':org.eclipse.paho.android.service')
- 添加权限
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
- 注册Service
<service android:name="org.eclipse.paho.android.service.MqttService" />
定义MQTT管理者
该类设计为单例模式,实现MQTT初始化、连接、订阅消息、发布消息、处理消息、释放连接等操作。
初始化包括创建MqttAndroidClient对象,并设置回调接口,针对连接失败的情况作断线重连的尝试,针对接收的消息进行JSON解析、并重新封装成需要的数据内容,通过EventBus将消息对象抛出去,相关业务模块注册EventBus并接收该对应消息,然后进行处理。文章来源:https://www.toymoban.com/news/detail-694146.html
连接主要是设置连接相关参数和针对连接结果的处理,参数有连接认证校验、设置超时时间、设置心跳包发送间隔、设置用户名和密码。文章来源地址https://www.toymoban.com/news/detail-694146.html
/**
* Created by ZhangJun on 2019/1/3.
*/
class MqttManager private constructor() {
private var mqttAndroidClient: MqttAndroidClient? = null
private lateinit var mqttConnectOptions: MqttConnectOptions
private object MqttManagerHolder {
val INSTANCE = MqttManager()
}
fun init(node: String, port: Int, clientId: String) {
try {
if (mqttAndroidClient == null) {
mqttAndroidClient = MqttAndroidClient(XxApplication.instance, "ssl://$node:$port", clientId)
} else {
mqttAndroidClient!!.setCallback(object : MqttCallbackExtended {
override fun connectComplete(reconnect: Boolean, serverURI: String?) {
LogUtils.d(TAG, "mqtt connectComplete reconnect = $reconnect")
}
override fun connectionLost(cause: Throwable?) {
if (cause != null) {
LogUtils.d(TAG, "mqtt connectionLost cause = " + cause.message)
}
connect()
}
@Throws(Exception::class)
override fun messageArrived(topic: String, message: MqttMessage) {
val str = String(message.payload)
LogUtils.d(TAG, "messageArrived str = $str")
val jsonObject = JSONObject(str)
val event = jsonObject.optJSONObject("event")
val header = event.optJSONObject("header")
val namespace = header.optString("namespace")
val name = header.optString("name")
val payload = event.optJSONObject("payload")
val message1 = MqttMessageBean()
message1.messageId = namespace.plus(name)
message1.messageContent = payload
EventBus.getDefault().post(ServerEvent.MqttMessageEvent(message1))
}
override fun deliveryComplete(token: IMqttDeliveryToken) {
//do nothing
}
})
mqttConnectOptions = MqttConnectOptions()
mqttConnectOptions.socketFactory = sslSocketFactory
mqttConnectOptions.isAutomaticReconnect = true
mqttConnectOptions.isCleanSession = false
// 设置超时时间,单位:秒
mqttConnectOptions.connectionTimeout = 10
// 心跳包发送间隔,单位:秒
mqttConnectOptions.keepAliveInterval = 20
// 用户名
mqttConnectOptions.userName = CommonUtils.decryptToken()
// 密码
mqttConnectOptions.password = XxApplication.instance.packageName.toCharArray()
connect()
}
} catch (ex: Exception) {
ex.printStackTrace()
}
}
private fun connect() {
if (mqttAndroidClient != null && !mqttAndroidClient!!.isConnected) {
mqttAndroidClient!!.connect(mqttConnectOptions, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken) {
val disconnectedBufferOptions = DisconnectedBufferOptions()
disconnectedBufferOptions.bufferSize = 100
disconnectedBufferOptions.isBufferEnabled = true
disconnectedBufferOptions.isPersistBuffer = false
disconnectedBufferOptions.isDeleteOldestMessages = false
mqttAndroidClient!!.setBufferOpts(disconnectedBufferOptions)
}
override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
LogUtils.d(TAG, " mqtt connect fail exception = " + exception.message)
}
})
}
}
private val sslSocketFactory: SSLSocketFactory
get() {
try {
val sslContext = SSLContext.getInstance("SSL")
sslContext.init(null, trustManager, SecureRandom())
return sslContext.socketFactory
} catch (e: Exception) {
throw RuntimeException(e)
}
}
private val trustManager: Array<TrustManager>
get() = arrayOf(object : X509TrustManager {
override fun checkClientTrusted(chain: Array<X509Certificate>, authType: String) {
//do nothing
}
override fun checkServerTrusted(chain: Array<X509Certificate>, authType: String) {
//do nothing
}
override fun getAcceptedIssuers(): Array<X509Certificate> {
return arrayOf()
}
})
/**
* 订阅消息
*/
fun subscribeTopic(subTopic: String, qos: Int) {
try {
if (mqttAndroidClient != null && mqttAndroidClient!!.isConnected) {
mqttAndroidClient!!.subscribe(subTopic, qos)
}
} catch (ex: MqttException) {
System.err.println("Exception while subscribing")
ex.printStackTrace()
}
}
/**
* 发布消息
*/
fun publishMessage(pubTopic: String, qos: Int, content: String) {
try {
if (mqttAndroidClient != null && mqttAndroidClient!!.isConnected) {
mqttAndroidClient!!.publish(pubTopic, content.toByteArray(), qos, false)
}
} catch (e: MqttException) {
System.err.println("Error Publishing: " + e.message)
e.printStackTrace()
}
}
fun release() {
try {
if (mqttAndroidClient != null) {
mqttAndroidClient!!.unregisterResources()
if (mqttAndroidClient!!.isConnected) {
mqttAndroidClient!!.disconnect()
}
mqttAndroidClient!!.close()
mqttAndroidClient = null
}
} catch (e: Exception) {
e.printStackTrace()
}
}
companion object {
private val TAG = MqttManager::class.java.simpleName
val instance: MqttManager
get() = MqttManagerHolder.INSTANCE
}
}
定义消息实体
/**
* Created by ZhangJun on 2019/1/5.
*/
class MqttMessageBean {
var messageId: String = ""
var messageContent: JSONObject = JSONObject()
}
到了这里,关于IOT开发---Android MQTT使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!