前言:
WebSocket PING-PONG心跳机制,只需要服务端发送PING,客户端会自动回应PONG,本文中使用了两个@OnMassage注解一个用于接收Text消息,一个用于接收PONG响应消息,此外还有二进制格式(InputStream
,byte[],ByteBuffer
等)。
说明:
记录一下,自己使用的WebSocket方式。
性能可能不是最优,也有可能有其他隐患。
(作者逻辑可能也点问题,有大佬发现问题还请不用口下留情!)
一、引入依赖
还有Lombok等自行导入
<!-- websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
二、创建WebSocket配置类
@Configuration
public class WebSocketConfig {
/**
* ServerEndpointExporter类的作用是,会扫描所有的服务器端点,
* 把带有@ServerEndpoint 注解的所有类都添加进来
*
*/
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
三、创建WebSocket服务类
这类token并没有做过期相关的处理,可以根据个人需求添加
/**
* webSocket服务
*/
@Slf4j
@Component
@ServerEndpoint("/wsserver/{groupId}/{userId}")
public class WebSocketServer {
private static WebSocketGroupManager groupManager;
private ScheduledExecutorService executor= Executors.newSingleThreadScheduledExecutor();
@Autowired
public void setWebSocketGroupManager(WebSocketGroupManager manager) {
groupManager = manager;
}
@OnOpen
public void onOpen(Session session, @PathParam("groupId") String groupId, @PathParam("userId") String userId) {
try {
//包含token校验
String queryString = session.getQueryString();
Map<String, List<String>> queryParams = decodeQueryString(queryString);
// 获取token参数
List<String> tokenValues = queryParams.get("token");
if (tokenValues != null && !tokenValues.isEmpty()) {
//根据自己的param顺序,实现不同的业务逻辑
String token = tokenValues.get(0);
// 进行校验操作
if (isValidToken(token)) {
WebSocketGroup group = groupManager.getOrCreateGroup(groupId);
WebSocketUser user = new WebSocketUser(session, userId,token);
group.addUser(user);
// 更新在线用户计数器
int count = groupManager.increaseOnlineCount();
session.getAsyncRemote().sendPing(ByteBuffer.wrap(new byte[0]));
log.info("校验通过!用户:{} 上线,当前在线人数:{},分组:{},分组在线人数:{}", userId, count, groupId,group != null ? group.getUserCount() : 0);
// 校验通过,进行其他操作
} else {
// 校验失败,关闭连接
closeSession(session,groupId,userId);
log.warn("校验失败!用户:{} token 错误!",userId);
}
} else {
// 没有提供token参数,直接过滤,关闭连接
closeSession(session,groupId,userId);
}
} catch (Exception e) {
log.error("用户:{} ,连接时发送异常!异常信息:{}", userId, e.getMessage());
closeSession(session, groupId, userId);
}
}
// 接收普通消息
@OnMessage
public void onMessage(String message, Session session, @PathParam("groupId") String groupId, @PathParam("userId") String userId) {
WebSocketGroup group = groupManager.getGroup(groupId);
if (group != null) {
group.sendMessageToAllUsers(message);
}
}
// 接收心跳消息
@OnMessage
public void onPong(PongMessage pong, Session session, @PathParam("groupId") String groupId, @PathParam("userId") String userId) {
executor.schedule(() -> {
try {
// 发送空的Ping消息
session.getAsyncRemote().sendPing(ByteBuffer.wrap(new byte[0]));
} catch (IOException e) {
// 处理发送失败的情况
log.error("Ping 用户:{} 心跳异常,关闭会话,错误原因:{}", userId, e.getMessage());
closeSession(session, groupId, userId);
}
}, 30, TimeUnit.SECONDS);
}
@OnClose
public void onClose(@PathParam("groupId") String groupId, @PathParam("userId") String userId, Session session) {
try {
WebSocketGroup group = groupManager.getGroup(groupId);
if (group != null) {
group.removeUser(userId);
// 检查分组的用户数量,如果为0,则从分组管理器中删除分组对象
if (group.getUserCount() == 0) {
groupManager.removeGroup(groupId);
}
}
// 更新在线用户计数器
int count = groupManager.decreaseOnlineCount();
log.info("用户:{} 退出,当前在线人数:{},分组:{},分组在线人数:{}", userId, count, groupId, group != null ? group.getUserCount() : 0);
} catch (Exception e) {
log.error("连接关闭时异常!用户:{},分组:{},错误原因:{}", userId, groupId, e.getMessage());
closeSession(session, groupId, userId);
}
}
@OnError
public void onError(Throwable throwable, @PathParam("groupId") String groupId, @PathParam("userId") String userId, Session session) {
// 向客户端发送错误消息
session.getAsyncRemote().sendText("发生了错误,请稍后再试。");
log.error("连接异常!用户:{},分组:{},错误原因:{}", userId, groupId, throwable.getMessage());
closeSession(session, groupId, userId);
}
/**
* 关闭Session
*
* @param session
*/
private void closeSession(Session session, String groupId, String userId) {
// 关闭连接
if (session != null && session.isOpen()) {
//关闭后删除掉对应用户信息
WebSocketGroup group = groupManager.getGroup(groupId);
if (group != null) {
group.removeUser(userId);
// 检查分组的用户数量,如果为0,则从分组管理器中删除分组对象
if (group.getUserCount() == 0) {
groupManager.removeGroup(groupId);
}
}
// 更新在线用户计数器
int count = groupManager.decreaseOnlineCount();
try {
session.close();
} catch (IOException e) {
log.error("关闭session会话时异常:{}", e.getMessage());
}
}
}
/**
* 获取全部在线用户数量统计
*
* @return
*/
public static int getOnlineCount() {
return groupManager.getOnlineCount();
}
/**
* 向所有分组的子目录下发命令。
*
* @param message
* @warn 由服务器统一下发,若使用多线程,存在线程安全问题。
*/
public static void sendMessageToAllGroups(String message) {
groupManager.sendMessageToAllGroups(message);
}
// 校验token的方法
private boolean isValidToken(String token) {
// 根据自己的需求,进行校验逻辑,返回校验结果
return true;
}
// 解码查询参数
private Map<String, List<String>> decodeQueryString(String queryString) {
// 根据自己的需求实现解码逻辑
//这里做简单的解析参数。
Map<String, List<String>> queryParams = new HashMap<>();
String[] pairs = queryString.split("&");
for (String pair : pairs) {
String[] parts = pair.split("=");
String name = parts[0];
String value = "";
if (parts.length > 1) {
value = parts[1];
}
queryParams.computeIfAbsent(name, k -> new ArrayList<>()).add(value);
}
return queryParams;
}
}
四、创建WebSocket分组以及分组管理器
分组管理器
@Slf4j
@Component
public class WebSocketGroupManager {
private final Map<String, WebSocketGroup> groups;
private AtomicInteger onlineCount;
public WebSocketGroupManager() {
this.groups = new ConcurrentHashMap<>();
this.onlineCount = new AtomicInteger(0);
}
public void addGroup(WebSocketGroup group) {
groups.put(group.getGroupId(), group);
}
public void removeGroup(String groupId) {
groups.remove(groupId);
}
public WebSocketGroup getGroup(String groupId) {
return groups.get(groupId);
}
public WebSocketGroup getOrCreateGroup(String groupId) {
WebSocketGroup group = groups.get(groupId);
if (group == null) {
group = new WebSocketGroup(groupId);
groups.put(groupId, group);
}
return group;
}
public void sendMessageToAllGroups(String message) {
for (WebSocketGroup group : groups.values()) {
group.sendMessageToAllUsers(message);
}
}
public int getGroupUserCount(String groupId) {
WebSocketGroup group = groups.get(groupId);
if (group != null) {
return group.getUserCount();
}
return 0;
}
public int getOnlineCount() {
return onlineCount.get();
}
public int increaseOnlineCount() {
return onlineCount.incrementAndGet();
}
public int decreaseOnlineCount() {
return onlineCount.decrementAndGet();
}
}
分组文章来源:https://www.toymoban.com/news/detail-670006.html
@Slf4j
public class WebSocketGroup {
private String groupId;
private Map<String, WebSocketUser> users;
private int userCount;
public WebSocketGroup(String groupId) {
this.groupId = groupId;
this.users =new HashMap<>();
this.userCount = 0;
}
public void addUser(WebSocketUser user) {
users.put(user.getUserId(), user);
// 更新在线用户计数器
userCount++;
}
public WebSocketUser getUser(String userId) {
return users.get(userId);
}
public void removeUser(String userId) {
if (users.containsKey(userId)) {
WebSocketUser removedUser = users.remove(userId);
// 更新在线用户计数器
if (removedUser != null) {
userCount--;
}
} else {
// 用户不存在
}
}
public int getUserCount() {
return userCount;
}
/**
* 向当前分组下所有用户发送信息
* @param message
*/
public void sendMessageToAllUsers(String message) {
for (WebSocketUser user : users.values()) {
user.sendMessage(message);
}
}
public String getGroupId() {
return groupId;
}
}
用户文章来源地址https://www.toymoban.com/news/detail-670006.html
@Slf4j
public class WebSocketUser {
private Session session;
private String userId;
private String token;
public WebSocketUser(Session session, String userId,String token) {
this.session = session;
this.userId = userId;
this.token=token;
}
public WebSocketUser(Session session, String userId) {
this.session = session;
this.userId = userId;
}
public Session getSession() {
return session;
}
public String getUserId() {
return userId;
}
public void sendMessage(String message) {
try {
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
log.error("发送消息异常!用户:{},错误原因:{}", userId, e.getMessage());
}
}
}
到了这里,关于SpringBoot+WebSocket 消息推送 校验 心跳机制 PING-PONG 用户分组等的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!