Java 使用 EMQX 实现物联网 MQTT 通信

这篇具有很好参考价值的文章主要介绍了Java 使用 EMQX 实现物联网 MQTT 通信。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言

EMQX 实现物联网 MQTT 通信。物联网的 MQ 消息通信方式。


一、介绍

1、MQTT

MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为远程连接设备提过实时可靠的消息服务,作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(loT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

特点:
使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
对负载内容屏蔽的消息传输;
使用 TCP/IP 提供网络连接;
有三种消息发布服务质量:
小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

2、EMQX

EMQX 是一个「无限连接,任意集成,随处运行」大规模分布式物联网接入平台。
EMQX 企业版提供一体化的分布式 MQTT 消息服务和强大的 IoT 规则引擎,为高可靠、高性能的物联网实时数据移动、处理和集成提供动力,助力企业快速构建关键业务的 IoT 平台与应用。附下载地址: https://www.emqx.com/zh/try?product=enterprise 可以自行下载对应版本运行
mria 集群架构,java程序实现,mqtt,emqx,java,物联网

优势:
海量连接:单节点支持 500 万 MQTT 设备连接,集群可水平扩展至支持 1 亿并发的 MQTT 连接。
高可靠:弹性伸缩,无单点故障。内置 RocksDB 可靠地持久化 MQTT 消息,确保无数据损失。
数据安全:端到端数据加密(支持国密),细粒度访问控制,保障数据安全,满足企业合规需求。
多协议:支持 MQTT、HTTP、QUIC、WebSocket、LwM2M/CoAP 或专有协议连接任何设备。
高性能:单节点支持每秒实时接收、处理与分发数百万条的 MQTT 消息。毫秒级消息交付时延。
易运维:图形化配置、操作与管理,实时监测运行状态。支持 MQTT 跟踪进行端到端问题分析。

3、Mria 集群架构​

支持全新的 Mria 集群架构,在此架构下 EMQX 水平扩展性得到指数级提升,单个集群可以轻松支持 1 亿 MQTT 连接,这使得 EMQX 5.0 成为目前全球最具扩展性的 MQTT Broker。

在构建满足用户业务需求的更大规模集群的同时,Mria 架构还能够降低大规模部署下的脑裂风险以及脑裂后的影响,以提供更加稳定可靠的物联网数据接入服务。

具体可以查看官方文档: https://docs.emqx.com/zh/enterprise/v5.1/deploy/cluster/create-cluster.html

4、MQTTX

MQTTX 是由 EMQ 开发的一款开源跨平台 MQTT 5.0 桌面客户端,它兼容 macOS,Linux 以及 Windows 系统。MQTTX 的用户界面 UI 采用聊天式设计,使得操作逻辑更加简明直观。它支持用户快速创建和保存多个 MQTT 连接,便于测试 MQTT/MQTTS 连接,以及 MQTT 消息的订阅和发布。

mria 集群架构,java程序实现,mqtt,emqx,java,物联网

主要功能
采用聊天界面设计,使得操作更加简单明了
跨平台兼容,支持在 Windows,macOS,Linux 系统上运行
100% 兼容 MQTT v5.0,v3.1.1 和 v3.1 协议
订阅的 MQTT 主题支持自定义颜色标签
支持单向和双向 SSL 认证,同时支持 CA 和自签名证书
支持通过 WebSocket 连接 MQTT 服务器
支持 Hex, Base64, JSON, Plaintext 等 Payload 格式转换
自定义脚本支持模拟 MQTT 发布/订阅测试
提供完整的日志记录功能
多语言支持:简体中文、英语、日语、土耳其语及匈牙利语 ??? ??? ??? ??? ???
自由切换 Light、Dark、Night 三种主题模式

二、SpringBoot 集成 EMQX

1、yaml 配置

# EMQX配置
emqx:
  # EMQX服务地址,端口号默认18083
  url: http://127.0.0.1:18083
  # 认证用户名
  username: admin
  # 密码
  password: admin123456

2、Properties 配置类

/**
 * EMQX配置
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "emqx")
public class EmqxConfig {

	private String url;

	private String username;

	private String password;
}

3、客户端连接实体 model

客户端连接实现模型类。

/**
 * EMQX客户端连接model
 */
@Data
public class EmqxClientModel {

	Integer awaiting_rel_cnt;
	Integer awaiting_rel_max;
	Boolean clean_start;
	
	/**
	 * 客户端id
	 */
	String clientid;
	
	/**
	 * 连接状态
	 */
	Boolean connected;
	Long connected_at;
	Long created_at;
	Long disconnected_at;
	Integer expiry_interval;
	Integer heap_size;
	Integer inflight_cnt;
	Integer inflight_max;
	
	/**
	 * ip地址
	 */
	String ip_address;
	Boolean is_bridge;
	
	/**
	 * 心跳检测时间s
	 */
	Integer keepalive;
	Integer mailbox_len;
	Integer mqueue_dropped;
	Integer mqueue_len;
	
	/**
	 * 消息队列最大长度
	 */
	Integer mqueue_max;
	String node;
	Integer port;
	String proto_name;
	Integer proto_ver;
	Integer recv_cnt;
	Integer recv_msg;
}

4、token 服务类

提供获取 token 的方法。

@Component
@RequiredArgsConstructor
public class EmqxTokenService {

	private final EmqxConfig emqxConfig;

	public String getToken(){
		String authentication = emqxConfig.getUsername() + ":" + emqxConfig.getPassword();
		return "Basic " + Base64.getEncoder().encodeToString(authentication.getBytes());
	}
}

5、客户端 api

提供对外调用的 api 服务。

  1. 查询客户端连接状态。
  2. 查询客户端连接信息。
  3. 删除客户端连接。
@Slf4j
@Component
@RequiredArgsConstructor
public class EmqxClientsApi {

	private final EmqxConfig emqxConfig;

	private final EmqxTokenService emqxTokenService;

	/**
	 * 根据客户端id查询连接状态
	 *
	 * @param clientId 客户端id
	 * @return 连接状态
	 */
	public boolean getConnectedStatus(String clientId) {
		EmqxClientModel client = getByClientId(clientId);
		if (client == null) {
			return false;
		}
		return client.getConnected();
	}

	/**
	 * 根据客户端id查询客户端信息
	 *
	 * @param clientId 客户端id
	 * @return 客户端信息
	 */
	public EmqxClientModel getByClientId(String clientId) {
		String url = String.format(emqxConfig.getUrl() + "/clients/%s", clientId);
		HttpResponse httpResponse;
		try {
			httpResponse = HttpRequest.get(url)
					.header("Authorization", emqxTokenService.getToken())
					.execute();
		} catch (Exception e) {
			log.info("未查到emqx客户端:clientId=" + clientId + "[msg]:" + e.getMessage());
			return null;
		}
		if (httpResponse != null && httpResponse.getStatus() == 200) {
			return JSON.parseObject(httpResponse.body(), EmqxClientModel.class);
		}
		return null;
	}

	/**
	 * 根据客户端id删除客户端
	 *
	 * @param clientId 客户端id
	 * @return 客户端信息
	 */
	public void delete(String clientId) {
		String url = String.format(emqxConfig.getUrl() + "/clients/%s", clientId);
		HttpResponse httpResponse;
		try {
			httpResponse = HttpRequest.delete(url)
					.header("Authorization", emqxTokenService.getToken())
					.execute();
		} catch (IORuntimeException e) {
			throw new ServiceException("删除emqx客户端请求超时");
		} catch (Exception e) {
			throw new ServiceException("删除emqx客户端请求异常:clientId=" + clientId + "[msg]:" + e.getMessage());
		}
		if (httpResponse == null || httpResponse.getStatus() != 204) {
			throw new ServiceException("删除emqx客户端失败:clientId=" + clientId);
		}
	}

}

三、SpringBoot 集成 MQTT

1、pom 依赖

<dependency>
  <groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.2</version>
</dependency>

2、yaml 配置

spring:
  # MQTT配置
  mqtt:
    # MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开
    host-url: tcp://127.0.0.1:1883
    # 用户名
    username: admin
    # 密码
    password: admin123456
    # 客户端id(不能重复)
    client-id: real-mqtt-client
    # MQTT默认的消息推送主题,实际可在调用接口时指定
    default-topic: topic

3、Properties 配置类

@Configuration
@ConfigurationProperties(prefix = "spring.mqtt")
@Data
public class MqttConfig {

	private String username;

	private String password;

	private String hostUrl;

	private String clientId;

	private String defaultTopic;
}

4、连接工厂类

执行 mq 初始化配置、连接、消息主题订阅。

@Slf4j
@Component
public class MqttFactory {

	public static ConcurrentHashMap<String, MqttClient> clientMap = new ConcurrentHashMap<>();

	@Autowired
	private MqttConfig mqttConfig;

	@Autowired
	private RealPersonAccessDeviceMapper realPersonAccessDeviceMapper;

	/**
	 * 在bean初始化后连接到服务器
	 */
	@PostConstruct
	public void init() {
		String mqttStartFlag = ParamResolver.getStr(RealCommonConstants.MQTT_START_FLAG);
		if (StrUtil.equals(mqttStartFlag, CommonConstants.SYS_YES_NO_Y)) {
			// 初始化订阅主题
			initSubscribeTopic(getInstance());
		}
	}

	/**
	 * 初始化客户端
	 */
	public MqttClient getInstance() {
		MqttClient client = null;
		if (clientMap.get(mqttConfig.getClientId()) == null) {
			try {
				client = new MqttClient(mqttConfig.getHostUrl(), mqttConfig.getClientId());
				// MQTT配置对象
				MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
				// 设置自动重连, 其它具体参数可以查看MqttConnectOptions
				mqttConnectOptions.setAutomaticReconnect(true);
				// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
				// mqttConnectOptions.setCleanSession(true);
				// 设置超时时间 单位为秒
				mqttConnectOptions.setConnectionTimeout(10);
				mqttConnectOptions.setUserName(mqttConfig.getUsername());
				mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
				// mqttConnectOptions.setServerURIs(new String[]{url});
				// 设置会话心跳时间 单位为秒
				mqttConnectOptions.setKeepAliveInterval(10);
				// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
				// mqttConnectOptions.setWill("willTopic", "offline".getBytes(), 2, false);
				if (!client.isConnected()) {
					client.connect(mqttConnectOptions);
				}
				client.setCallback(new MqttCallBack());
				log.info("MQTT创建client成功={}", JSONObject.toJSONString(client));
				clientMap.put(mqttConfig.getClientId(), client);
			} catch (MqttException e) {
				log.error("MQTT连接消息服务器[{}]失败", mqttConfig.getClientId() + "-" + mqttConfig.getHostUrl());
			}
		} else {
			client = clientMap.get(mqttConfig.getClientId());
			log.info("MQTT从map里获取到client,clientId=" + mqttConfig.getClientId());
			// TODO 已采用自动重连策略
//			log.info("MQTT从map里获取到client={}", JSONObject.toJSONString(client));
//			if (!client.isConnected()) {
//				initSubscribeTopic(client);
			// 如果缓存里的client已经断开,则清除该缓存,再重新创建客户端连接
//				clientMap.remove(mqttConfig.getClientId());
//				this.getInstance();
//			}
		}
		return client;
	}


	/**
	 * 初始化订阅主题
	 * <p>
	 * 消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
	 */
	public void initSubscribeTopic(MqttClient client) {
			// 订阅设备发布消息主题
			List<String> upstreamTopics = new ArrayList<>();
			List<Integer> upstreamQos = new ArrayList<>();
			
			upstreamTopics.add("topic_1");
			upstreamQos.add(1);
			upstreamTopics.add("topic_2");
			upstreamQos.add(0);
			upstreamTopics.add("topic_3");
			upstreamQos.add(1);
		
			try {
				client.subscribe(upstreamTopics.toArray(new String[upstreamTopics.size()]), upstreamQos.stream().mapToInt(Integer::intValue).toArray());
			} catch (MqttException e) {
				e.printStackTrace();
			}
		}
	}

}

5、MQTT 回调类

连接断开回调,以及消息到达回调。根据消息主题进行消息分发。

@Slf4j
@Component
public class MqttCallBack implements MqttCallback, MqttCallbackExtended {

	/**
	 * 客户端断开连接的回调
	 */
	@Override
	public void connectionLost(Throwable throwable) {
		log.info("客户端断开连接回调");
	}

	@Override
	public void connectComplete(boolean reconnect, String serverURI) {
		log.info("客户端断开连接重连");
		// 重新订阅
		MqttFactory client = SpringContextHolder.getBeanFactory().getBean(MqttFactory.class);
		client.initSubscribeTopic(client.getInstance());
		log.info("重连成功");
	}

	/**
	 * 消息到达的回调
	 */
	@Override
	public void messageArrived(String topic, MqttMessage mqttMessage) {
		// 日志输出
		MqttFactory client = SpringContextHolder.getBeanFactory().getBean(MqttFactory.class);
		log.info("mqtt客户端ID : {}", client.getInstance().getClientId());
		log.info("mqtt接收消息主题 : {}", topic);
		log.info("mqtt接收消息Qos : {}", mqttMessage.getQos());
		log.info("mqtt接收消息retained : {}", mqttMessage.isRetained());
		log.info("mqtt接收消息内容 : {}", new String(mqttMessage.getPayload()));

		String message = new String(mqttMessage.getPayload()); // 消息内容

		// TODO 根据消息主题进行消息分发
		
	}

	/**
	 * 消息发布成功的回调
	 */
	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {
		IMqttAsyncClient client = token.getClient();
		log.info(client.getClientId() + " 发布消息成功!");
	}
}

6、MQ 服务类

提供消息发布等方法。

这里对发送消息方法进行了自定义封装,增加了 redis 对发送的消息进行存储,在异步的响应消息返回时,利用 redis 中发送消息的消息id实现响应数据的异步绑定。响应消息存在丢失的情况。需要合理设置发送消息的过期时间,防止时间过短导致返回的响应丢失,防止时间过长占用 redis 资源。

@Slf4j
@Data
@Configuration
public class UMqttClientService {

	private final MqttFactory mqttFactory;

	private final StringRedisTemplate redisTemplate;

	/**
	 * 订阅主题
	 */
	public void subscribeTopic(String deviceNo) {
		try {
			// 订阅设备发布消息主题
			List<String> upstreamTopics = new ArrayList<>();
			upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, deviceNo));
			upstreamTopics.add(UMqttCommonConstants.ONLINE);
			upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, deviceNo));
			upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, deviceNo));
			int[] upstreamQos = {1, 1, 2, 0};
			mqttFactory.getInstance().subscribe(upstreamTopics.toArray(new String[0]), upstreamQos);
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 取消订阅主题
	 */
	public void stopSubscribeTopic(String deviceNo) {
		try {
			// 取消订阅设备发布消息主题
			List<String> upstreamTopics = new ArrayList<>();
			upstreamTopics.add(String.format(UMqttCommonConstants.EVENT, deviceNo));
			upstreamTopics.add(UMqttCommonConstants.ONLINE);
			upstreamTopics.add(String.format(UMqttCommonConstants.RESPONSE, deviceNo));
			upstreamTopics.add(String.format(UMqttCommonConstants.UPLOAD, deviceNo));
			mqttFactory.getInstance().unsubscribe(upstreamTopics.toArray(new String[0]));
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 断开连接
	 */
	public void disConnect() {
		try {
			mqttFactory.getInstance().disconnect();
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 订阅主题
	 */
	public void subscribe(String topic, int qos) {
		try {
			mqttFactory.getInstance().subscribe(topic, qos);
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 发布请求设备消息
	 *
	 * @param deviceNo 设备编号
	 * @param message  消息
	 */
	public void publish(String deviceNo, String message) {
		publish(1, false, String.format(UMqttCommonConstants.REQUEST, deviceNo), message);
	}

	/**
	 * 发布请求设备消息
	 */
	public void publish(UMqttPublishDate publishDate) {
		// 将消息id和方法名存到redis中:缓存3分钟
		redisTemplate.opsForValue().set(UMqttCommonConstants.UMQTT_MSG_REDIS_KEY + publishDate.getId(),
				JSON.toJSONString(publishDate), 24, TimeUnit.HOURS);
		publish(1, false, String.format(UMqttCommonConstants.REQUEST, publishDate.getDeviceNo()), publishDate.getMessage());
	}

	/**
	 * 发布响应设备消息
	 */
	public void publishResponse(UMqttPublishDate publishDate) {
		// 将消息id和方法名存到redis中:缓存3分钟
		redisTemplate.opsForValue().set(UMqttCommonConstants.UMQTT_MSG_REDIS_KEY + publishDate.getId(),
				JSON.toJSONString(publishDate), 24, TimeUnit.HOURS);
		publish(1, false, String.format(UMqttCommonConstants.RESPONSE, publishDate.getDeviceNo()), publishDate.getMessage());
	}

	/**
	 * 发布消息
	 *
	 * @param qos      qos
	 * @param retained retained
	 * @param topic    主题
	 * @param message  消息
	 */
	public void publish(int qos, boolean retained, String topic, String message) {
		log.info("发布消息topic:" + topic);
		log.info("发布消息message:" + message);
		MqttMessage mqttMessage = new MqttMessage();
		mqttMessage.setQos(qos);
		mqttMessage.setRetained(retained);
		mqttMessage.setPayload(message.getBytes());
		//主题的目的地,用于发布/订阅信息
		MqttTopic mqttTopic = mqttFactory.getInstance().getTopic(topic);
		//提供一种机制来跟踪消息的传递进度
		//用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
		MqttDeliveryToken token;
		try {
			//将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
			//一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
			token = mqttTopic.publish(mqttMessage);
			token.waitForCompletion();
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}
}

四、MQTT 的重连策略

  1. 在 mqtt 工厂类中进行初始化连接时,设置自动重连状态为开启。
// 设置自动重连
mqttConnectOptions.setAutomaticReconnect(true);
  1. 在 mqtt 回调类中,设置重连处理业务。
@Override
public void connectComplete(boolean reconnect, String serverURI) {
	log.info("客户端断开连接重连");
	// 重新订阅
	MqttFactory client = SpringContextHolder.getBeanFactory().getBean(MqttFactory.class);
	client.initSubscribeTopic(client.getInstance());
	log.info("重连成功");
}

MQTT 连接失效时,会自动进行重连,执行自定义的重连策略,重连过程中 MQTT 消息服务停止,消息处理等业务会抛出异常。

五、EMQX 的 Windows 部署启动方式

EMQX 服务需要延时启动,因为部署服务器开机时 EMQX 服务需要等待完成一些初始化操作。保证 EMQX 服务在业务服务启动前启动。

bat 脚本部署方案:

  1. 方式一:(每次停止服务后需重启电脑)
@echo off
start cmd /k "cd /d D:\Program Files\emqx-5.0.11-windows-amd64\bin && emqx start"

echo start magic-demo-biz
java -jar -Dfile.encoding=utf-8 magic-demo-biz.jar
  1. 方式二:(管理员权限cmd进入D:\Program Files\emqx-5.0.11-windows-amd64\bin,执行emqx install安装成服务,将emqx服务设置为手动启动)
@echo off
sc start emqx

echo start magic-demo-biz
java -jar -Dfile.encoding=utf-8 magic-demo-biz.jar

六、疑难解答

1、避免消息发送速率过快

当需要批量发送大量消息时,如果消息发送频率过快,会导致 EMQX 服务器会主动将当前发送消息的客户端连接断开,因此在发送消息时需要控制 MQTT 消息的发送频率。

for (int i = 0; i < recordSize; i++) {
	// 发送MQTT消息
	mqttClientService.sendMessage("topic", "发送消息");

	// 执行延时程序,控制消息发送速率。(速率过快会导致MQTT客户端连接掉线)
	try {
		TimeUnit.SECONDS.sleep(second);
	} catch (Exception e) {
		log.error("延时程序执行异常:" + e.getMessage());
	}
}

2、判断 MQTT 客户端连接状态

会有 MQTT 客户端连接存在但连接状态为已断开的情况。因此判断 MQTT 客户端连接状态时,需要获取 MQTT 客户端连接的实际连接状态,而不是仅判断 MQTT 客户端连接是否存在。

/**
 * 根据客户端id查询连接状态
 *
 * @param clientId 客户端id
 * @return 连接状态
 */
public boolean getConnectedStatus(String clientId) {
	EmqxClientModel client = getByClientId(clientId);
	if (client == null) {
		return false;
	}
	return client.getConnected(); // 查询实际连接状态
}

总结

MQTT 原理同 MQ 消息发送与接收,EMQX 的 MQTT 客服端连接即消息队列。文章来源地址https://www.toymoban.com/news/detail-777250.html

到了这里,关于Java 使用 EMQX 实现物联网 MQTT 通信的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 搭建阿里云物联网平台实现MQTT通信

    1,点击进入阿里云官方网站:阿里云-上云就上阿里云 (aliyun.com) 2,注册登录并且进行实名认证; 如下图: 点击右上角 控制台,进入如图界面:  3,在阿里物联网云平台创建设备:   首先创建产品:大致过程如下(多图预警)    添加自定义功能:  发布上线后,就成为了

    2024年02月08日
    浏览(61)
  • 使用4G通信模块和MQTT协议,完成物联网设备开发。

    (1)安装并使用4G模块通信模块,建立microPython开发环境; (2)使用提供的Demo开发例程,使用MQTT传输协议连接阿里或腾讯网站,完成物联网设备开发。 (3)将温湿度信息上传到网站; (4)手机APP查看数 这是第一步,在阿里云平台创建产品和设备,用来将实际的设备数据

    2024年02月04日
    浏览(59)
  • MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图

    MQTT(EMQX) - Linux CentOS Docker 安装 MQTT (Message Queue Telemetry Transport) 是一个轻量级传输协议,它被设计用于轻量级的发布/订阅式消息传输,MQTT协议针对低带宽网络,低计算能力的设备,做了特殊的优化。是一种简单、稳定、开放、轻量级易于实现的消息协议,在物联网的应用下的

    2023年04月10日
    浏览(34)
  • STM32+ESP-01s+EMQX实现单片机MQTT协议传输数据上云(二)STM32F103与ESP-01s的Usart通信,实现STM32连接上网上云

    单片机:STM32F103c8t6 WiFi模块:ESP8266-01s EMQX:自身服务器上搭载emq服务器或者借用emqx window 版本  USB TO TTL模块:CH340 因为CH340不能给ESP-01s供3.3V的电,所以测试时需要外加供电           本章中涉及到的技术原理主要为ESP01S wfi模块的AT指令通信,我在上一篇文章给大家提到了

    2024年02月16日
    浏览(52)
  • springboot当中使用EMQX(MQTT协议)

    本篇博客主要围绕EMQX是什么?、能干什么?、怎么用? 三点来进行整理。 1.1、MQTT简介 在了解EMQX前首先了解一下MQTT协议,MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输),是一种基于 发布/订阅 模式的 轻量级物联网消息传输协议。IBM 公司的安迪·斯坦福-克拉

    2024年02月21日
    浏览(37)
  • MQTT协议原理介绍及如何使用emqx

    MQTT(Message Queuing Telemetry Transport)协议是一种轻量级的、基于发布/订阅模式的通信协议。它最初由IBM开发,用于在低带宽和不稳定的网络环境中传输小型数据包。MQTT协议被广泛应用于物联网(IoT)领域,例如传感器数据采集、远程监控和控制等。 MQTT协议使用了一种异步的、

    2024年02月12日
    浏览(38)
  • uniAPP开发小程序使用MQTT通讯EMQX Cloud

    首先感谢大佬 参考案例 下载并安装工具 1.Hbuilderx 2. nodejs 3.MQTTX 链接放这,自己下载安装 MQTT服务器:EMQX 第一步:测试MQTTX通讯 1.记住这地址, 你的服务器地址 2.随便创建几个用户 3.打开MQTTX 填入刚刚的服务器地址 注意我选的参数 用户就是上图的用户和密码 点击连接,成功

    2024年01月15日
    浏览(32)
  • EMQX(MQTT)----基本用法以及使用Python程序进行模拟流程

            EMQX是大规模分布式物联网MQTT消息服务器,除了发送接送的流量不能太大(不能用于生产!),在学习MQTT方面上有很大的优势的!         在使用该协议时,主要需要弄懂的一个知识点就是“发布者”和“订阅者”的关系,在最简单的模型中,一般会含有以上两

    2023年04月24日
    浏览(80)
  • 搭建自己的MQTT服务器,实现设备上云(Ubuntu+EMQX)

    这篇文章教大家在ECS云服务器上部署EMQX,搭建自己私有的MQTT服务器,配置EMQX实现设备上云,设备数据转发,存储;服务器我采用的华为云的ECS服务器,系统选择Ubuntu系统。 Windows版本的看这里: https://blog.csdn.net/xiaolong1126626497/article/details/134280836 EMQX是一款大规模可弹性伸缩

    2024年02月04日
    浏览(47)
  • C#通过MQTT与其他物联网设备通信

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,常用于物联网设备之间的通信。在C#中,我们可以使用MQTT库来实现与其他物联网设备之间的通信,本文将介绍如何使用C#中的MQTT库进行通信。 一、安装MQTT库 C#中有多个MQTT库可供选择,例如M2Mqtt、MQTTnet等,本文

    2024年02月16日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包