netty websocket uri 连接时 传参
方案1
在URL后面加上参数 new WebSocket("ws://127.0.0.1:20683/ws/serialPort?name=value")
然后自己解析参数文章来源:https://www.toymoban.com/news/detail-596697.html
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
System.out.println("触发事件");
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
String uri = complete.requestUri();
System.out.println("uri: " + uri);
//输出 ws/serialPort?name=value , 然后截取
if (null != uri && uri.contains("/ws") && uri.contains("?")) {
String[] uriArray = uri.split("\\?");
if (null != uriArray && uriArray.length > 1) {
String[] paramsArray = uriArray[1].split("=");
if (null != paramsArray && paramsArray.length > 1) {
String token = paramsArray[1];
logger.info("提取token成功");
}
System.out.println("握手成功");
}
}
}
方案2
在客户端设置 连接成功回调 ,一旦连接成功发送参数:文章来源地址https://www.toymoban.com/news/detail-596697.html
var websocket = new WebSocket("ws://127.0.0.1:20683/ws/serialPort");
websocket.onopen = function(evt) {
websocket.send("some parameters");
};
附 nety websoket service 代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springblade.core.tool.utils.Func;
import org.springblade.core.tool.utils.SpringUtil;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* ws服务器
*
* @author YZD
*/
@Slf4j
@Component
public class WebSocketServer {
/**
* websocketPath
*/
public static final String WEBSOCKET_PATH = "/ws";
/**
* 单线程-线程池
*/
private final ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), new ThreadPoolExecutor.AbortPolicy());
/**
* 用来接收进来的连接
*/
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
/**
* 用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。
*/
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
/**
* 端口号
*/
private final Integer port = 8888;
@PostConstruct
public void contextInitialized() {
// 要用线程,否则会阻塞主线程
singleThreadPool.execute(this::runServer);
}
public void runServer() {
log.info(String.format("【netty ws server start run port: %d】", port));
try {
// 是一个启动NIO服务的辅助启动类。
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//因为基于http协议,使用http的编码和解码器
pipeline.addLast(new HttpServerCodec());
//是以块方式写,添加ChunkedWriteHandler处理器 主要作用是支持异步发送的码流(大文件传输),但不专用过多的内存,防止java内存溢出
pipeline.addLast(new ChunkedWriteHandler());
// http数据聚合器 用于将大数据量分段传输的数据 聚合 加入ObjectAggregator解码器,作用是他会把多个消息转换为单一的FullHttpRequest或者FullHttpResponse
pipeline.addLast(new HttpObjectAggregator(port));
// websocket协议处理器
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true, 65536, true, true, 10000L));
// pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH));
// 自定义的业务处理
pipeline.addLast(new WebSocketHandler());
}
})
// 默认128
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).sync();
log.info(String.format("【netty ws server running port: %d】", port));
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error("netty ws server error ", e);
}
}
@PreDestroy
public void contextDestroyed() {
// 关闭netty服务器
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
// 关闭线程池
singleThreadPool.shutdown();
log.info(String.format("【netty ws server stop port: %d】", port));
// 睡眠3秒,等待netty关闭结束,不睡的话会抛异常,但不影响流程
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
log.warn("【netty ws server stop error 】", e);
}
this.shutdownGracefully();
}
/**
* 优雅关闭线程池
*/
private void shutdownGracefully() {
ThreadPoolExecutor threadPool = SpringUtil.getBean(ThreadPoolExecutor.class);
if (Func.isNotEmpty(threadPool)) {
// 使新任务无法提交
threadPool.shutdown();
try {
// 等待未完成任务结束
if (!threadPool.awaitTermination(3, TimeUnit.SECONDS)) {
threadPool.shutdown();
}
} catch (InterruptedException ie) {
log.error("多次尝试关闭线程池出现异常,可能造成数据不一致");
// 重新 进行中断
threadPool.shutdown();
// 保留中断状态 该线程完成任务之前停止其正在进行的一切,有效地中止其当前的操作。
Thread.currentThread().interrupt();
}
log.warn("【threadPool shutdown Gracefully 】: " + threadPool.toString());
}
}
}
附 msgUtils
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springblade.common.enums.MsgTypeEnum;
import org.springblade.common.enums.YesNoEnum;
import org.springblade.core.tool.jackson.JsonUtil;
import org.springblade.core.tool.utils.Func;
import org.springblade.core.tool.utils.SpringUtil;
import org.springblade.core.tool.utils.StringPool;
import org.springblade.modules.course.service.IChatGroupUserService;
import org.springblade.modules.course.service.IChatMsgService;
import org.springblade.modules.course.service.IChatWordService;
import org.springblade.modules.im.ImConstants;
import org.springblade.modules.im.model.ReqDeviceData;
import org.springblade.modules.system.entity.User;
import org.springblade.modules.system.service.IUserService;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* <p>Title:</p>
* <p>Description:</p>
*
* @author YZD
* @date 2022年10月19日11:31:58
*/
@Slf4j
public class MsgUtils {
/**
* channelId 与 userId
* 一个通道对应一个用户
*/
private static final ConcurrentHashMap<String, String> CHANNEL_USER_MAP = new ConcurrentHashMap<>();
/**
* user 与 channelId 链接通道
* 一个用户 可 建立 多个连接
*/
private static final ConcurrentHashMap<String, CopyOnWriteArraySet<ChannelHandlerContext>> USER_CONN_LIST_MAP = new ConcurrentHashMap<>();
/**
* 用户端类型 pc app
*/
public static final String TYPE = "type";
/**
* 用户 userId
*/
public static final String USER_ID = "userId";
/**
* 用户 心跳时间
*/
private static final AttributeKey<String> ATTR_KEY_LAST_HEARTBEAT_TIME = AttributeKey.valueOf("lastHeartBeatTime");
/**
* 心跳
*/
private static final String PONG = "pong";
/**
* 刷新 心跳时间
* @param channel channel
*/
public static void refreshLastHeartBeatTime(Channel channel,String userId, String type) {
long now = System.currentTimeMillis();
channel.attr(AttributeKey.valueOf(USER_ID)).set(userId);
channel.attr(AttributeKey.valueOf(TYPE)).set(type);
channel.attr(ATTR_KEY_LAST_HEARTBEAT_TIME).set(Long.toString(now));
}
/**
* 获取最后一次心跳时间
* @param channel channel
* @return long 心跳时间
*/
public static Long getLastHeartBeatTime(Channel channel) {
if (channel.hasAttr(ATTR_KEY_LAST_HEARTBEAT_TIME)) {
String value = channel.attr(ATTR_KEY_LAST_HEARTBEAT_TIME).get();
if (value != null) {
return Long.valueOf(value);
}
}
return null;
}
/**
* 从缓存中移除 Channel,并且关闭Channel
*/
public static void scanNotActiveChannel() {
log.warn("ws 会话清理");
// 如果这个直播下已经没有连接中的用户会话了,删除频道
if (USER_CONN_LIST_MAP.size() == 0) {
return;
}
//循环 判读 当前登录人的 连接
USER_CONN_LIST_MAP.forEach((userId,list)-> {
if (Func.isNotEmpty(list)) {
// 当前人 新的 容器
CopyOnWriteArraySet<ChannelHandlerContext> newContextSet = new CopyOnWriteArraySet<>();
// 当前人 连接
for (ChannelHandlerContext channelHandlerContext : list) {
Channel channel = channelHandlerContext.channel();
if (Func.isNotEmpty(channel)) {
Long lastHeartBeatTime = getLastHeartBeatTime(channel);
if (Func.isNotEmpty(lastHeartBeatTime)) {
long intervalMillis = System.currentTimeMillis() - lastHeartBeatTime;
// 若 未 开启 、未激活 、已超时 则 断开 连接 清理资源
if (!channel.isOpen()
|| !channel.isActive()
|| intervalMillis > 24 * 60 * 60 * 1000L) {
String channelId = getLongText(channel);
CHANNEL_USER_MAP.remove(channelId);
// 若开启 或 连接中 ,则 关闭
if (channel.isOpen() || channel.isActive()) {
channel.close();
}
} else {
// 未失效
newContextSet.add(channelHandlerContext);
}
}
}
// 未失效连接
USER_CONN_LIST_MAP.put(userId, newContextSet);
}
} else {
// 若 没有 连接 ,则 清理 内存
USER_CONN_LIST_MAP.remove(userId);
}
});
}
/**
* 发送 pong
*/
public static void heat() {
if (USER_CONN_LIST_MAP.size() == 0) {
return;
}
//循环 判读 当前登录人的 连接
USER_CONN_LIST_MAP.forEach((userId, list) -> {
if (Func.isNotEmpty(list)) {
// 当前人 连接
for (ChannelHandlerContext channelHandlerContext : list) {
Channel channel = channelHandlerContext.channel();
if (Func.isNotEmpty(channel)) {
// 若开启 或 连接中
if (channel.isOpen() || channel.isActive()) {
channel.writeAndFlush(new TextWebSocketFrame(PONG));
}
}
}
}
});
}
/**
* 发送 pong
*/
public static void heat(ChannelHandlerContext ctx) {
if (Func.isEmpty(ctx)) {
return;
}
Channel channel = ctx.channel();
// 若开启 或 连接中
if (channel.isOpen() || channel.isActive()) {
channel.writeAndFlush(new TextWebSocketFrame(PONG));
}
}
/**
* channelId 与 userId
* user 与 channelId
* 一个用户多个连接
*/
public static void putUserConn(ChannelHandlerContext ctx, String userId, String type) {
// channelId 与 userId
String channelId = getLongText(ctx);
MsgUtils.addChannel(channelId, userId);
// user 与 channelId 一个用户多个连接
String ip = MsgUtils.getClientIpAddress(ctx);
log.info("ws userId 加入连接, userId= {}, ip= {}, type= {}", userId, ip,type);
// 刷新心跳时间
refreshLastHeartBeatTime(ctx.channel(),userId,type);
// 加入 连接 list
CopyOnWriteArraySet<ChannelHandlerContext> contextSet = new CopyOnWriteArraySet<>();
contextSet.add(ctx);
if (USER_CONN_LIST_MAP.containsKey(userId)) {
CopyOnWriteArraySet<ChannelHandlerContext> contexts = USER_CONN_LIST_MAP.get(userId);
if (Func.isNotEmpty(contexts)) {
contextSet.addAll(contexts);
}
}
USER_CONN_LIST_MAP.put(userId, contextSet);
// 异步去检测 需要清理的会话
}
/**
* 功能描述: 判断是否在线
*
* @param userId 用户标识
* @return : java.lang.Boolean
* @author : yzd e-mail: 121665820@qq.com
* @create : 2022/12/29 19:48
*/
public static Boolean online(Long userId) {
if (Func.isEmpty(userId)) {
return false;
}
String userIdStr = userId.toString();
return USER_CONN_LIST_MAP.containsKey(userIdStr) && Func.isNotEmpty(USER_CONN_LIST_MAP.get(userIdStr));
}
/**
* 连接通道设置用户标识
*/
private static void addChannel(String channelId, String userId) {
CHANNEL_USER_MAP.put(channelId, userId);
}
/**
* 根据链接通道 获取 userId
*/
public static String getUserIdByChannel(String channelId) {
if (CHANNEL_USER_MAP.containsKey(channelId)) {
return CHANNEL_USER_MAP.get(channelId);
}
return "";
}
/**
* 获取连接的通道 连接 唯一标识
*/
public static String getLongText(ChannelHandlerContext client) {
return client.channel().id().asLongText().replace(StringPool.DASH, "");
}
/**
* 获取连接的通道 连接 唯一标识
*/
public static String getLongText(Channel channel) {
return channel.id().asLongText().replace(StringPool.DASH, "");
}
/**
* 断开userId连接
*/
public static boolean offline(String userId) {
log.info("ws 断开userId连接, userId={}", userId);
if (USER_CONN_LIST_MAP.containsKey(userId)) {
Set<ChannelHandlerContext> contextSet = USER_CONN_LIST_MAP.get(userId);
for (ChannelHandlerContext context : contextSet) {
context.close();
}
} else {
log.warn("userId未连接, userId={}", userId);
return false;
}
return true;
}
/**
* userId 往 toUserId 发送普通文本消息 批量发
*
* @return 是否发送成功
*/
private static boolean sendBatchTextMsg(ReqDeviceData reqDeviceData) {
List<Long> toUserIdList = reqDeviceData.getToUserIdList();
if (Func.isEmpty(toUserIdList)) {
return false;
}
AtomicBoolean success = new AtomicBoolean(true);
toUserIdList.stream().distinct().forEach(toUserId->{
success.set(sendTextMsg(Long.valueOf(reqDeviceData.getUserId()), toUserId, reqDeviceData.getMsgType(), reqDeviceData));
});
return success.get();
}
/**
* 功能描述: 群发
*
* @return : boolean
* @author : yzd e-mail: 121665820@qq.com
* @create : 2022/12/28 11:31
*/
private static boolean sendGroupTextMsg(ReqDeviceData reqDeviceData) {
log.info("群发:[{}]",JsonUtil.toJson(reqDeviceData));
Long userId = Long.valueOf(reqDeviceData.getUserId());
if (Func.isEmpty(userId)) {
userId = -1L;
}
boolean success = true;
// 统一消息体 普通文本消息
Integer msgType = reqDeviceData.getMsgType();
reqDeviceData.setMsgTypeName(MsgTypeEnum.getNameByCode(msgType));
IChatMsgService iChatMsgService = SpringUtil.getBean(IChatMsgService.class);
Long groupId = reqDeviceData.getGroupId();
// 查询组内成员
IChatGroupUserService chatGroupUserService = SpringUtil.getBean(IChatGroupUserService.class);
List<Long> userIdList = chatGroupUserService.listByGroupId(groupId);
// 保存消息记录
List<Long> logList = iChatMsgService.saveGroupMsg(userId, groupId, reqDeviceData, msgType, userIdList);
if (Func.isNotEmpty(userIdList)) {
for (Long toUserId : userIdList) {
// 判断 在线
log.info(String.format("userId: %s 往: %s 发送消息: [ %s]", userId, toUserId, JsonUtil.toJson(reqDeviceData)));
boolean containsKey = USER_CONN_LIST_MAP.containsKey(String.valueOf(toUserId));
if (!containsKey) {
log.warn("userId: [{}] 未连接", toUserId);
success = false;
continue;
}
// 异步发送
Set<ChannelHandlerContext> contexts = USER_CONN_LIST_MAP.get(String.valueOf(toUserId));
for (ChannelHandlerContext context : contexts) {
context.channel().writeAndFlush(new TextWebSocketFrame(JsonUtil.toJson(reqDeviceData)));
}
}
}
if (success) {
// 发送成功 更新 消息成功状态
// iChatMsgService.updateMsgSuccess(logList);
}
return success;
}
/**
* userId 往 toUserId 发送文本消息
*
* @param userId 发送者
* @param toUserId 接受者
* @param reqDeviceData 消息内容
* @return 是否发送成功
*/
private static boolean sendTextMsg(Long userId, Long toUserId, Integer msgType, ReqDeviceData reqDeviceData) {
if (Func.isEmpty(userId)) {
userId = -1L;
}
boolean success = true;
if (toUserId.equals(userId)) {
return false;
}
reqDeviceData.setMsgTypeName(MsgTypeEnum.getNameByCode(msgType));
// 保存 普通消息 消息记录
IChatMsgService iChatMsgService = SpringUtil.getBean(IChatMsgService.class);
Long logId = null;
if (reqDeviceData.getMsgType().equals(MsgTypeEnum.GENERAL_MSG.getCode())) {
logId = iChatMsgService.saveMsg(userId, toUserId, reqDeviceData, msgType);
}
// 判断 在线
log.info(String.format("userId: %s 往: %s 发送消息: [ %s]", userId, toUserId, JsonUtil.toJson(reqDeviceData)));
boolean containsKey = USER_CONN_LIST_MAP.containsKey(String.valueOf(toUserId));
if (!containsKey) {
log.warn("userId: [{}] 未连接", toUserId);
success = false;
}
// 异步发送
Set<ChannelHandlerContext> contexts = USER_CONN_LIST_MAP.get(String.valueOf(toUserId));
if (Func.isNotEmpty(contexts)) {
for (ChannelHandlerContext context : contexts) {
context.channel().writeAndFlush(new TextWebSocketFrame(JsonUtil.toJson(reqDeviceData)));
}
}
// 发送成功 更新 消息成功状态
if (success && reqDeviceData.getMsgType().equals(MsgTypeEnum.GENERAL_MSG.getCode())) {
iChatMsgService.updateMsgSuccess(logId);
}
return success;
}
/**
* 从连接池删除 channelId 与 userId
*/
public static void removeClient(ChannelHandlerContext client) {
String channelId = getLongText(client);
if (CHANNEL_USER_MAP.containsKey(channelId)) {
String userId = CHANNEL_USER_MAP.get(channelId);
if (USER_CONN_LIST_MAP.containsKey(userId)) {
CopyOnWriteArraySet<ChannelHandlerContext> contextSet = USER_CONN_LIST_MAP.get(userId);
CopyOnWriteArraySet<ChannelHandlerContext> newContextSet = new CopyOnWriteArraySet<>();
for (ChannelHandlerContext context : contextSet) {
String thisChannelId = getLongText(context);
if (thisChannelId.equals(channelId)) {
context.close();
} else {
newContextSet.add(context);
}
}
USER_CONN_LIST_MAP.put(userId, newContextSet);
}
}
CHANNEL_USER_MAP.remove(channelId);
}
/**
* 根据ChannelId获取userId编号
*/
public static String getUserId(String channelId) {
return CHANNEL_USER_MAP.get(channelId);
}
/**
* 获取userId ip地址
*/
public static String getClientIpAddress(ChannelHandlerContext client) {
return client.channel().remoteAddress().toString().substring(1);
}
/**
* 获取 type 地址
*/
public static String getType(ChannelHandlerContext client) {
AttributeKey<Object> type = AttributeKey.valueOf(TYPE);
if (client.channel().hasAttr(type)) {
return String.valueOf(client.channel().attr(type).get());
}
return "";
}
/**
* 功能描述: 根据对象发送消息
*
* @param reqDeviceData 发送小婆媳
* @return : void
* @author : yzd e-mail: 121665820@qq.com
* @create : 2022/12/28 10:51
*/
public static void sendMsg(ReqDeviceData reqDeviceData) {
// 发送者 头像
IUserService userService = SpringUtil.getBean(IUserService.class);
User sendUser = userService.getById(reqDeviceData.getUserId());
if (Func.isNotEmpty(sendUser)) {
reqDeviceData.setUserAvatar(sendUser.getAvatar());
reqDeviceData.setFromUserAvatar(sendUser.getAvatar());
reqDeviceData.setFormUserId(reqDeviceData.getUserId());
}
reqDeviceData.setCreateTime(new Date());
// 异步 发送 消息
CompletableFuture.runAsync(() -> {
if (reqDeviceData.getBizMsgType().equalsIgnoreCase(ImConstants.SEND_MSG_TO_USER)) {
// 向其他用户发送消息
MsgUtils.sendBatchTextMsg(reqDeviceData);
// 自动回复
autoReply(reqDeviceData);
} else if (reqDeviceData.getBizMsgType().equalsIgnoreCase(ImConstants.SEND_GROUP_MSG_TO_USER)) {
// 群发 向其他用户发送消息
MsgUtils.sendGroupTextMsg(reqDeviceData);
} else {
log.error("消息发送类型错误:[{}]", reqDeviceData);
}
}, SpringUtil.getBean(ThreadPoolTaskExecutor.class));
}
/**
*
* 功能描述: 自动回复
* @param reqDeviceData reqDeviceData
* @return : void
* @author : yzd e-mail: 121665820@qq.com
* @create : 2023/2/6 11:44
*/
public static void autoReply(ReqDeviceData reqDeviceData) {
List<Long> toUserIdList = reqDeviceData.getToUserIdList();
Integer toType = reqDeviceData.getToType();
Long userId = Long.valueOf(reqDeviceData.getUserId());
// 消息为空 结束
if (Func.isEmpty(reqDeviceData.getMsgContent())) {
return;
}
String msgContent = reqDeviceData.getMsgContent().toString();
if (Func.isNotEmpty(toUserIdList) && Func.isNotEmpty(toType) && toType.equals(YesNoEnum.YES.getCode())) {
IChatWordService iChatWordService = SpringUtil.getBean(IChatWordService.class);
for (Long toUserId : toUserIdList) {
String replyText = iChatWordService.autoReplyList(msgContent, toUserId);
if (Func.isNoneBlank(replyText)) {
String toUserIdStr = String.valueOf(toUserId);
// 自动回复
ReqDeviceData clone = Func.clone(reqDeviceData);
Objects.requireNonNull(clone).setToUserIdList(Collections.singletonList(userId));
clone.setMsgContent(replyText);
clone.setUserId(toUserIdStr);
clone.setFormUserId(toUserIdStr);
clone.setAutoReply(YesNoEnum.YES.getCode());
// 自动回复 向其他用户发送消息
MsgUtils.sendBatchTextMsg(reqDeviceData);
}
}
}
}
}
附 消息体
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
/**
* @author YZD
*/
@ApiModel(value = "请求消息体")
@Data
public class ReqDeviceData implements Serializable {
private static final long serialVersionUID = 1L;
@ApiModelProperty(value = "消息发送类型: 向其他用户发消息 send_msg_to_user , 群发 send_group_msg_to_user")
private String bizMsgType;
@ApiModelProperty(value = "系统消息-业务id")
private Long bizId;
@ApiModelProperty(value = "系统消息-业务类型")
private String bizType;
@ApiModelProperty(value = "接受者类型:1:用户,0:组")
private Integer toType;
@ApiModelProperty(value = "消息类型: 1 普通消息 2系统消息")
private Integer msgType;
@ApiModelProperty(value = "消息类型名称: 1 普通消息 2系统消息")
private String msgTypeName;
@ApiModelProperty(value = "发送到的用户标识")
private List<Long> toUserIdList;
@ApiModelProperty(value = "发送到的组标识")
private Long groupId;
@ApiModelProperty(value = "发送者的用户标识")
private String userId;
@ApiModelProperty(value = "发送者的用户标识")
private String formUserId;
@ApiModelProperty(value = "消息内容类型: 前端自定义,原样返回")
private String msgContentType;
@ApiModelProperty(value = "消息内容")
private Object msgContent;
@ApiModelProperty(value = "发送者的用户头像")
private String userAvatar;
@ApiModelProperty(value = "发送时间")
private Date createTime;
@ApiModelProperty(value = "消息标题")
private String title;
@JsonIgnore
@ApiModelProperty(value = "是否自动回复 0 否 1 是")
private Integer autoReply;
@ApiModelProperty(value = "发送者头像")
private String fromUserAvatar ;
}
附 消息常量
/**
* <p>Title: Constants</p>
* <p>Description: 常量类</p>
*
* @author YZD
*/
public class ImConstants {
/*========userId通信数据========*/
/**
* 向其他用户 发送消息
*/
public static final String SEND_MSG_TO_USER = "send_msg_to_user";
/**
* 群发 向其他用户 发送消息
*/
public static final String SEND_GROUP_MSG_TO_USER = "send_group_msg_to_user";
private ImConstants() {
throw new RuntimeException("常量类不能被初始化");
}
}
附 多线程配置
import lombok.extern.slf4j.Slf4j;
import org.springblade.modules.im.utils.MsgUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Title: ExecutorConfig
* @author: yzd e-mail: 121665820@qq.com
* @date: 2022/12/15 8:53
* @ClassName: ExecutorConfig
* @Description: 线程池配置
*/
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig implements SchedulingConfigurer {
/**
* 功能描述: 线程池
*
* @return : org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
* @author : yzd e-mail: 121665820@qq.com
* @create : 2023/2/11 16:04
*/
@Bean
@Primary
public ThreadPoolTaskExecutor asyncServiceExecutor() {
log.info("start asyncService Executor");
//使用visibleThreadPoolTaskExecutor
ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
// 通过Runtime方法来获取当前服务器cpu内核,根据cpu内核来创建核心线程数和最大线程数
int availableProcessors = Runtime.getRuntime().availableProcessors();
/**
* 配置线程个数
如果是CPU密集型任务,那么线程池的线程个数应该尽量少一些,一般为CPU的个数+1条线程(大量计算)
如果是IO密集型任务,那么线程池的线程可以放的很大,如2*CPU的个数(IO操作)
*/
executor.setCorePoolSize(availableProcessors + 1);
// 允许线程池超时
executor.setAllowCoreThreadTimeOut(true);
//配置最大线程数
executor.setMaxPoolSize(availableProcessors * 4);
// 空闲存活时间
executor.setKeepAliveSeconds(60);
// 设置 等待终止秒数
executor.setAwaitTerminationSeconds(60);
//配置队列大小
executor.setQueueCapacity(availableProcessors * 100);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("async-thread-pool-");
// 等待所有任务结束再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
/**
* 创建一个定长线程池,支持定时及周期性任务执行
*/
@Bean
public ThreadPoolTaskScheduler scheduledThreadPoolExecutor() {
log.info("start Scheduled Executor");
ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
executor.setPoolSize(10);
executor.setThreadNamePrefix("task-thread");
//设置饱和策略
//CallerRunsPolicy:线程池的饱和策略之一,当线程池使用饱和后,直接使用调用者所在的线程来执行任务;如果执行程序已关闭,则会丢弃该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
//定时任务1:扫描所有的 Channel,关闭失效的Channel
executor.scheduleWithFixedDelay(MsgUtils::scanNotActiveChannel, 24 * 60 * 60 * 1000L);
return executor;
}
/**
* 功能描述: 配置 @Scheduled 定时器所使用的线程池
* 配置任务注册器:ScheduledTaskRegistrar 的任务调度器
*
* @param scheduledTaskRegistrar scheduledTaskRegistrar
* @return : void
* "@Scheduled" 默认是单线程执行的,所以在需要的时候,我们可以设置一个线程池去执行定时任务。
* @author : yzd e-mail: 121665820@qq.com
* @create : 2023/2/11 16:30
*/
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
//可配置两种类型:TaskScheduler、ScheduledExecutorService
// scheduledTaskRegistrar.setScheduler(scheduledThreadPoolExecutor());
//只可配置一种类型:taskScheduler
scheduledTaskRegistrar.setTaskScheduler(scheduledThreadPoolExecutor());
}
}
附 连接 处理
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import lombok.extern.slf4j.Slf4j;
import org.springblade.core.secure.BladeUser;
import org.springblade.core.tool.utils.Func;
import org.springblade.modules.im.model.ReqDeviceData;
import org.springblade.modules.im.utils.MsgUtils;
import org.springblade.modules.im.utils.WsAuthUtil;
import org.springblade.modules.mq.WsSendMq;
import java.util.Objects;
/**
* 业务处理器
* TextWebSocketFrame 文本处理
* 测试连接地址:如下
* ws://192.168.2.18:8888/ws?Blade-Auth=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJpc3MiOiJpc3N1c2VyIiwiYXVkIjoiYXVkaWVuY2UiLCJ0ZW5hbnRfaWQiOiIwMDAwMDAiLCJyb2xlX25hbWUiOiJhZG1pbmlzdHJhdG9yLHN5c3RlbSxkb2N0b3IsbWFzc2VzLGNhc2VfbWFuYWdlciIsInBvc3RfaWQiOiIxMTIzNTk4ODE3NzM4Njc1MjAxIiwidXNlcl9pZCI6IjExMjM1OTg4MjE3Mzg2NzUyMDEiLCJyb2xlX2lkIjoiMTEyMzU5ODgxNjczODY3NTIwMSwxNTk1OTQ1ODQ0Mjg3MzA3Nzc3LDE1OTU5NDYwNDIwNjk3MTI4OTcsMTU5NTk0NTk3NTc3NDU0Mzg3MywxNTk1OTQ1NzgyNjg5NzU5MjMzIiwidXNlcl9uYW1lIjoiYWRtaW4iLCJuaWNrX25hbWUiOiLotoXnuqfnrqHnkIblkZgiLCJ0b2tlbl90eXBlIjoiYWNjZXNzX3Rva2VuIiwiZGVwdF9pZCI6IjE2MjQ5MzgxMTM4MDA0NDE4NTgiLCJhY2NvdW50IjoiYWRtaW4iLCJjbGllbnRfaWQiOiJzYWJlciIsImV4cCI6MTc2MjY2Njc5NSwibmJmIjoxNjc2MjY2Nzk1fQ.AlhLnFGB0iodL6GngjuiuxF8n5b-v_bZJ9fH7bfvHM1KNQewNEku9Z_k72YNypMKvzb31QykGYAqkr_OJeM4Eg&type=pc
* @author YZD
*/
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 心跳包
*/
public static final String PING = "ping";
/**
* 通道消息读取
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
String channelId = MsgUtils.getLongText(ctx);
String userId = MsgUtils.getUserIdByChannel(channelId);
String ip = MsgUtils.getClientIpAddress(ctx);
String type = MsgUtils.getType(ctx);
String msgText = msg.text();
log.info("ws 收到消息:[{}], userId={}, ip={}, type={}", msgText, userId, ip,type);
if (log.isDebugEnabled()){
// log.info("ws 收到消息:[{}], userId={}, ip={}, type={}", msgText, userId, ip,type);
}
try {
// 添加userId到连接池
boolean canSerialize = JSONUtil.isTypeJSON(msgText);
if (canSerialize) {
ReqDeviceData reqDeviceData = JSONUtil.toBean(msgText, ReqDeviceData.class);
if (Func.isEmpty(reqDeviceData)) {
log.error("ws 收到错误格式数据");
ctx.close();
}
// 收到数据 需要服务处理
this.handleDeviceResult(reqDeviceData, MsgUtils.getLongText(ctx));
} else {
if (PING.equals(msgText)) {
// 刷新心跳时间
MsgUtils.refreshLastHeartBeatTime(ctx.channel(),userId,type);
// 心跳包 发送
MsgUtils.heat(ctx);
return;
}
log.error("ws 收到未知数据:[{}], userId={}, ip={}, type={}", msgText, userId, ip,type);
ctx.close();
}
} catch (Exception e) {
log.error("ws userId={}, ip={}, type={},接收到消息,处理异常", userId, ip,type, e);
}
}
/**
* 处理业务数据
*/
private void handleDeviceResult(ReqDeviceData reqDeviceData, String channelId) {
// 消息类型为空 拒绝
if (Func.isAllEmpty(reqDeviceData.getMsgType(),reqDeviceData.getBizMsgType())){
return;
}
// 收到数据 需要服务处理
log.info("ws 收到消息-处理业务..., data={}", reqDeviceData);
WsSendMq wsSendMq = SpringUtil.getBean(WsSendMq.class);
if (Func.isNotEmpty(wsSendMq)) {
// wsSendMq.send(reqDeviceData);
}
MsgUtils.sendMsg(reqDeviceData);
}
/**
* 连接初始化
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
String ip = MsgUtils.getClientIpAddress(ctx);
log.info("ws 新连接, ip= {}", ip);
}
/**
* 退出连接
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
String channelId = MsgUtils.getLongText(ctx);
String userId = MsgUtils.getUserIdByChannel(channelId);
String ip = MsgUtils.getClientIpAddress(ctx);
String type = MsgUtils.getType(ctx);
log.info("ws 客户端断开 userId= {}, ip= {}, type= {}", userId, ip,type);
MsgUtils.removeClient(ctx);
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
String channelId = MsgUtils.getLongText(ctx);
String userId = MsgUtils.getUserIdByChannel(channelId);
String ip = MsgUtils.getClientIpAddress(ctx);
String type = MsgUtils.getType(ctx);
log.error("ws 客户端 异常, userId= {}, ip= {} , type={} 异常: {}", userId, ip,type,cause.getMessage());
MsgUtils.removeClient(ctx);
}
/**
* token 当前登录人,当前连接人
*/
private static final String TOKEN = "Blade-Auth";
/**
* 用户端类型 pc app
*/
private static final String TYPE = "type";
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
String uri = complete.requestUri();
if (null != uri && uri.contains(WebSocketServer.WEBSOCKET_PATH) && uri.contains(StringPool.QUESTION_MARK)) {
String[] uriArray = uri.split(StringPool.BACK_SLASH + StringPool.QUESTION_MARK);
if (Func.isNotEmpty(uriArray) && uriArray.length > 1) {
// 多个参数
String[] split = uriArray[1].split(StringPool.AMPERSAND);
String token = null;
String type = null;
for (String str : split) {
String[] paramsArray = str.split(StringPool.EQUALS);
if (Func.isNotEmpty(paramsArray) && paramsArray.length > 1) {
String paramName = paramsArray[0];
String paramStr = paramsArray[1];
if (TOKEN.equalsIgnoreCase(paramName)) {
token = paramStr;
}
if (TYPE.equalsIgnoreCase(paramName)) {
type = paramStr;
}
}
}
if (Func.isNoneBlank(token, type)) {
log.info("ws token 和 type 提取参数成功, type: [{}]", type);
BladeUser bladeUser = WsAuthUtil.getUser(token);
if (Func.isNotEmpty(bladeUser) && Func.isNotEmpty(Objects.requireNonNull(bladeUser).getUserId())) {
Long userId = bladeUser.getUserId();
// 添加设备到连接池
MsgUtils.putUserConn(ctx, userId.toString(), type);
} else {
log.error("ws 握手失败, type:[{}], bladeUser:[{}]",type,bladeUser);
ctx.close();
}
}else {
log.error("ws 握手失败, 连接参数为空");
ctx.close();
}
}else {
log.error("ws 握手失败, 连接参数:[{}]错误",uri);
ctx.close();
}
}else {
log.error("ws 握手失败, 连接地址:[{}]错误",uri);
ctx.close();
}
}
}
}
pom
<!--netty-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
到了这里,关于netty websocket uri 连接时 传参的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!