Springboot项目使用原生Websocket

这篇具有很好参考价值的文章主要介绍了Springboot项目使用原生Websocket。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.启用Websocket功能

package com.xxx.robot.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
@EnableWebSocket
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpoint() {
        return new ServerEndpointExporter();
    }

}

2.封装操作websocket session的工具

package com.xxx.robot.websocket.util;

import java.util.Map;

import javax.websocket.Session;

import org.apache.tomcat.websocket.Constants;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;

import com.xxx.framework.security.config.MyUserDetails;
import com.xxx.framework.security.entity.LoginUser;
import com.xxx.user.entity.User;

public final class WebSocketSessionUtils {

    private WebSocketSessionUtils() {}

	public static final int WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;
    
    public static final int WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;
    
    /**
     * websocket block 发送超时 毫秒
     */
    public static final long WEBSOCKET_BLOCKING_SEND_TIMEOUT = 10 * 1000;

	/**
	 * 从 websocket session 中找到登录用户
	 * 其中 MyUserDetails 继承自 org.springframework.security.core.userdetails.User
	 * LoginUser、User 从业务层自定义的类
	 * 项目中使用了spring security框架
	 */
    public static User findUser (Session session) {
        UsernamePasswordAuthenticationToken uToken = (UsernamePasswordAuthenticationToken) session.getUserPrincipal();
        MyUserDetails userDetails = (MyUserDetails) uToken.getPrincipal();
        LoginUser loginUser = (LoginUser) userDetails.getUserData();
        return (User) loginUser.getAdditionalInfo();
    }
    
    /**
     * 给 websocket session 设置参数
     */
    public static void setProperties(Session session) {
    	//设置websocket文本消息的长度为8M,默认为8k
        session.setMaxTextMessageBufferSize(WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE);
        //设置websocket二进制消息的长度为8M,默认为8k
        session.setMaxBinaryMessageBufferSize(WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE);
        Map<String, Object> userProperties = session.getUserProperties();
        //设置websocket发送消息的超时时长为10秒,默认为20秒
        userProperties.put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, WEBSOCKET_BLOCKING_SEND_TIMEOUT);
    }
}

3.保存websocket session的接口

package com.xxx.robot.websocket;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

import javax.websocket.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface WebSocketSessionManager {
    Logger log = LoggerFactory.getLogger(WebSocketSessionManager.class);
    
    String PING = "ping";
    String PONG = "pong";
    
    Session get (String key);
    
    List<String> keys();

    void add (String key, Session session);
    
    Session remove (String key);
    
    /**
     * ping每一个websocket客户端,如果ping超时,则触发由@OnError注释的方法
     */
    default void pingBatch () {
        List<String> keyList = keys();
        log.info("WebSocket: {} 数量为:{}", this.getClass().getSimpleName(), keyList.size());
        for (String key : keyList) {
            if (key != null) {
                Session session = get(key);
                if (session != null) {
                    try {
                        session.getBasicRemote().sendPing(ByteBuffer.wrap(PING.getBytes()));
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e1) {
                        }
                    } catch (Exception e) {
                        log.error("WebSocket-ping异常", e);
                    }
                }
            }
        }
    }
    
    /**
     * 消除所有websocket客户端
     */
    default void clearAllSession () {
        List<String> keyList = keys();
        int i = 0;
        for (String key : keyList) {
            if (key != null) {
                Session session = get(key);
                if (session != null) {
                    try {
                        remove(key);
                        i++;
                        session.close();
                    } catch (IOException e1) {
                        log.error("WebSocket-移除并关闭session异常", e1);
                    }
                    if (i % 10 == 0) {
                        try {
                            Thread.sleep(0);
                        } catch (InterruptedException e1) {
                        }
                    }
                }
            }
        }
        log.info("WebSocket-移除并关闭session数量为:{}", i);
    }
}

4.保存websocket session的类

package com.xxx.robot.websocket.robot.manager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

import javax.websocket.Session;

import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import com.xxx.robot.websocket.WebSocketSessionManager;

/**
 * 机器人模块WebSocket Session管理器
 */
@Component
public class RobotSessionManager implements WebSocketSessionManager {
    
    /**
     * key = userId + '-' + managerId
     * userId 从当前登录用户中可得到, managerId由客户端连接websocket时按服务端的接口传给服务端
     * 因为业务中不仅要获取每一个客户端,还要获取同一个用户下的所有客户端,所以由ConcurrentHashMap改为ConcurrentSkipListMap
     */
    private static final ConcurrentSkipListMap<String, Session> SESSION_POOL = new ConcurrentSkipListMap<>();
    
    public static final String joinKey (String userId, String managerId) {
        return userId + '-' + managerId;
    }

    public static final String joinKey (Long userId, String managerId) {
        return userId.toString() + '-' + managerId;
    }
    
    public static final String[] splitKey (String key) {
        return StringUtils.split(key, '-');
    }

    @Override
    public Session get(String key) {
        return SESSION_POOL.get(key);
    }
    
    /**
     * 根据用户ID查询所有websocket session的key
     * @param userId
     * @param excludeManagerId 排除的key, 可为空
     * @return
     */
    public List<String> keysByUserId(String userId, String excludeManagerId) {
    	//'-'的ascii码为45, '.'的ascii码为46, 所以下面获得的是key以 userId + '-' 为前缀的map视图
        ConcurrentNavigableMap<String, Session> subMap = SESSION_POOL.subMap(userId + '-', userId + '.');
        NavigableSet<String> keySet = subMap.navigableKeySet();
        List<String> list = new ArrayList<>();
        if (StringUtils.isBlank(excludeManagerId)) {
            for (String key : keySet) {
                if (key != null) {
                    list.add(key);
                }
            }
        } else {
            for (String key : keySet) {
                if (key != null && !key.equals(excludeManagerId)) {
                    list.add(key);
                }
            }
        }
        return list;
    }

    @Override
    public List<String> keys() {
        NavigableSet<String> keySet = SESSION_POOL.navigableKeySet();
        List<String> list = new ArrayList<>();
        for (String key : keySet) {
            if (key != null) {
                list.add(key);
            }
        }
        return list;
    }

    @Override
    public synchronized void add(String key, Session session) {
        removeAndClose(key);
        SESSION_POOL.put(key, session);
    }

    @Override
    public synchronized Session remove(String key) {
        return SESSION_POOL.remove(key);
    }
    
    /**
     * 必须key和value都匹配才能删除
     */
    public synchronized void remove(String key, Session session) {
        SESSION_POOL.remove(key, session);
    }
    
    private void removeAndClose (String key) {
        Session session = remove(key);
        if (session != null) {
            try {
                session.close();
            } catch (IOException e) {
            }
        }
    }

}

5.定义websocket 端点

package com.xxx.robot.websocket.robot.endpoint;

import java.util.Map;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.JsonNode;
import com.xxx.framework.util.SpringBeanUtils;
import com.xxx.user.entity.User;
import com.xxx.robot.corefunc.service.RobotCoreService;
import com.xxx.robot.util.serial.BaseJsonUtils;
import com.xxx.robot.websocket.WebSocketSessionManager;
import com.xxx.robot.websocket.robot.manager.RobotSessionManager;
import com.xxx.robot.websocket.util.WebSocketSessionUtils;

import lombok.extern.slf4j.Slf4j;

/**
 * 机器人模块WebSocket接口
 * 每一次websocket请求,RobotWebSocketServer都是一个新的实例,所以成员变量是安全的
 * 以致虽然类由@Component注释,但不可使用@Autowired等方式注入bean
 */
@Slf4j
@Component
@ServerEndpoint(value = "/robot/{id}")
public class RobotWebSocketServer {
    
    private volatile User user;
    
    private volatile String id;
    
    private volatile Session session;
    
    private volatile Map<String, RobotCoreService> robotCoreServiceMap;

    /**
     * 所有初始化操作都写在@OnOpen注释的方法中
     * 连接成功
     * @param session
     */
    @OnOpen
    public void onOpen(@PathParam("id") String id, Session session) {
        WebSocketSessionUtils.setProperties(session);
        this.user = WebSocketSessionUtils.findUser(session);
        this.id = id;
        this.session = session;
        log.info("连接成功:{}, {}", id, this.user.getUserCode());
        //使用BeanUtils代替@Autowired获取bean, 
        //RobotCoreService为业务类,不必关心
        robotCoreServiceMap = SpringBeanUtils.getApplicationContext().getBeansOfType(RobotCoreService.class);
        RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);
        //保存websocket session
        robotSessionManager.add(RobotSessionManager.joinKey(this.user.getId(), id), session);
    }

    /**
     * 连接关闭
     * @param session
     */
    @OnClose
    public void onClose() {
        log.info("连接关闭:{}, {}", this.id, this.user.getUserCode());
        RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);
        //连接关闭时,使用两个参数的remove方法,多线程下安全删除
        robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);
    }
    
    @OnError
    public void onError(Throwable error) {
        log.error("onError:id = {}, {}, {}", this.id, this.session.getId(), this.user.getUserCode(), error);
        RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);
        //websocket异常时,使用两个参数的remove方法,多线程下安全删除
        //比如ping客户端超时,触发此方法,删除该客户端
        robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);
    }

    /**
     * 接收到消息
     * @param message
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("onMessage:id = {}, {}, {}", this.id, this.user.getUserCode(), message);
        if (WebSocketSessionManager.PING.equals(message)) {
        	//自定义ping接口,收到ping后,响应pong,客户端暂时未使用此接口
            this.session.getAsyncRemote().sendText(WebSocketSessionManager.PONG);
            return;
        }
        //用 try...catch 包裹防止抛出异常导致websocket关闭
        try {
        	//业务层,使用jackson反序列化json,不必关心具体的业务
            JsonNode root = BaseJsonUtils.readTree(message);
            String apiType = root.at("/apiType").asText();
            //业务层代码应在子线程中执行,防止wesocket线程执行时间过长导致websocket关闭
            robotCoreServiceMap.get(apiType + "Service").receiveFrontMessage(this.user, RobotSessionManager.joinKey(this.user.getId(), this.id), root);
        } catch (Exception e) {
            log.error("处理消息错误", e);
        }
    }
    
}

Springboot项目使用原生Websocket文章来源地址https://www.toymoban.com/news/detail-507882.html

6.创建定时任务 ping websocket 客户端

package com.xxx.robot.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * 启用定时任务功能
 * 因为websocket session是有状态的,只能保存在各自的服务端,
 * 所以只能使用单机式的定时任务,而不能使用分布式定时任务,
 * 因此 springboot自带的定时任务功能成为了首选
 * springboot定时任务线程池
 */
@Configuration
@EnableScheduling
public class TaskExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(10);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("scheduler-executor-");
        return executor;
    }

}
package com.xxx.robot.websocket;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
 * @author Sunzhihua
 */
@Slf4j
@Component
public class WebSocketSchedulerTask {
    
    /**
     * 注入所有的 websocket session 管理器
     */
    @Autowired
    private List<WebSocketSessionManager> webSocketSessionManagers;

	/**
	 * initialDelay 表示 延迟60秒初始化
	 * fixedDelay 表示 上一次任务结束后,再延迟30秒执行
	 */
    @Scheduled(initialDelay = 60000, fixedDelay = 30000)
    public void clearInvalidSession() {
        try {
            log.info("pingBatch 开始。。。");
            for (WebSocketSessionManager webSocketSessionManager : webSocketSessionManagers) {
                webSocketSessionManager.pingBatch();
            }
            log.info("pingBatch 完成。。。");
        } catch (Exception e) {
            log.error("pingBatch异常", e);
        }
    }
}

到了这里,关于Springboot项目使用原生Websocket的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • spring-websocket在SpringBoot(包含SpringSecurity)项目中的导入

    ✅作者简介:大家好,我是 Meteors., 向往着更加简洁高效的代码写法与编程方式,持续分享Java技术内容。 🍎个人主页:Meteors.的博客 🥭本文内容:spring-websocket在SpringBoot(包含SpringSecurity)项目中的导入 -----------------------------------------------------       目录       --------------

    2024年02月15日
    浏览(43)
  • 宝兰德BES安装及Springboot项目打包部署及websocket解决方案

    随着软件国产化越来越普及,客户在实施中选择了宝兰德,这几天终于弄好,发个文章记录一下。 BES安装JDK使用版本为Kona8.0.1_242 2.bes安装,在实施过程中,将安装步骤整合成shell脚本了,方便新服务器一键安装,脚本包含内容:一键安装参数检查、解压安装包、导入证书、指

    2024年01月19日
    浏览(49)
  • 微信小程序如何使用原生Websocket与Asp.Net Core SignalR 通信

    如题,这可能算是.net 做小程序的服务端时,绕不开的一个问题,老生常谈了。同样的问题,我记得我2018/19年的一个项目的解决方案是: 修改官方的SignalR.js的客户端 :把里面用到浏览器的Websocket改成微信小程序的官方api的。目前网上也有不少这样的方案,已经改好开源了;

    2024年02月08日
    浏览(62)
  • 基于SpringBoot+WebSocket+Spring Task的前后端分离外卖项目-订单管理(十七)

    1.1 介绍 Spring Task 是Spring框架提供的任务调度工具,可以按照约定的时间自动执行某个代码逻辑。 定位 :定时任务框架 作用 :定时自动执行某段Java代码 应用场景: 1). 信用卡每月还款提醒 2). 银行贷款每月还款提醒 3). 火车票售票系统处理未支付订单 4). 入职纪念日为用户发

    2024年02月21日
    浏览(57)
  • 微信小程序如何使用原生Websocket api与Asp.Net Core SignalR 通信

    如题,这可能算是.net 做小程序的服务端时,绕不开的一个问题,老生常谈了。同样的问题,我记得我2018/19年的一个项目的解决方案是: 修改官方的SignalR.js的客户端 :把里面用到浏览器的Websocket改成微信小程序的官方api的。目前网上也有不少这样的方案,已经改好开源了;

    2024年02月09日
    浏览(74)
  • SpringBoot整合Websocket(Java websocket怎么使用)

    WebSocket 是一种基于 TCP 协议的全双工通信协议,可以在浏览器和服务器之间建立 实时、双向的数据通信 。可以用于在线聊天、在线游戏、实时数据展示等场景。与传统的 HTTP 协议不同,WebSocket 可以保持 长连接 ,实时传输数据,避免了频繁的 HTTP 请求和响应,节省了网络带

    2024年02月10日
    浏览(41)
  • SpringBoot中使用WebSocket

    1. 在pom.xml配置文件中添加spring-boot-starter-websocket依赖。 2. 添加WebSocket配置类 WebSocketConfig.java 3. 添加WebSocket请求处理类 WebSocketServer.java 4. 通过在线工具连接测试WebSocket  在线测试工具:www.websocket-test.com

    2024年02月04日
    浏览(31)
  • SpringBoot 使用WebSocket详解

    userId根据业务需求自己定义,属于唯一标识 点关注不迷路,喜欢的朋友们关注支持一下 给点继续写的动力,感谢!!

    2024年02月11日
    浏览(34)
  • 【Spring实战项目】SpringBoot3整合WebSocket+拦截器实现登录验证!从原理到实战

    🎉🎉 欢迎光临,终于等到你啦 🎉🎉 🏅我是 苏泽 ,一位对技术充满热情的探索者和分享者。🚀🚀 🌟持续更新的专栏 《Spring 狂野之旅:从入门到入魔》 🚀 本专栏带你从Spring入门到入魔   这是苏泽的个人主页可以看到我其他的内容哦👇👇 努力的苏泽 http://suzee.blog.

    2024年04月17日
    浏览(58)
  • SpringBoot+Vue使用WebSocket

    一:什么是Websocket? WebSocket是HTML5下一种新的协议(websocket协议本质上是一个基于tcp的协议) 它实现了浏览器与服务器全双工通信,能更好的节省服务器资源和带宽并达到实时通讯的目的 Websocket是一个 持久化 的协议 二:websocket的原理         1.websocket约定了一个通信的规范,

    2024年02月01日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包