netty websocket uri 连接时 传参

这篇具有很好参考价值的文章主要介绍了netty websocket uri 连接时 传参。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

netty websocket uri 连接时 传参

方案1

在URL后面加上参数 new WebSocket("ws://127.0.0.1:20683/ws/serialPort?name=value") 然后自己解析参数

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
		System.out.println("触发事件");
		if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
			WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
			String uri = complete.requestUri();
			System.out.println("uri: " + uri);
			//输出  ws/serialPort?name=value , 然后截取
			if (null != uri && uri.contains("/ws") && uri.contains("?")) {
				String[] uriArray = uri.split("\\?");
				if (null != uriArray && uriArray.length > 1) {
					String[] paramsArray = uriArray[1].split("=");
					if (null != paramsArray && paramsArray.length > 1) {
						String token = paramsArray[1];
						logger.info("提取token成功");
					}
					System.out.println("握手成功");
				}
			}
		}

方案2

在客户端设置 连接成功回调 ,一旦连接成功发送参数:文章来源地址https://www.toymoban.com/news/detail-596697.html

var websocket = new WebSocket("ws://127.0.0.1:20683/ws/serialPort");
websocket.onopen = function(evt) {
    websocket.send("some parameters");
};

附 nety websoket service 代码

 
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springblade.core.tool.utils.Func;
import org.springblade.core.tool.utils.SpringUtil;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * ws服务器
 *
 * @author YZD
 */
@Slf4j
@Component
public class WebSocketServer {

	/**
	 * websocketPath
	 */
	public static final String WEBSOCKET_PATH = "/ws";

	/**
	 * 单线程-线程池
	 */
	private final ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
		0L, TimeUnit.MILLISECONDS,
		new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.AbortPolicy());
	/**
	 * 用来接收进来的连接
	 */
	private final EventLoopGroup bossGroup = new NioEventLoopGroup();
	/**
	 * 用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。
	 */
	private final EventLoopGroup workerGroup = new NioEventLoopGroup();

	/**
	 * 端口号
	 */
	private final Integer port = 8888;

	@PostConstruct
	public void contextInitialized() {
		// 要用线程,否则会阻塞主线程
		singleThreadPool.execute(this::runServer);
	}

	public void runServer() {
		log.info(String.format("【netty ws server start run port: %d】", port));

		try {
			// 是一个启动NIO服务的辅助启动类。
			ServerBootstrap bootstrap = new ServerBootstrap();
			bootstrap.group(bossGroup, workerGroup)
				.channel(NioServerSocketChannel.class)
				.childHandler(new ChannelInitializer<SocketChannel>() {

					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ChannelPipeline pipeline = ch.pipeline();
						//因为基于http协议,使用http的编码和解码器
						pipeline.addLast(new HttpServerCodec());
						//是以块方式写,添加ChunkedWriteHandler处理器 主要作用是支持异步发送的码流(大文件传输),但不专用过多的内存,防止java内存溢出
						pipeline.addLast(new ChunkedWriteHandler());
						// http数据聚合器 用于将大数据量分段传输的数据 聚合 加入ObjectAggregator解码器,作用是他会把多个消息转换为单一的FullHttpRequest或者FullHttpResponse
						pipeline.addLast(new HttpObjectAggregator(port));
						// websocket协议处理器
						pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true, 65536, true, true, 10000L));
						// pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH));
						// 自定义的业务处理
						pipeline.addLast(new WebSocketHandler());
					}
				})
				// 默认128
				.option(ChannelOption.SO_BACKLOG, 128)
				.childOption(ChannelOption.SO_KEEPALIVE, true);

			ChannelFuture future = bootstrap.bind(port).sync();
			log.info(String.format("【netty ws server  running port: %d】", port));
			future.channel().closeFuture().sync();
		} catch (Exception e) {
			log.error("netty ws server  error ", e);
		}
	}


	@PreDestroy
	public void contextDestroyed() {
		// 关闭netty服务器
		workerGroup.shutdownGracefully();
		bossGroup.shutdownGracefully();
		// 关闭线程池
		singleThreadPool.shutdown();
		log.info(String.format("【netty ws  server stop port: %d】", port));
		// 睡眠3秒,等待netty关闭结束,不睡的话会抛异常,但不影响流程
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			log.warn("【netty ws  server stop error 】", e);
		}
		this.shutdownGracefully();
	}

	/**
	 * 优雅关闭线程池
	 */
	private void shutdownGracefully() {
		ThreadPoolExecutor threadPool = SpringUtil.getBean(ThreadPoolExecutor.class);
		if (Func.isNotEmpty(threadPool)) {
			// 使新任务无法提交
			threadPool.shutdown();
			try {
				// 等待未完成任务结束
				if (!threadPool.awaitTermination(3, TimeUnit.SECONDS)) {
					threadPool.shutdown();
				}
			} catch (InterruptedException ie) {
				log.error("多次尝试关闭线程池出现异常,可能造成数据不一致");
				// 重新  进行中断
				threadPool.shutdown();
				// 保留中断状态 该线程完成任务之前停止其正在进行的一切,有效地中止其当前的操作。
				Thread.currentThread().interrupt();
			}
			log.warn("【threadPool shutdown Gracefully 】: " + threadPool.toString());
		}
	}
}


附 msgUtils

 
 
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springblade.common.enums.MsgTypeEnum;
import org.springblade.common.enums.YesNoEnum;
import org.springblade.core.tool.jackson.JsonUtil;
import org.springblade.core.tool.utils.Func;
import org.springblade.core.tool.utils.SpringUtil;
import org.springblade.core.tool.utils.StringPool;
import org.springblade.modules.course.service.IChatGroupUserService;
import org.springblade.modules.course.service.IChatMsgService;
import org.springblade.modules.course.service.IChatWordService;
import org.springblade.modules.im.ImConstants;
import org.springblade.modules.im.model.ReqDeviceData;
import org.springblade.modules.system.entity.User;
import org.springblade.modules.system.service.IUserService;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * <p>Title:</p>
 * <p>Description:</p>
 *
 * @author YZD
 * @date 2022年10月19日11:31:58
 */
@Slf4j
public class MsgUtils {

	/**
	 * channelId 与  userId
	 * 一个通道对应一个用户
	 */
	private static final ConcurrentHashMap<String, String> CHANNEL_USER_MAP = new ConcurrentHashMap<>();

	/**
	 * user 与  channelId 链接通道
	 * 一个用户 可 建立 多个连接
	 */
	private static final ConcurrentHashMap<String, CopyOnWriteArraySet<ChannelHandlerContext>> USER_CONN_LIST_MAP = new ConcurrentHashMap<>();

	/**
	 * 用户端类型 pc app
	 */
	public static final String TYPE = "type";

	/**
	 * 用户 userId
	 */
	public static final String USER_ID = "userId";

	/**
	 * 用户 心跳时间
	 */
	private static final AttributeKey<String> ATTR_KEY_LAST_HEARTBEAT_TIME = AttributeKey.valueOf("lastHeartBeatTime");

	/**
	 * 心跳
	 */
	private static final String PONG = "pong";

	/**
	 * 刷新 心跳时间
	 * @param channel channel
	 */
	public static void refreshLastHeartBeatTime(Channel channel,String userId, String type) {
		long now = System.currentTimeMillis();
		channel.attr(AttributeKey.valueOf(USER_ID)).set(userId);
		channel.attr(AttributeKey.valueOf(TYPE)).set(type);
		channel.attr(ATTR_KEY_LAST_HEARTBEAT_TIME).set(Long.toString(now));
	}

	/**
	 * 获取最后一次心跳时间
	 * @param channel channel
	 * @return long 心跳时间
	 */
	public static Long getLastHeartBeatTime(Channel channel) {
		if (channel.hasAttr(ATTR_KEY_LAST_HEARTBEAT_TIME)) {
			String value = channel.attr(ATTR_KEY_LAST_HEARTBEAT_TIME).get();
			if (value != null) {
				return Long.valueOf(value);
			}
		}
		return null;
	}

	/**
	 * 从缓存中移除 Channel,并且关闭Channel
	 */
	public static  void scanNotActiveChannel() {
		log.warn("ws 会话清理");
		// 如果这个直播下已经没有连接中的用户会话了,删除频道
		if (USER_CONN_LIST_MAP.size() == 0) {
			return;
		}
		//循环 判读 当前登录人的 连接
		USER_CONN_LIST_MAP.forEach((userId,list)-> {
			if (Func.isNotEmpty(list)) {
				// 当前人 新的 容器
				CopyOnWriteArraySet<ChannelHandlerContext> newContextSet = new CopyOnWriteArraySet<>();
				// 当前人 连接
				for (ChannelHandlerContext channelHandlerContext : list) {
					Channel channel = channelHandlerContext.channel();
					if (Func.isNotEmpty(channel)) {
						Long lastHeartBeatTime = getLastHeartBeatTime(channel);
						if (Func.isNotEmpty(lastHeartBeatTime)) {
							long intervalMillis = System.currentTimeMillis() - lastHeartBeatTime;
							// 若 未 开启 、未激活 、已超时  则 断开 连接 清理资源
							if (!channel.isOpen()
								|| !channel.isActive()
								|| intervalMillis > 24 * 60 * 60 * 1000L) {
								String channelId = getLongText(channel);
								CHANNEL_USER_MAP.remove(channelId);
								// 若开启 或 连接中 ,则 关闭
								if (channel.isOpen() || channel.isActive()) {
									channel.close();
								}
							} else {
								// 未失效
								newContextSet.add(channelHandlerContext);
							}
						}
					}
					// 未失效连接
					USER_CONN_LIST_MAP.put(userId, newContextSet);
				}
			} else {
				// 若 没有 连接 ,则 清理 内存
				USER_CONN_LIST_MAP.remove(userId);
			}
		});
	}


	/**
	 * 发送 pong
	 */
	public static void heat() {
		if (USER_CONN_LIST_MAP.size() == 0) {
			return;
		}
		//循环 判读 当前登录人的 连接
		USER_CONN_LIST_MAP.forEach((userId, list) -> {
			if (Func.isNotEmpty(list)) {
				// 当前人 连接
				for (ChannelHandlerContext channelHandlerContext : list) {
					Channel channel = channelHandlerContext.channel();
					if (Func.isNotEmpty(channel)) {
						// 若开启 或 连接中
						if (channel.isOpen() || channel.isActive()) {
							channel.writeAndFlush(new TextWebSocketFrame(PONG));
						}
					}
				}
			}
		});
	}

	/**
	 * 发送 pong
	 */
	public static void heat(ChannelHandlerContext ctx) {
		if (Func.isEmpty(ctx)) {
			return;
		}
		Channel channel = ctx.channel();
		// 若开启 或 连接中
		if (channel.isOpen() || channel.isActive()) {
			channel.writeAndFlush(new TextWebSocketFrame(PONG));
		}
	}


	/**
	 * channelId 与  userId
	 * user 与  channelId
	 * 一个用户多个连接
	 */
	public static void putUserConn(ChannelHandlerContext ctx, String userId, String type) {
		//  channelId 与  userId
		String channelId = getLongText(ctx);
		MsgUtils.addChannel(channelId, userId);
		// user 与  channelId 一个用户多个连接
		String ip = MsgUtils.getClientIpAddress(ctx);
		log.info("ws userId 加入连接, userId= {}, ip= {}, type= {}", userId, ip,type);
		// 刷新心跳时间
		refreshLastHeartBeatTime(ctx.channel(),userId,type);
		// 加入 连接 list
		CopyOnWriteArraySet<ChannelHandlerContext> contextSet = new CopyOnWriteArraySet<>();
		contextSet.add(ctx);
		if (USER_CONN_LIST_MAP.containsKey(userId)) {
			CopyOnWriteArraySet<ChannelHandlerContext> contexts = USER_CONN_LIST_MAP.get(userId);
			if (Func.isNotEmpty(contexts)) {
				contextSet.addAll(contexts);
			}
		}
		USER_CONN_LIST_MAP.put(userId, contextSet);
		// 异步去检测 需要清理的会话
	}

	/**
	 * 功能描述:  判断是否在线
	 *
	 * @param userId 用户标识
	 * @return : java.lang.Boolean
	 * @author : yzd e-mail: 121665820@qq.com
	 * @create : 2022/12/29 19:48
	 */
	public static Boolean online(Long userId) {
		if (Func.isEmpty(userId)) {
			return false;
		}
		String userIdStr = userId.toString();
		return USER_CONN_LIST_MAP.containsKey(userIdStr) && Func.isNotEmpty(USER_CONN_LIST_MAP.get(userIdStr));
	}

	/**
	 * 连接通道设置用户标识
	 */
	private static void addChannel(String channelId, String userId) {
		CHANNEL_USER_MAP.put(channelId, userId);
	}

	/**
	 * 根据链接通道 获取 userId
	 */
	public static String getUserIdByChannel(String channelId) {
		if (CHANNEL_USER_MAP.containsKey(channelId)) {
			return CHANNEL_USER_MAP.get(channelId);
		}
		return "";
	}

	/**
	 * 获取连接的通道 连接 唯一标识
	 */
	public static String getLongText(ChannelHandlerContext client) {
		return client.channel().id().asLongText().replace(StringPool.DASH, "");
	}

	/**
	 * 获取连接的通道 连接 唯一标识
	 */
	public static String getLongText(Channel channel) {
		return channel.id().asLongText().replace(StringPool.DASH, "");
	}


	/**
	 * 断开userId连接
	 */
	public static boolean offline(String userId) {
		log.info("ws 断开userId连接, userId={}", userId);
		if (USER_CONN_LIST_MAP.containsKey(userId)) {
			Set<ChannelHandlerContext> contextSet = USER_CONN_LIST_MAP.get(userId);
			for (ChannelHandlerContext context : contextSet) {
				context.close();
			}
		} else {
			log.warn("userId未连接, userId={}", userId);
			return false;
		}
		return true;
	}

	/**
	 * userId 往 toUserId   发送普通文本消息 批量发
	 *
	 * @return 是否发送成功
	 */
	private static boolean sendBatchTextMsg(ReqDeviceData reqDeviceData) {
		List<Long> toUserIdList = reqDeviceData.getToUserIdList();
		if (Func.isEmpty(toUserIdList)) {
			return false;
		}
		AtomicBoolean success = new AtomicBoolean(true);
		toUserIdList.stream().distinct().forEach(toUserId->{
			success.set(sendTextMsg(Long.valueOf(reqDeviceData.getUserId()), toUserId, reqDeviceData.getMsgType(), reqDeviceData));
		});
		return success.get();
	}

	/**
	 * 功能描述:  群发
	 *
	 * @return : boolean
	 * @author : yzd e-mail: 121665820@qq.com
	 * @create : 2022/12/28 11:31
	 */
	private static boolean sendGroupTextMsg(ReqDeviceData reqDeviceData) {
		log.info("群发:[{}]",JsonUtil.toJson(reqDeviceData));
		Long userId = Long.valueOf(reqDeviceData.getUserId());
		if (Func.isEmpty(userId)) {
			userId = -1L;
		}
		boolean success = true;
		// 统一消息体 普通文本消息
		Integer msgType = reqDeviceData.getMsgType();
		reqDeviceData.setMsgTypeName(MsgTypeEnum.getNameByCode(msgType));

		IChatMsgService iChatMsgService = SpringUtil.getBean(IChatMsgService.class);
		Long groupId = reqDeviceData.getGroupId();
		// 查询组内成员
		IChatGroupUserService chatGroupUserService = SpringUtil.getBean(IChatGroupUserService.class);
		List<Long> userIdList = chatGroupUserService.listByGroupId(groupId);
		// 保存消息记录
		List<Long> logList = iChatMsgService.saveGroupMsg(userId, groupId, reqDeviceData, msgType, userIdList);
		if (Func.isNotEmpty(userIdList)) {
			for (Long toUserId : userIdList) {
				// 判断 在线
				log.info(String.format("userId: %s 往: %s 发送消息: [ %s]", userId, toUserId, JsonUtil.toJson(reqDeviceData)));
				boolean containsKey = USER_CONN_LIST_MAP.containsKey(String.valueOf(toUserId));
				if (!containsKey) {
					log.warn("userId: [{}] 未连接", toUserId);
					success = false;
					continue;
				}
				// 异步发送
				Set<ChannelHandlerContext> contexts = USER_CONN_LIST_MAP.get(String.valueOf(toUserId));
				for (ChannelHandlerContext context : contexts) {
					context.channel().writeAndFlush(new TextWebSocketFrame(JsonUtil.toJson(reqDeviceData)));
				}
			}
		}
		if (success) {
			// 发送成功 更新 消息成功状态
			// iChatMsgService.updateMsgSuccess(logList);
		}
		return success;
	}

	/**
	 * userId 往 toUserId 发送文本消息
	 *
	 * @param userId        发送者
	 * @param toUserId      接受者
	 * @param reqDeviceData 消息内容
	 * @return 是否发送成功
	 */
	private static boolean sendTextMsg(Long userId, Long toUserId, Integer msgType, ReqDeviceData reqDeviceData) {
		if (Func.isEmpty(userId)) {
			userId = -1L;
		}
		boolean success = true;
		if (toUserId.equals(userId)) {
			return false;
		}
		reqDeviceData.setMsgTypeName(MsgTypeEnum.getNameByCode(msgType));
		// 保存 普通消息 消息记录
		IChatMsgService iChatMsgService = SpringUtil.getBean(IChatMsgService.class);
		Long logId = null;
		if (reqDeviceData.getMsgType().equals(MsgTypeEnum.GENERAL_MSG.getCode())) {
			logId = iChatMsgService.saveMsg(userId, toUserId, reqDeviceData, msgType);
		}
		// 判断 在线
		log.info(String.format("userId: %s 往: %s 发送消息: [ %s]", userId, toUserId, JsonUtil.toJson(reqDeviceData)));
		boolean containsKey = USER_CONN_LIST_MAP.containsKey(String.valueOf(toUserId));
		if (!containsKey) {
			log.warn("userId: [{}] 未连接", toUserId);
			success = false;
		}
		// 异步发送
		Set<ChannelHandlerContext> contexts = USER_CONN_LIST_MAP.get(String.valueOf(toUserId));
		if (Func.isNotEmpty(contexts)) {
			for (ChannelHandlerContext context : contexts) {
				context.channel().writeAndFlush(new TextWebSocketFrame(JsonUtil.toJson(reqDeviceData)));
			}
		}
		// 发送成功 更新 消息成功状态
		if (success && reqDeviceData.getMsgType().equals(MsgTypeEnum.GENERAL_MSG.getCode())) {
			iChatMsgService.updateMsgSuccess(logId);
		}
		return success;
	}

	/**
	 * 从连接池删除 channelId 与 userId
	 */
	public static void removeClient(ChannelHandlerContext client) {
		String channelId = getLongText(client);
		if (CHANNEL_USER_MAP.containsKey(channelId)) {
			String userId = CHANNEL_USER_MAP.get(channelId);
			if (USER_CONN_LIST_MAP.containsKey(userId)) {
				CopyOnWriteArraySet<ChannelHandlerContext> contextSet = USER_CONN_LIST_MAP.get(userId);
				CopyOnWriteArraySet<ChannelHandlerContext> newContextSet = new CopyOnWriteArraySet<>();
				for (ChannelHandlerContext context : contextSet) {
					String thisChannelId = getLongText(context);
					if (thisChannelId.equals(channelId)) {
						context.close();
					} else {
						newContextSet.add(context);
					}
				}
				USER_CONN_LIST_MAP.put(userId, newContextSet);
			}
		}
		CHANNEL_USER_MAP.remove(channelId);
	}

	/**
	 * 根据ChannelId获取userId编号
	 */
	public static String getUserId(String channelId) {
		return CHANNEL_USER_MAP.get(channelId);
	}

	/**
	 * 获取userId ip地址
	 */
	public static String getClientIpAddress(ChannelHandlerContext client) {
		return client.channel().remoteAddress().toString().substring(1);
	}

	/**
	 * 获取 type  地址
	 */
	public static String getType(ChannelHandlerContext client) {
		AttributeKey<Object> type = AttributeKey.valueOf(TYPE);
		if (client.channel().hasAttr(type)) {
			return String.valueOf(client.channel().attr(type).get());
		}
		return   "";
	}

	/**
	 * 功能描述:  根据对象发送消息
	 *
	 * @param reqDeviceData 发送小婆媳
	 * @return : void
	 * @author : yzd e-mail: 121665820@qq.com
	 * @create : 2022/12/28 10:51
	 */
	public static void sendMsg(ReqDeviceData reqDeviceData) {
		// 发送者 头像
		IUserService userService = SpringUtil.getBean(IUserService.class);
		User sendUser = userService.getById(reqDeviceData.getUserId());
		if (Func.isNotEmpty(sendUser)) {
			reqDeviceData.setUserAvatar(sendUser.getAvatar());
			reqDeviceData.setFromUserAvatar(sendUser.getAvatar());
			reqDeviceData.setFormUserId(reqDeviceData.getUserId());
		}

		reqDeviceData.setCreateTime(new Date());
		// 异步 发送 消息
		CompletableFuture.runAsync(() -> {
			if (reqDeviceData.getBizMsgType().equalsIgnoreCase(ImConstants.SEND_MSG_TO_USER)) {
				// 向其他用户发送消息
				MsgUtils.sendBatchTextMsg(reqDeviceData);
				// 自动回复
				autoReply(reqDeviceData);
			} else if (reqDeviceData.getBizMsgType().equalsIgnoreCase(ImConstants.SEND_GROUP_MSG_TO_USER)) {
				//  群发 向其他用户发送消息
				MsgUtils.sendGroupTextMsg(reqDeviceData);
			} else {
				log.error("消息发送类型错误:[{}]", reqDeviceData);
			}
		}, SpringUtil.getBean(ThreadPoolTaskExecutor.class));
	}

	/**
	 *
	 * 功能描述: 自动回复
	 * @param reqDeviceData reqDeviceData
	 * @return : void
	 * @author : yzd e-mail: 121665820@qq.com
	 * @create : 2023/2/6 11:44
	 */
	public static void autoReply(ReqDeviceData reqDeviceData) {
		List<Long> toUserIdList = reqDeviceData.getToUserIdList();
		Integer toType = reqDeviceData.getToType();
		Long userId = Long.valueOf(reqDeviceData.getUserId());

		// 消息为空 结束
		if (Func.isEmpty(reqDeviceData.getMsgContent())) {
			return;
		}

		String msgContent = reqDeviceData.getMsgContent().toString();

		if (Func.isNotEmpty(toUserIdList) && Func.isNotEmpty(toType) && toType.equals(YesNoEnum.YES.getCode())) {
			IChatWordService iChatWordService = SpringUtil.getBean(IChatWordService.class);
			for (Long toUserId : toUserIdList) {
				String replyText = iChatWordService.autoReplyList(msgContent, toUserId);
				if (Func.isNoneBlank(replyText)) {
					String toUserIdStr = String.valueOf(toUserId);
					// 自动回复
					ReqDeviceData clone = Func.clone(reqDeviceData);
					Objects.requireNonNull(clone).setToUserIdList(Collections.singletonList(userId));
					clone.setMsgContent(replyText);
					clone.setUserId(toUserIdStr);
					clone.setFormUserId(toUserIdStr);
					clone.setAutoReply(YesNoEnum.YES.getCode());
					// 自动回复 向其他用户发送消息
					MsgUtils.sendBatchTextMsg(reqDeviceData);
				}
			}
		}

	}
}

附 消息体


import com.fasterxml.jackson.annotation.JsonIgnore;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.io.Serializable;
import java.util.Date;
import java.util.List;

/**
 * @author YZD
 */
@ApiModel(value = "请求消息体")
@Data
public class ReqDeviceData implements Serializable {

	private static final long serialVersionUID = 1L;

	@ApiModelProperty(value = "消息发送类型:  向其他用户发消息 send_msg_to_user ,  群发 send_group_msg_to_user")
	private String bizMsgType;


	@ApiModelProperty(value = "系统消息-业务id")
	private Long bizId;

	@ApiModelProperty(value = "系统消息-业务类型")
	private String bizType;

	@ApiModelProperty(value = "接受者类型:1:用户,0:组")
	private Integer toType;

	@ApiModelProperty(value = "消息类型: 1 普通消息 2系统消息")
	private Integer msgType;

	@ApiModelProperty(value = "消息类型名称: 1 普通消息 2系统消息")
	private String msgTypeName;

	@ApiModelProperty(value = "发送到的用户标识")
	private List<Long> toUserIdList;

	@ApiModelProperty(value = "发送到的组标识")
	private Long groupId;

	@ApiModelProperty(value = "发送者的用户标识")
	private String userId;


	@ApiModelProperty(value = "发送者的用户标识")
	private String formUserId;

	@ApiModelProperty(value = "消息内容类型: 前端自定义,原样返回")
	private String msgContentType;

	@ApiModelProperty(value = "消息内容")
	private Object msgContent;

	@ApiModelProperty(value = "发送者的用户头像")
	private String userAvatar;

	@ApiModelProperty(value = "发送时间")
	private Date createTime;

	@ApiModelProperty(value = "消息标题")
	private String title;

	@JsonIgnore
	@ApiModelProperty(value = "是否自动回复 0 否 1 是")
	private Integer autoReply;

	@ApiModelProperty(value = "发送者头像")
	private String  fromUserAvatar ;
}

附 消息常量


/**
 * <p>Title: Constants</p>
 * <p>Description: 常量类</p>
 *
 * @author YZD
 */
public class ImConstants {

	/*========userId通信数据========*/
	/**
	 * 向其他用户 发送消息
	 */
	public static final String SEND_MSG_TO_USER = "send_msg_to_user";

	/**
	 *  群发 向其他用户 发送消息
	 */
	public static final String SEND_GROUP_MSG_TO_USER = "send_group_msg_to_user";

	private ImConstants() {
		throw new RuntimeException("常量类不能被初始化");
	}
}

附 多线程配置


import lombok.extern.slf4j.Slf4j;
import org.springblade.modules.im.utils.MsgUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @Title: ExecutorConfig
 * @author: yzd e-mail: 121665820@qq.com
 * @date: 2022/12/15 8:53
 * @ClassName: ExecutorConfig
 * @Description: 线程池配置
 */
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig implements SchedulingConfigurer {

	/**
	 * 功能描述:  线程池
	 *
	 * @return : org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
	 * @author : yzd e-mail: 121665820@qq.com
	 * @create : 2023/2/11 16:04
	 */
	@Bean
	@Primary
	public ThreadPoolTaskExecutor asyncServiceExecutor() {
		log.info("start asyncService Executor");
		//使用visibleThreadPoolTaskExecutor
		ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
		// 通过Runtime方法来获取当前服务器cpu内核,根据cpu内核来创建核心线程数和最大线程数
		int availableProcessors = Runtime.getRuntime().availableProcessors();
		/**
		 * 配置线程个数
		 如果是CPU密集型任务,那么线程池的线程个数应该尽量少一些,一般为CPU的个数+1条线程(大量计算)
		 如果是IO密集型任务,那么线程池的线程可以放的很大,如2*CPU的个数(IO操作)
		 */
		executor.setCorePoolSize(availableProcessors + 1);
		// 允许线程池超时
		executor.setAllowCoreThreadTimeOut(true);
		//配置最大线程数
		executor.setMaxPoolSize(availableProcessors * 4);
		// 空闲存活时间
		executor.setKeepAliveSeconds(60);
		// 设置 等待终止秒数
		executor.setAwaitTerminationSeconds(60);
		//配置队列大小
		executor.setQueueCapacity(availableProcessors * 100);
		//配置线程池中的线程的名称前缀
		executor.setThreadNamePrefix("async-thread-pool-");
		// 等待所有任务结束再关闭线程池
		executor.setWaitForTasksToCompleteOnShutdown(true);
		// rejection-policy:当pool已经达到max size的时候,如何处理新任务
		// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
		executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
		//执行初始化
		executor.initialize();
		return executor;
	}

	/**
	 * 创建一个定长线程池,支持定时及周期性任务执行
	 */
	@Bean
	public ThreadPoolTaskScheduler scheduledThreadPoolExecutor() {
		log.info("start Scheduled Executor");
		ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
		executor.setPoolSize(10);
		executor.setThreadNamePrefix("task-thread");
		//设置饱和策略
		//CallerRunsPolicy:线程池的饱和策略之一,当线程池使用饱和后,直接使用调用者所在的线程来执行任务;如果执行程序已关闭,则会丢弃该任务
		executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
		executor.initialize();
		//定时任务1:扫描所有的 Channel,关闭失效的Channel
		executor.scheduleWithFixedDelay(MsgUtils::scanNotActiveChannel, 24 * 60 * 60 * 1000L);
		return executor;
	}

	/**
	 * 功能描述:  配置 @Scheduled 定时器所使用的线程池
	 * 配置任务注册器:ScheduledTaskRegistrar 的任务调度器
	 *
	 * @param scheduledTaskRegistrar scheduledTaskRegistrar
	 * @return : void
	 * "@Scheduled" 默认是单线程执行的,所以在需要的时候,我们可以设置一个线程池去执行定时任务。
	 * @author : yzd e-mail: 121665820@qq.com
	 * @create : 2023/2/11 16:30
	 */
	@Override
	public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
		//可配置两种类型:TaskScheduler、ScheduledExecutorService
		// scheduledTaskRegistrar.setScheduler(scheduledThreadPoolExecutor());
		//只可配置一种类型:taskScheduler
		scheduledTaskRegistrar.setTaskScheduler(scheduledThreadPoolExecutor());
	}
}

附 连接 处理


import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import lombok.extern.slf4j.Slf4j;
import org.springblade.core.secure.BladeUser;
import org.springblade.core.tool.utils.Func;
import org.springblade.modules.im.model.ReqDeviceData;
import org.springblade.modules.im.utils.MsgUtils;
import org.springblade.modules.im.utils.WsAuthUtil;
import org.springblade.modules.mq.WsSendMq;

import java.util.Objects;


/**
 * 业务处理器
 * TextWebSocketFrame 文本处理
 * 测试连接地址:如下
 * ws://192.168.2.18:8888/ws?Blade-Auth=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJpc3MiOiJpc3N1c2VyIiwiYXVkIjoiYXVkaWVuY2UiLCJ0ZW5hbnRfaWQiOiIwMDAwMDAiLCJyb2xlX25hbWUiOiJhZG1pbmlzdHJhdG9yLHN5c3RlbSxkb2N0b3IsbWFzc2VzLGNhc2VfbWFuYWdlciIsInBvc3RfaWQiOiIxMTIzNTk4ODE3NzM4Njc1MjAxIiwidXNlcl9pZCI6IjExMjM1OTg4MjE3Mzg2NzUyMDEiLCJyb2xlX2lkIjoiMTEyMzU5ODgxNjczODY3NTIwMSwxNTk1OTQ1ODQ0Mjg3MzA3Nzc3LDE1OTU5NDYwNDIwNjk3MTI4OTcsMTU5NTk0NTk3NTc3NDU0Mzg3MywxNTk1OTQ1NzgyNjg5NzU5MjMzIiwidXNlcl9uYW1lIjoiYWRtaW4iLCJuaWNrX25hbWUiOiLotoXnuqfnrqHnkIblkZgiLCJ0b2tlbl90eXBlIjoiYWNjZXNzX3Rva2VuIiwiZGVwdF9pZCI6IjE2MjQ5MzgxMTM4MDA0NDE4NTgiLCJhY2NvdW50IjoiYWRtaW4iLCJjbGllbnRfaWQiOiJzYWJlciIsImV4cCI6MTc2MjY2Njc5NSwibmJmIjoxNjc2MjY2Nzk1fQ.AlhLnFGB0iodL6GngjuiuxF8n5b-v_bZJ9fH7bfvHM1KNQewNEku9Z_k72YNypMKvzb31QykGYAqkr_OJeM4Eg&type=pc
 * @author YZD
 */
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

	/**
	 * 心跳包
	 */
	public static final String PING = "ping";

	/**
	 * 通道消息读取
	 */
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
		String channelId = MsgUtils.getLongText(ctx);
		String userId = MsgUtils.getUserIdByChannel(channelId);
		String ip = MsgUtils.getClientIpAddress(ctx);
		String type = MsgUtils.getType(ctx);
		String msgText = msg.text();
		log.info("ws 收到消息:[{}], userId={}, ip={}, type={}", msgText, userId, ip,type);
		if (log.isDebugEnabled()){
			// log.info("ws 收到消息:[{}], userId={}, ip={}, type={}", msgText, userId, ip,type);
		}
		try {
			// 添加userId到连接池
			boolean canSerialize = JSONUtil.isTypeJSON(msgText);
			if (canSerialize) {
				ReqDeviceData reqDeviceData = JSONUtil.toBean(msgText, ReqDeviceData.class);
				if (Func.isEmpty(reqDeviceData)) {
					log.error("ws 收到错误格式数据");
					ctx.close();
				}
				// 收到数据 需要服务处理
				this.handleDeviceResult(reqDeviceData, MsgUtils.getLongText(ctx));
			} else {
				if (PING.equals(msgText)) {
					// 刷新心跳时间
					MsgUtils.refreshLastHeartBeatTime(ctx.channel(),userId,type);
					// 心跳包 发送
					MsgUtils.heat(ctx);
					return;
				}
				log.error("ws 收到未知数据:[{}], userId={}, ip={}, type={}", msgText, userId, ip,type);
				ctx.close();
			}
		} catch (Exception e) {
			log.error("ws userId={}, ip={}, type={},接收到消息,处理异常", userId, ip,type, e);
		}
	}

	/**
	 * 处理业务数据
	 */
	private void handleDeviceResult(ReqDeviceData reqDeviceData, String channelId) {
		// 消息类型为空 拒绝
		if (Func.isAllEmpty(reqDeviceData.getMsgType(),reqDeviceData.getBizMsgType())){
			return;
		}
		// 收到数据 需要服务处理
		log.info("ws 收到消息-处理业务..., data={}", reqDeviceData);
		WsSendMq wsSendMq = SpringUtil.getBean(WsSendMq.class);
		if (Func.isNotEmpty(wsSendMq)) {
			// wsSendMq.send(reqDeviceData);
		}
		MsgUtils.sendMsg(reqDeviceData);
	}

	/**
	 * 连接初始化
	 */
	@Override
	public void handlerAdded(ChannelHandlerContext ctx) {
		String ip = MsgUtils.getClientIpAddress(ctx);
		log.info("ws 新连接,  ip= {}", ip);
	}

	/**
	 * 退出连接
	 */
	@Override
	public void handlerRemoved(ChannelHandlerContext ctx) {
		String channelId = MsgUtils.getLongText(ctx);
		String userId = MsgUtils.getUserIdByChannel(channelId);
		String ip = MsgUtils.getClientIpAddress(ctx);
		String type = MsgUtils.getType(ctx);
		log.info("ws 客户端断开 userId= {}, ip= {}, type= {}", userId, ip,type);
		MsgUtils.removeClient(ctx);
	}

	/**
	 * 异常处理
	 */
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		String channelId = MsgUtils.getLongText(ctx);
		String userId = MsgUtils.getUserIdByChannel(channelId);
		String ip = MsgUtils.getClientIpAddress(ctx);
		String type = MsgUtils.getType(ctx);
		log.error("ws 客户端 异常, userId= {}, ip= {} ,  type={} 异常: {}", userId, ip,type,cause.getMessage());
		MsgUtils.removeClient(ctx);
	}

	/**
	 * token 当前登录人,当前连接人
	 */
	private static final String TOKEN = "Blade-Auth";
	/**
	 * 用户端类型 pc app
	 */
	private static final String TYPE = "type";

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
		if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
			WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
			String uri = complete.requestUri();
			if (null != uri && uri.contains(WebSocketServer.WEBSOCKET_PATH) && uri.contains(StringPool.QUESTION_MARK)) {
				String[] uriArray = uri.split(StringPool.BACK_SLASH + StringPool.QUESTION_MARK);
				if (Func.isNotEmpty(uriArray) && uriArray.length > 1) {
					// 多个参数
					String[] split = uriArray[1].split(StringPool.AMPERSAND);
					String token = null;
					String type = null;
					for (String str : split) {
						String[] paramsArray = str.split(StringPool.EQUALS);
						if (Func.isNotEmpty(paramsArray) && paramsArray.length > 1) {
							String paramName = paramsArray[0];
							String paramStr = paramsArray[1];
							if (TOKEN.equalsIgnoreCase(paramName)) {
								token = paramStr;
							}
							if (TYPE.equalsIgnoreCase(paramName)) {
								type = paramStr;
							}
						}
					}
					if (Func.isNoneBlank(token, type)) {
						log.info("ws token 和 type 提取参数成功, type: [{}]", type);
						BladeUser bladeUser = WsAuthUtil.getUser(token);
						if (Func.isNotEmpty(bladeUser) && Func.isNotEmpty(Objects.requireNonNull(bladeUser).getUserId())) {
							Long userId = bladeUser.getUserId();
							// 添加设备到连接池
							MsgUtils.putUserConn(ctx, userId.toString(), type);
						} else {
							log.error("ws 握手失败, type:[{}], bladeUser:[{}]",type,bladeUser);
							ctx.close();
						}
					}else {
						log.error("ws 握手失败, 连接参数为空");
						ctx.close();
					}
				}else {
					log.error("ws 握手失败, 连接参数:[{}]错误",uri);
					ctx.close();
				}
			}else {
				log.error("ws 握手失败, 连接地址:[{}]错误",uri);
				ctx.close();
			}
		} 
	}

}

pom

        <!--netty-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>

到了这里,关于netty websocket uri 连接时 传参的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • json传参到java接口部分参数接收不到

    使用postman传参时,接口参数中有部分参数被传递进了接口内部,还有部分参数没有接收到,如下图:第一张图是postman接口参数,第二张图是接收到的参数 图一: 图二: 这个问题其实解决很简单,就是json数据转化为实体的过程,找寻get、set方法失败,导致数据无法正常从

    2023年04月10日
    浏览(32)
  • 如何给Java中的main函数参数args传参

    创建java测试类: JAVA类中main函数的参数String [] args,指的是运行时给main函数传递的参数。 传参的方式有以下几种: 1、idea工具的测试类中,左上角菜单中点击Run,选择Edit Configurations…,找到Program Arguments,添加参数   参数添加成功后,运行Java测试类后,结果如下:   2、

    2024年02月08日
    浏览(31)
  • Java实战:Netty+SpringBoot打造TCP长连接通讯方案

    本文将详细介绍如何使用Netty和SpringBoot来打造一个高性能的TCP长连接通讯方案。我们将从Netty和SpringBoot的基本概念讲起,然后逐步介绍如何整合这两个框架来实现TCP长连接通讯。通过阅读本文,我们将了解到如何利用Netty的高性能和SpringBoot的便捷性来构建一个稳定、高效的

    2024年04月10日
    浏览(33)
  • Springboot使用Netty连接Tcp接口(c语言二进制字节码转java字符串)

    一、引入netty的jar包 io.netty netty-all 二、使用netty框架 1、创建客户端 package com.iflytek.digtal.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel

    2024年02月20日
    浏览(37)
  • Netty 教程 – 实现WebSocket通讯

    WebSocket 协议是基于 TCP 的一种新的网络协议,它实现了浏览器与服务器 全双工(full-duplex)通信 ,允许 服务器主动发送信息给客户端 优点及作用 Http协议的弊端: Http协议为半双工协议。(半双工:同一时刻,数据只能在客户端和服务端一个方向上传输) Http协议冗长且繁琐 易

    2024年02月09日
    浏览(26)
  • Netty和Websocket的区别

    Netty 和 WebSocket 没有直接可比性,因为它们在网络编程环境中具有不同的用途。 Netty: Netty 是一个高性能、事件驱动的网络框架,用于用 Java 构建网络应用程序。 它提供了一组用于处理各种网络协议(例如 TCP 和 UDP)的工具和抽象。 Netty 通常用于构建需要低延迟、高吞吐量

    2024年01月22日
    浏览(29)
  • Java连接websocket优雅断线、重连功能

          为了实现优雅重连和重试,您需要在代码中添加一些逻辑来处理连接失败或断开连接的情况。 实现代码如下:

    2024年02月10日
    浏览(30)
  • 谷粒学院项目redirect_uri 参数错误微信二维码登录

    谷粒学院项目redirect_uri 参数错误_redirect_uri\\\": \\\"http%3a%2f%2fguli.shop%2fapi%2fuce-CSDN博客 修改本地配置 成功后的二维码

    2024年02月01日
    浏览(30)
  • netty整合websocket支持自签证书出现netty websocket ssl Received fatal alert: certificate_unknown

    win+r cmd 生成自己jks文件,指向自己要生成jks的文件位置下,我直接生成到项目resources下 2.生成证书 3.迁移到行业标志 成功生成证书 将jks文件考入项目resources下 yaml配置: netty证书加载 这里我就只上关键代码了 不添加信任netty websocket ssl Received fatal alert: certificate_unknown。 错误原

    2024年02月02日
    浏览(34)
  • netty对websocket协议的实现

    1. websocket协议 websocket协议是对http协议的扩充, 也是使用的TCP协议可以全双工通信的应用层协议。 websocket协议允许服务端向客户端推送消息。 浏览器和服务端只需要进行一次握手,不必像http协议一样,每次连接都要新建立连接,两者之间创建持久性的连接,并进行双向的数

    2024年01月20日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包