1.启用Websocket功能
package com.xxx.robot.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
@EnableWebSocket
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpoint() {
return new ServerEndpointExporter();
}
}
2.封装操作websocket session的工具
package com.xxx.robot.websocket.util;
import java.util.Map;
import javax.websocket.Session;
import org.apache.tomcat.websocket.Constants;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import com.xxx.framework.security.config.MyUserDetails;
import com.xxx.framework.security.entity.LoginUser;
import com.xxx.user.entity.User;
public final class WebSocketSessionUtils {
private WebSocketSessionUtils() {}
public static final int WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;
public static final int WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;
/**
* websocket block 发送超时 毫秒
*/
public static final long WEBSOCKET_BLOCKING_SEND_TIMEOUT = 10 * 1000;
/**
* 从 websocket session 中找到登录用户
* 其中 MyUserDetails 继承自 org.springframework.security.core.userdetails.User
* LoginUser、User 从业务层自定义的类
* 项目中使用了spring security框架
*/
public static User findUser (Session session) {
UsernamePasswordAuthenticationToken uToken = (UsernamePasswordAuthenticationToken) session.getUserPrincipal();
MyUserDetails userDetails = (MyUserDetails) uToken.getPrincipal();
LoginUser loginUser = (LoginUser) userDetails.getUserData();
return (User) loginUser.getAdditionalInfo();
}
/**
* 给 websocket session 设置参数
*/
public static void setProperties(Session session) {
//设置websocket文本消息的长度为8M,默认为8k
session.setMaxTextMessageBufferSize(WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE);
//设置websocket二进制消息的长度为8M,默认为8k
session.setMaxBinaryMessageBufferSize(WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE);
Map<String, Object> userProperties = session.getUserProperties();
//设置websocket发送消息的超时时长为10秒,默认为20秒
userProperties.put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, WEBSOCKET_BLOCKING_SEND_TIMEOUT);
}
}
3.保存websocket session的接口
package com.xxx.robot.websocket;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import javax.websocket.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public interface WebSocketSessionManager {
Logger log = LoggerFactory.getLogger(WebSocketSessionManager.class);
String PING = "ping";
String PONG = "pong";
Session get (String key);
List<String> keys();
void add (String key, Session session);
Session remove (String key);
/**
* ping每一个websocket客户端,如果ping超时,则触发由@OnError注释的方法
*/
default void pingBatch () {
List<String> keyList = keys();
log.info("WebSocket: {} 数量为:{}", this.getClass().getSimpleName(), keyList.size());
for (String key : keyList) {
if (key != null) {
Session session = get(key);
if (session != null) {
try {
session.getBasicRemote().sendPing(ByteBuffer.wrap(PING.getBytes()));
try {
Thread.sleep(10);
} catch (InterruptedException e1) {
}
} catch (Exception e) {
log.error("WebSocket-ping异常", e);
}
}
}
}
}
/**
* 消除所有websocket客户端
*/
default void clearAllSession () {
List<String> keyList = keys();
int i = 0;
for (String key : keyList) {
if (key != null) {
Session session = get(key);
if (session != null) {
try {
remove(key);
i++;
session.close();
} catch (IOException e1) {
log.error("WebSocket-移除并关闭session异常", e1);
}
if (i % 10 == 0) {
try {
Thread.sleep(0);
} catch (InterruptedException e1) {
}
}
}
}
}
log.info("WebSocket-移除并关闭session数量为:{}", i);
}
}
4.保存websocket session的类
package com.xxx.robot.websocket.robot.manager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import javax.websocket.Session;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import com.xxx.robot.websocket.WebSocketSessionManager;
/**
* 机器人模块WebSocket Session管理器
*/
@Component
public class RobotSessionManager implements WebSocketSessionManager {
/**
* key = userId + '-' + managerId
* userId 从当前登录用户中可得到, managerId由客户端连接websocket时按服务端的接口传给服务端
* 因为业务中不仅要获取每一个客户端,还要获取同一个用户下的所有客户端,所以由ConcurrentHashMap改为ConcurrentSkipListMap
*/
private static final ConcurrentSkipListMap<String, Session> SESSION_POOL = new ConcurrentSkipListMap<>();
public static final String joinKey (String userId, String managerId) {
return userId + '-' + managerId;
}
public static final String joinKey (Long userId, String managerId) {
return userId.toString() + '-' + managerId;
}
public static final String[] splitKey (String key) {
return StringUtils.split(key, '-');
}
@Override
public Session get(String key) {
return SESSION_POOL.get(key);
}
/**
* 根据用户ID查询所有websocket session的key
* @param userId
* @param excludeManagerId 排除的key, 可为空
* @return
*/
public List<String> keysByUserId(String userId, String excludeManagerId) {
//'-'的ascii码为45, '.'的ascii码为46, 所以下面获得的是key以 userId + '-' 为前缀的map视图
ConcurrentNavigableMap<String, Session> subMap = SESSION_POOL.subMap(userId + '-', userId + '.');
NavigableSet<String> keySet = subMap.navigableKeySet();
List<String> list = new ArrayList<>();
if (StringUtils.isBlank(excludeManagerId)) {
for (String key : keySet) {
if (key != null) {
list.add(key);
}
}
} else {
for (String key : keySet) {
if (key != null && !key.equals(excludeManagerId)) {
list.add(key);
}
}
}
return list;
}
@Override
public List<String> keys() {
NavigableSet<String> keySet = SESSION_POOL.navigableKeySet();
List<String> list = new ArrayList<>();
for (String key : keySet) {
if (key != null) {
list.add(key);
}
}
return list;
}
@Override
public synchronized void add(String key, Session session) {
removeAndClose(key);
SESSION_POOL.put(key, session);
}
@Override
public synchronized Session remove(String key) {
return SESSION_POOL.remove(key);
}
/**
* 必须key和value都匹配才能删除
*/
public synchronized void remove(String key, Session session) {
SESSION_POOL.remove(key, session);
}
private void removeAndClose (String key) {
Session session = remove(key);
if (session != null) {
try {
session.close();
} catch (IOException e) {
}
}
}
}
5.定义websocket 端点
package com.xxx.robot.websocket.robot.endpoint;
import java.util.Map;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.JsonNode;
import com.xxx.framework.util.SpringBeanUtils;
import com.xxx.user.entity.User;
import com.xxx.robot.corefunc.service.RobotCoreService;
import com.xxx.robot.util.serial.BaseJsonUtils;
import com.xxx.robot.websocket.WebSocketSessionManager;
import com.xxx.robot.websocket.robot.manager.RobotSessionManager;
import com.xxx.robot.websocket.util.WebSocketSessionUtils;
import lombok.extern.slf4j.Slf4j;
/**
* 机器人模块WebSocket接口
* 每一次websocket请求,RobotWebSocketServer都是一个新的实例,所以成员变量是安全的
* 以致虽然类由@Component注释,但不可使用@Autowired等方式注入bean
*/
@Slf4j
@Component
@ServerEndpoint(value = "/robot/{id}")
public class RobotWebSocketServer {
private volatile User user;
private volatile String id;
private volatile Session session;
private volatile Map<String, RobotCoreService> robotCoreServiceMap;
/**
* 所有初始化操作都写在@OnOpen注释的方法中
* 连接成功
* @param session
*/
@OnOpen
public void onOpen(@PathParam("id") String id, Session session) {
WebSocketSessionUtils.setProperties(session);
this.user = WebSocketSessionUtils.findUser(session);
this.id = id;
this.session = session;
log.info("连接成功:{}, {}", id, this.user.getUserCode());
//使用BeanUtils代替@Autowired获取bean,
//RobotCoreService为业务类,不必关心
robotCoreServiceMap = SpringBeanUtils.getApplicationContext().getBeansOfType(RobotCoreService.class);
RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);
//保存websocket session
robotSessionManager.add(RobotSessionManager.joinKey(this.user.getId(), id), session);
}
/**
* 连接关闭
* @param session
*/
@OnClose
public void onClose() {
log.info("连接关闭:{}, {}", this.id, this.user.getUserCode());
RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);
//连接关闭时,使用两个参数的remove方法,多线程下安全删除
robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);
}
@OnError
public void onError(Throwable error) {
log.error("onError:id = {}, {}, {}", this.id, this.session.getId(), this.user.getUserCode(), error);
RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);
//websocket异常时,使用两个参数的remove方法,多线程下安全删除
//比如ping客户端超时,触发此方法,删除该客户端
robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);
}
/**
* 接收到消息
* @param message
*/
@OnMessage
public void onMessage(String message) {
log.info("onMessage:id = {}, {}, {}", this.id, this.user.getUserCode(), message);
if (WebSocketSessionManager.PING.equals(message)) {
//自定义ping接口,收到ping后,响应pong,客户端暂时未使用此接口
this.session.getAsyncRemote().sendText(WebSocketSessionManager.PONG);
return;
}
//用 try...catch 包裹防止抛出异常导致websocket关闭
try {
//业务层,使用jackson反序列化json,不必关心具体的业务
JsonNode root = BaseJsonUtils.readTree(message);
String apiType = root.at("/apiType").asText();
//业务层代码应在子线程中执行,防止wesocket线程执行时间过长导致websocket关闭
robotCoreServiceMap.get(apiType + "Service").receiveFrontMessage(this.user, RobotSessionManager.joinKey(this.user.getId(), this.id), root);
} catch (Exception e) {
log.error("处理消息错误", e);
}
}
}
文章来源地址https://www.toymoban.com/news/detail-507882.html
6.创建定时任务 ping websocket 客户端
package com.xxx.robot.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* 启用定时任务功能
* 因为websocket session是有状态的,只能保存在各自的服务端,
* 所以只能使用单机式的定时任务,而不能使用分布式定时任务,
* 因此 springboot自带的定时任务功能成为了首选
* springboot定时任务线程池
*/
@Configuration
@EnableScheduling
public class TaskExecutorConfig {
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("scheduler-executor-");
return executor;
}
}
package com.xxx.robot.websocket;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
/**
* @author Sunzhihua
*/
@Slf4j
@Component
public class WebSocketSchedulerTask {
/**
* 注入所有的 websocket session 管理器
*/
@Autowired
private List<WebSocketSessionManager> webSocketSessionManagers;
/**
* initialDelay 表示 延迟60秒初始化
* fixedDelay 表示 上一次任务结束后,再延迟30秒执行
*/
@Scheduled(initialDelay = 60000, fixedDelay = 30000)
public void clearInvalidSession() {
try {
log.info("pingBatch 开始。。。");
for (WebSocketSessionManager webSocketSessionManager : webSocketSessionManagers) {
webSocketSessionManager.pingBatch();
}
log.info("pingBatch 完成。。。");
} catch (Exception e) {
log.error("pingBatch异常", e);
}
}
}
文章来源:https://www.toymoban.com/news/detail-507882.html
到了这里,关于Springboot项目使用原生Websocket的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!