Android 实现MQTT客户端,用于门禁消息推送

这篇具有很好参考价值的文章主要介绍了Android 实现MQTT客户端,用于门禁消息推送。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

添加MQTT依赖

implementation ‘org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.2’
implementation ‘org.eclipse.paho:org.eclipse.paho.android.service:1.1.1’

在Manifest清单文件中添加服务

<service android:name="org.eclipse.paho.android.service.MqttService" />

两种实现方法


MqttClient的实现方式

MQTT初始化连接线程,实现与服务器的连接、订阅、发布消息
    private class ConnectTask extends AsyncTask<Void, String, Boolean> {
        //MQTT连接线程,实现与服务器的连接、订阅、发布消息
        /**
         * 异步任务:AsyncTask<Params, Progress, Result>
         * 1.Params:UI线程传过来的参数。
         * 2.Progress:发布进度的类型。
         * 3.Result:返回结果的类型。耗时操作doInBackground的返回结果传给执行之后的参数类型。
         *
         * 执行流程:
         * 1.onPreExecute()
         * 2.doInBackground()-->onProgressUpdate()
         * 3.onPostExecute()
         */

        @Override
        protected void onPreExecute() //执行耗时操作之前处理UI线程事件
        {
            super.onPreExecute();
            isConnecting = true;
            connectionStatusTextView.setText("Connecting...");
        }

        @Override
        protected Boolean doInBackground(Void... voids)
        {
            //在此方法执行耗时操作,耗时操作中收发MQTT服务器的数据
            //MQTT服务器地址
            String brokerUrl = "tcp://www.10086.com:1883";
            //客户端ID,用于在MQTT服务器上
            String clientId = MqttClient.generateClientId();
            try {
                mqttClient = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
            } catch (MqttException e) {
                throw new RuntimeException(e);
            }
            MqttConnectOptions connectOptions = new MqttConnectOptions();
            connectOptions.setCleanSession(true);
            //mqtt服务器用户名和密码
            connectOptions.setUserName("username");
            connectOptions.setPassword("password".toCharArray());
            connectOptions.setWill("断线消息主题","断线消息内容".getBytes(),1,true);
            connectOptions.setConnectionTimeout(10);
            connectOptions.setKeepAliveInterval(20);
            try {
                mqttClient.connect(connectOptions);
                mqttClient.setCallback(new MqttCallback() {
                    @Override
                    public void connectionLost(Throwable throwable) {
                        Log.e(TAG, "连接丢失");//连接丢失的时候可以在这里进行重新连接
                        publishProgress("Connection lost, reconnecting...");
                        new ReconnectTask().execute();
                    }

                    @Override
                    public void messageArrived(String topic, MqttMessage message) throws Exception {
                        Log.i(TAG, "收到消息:"+message.toString());
                        if (topic.equals("sensors/temperature")) {
                            // Update temperature reading
                            publishProgress("Temperature: " + message.toString());
                        } else if (topic.equals("sensors/humidity")) {
                            // Update humidity reading
                            publishProgress("Humidity: " + message.toString());
                        } else if (topic.equals("leds/led1/status")) {
                            // Update LED 1 status
                            if (message.toString().equals("on")) {
                                publishProgress("LED 1 is on");
                                ledStatusImageView.setText("LED 1 is on");
                            } else {
                                publishProgress("LED 1 is off");
                                ledStatusImageView.setText("LED 1 is off");
                            }
                        } else if (topic.equals("leds/led2/status")) {
                            // Update LED 2 status
                            if (message.toString().equals("on")) {
                                publishProgress("LED 2 is on");
                                ledStatusImageView.setText("LED 2 is on");
                            } else {
                                publishProgress("LED 2 is off");
                                ledStatusImageView.setText("LED 2 is off");
                            }
                        }
                    }

                    @Override
                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                        Log.i(TAG, "成功发送");
                    }
                });
                //这里是订阅的话题
                /**
                 * subscribe的第二个参数如果是消息等级,其代表的意义是:
                 *
                 * qos = 0 : 消息最多发送一次,在客户端离线等不可用的情况下客户端将会丢失这条消息。
                 *
                 * qos = 1 : 消息至少发送一次,能保证客户端能收到该条消息。
                 *
                 * qos = 2 : 消息仅传送一次。
                 */
                mqttClient.subscribe("zhyj/temperature",1);
                mqttClient.subscribe("zhyj/mj1/status",1);
                mqttClient.subscribe("zhyj/mj22/status",1);
            } catch (MqttException e) {
                Log.e(TAG, "Error connecting to MQTT broker: " + e.getMessage());
                publishProgress("Error connecting to MQTT broker: " + e.getMessage());
                return false;
            }
            return true;
        }

        @Override
        protected void onProgressUpdate(String... values) {
            //用于在主线程处理doInBackground()方法执行完毕后的结果,更新UI或者执行其它操作
            super.onProgressUpdate(values);
            connectionStatusTextView.setText(values[0]);
        }

        @Override
        protected void onPostExecute(Boolean aBoolean)
        {
            //用于在主线程处理doInBackground()方法执行完毕后的结果,更新UI或者执行其它操作
            super.onPostExecute(aBoolean);
            isConnecting = false;
            if (aBoolean) {
                connectionStatusTextView.setText("Connected");
            }
        }
    }

MQTT重连
    private class ReconnectTask extends AsyncTask<Void, String, Boolean> {

        @Override
        protected void onPreExecute() {
            super.onPreExecute();
            isConnecting = true;
            connectionStatusTextView.setText("Reconnecting...");
        }

        @Override
        protected Boolean doInBackground(Void... voids) {
            if (mqttClient != null && mqttClient.isConnected()) {
                try {
                    mqttClient.disconnect();
                } catch (MqttException e) {
                    Log.e(TAG, "Error disconnecting from MQTT broker: " + e.getMessage());
                }
            }
            return new ConnectTask().doInBackground();
        }

        @Override
        protected void onPostExecute(Boolean aBoolean) {
            super.onPostExecute(aBoolean);
            isConnecting = false;
            if (aBoolean) {
                connectionStatusTextView.setText("Connected");
            }
        }
    }
MQTT断开
 private class DisconnectTask extends AsyncTask<Void, Void, Void> {

        @Override
        protected void onPreExecute() {
            super.onPreExecute();
            connectionStatusTextView.setText("Disconnecting...");
        }

        @Override
        protected Void doInBackground(Void... voids) {
            if (mqttClient != null && mqttClient.isConnected()) {
                try {
                    mqttClient.disconnect();
                } catch (MqttException e) {
                    Log.e("dbj", "Error disconnecting from MQTT broker: " + e.getMessage());
                }
            }
            return null;
        }

        @Override
        protected void onPostExecute(Void aVoid) {
            super.onPostExecute(aVoid);
            connectionStatusTextView.setText("Disconnected");
            ledStatusImageView.setText("Disconnected");
        }
    }
发送消息
private class ToggleLedTask extends AsyncTask<String, Void, Void> {

        @Override
        protected Void doInBackground(String... strings) {
            if (mqttClient != null && mqttClient.isConnected()) {
                String topic = "zhyj/" + strings[0] + "/control";
                MqttMessage message = new MqttMessage();
                if (ledStatusImageView.getText().toString().contains("on")) {
                    message.setPayload("off".getBytes());
                } else {
                    message.setPayload("on".getBytes());
                }
                try {
                    //publish()的第三个参数和subscribe的第二个参数的qos同理
                    mqttClient.publish(topic, message);
                } catch (MqttException e) {
                    Log.e(TAG, "Error publishing message: " + e.getMessage());
                }
            }
            return null;
        }
    }


MqttAndroidClient service的实现方式

配置文件添加 MQTTService

public class MQTTService extends Service {

    public static final String TAG = "dxj";

    private static MqttAndroidClient client;
    private MqttConnectOptions conOpt;

    private String host = "tcp://www.10086.com:1883";
    private String userName = "";
    private String passWord = "";
    private static String myTopic = "zhyj_mj";      //要订阅的主题
    private String clientId = "";//客户端标识
    private IGetMessageCallBack IGetMessageCallBack;



    @Override
    public void onCreate() {
        super.onCreate();
        Log.e(getClass().getName(), "onCreate");
        init();
    }

    public static void publish(String msg) {
        try {
            if (client != null) {
                client.publish(myTopic, msg.getBytes(), 1, false);
            }
        } catch (MqttException e) {
            e.printStackTrace();
            Log.e(TAG, "Error connecting to MQTT broker: " + e.getMessage());
        }
    }

    private void init() {
        // 服务器地址(协议+地址+端口号)
        String uri = host;
        client = new MqttAndroidClient(this, uri, clientId);
        // 设置MQTT监听并且接受消息
        client.setCallback(mqttCallback);

        conOpt = new MqttConnectOptions();
        // 这个标志是标志客户端,服务端是否要保持持久化的一个标志。默认是true
        //设置客户端和服务端重启或重连后是否需要记住之前的状态
        conOpt.setCleanSession(false);
        // 设置超时时间,单位:秒
        conOpt.setConnectionTimeout(10);
        // 心跳包发送间隔,单位:秒
        conOpt.setKeepAliveInterval(20);
        //连接丢失的情况下,客户端将尝试重新连接到服务器。
        // 在尝试重新连接之前,它最初将等待1秒,对于每次失败的重新连接尝试,
        // 延迟将加倍,直到达到2分钟,此时延迟将保持在2分钟
//        conOpt.setAutomaticReconnect(true);

        // 用户名
        conOpt.setUserName(userName);
        // 密码
        conOpt.setPassword(passWord.toCharArray());     //将字符串转换为字符串数组

        // last will message
        boolean doConnect = true;
        String message = "{\"terminal_uid\":\"" + clientId + "\"}";
        Log.e(getClass().getName(), "message是:" + message);
        String topic = myTopic;
        // 最后的遗嘱
        // MQTT本身就是为信号不稳定的网络设计的,所以难免一些客户端会无故的和Broker断开连接。
        //当客户端连接到Broker时,可以指定LWT,Broker会定期检测客户端是否有异常。
        //当客户端异常掉线时,Broker就往连接时指定的topic里推送当时指定的LWT消息。
        try {
            conOpt.setWill(topic, message.getBytes(), 1, false);
        } catch (Exception e) {
            Log.i(TAG, "Exception Occured", e);
            doConnect = false;
            iMqttActionListener.onFailure(null, e);
        }

        if (doConnect) {
            doClientConnection();
        }

    }


    @Override
    public void onDestroy() {
         stopSelf();
        try {
            if (isAlreadyConnected()) {
                client.disconnect();
                client.unregisterResources();
            }
            IGetMessageCallBack.setStatus("断开连接");
        } catch (MqttException e) {
            e.printStackTrace();
            Log.e(TAG, "Error connecting to MQTT broker: " + e.getMessage());
        }
      
        super.onDestroy();
    }

    /**
     * 连接MQTT服务器
     */
    private void doClientConnection() {
        if (!isAlreadyConnected() && isConnectIsNormal()) {
            try {
                client.connect(conOpt, null, iMqttActionListener);
            } catch (MqttException e) {
                e.printStackTrace();
                Log.e(TAG, "Error connecting to MQTT broker: " + e.getMessage());
            }
        }

    }

    // MQTT是否连接成功
    private IMqttActionListener iMqttActionListener = new IMqttActionListener() {

        @Override
        public void onSuccess(IMqttToken arg0) {
            try {
                /**
                 * subscribe的第二个参数如果是消息等级,其代表的意义是:
                 *
                 * qos = 0 : 消息最多发送一次,在客户端离线等不可用的情况下客户端将会丢失这条消息。
                 *
                 * qos = 1 : 消息至少发送一次,能保证客户端能收到该条消息。
                 *
                 * qos = 2 : 消息仅传送一次。
                 */
                client.subscribe("zhyj/mj/status", 1);
                IGetMessageCallBack.setStatus("连接成功");
            } catch (MqttException e) {
                e.printStackTrace();
                IGetMessageCallBack.setStatus("连接失败");
                Log.e(TAG, "Error connecting to MQTT broker: " + e.getMessage());
            }
        }

        @Override
        public void onFailure(IMqttToken arg0, Throwable arg1) {
            arg1.printStackTrace();
            // 连接失败,重连
            Log.e(TAG, "Error publishing message: " + arg0.toString());
            IGetMessageCallBack.setStatus("连接失败");
        }
    };

    // MQTT监听并且接受消息
    private MqttCallback mqttCallback = new MqttCallback() {

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            IGetMessageCallBack.setStatus("收到消息");

            String str1 = new String(message.getPayload());
            if (IGetMessageCallBack != null) {
                IGetMessageCallBack.setMessage(str1);
            }
            String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained();
            Log.i(TAG, "messageArrived:" + str1);
            Log.i(TAG, str2);
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {
            IGetMessageCallBack.setStatus("成功发送");
        }

        @Override
        public void connectionLost(Throwable arg0) {
            // 失去连接,重连
            Log.e(TAG, "连接丢失 isAlreadyConnected=" + isAlreadyConnected());
            IGetMessageCallBack.setStatus("连接丢失");

        }
    };

    /**
     * 判断网络是否连接
     */
    private boolean isConnectIsNormal() {
        ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext()
                .getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo info = connectivityManager.getActiveNetworkInfo();
        if (info != null && info.isAvailable()) {
            String name = info.getTypeName();
            Log.e(TAG, "MQTT当前网络名称:" + name);
            return true;
        } else {
            Log.e(TAG, "MQTT 没有可用网络");
            return false;
        }
    }


    @Override
    public IBinder onBind(Intent intent) {
        Log.e(getClass().getName(), "onBind");
        return new CustomBinder();
    }

    public void setIGetMessageCallBack(IGetMessageCallBack IGetMessageCallBack) {
        this.IGetMessageCallBack = IGetMessageCallBack;
    }

    public class CustomBinder extends Binder {
        public MQTTService getService() {
            Log.e(getClass().getName(), "CustomBinder");
            return MQTTService.this;
        }
    }

    public void toCreateNotification(String message) {
        PendingIntent pendingIntent = PendingIntent.getActivity(this, 1, new Intent(this, MQTTService.class), PendingIntent.FLAG_UPDATE_CURRENT);
        NotificationCompat.Builder builder = new NotificationCompat.Builder(this);//3、创建一个通知,属性太多,使用构造器模式

        Notification notification = builder
                .setTicker("test_title")
                .setSmallIcon(R.mipmap.ic_launcher)
                .setContentTitle("")
                .setContentText(message)
                .setContentInfo("")
                .setContentIntent(pendingIntent)//点击后才触发的意图,“挂起的”意图
                .setAutoCancel(true)        //设置点击之后notification消失
                .build();
        NotificationManager notificationManager = (NotificationManager) getSystemService(NOTIFICATION_SERVICE);
        startForeground(0, notification);
        notificationManager.notify(0, notification);

    }

    public boolean isAlreadyConnected() {
        if (client != null) {
            try {
                boolean result = client.isConnected();
                if (result) {
                    return true;
                } else {
                    return false;
                }
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
        } else {
            return false;
        }
    }
}
为了方便Service与Acitivity之间的通信,创建一个工具类作为桥梁
public class MyServiceConnection implements ServiceConnection {

    private MQTTService mqttService;
    private IGetMessageCallBack IGetMessageCallBack;

    @Override
    public void onServiceConnected(ComponentName componentName, IBinder iBinder) {
        mqttService = ((MQTTService.CustomBinder)iBinder).getService();
        mqttService.setIGetMessageCallBack(IGetMessageCallBack);
    }

    @Override
    public void onServiceDisconnected(ComponentName componentName) {

    }

    public MQTTService getMqttService(){
        return mqttService;
    }

    public void setIGetMessageCallBack(IGetMessageCallBack IGetMessageCallBack){
        this.IGetMessageCallBack = IGetMessageCallBack;
    }

几个重要的参数
MqttConnectOptions.setAutomaticReconnect(true)

true表示支持自动重连
连接丢失的情况下,客户端将尝试重新连接到服务器。
在尝试重新连接之前,它最初将等待1秒,对于每次失败的重新连接尝试,
延迟将加倍,直到达到2分钟,此时延迟将保持在2分钟

MqttConnectOptions.setCleanSession(true)

官方注释:
如果设置为false,则客户端和服务器将在重新启动客户端、服务器和连接时保持状态。当状态保持时:
即使重新启动客户端、服务器或连接,消息传递也将可靠地满足指定的QOS。
服务器会将订阅视为持久订阅。
如果设置为true,则客户端和服务器将不会在重新启动客户端、服务器或连接时保持状态。这意味着
如果重新启动客户端、服务器或连接,则无法维持向指定QOS的消息传递

这个标志是标志客户端,服务端是否要保持持久化的一个标志。默认是true
设置客户端和服务端重启或重连后是否需要记住之前的状态。
当setCleanSession为true时,客户端掉线或服务端重启后,服务端和客户端会清掉之前的 session, 重连后客户端会有一个新的session。离线期间发来QoS=0,1,2的消息一律接收不到,且所有之前订阅的topic需要重新订阅。
··························································································
当setCleanSession为false时, 客户端掉线或服务端重启后,服务端和客户端不会清除之前的session。重连后session将会恢复,客户端不用重新订阅主题。且离线期间发来QoS=0,1,2的消息仍然可以接收到。

这里有个需要注意的地方,即setCleanSession为true时,掉线后客户端设置了setAutomaticReconnect为true才会自动重连。为当setCleanSession为false时。不管setAutomaticReconnect为true或者false都会自动重连。

MqttConnectOptions.setKeepAliveInterval(30);

心跳包发送间隔,单位:秒
MQTT客户端(client)在没有收到或发布任何消息时仍然是保持连接的。服务端(the broker)需要跟踪客户端的连接状态。 所有需要发送心跳包来确定客户端是否是连接状态。心跳包发送的时间间隔就是keepalive设置的。
服务端会维持一个timer。当这个timer记录的时间超过1.5倍keepalive值时,服务端会将这个客户端标记为断开连接,并发送Last Will and Testament (LWT)遗言广播。
每次客户端发送或接收一个消息, 服务端会重置这个timer。文章来源地址https://www.toymoban.com/news/detail-629484.html

按具体业务场景修改使用即可

到了这里,关于Android 实现MQTT客户端,用于门禁消息推送的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Android】MQTT入门——服务器部署与客户端搭建

    MQTT(Message Queuing Telemetry Transport) 是一种基于发布/订阅模式的轻量级消息传输协议,专门针对低带宽、和不稳定网络环境的物联网应用而设计,它可以用极少的代码为互联网设备提供实时可靠的消息服务。 MQTT 协议主要用于物联网和移动设备等资源有限的场景中,其中包括

    2024年02月04日
    浏览(72)
  • mqtt服务器搭建与qt下的mqtt客户端实现

      MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(Io

    2024年02月06日
    浏览(79)
  • Python实现的mqtt客户端工具分享,小巧且超轻量级(python+tkinter+paho.mqtt)

    mqtt协议调试时需要个客户端工具,但网上找的体积包都很大,都不够小巧和便携。于是趁周末时间用python搞出来了个客户端工具,使用python+tinker+paho.mqtt实现。源码量很少但功能不弱,相当的轻量级。分享给有需要的小伙伴,喜欢的可以点击收藏。 用python实现个跨平台的mq

    2024年02月07日
    浏览(33)
  • Qt实现客户端与服务器消息发送

    里用Qt来简单设计实现一个场景,即: (1)两端:服务器QtServer和客户端QtClient (2)功能:服务端连接客户端,两者能够互相发送消息,传送文件,并且显示文件传送进度。 环境:VS20013 + Qt5.11.2 + Qt设计师 先看效果: 客户端与服务器的基本概念不说了,关于TCP通信的三次握

    2024年02月11日
    浏览(44)
  • SpringBoot中使用Spring integration加Eclipse Paho Java Client 实现MQTT客户端

    Spring Integration 是一个开源的集成消息处理框架,它提供了消息传递、消息过滤、消息转换、消息路由等功能,可以用于构建异步、分布式的系统。 Spring-integration-stream是Spring Integration框架的一个组件,用于在不同的系统和应用之间进行消息传递、集成和流处理。 它提供了一套

    2024年02月10日
    浏览(44)
  • 使用HTTP/2实现服务端主动推送消息给客户端

    77. 使用HTTP/2实现服务端主动推送消息给客户端 HTTP/2 协议的服务器主动推送机制是通过服务器在接收到客户端请求后,主动向客户端推送相关资源的方式来实现的。下面将详细解释如何在服务器端和客户端实现 HTTP/2 的服务器主动推送,并给出相应的代码示例。 客户端实现:

    2024年02月11日
    浏览(51)
  • mqtt安卓客户端

    1.MQTT(消息队列遥测传输协议),是一种基于 发布/订阅 (publish/subscribe)模式的\\\"轻量级\\\"通讯协议, 该协议构建于TCP/IP协议上 。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使

    2024年02月10日
    浏览(38)
  • Spring Boot 集成 WebSocket 实现服务端推送消息到客户端

          假设有这样一个场景:服务端的资源经常在更新,客户端需要尽量及时地了解到这些更新发生后展示给用户,如果是 HTTP 1.1,通常会开启 ajax 请求询问服务端是否有更新,通过定时器反复轮询服务端响应的资源是否有更新。                         在长时间不更新

    2024年02月16日
    浏览(43)
  • 【MQTT】MQTT简介+安装+使用 python MQTT客户端

    目录 前言 MQTT 协议简介 为何选择 MQTT MQTT 通讯运作方式 MQTT 协议帧格式 MQTT服务器搭建和使用  公共MQTT 测试服务器 MQTT服务器搭建 各种MQTT代理服务程序比较 Mosquitto安装 MQTT使用方法 测试MQTT服务器 程序中使用MQTT 本文随时更新,转载请注明出处,源地址:http://t.csdn.cn/kCC0B 文

    2024年02月01日
    浏览(34)
  • java后端使用websocket实现与客户端之间接收及发送消息

    客户端请求websocket接口,连接通道=》我这边业务成功客户端发消息=》客户端自动刷新。 接口:ws://localhost:8080/websocket/xx 经测试,成功 如果是线上服务器连接,则需要在nginx里配置websocket相关内容,再重启nginx,代码如下 本地连接的时候用的是ws://,因为是http链接,但是如果是

    2024年02月16日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包