SpringBoot+WebSocket 消息推送 校验 心跳机制 PING-PONG 用户分组等

这篇具有很好参考价值的文章主要介绍了SpringBoot+WebSocket 消息推送 校验 心跳机制 PING-PONG 用户分组等。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言:

        WebSocket PING-PONG心跳机制,只需要服务端发送PING,客户端会自动回应PONG,本文中使用了两个@OnMassage注解一个用于接收Text消息,一个用于接收PONG响应消息,此外还有二进制格式(InputStream ,byte[],ByteBuffer 等)。
          

说明:      

        记录一下,自己使用的WebSocket方式。

        性能可能不是最优,也有可能有其他隐患。

        (作者逻辑可能也点问题,有大佬发现问题还请不用口下留情!)

一、引入依赖

 还有Lombok等自行导入

<!-- websocket -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

二、创建WebSocket配置类

@Configuration
public class WebSocketConfig {

    /**
     * ServerEndpointExporter类的作用是,会扫描所有的服务器端点,
     * 把带有@ServerEndpoint 注解的所有类都添加进来
     *
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }

}

三、创建WebSocket服务类

这类token并没有做过期相关的处理,可以根据个人需求添加 

/**
 * webSocket服务
 */
@Slf4j
@Component
@ServerEndpoint("/wsserver/{groupId}/{userId}")
public class WebSocketServer {

    private static WebSocketGroupManager groupManager;
    private ScheduledExecutorService executor= Executors.newSingleThreadScheduledExecutor();
    @Autowired
    public void setWebSocketGroupManager(WebSocketGroupManager manager) {
        groupManager = manager;
    }

    @OnOpen
    public void onOpen(Session session, @PathParam("groupId") String groupId, @PathParam("userId") String userId) {
        try {
            //包含token校验
            String queryString = session.getQueryString();
            Map<String, List<String>> queryParams = decodeQueryString(queryString);
            // 获取token参数
            List<String> tokenValues = queryParams.get("token");
            if (tokenValues != null && !tokenValues.isEmpty()) {
                //根据自己的param顺序,实现不同的业务逻辑
                String token = tokenValues.get(0);
                // 进行校验操作
                if (isValidToken(token)) {
                    WebSocketGroup group = groupManager.getOrCreateGroup(groupId);
                    WebSocketUser user = new WebSocketUser(session, userId,token);
                    group.addUser(user);
                    // 更新在线用户计数器
                    int count = groupManager.increaseOnlineCount();

                    session.getAsyncRemote().sendPing(ByteBuffer.wrap(new byte[0]));

                    log.info("校验通过!用户:{} 上线,当前在线人数:{},分组:{},分组在线人数:{}", userId, count, groupId,group != null ? group.getUserCount() : 0);
                    // 校验通过,进行其他操作
                } else {
                    // 校验失败,关闭连接
                    closeSession(session,groupId,userId);
                    log.warn("校验失败!用户:{} token 错误!",userId);
                }
            } else {
                // 没有提供token参数,直接过滤,关闭连接
                closeSession(session,groupId,userId);
            }
        } catch (Exception e) {
            log.error("用户:{} ,连接时发送异常!异常信息:{}", userId, e.getMessage());
            closeSession(session, groupId, userId);
        }
    }

    // 接收普通消息
    @OnMessage
    public void onMessage(String message, Session session, @PathParam("groupId") String groupId, @PathParam("userId") String userId) {
        WebSocketGroup group = groupManager.getGroup(groupId);
        if (group != null) {
            group.sendMessageToAllUsers(message);
        }
    }

    // 接收心跳消息
    @OnMessage
    public void onPong(PongMessage pong, Session session, @PathParam("groupId") String groupId, @PathParam("userId") String userId) {
        executor.schedule(() -> {
            try {
                // 发送空的Ping消息
                session.getAsyncRemote().sendPing(ByteBuffer.wrap(new byte[0]));
            } catch (IOException e) {
                // 处理发送失败的情况
                log.error("Ping 用户:{} 心跳异常,关闭会话,错误原因:{}", userId, e.getMessage());
                closeSession(session, groupId, userId);
            }
        }, 30, TimeUnit.SECONDS);
    }

    @OnClose
    public void onClose(@PathParam("groupId") String groupId, @PathParam("userId") String userId, Session session) {
        try {
            WebSocketGroup group = groupManager.getGroup(groupId);
            if (group != null) {
                group.removeUser(userId);
                // 检查分组的用户数量,如果为0,则从分组管理器中删除分组对象
                if (group.getUserCount() == 0) {
                    groupManager.removeGroup(groupId);
                }
            }
            // 更新在线用户计数器
            int count = groupManager.decreaseOnlineCount();

            log.info("用户:{} 退出,当前在线人数:{},分组:{},分组在线人数:{}", userId, count, groupId, group != null ? group.getUserCount() : 0);
        } catch (Exception e) {
            log.error("连接关闭时异常!用户:{},分组:{},错误原因:{}", userId, groupId, e.getMessage());
            closeSession(session, groupId, userId);
        }
    }

    @OnError
    public void onError(Throwable throwable, @PathParam("groupId") String groupId, @PathParam("userId") String userId, Session session) {
        // 向客户端发送错误消息
        session.getAsyncRemote().sendText("发生了错误,请稍后再试。");
        log.error("连接异常!用户:{},分组:{},错误原因:{}", userId, groupId, throwable.getMessage());
        closeSession(session, groupId, userId);
    }

    /**
     * 关闭Session
     *
     * @param session
     */
    private void closeSession(Session session, String groupId, String userId) {
        // 关闭连接
        if (session != null && session.isOpen()) {
            //关闭后删除掉对应用户信息
            WebSocketGroup group = groupManager.getGroup(groupId);
            if (group != null) {
                group.removeUser(userId);
                // 检查分组的用户数量,如果为0,则从分组管理器中删除分组对象
                if (group.getUserCount() == 0) {
                    groupManager.removeGroup(groupId);
                }
            }
            // 更新在线用户计数器
            int count = groupManager.decreaseOnlineCount();
            try {
                session.close();
            } catch (IOException e) {
                log.error("关闭session会话时异常:{}", e.getMessage());
            }
        }
    }

    /**
     * 获取全部在线用户数量统计
     *
     * @return
     */
    public static int getOnlineCount() {
        return groupManager.getOnlineCount();
    }

    /**
     * 向所有分组的子目录下发命令。
     *
     * @param message
     * @warn 由服务器统一下发,若使用多线程,存在线程安全问题。
     */
    public static void sendMessageToAllGroups(String message) {
        groupManager.sendMessageToAllGroups(message);
    }

    // 校验token的方法
    private boolean isValidToken(String token) {
        // 根据自己的需求,进行校验逻辑,返回校验结果
        return true;
    }

    // 解码查询参数
    private Map<String, List<String>> decodeQueryString(String queryString) {
        // 根据自己的需求实现解码逻辑
        //这里做简单的解析参数。
        Map<String, List<String>> queryParams = new HashMap<>();
        String[] pairs = queryString.split("&");
        for (String pair : pairs) {
            String[] parts = pair.split("=");
            String name = parts[0];
            String value = "";
            if (parts.length > 1) {
                value = parts[1];
            }
            queryParams.computeIfAbsent(name, k -> new ArrayList<>()).add(value);
        }
        return queryParams;
    }
}

四、创建WebSocket分组以及分组管理器

 分组管理器

@Slf4j
@Component
public class WebSocketGroupManager {
    private final Map<String, WebSocketGroup> groups;

    private AtomicInteger onlineCount;

    public WebSocketGroupManager() {
        this.groups = new ConcurrentHashMap<>();
        this.onlineCount = new AtomicInteger(0);
    }

    public void addGroup(WebSocketGroup group) {
        groups.put(group.getGroupId(), group);
    }

    public void removeGroup(String groupId) {
        groups.remove(groupId);
    }

    public WebSocketGroup getGroup(String groupId) {
        return groups.get(groupId);
    }

    public WebSocketGroup getOrCreateGroup(String groupId) {
        WebSocketGroup group = groups.get(groupId);
        if (group == null) {
            group = new WebSocketGroup(groupId);
            groups.put(groupId, group);
        }
        return group;
    }

    public void sendMessageToAllGroups(String message) {
        for (WebSocketGroup group : groups.values()) {
            group.sendMessageToAllUsers(message);
        }
    }

    public int getGroupUserCount(String groupId) {
        WebSocketGroup group = groups.get(groupId);
        if (group != null) {
            return group.getUserCount();
        }
        return 0;
    }
    public int getOnlineCount() {
        return onlineCount.get();
    }

    public int increaseOnlineCount() {
        return onlineCount.incrementAndGet();
    }

    public int decreaseOnlineCount() {
        return onlineCount.decrementAndGet();
    }

}

 分组

@Slf4j
public class WebSocketGroup {
    private String groupId;
    private Map<String, WebSocketUser> users;
    private int userCount;

    public WebSocketGroup(String groupId) {
        this.groupId = groupId;
        this.users =new HashMap<>();
        this.userCount = 0;
    }

    public void addUser(WebSocketUser user) {
        users.put(user.getUserId(), user);

        // 更新在线用户计数器
        userCount++;
    }

    public WebSocketUser getUser(String userId) {
        return users.get(userId);
    }

    public void removeUser(String userId) {
        if (users.containsKey(userId)) {
            WebSocketUser removedUser = users.remove(userId);

            // 更新在线用户计数器
            if (removedUser != null) {
                userCount--;
            }
        } else {
            // 用户不存在

        }
    }

    public int getUserCount() {
        return userCount;
    }

    /**
     * 向当前分组下所有用户发送信息
     * @param message
     */
    public void sendMessageToAllUsers(String message) {
        for (WebSocketUser user : users.values()) {
            user.sendMessage(message);
        }
    }

    public String getGroupId() {
        return groupId;
    }
}

 用户文章来源地址https://www.toymoban.com/news/detail-670006.html

@Slf4j
public class WebSocketUser {
    private Session session;
    private String userId;

    private String token;

    public WebSocketUser(Session session, String userId,String token) {
        this.session = session;
        this.userId = userId;
        this.token=token;
    }
    public WebSocketUser(Session session, String userId) {
        this.session = session;
        this.userId = userId;
    }

    public Session getSession() {
        return session;
    }

    public String getUserId() {
        return userId;
    }

    public void sendMessage(String message) {
        try {
            session.getAsyncRemote().sendText(message);
        } catch (Exception e) {
            log.error("发送消息异常!用户:{},错误原因:{}", userId, e.getMessage());
        }
    }
}

到了这里,关于SpringBoot+WebSocket 消息推送 校验 心跳机制 PING-PONG 用户分组等的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot+Netty+Websocket实现消息推送

    这样一个需求:把设备异常的状态每10秒推送到页面并且以弹窗弹出来,这个时候用Websocket最为合适,今天主要是后端代码展示。 添加依赖 定义netty端口号 netty服务器 Netty配置 管理全局Channel以及用户对应的channel(推送消息) 管道配置 自定义CustomChannelHandler 推送消息接口及

    2024年02月04日
    浏览(34)
  • SpringBoot整合Netty+Websocket实现消息推送

           Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。以下是Netty的主要优势: 高性能 :Netty基于NIO(非阻塞IO)模型,采用事件驱动的设计,具有高性能的特点。它通过零拷贝技术、内存池化技术等手段,进一步提高

    2024年01月20日
    浏览(30)
  • Springboot整合WebSocket实现主动向前端推送消息

            在上篇文章tcp编程中,我们实现了C++客户端与java服务器之间的通信,客户端发送了一个消息给服务器,今天我们要实现基于WebSocket实现服务器主动向前端推送消息,并且以服务器接收到C++客户端的消息主动向前端推送消息的触发条件。 WebSocket 的诞生背景       

    2024年03月16日
    浏览(31)
  • Springboot集成websocket实现消息推送和在线用户统计

    在启动类上添加一个bean 核心代码 实现消息推送只要在业务代码中调用sendMessageSpecial()方法即可。 然后调用刚才的业务接口测试:http://localhost:8080/websocket/t1 调用成功后可以看到三个窗口中都收到了消息

    2023年04月08日
    浏览(33)
  • SpringBoot集成WebSocket实现消息实时推送(提供Gitee源码)

    前言:在最近的工作当中,客户反应需要实时接收消息提醒,这个功能虽然不大,但不过也用到了一些新的技术,于是我这边写一个关于我如何实现这个功能、编写、测试到部署服务器,归纳到这篇博客中进行总结。 目录 一、什么是WebSocket 二、后端实现 2.1、引入pom.xml依赖

    2024年02月11日
    浏览(33)
  • Java:SpringBoot整合WebSocket实现服务端向客户端推送消息

    思路: 后端通过websocket向前端推送消息,前端统一使用http协议接口向后端发送数据 本文仅放一部分重要的代码,完整代码可参看github仓库 websocket 前端测试 :http://www.easyswoole.com/wstool.html 依赖 项目目录 完整依赖 配置 WebSocketServer.java 前端页面 websocket.html 前端逻辑 index.js 参

    2024年02月04日
    浏览(35)
  • Spring Boot进阶(48):【实战教程】SpringBoot集成WebSocket轻松实现实时消息推送

            WebSocket是一种新型的通信协议,它可以在客户端与服务器端之间实现双向通信,具有低延迟、高效性等特点,适用于实时通信场景。在SpringBoot应用中,集成WebSocket可以方便地实现实时通信功能,如即时聊天、实时数据传输等。         本文将介绍如何在Sprin

    2024年02月09日
    浏览(37)
  • websocket 心跳机制

    WebSocket 是一种在客户端和服务器之间创建持久连接的技术。为了保持连接的稳定性,就需要通过发送心跳消息来维持 WebSocket 连接。 1、创建一个webscoket基本的使用 2、在客户端连接到 WebSocket 服务器之后,通过 setInterval 方法定时发送心跳消息 这边的代码会每隔5秒向服务器发

    2024年02月11日
    浏览(24)
  • WebSocket心跳机制(笔记大全)

    一、WebSocket心跳机制前端 前端实现WebSocket心跳机制的方式主要有两种: 使用setInterval定时发送心跳包。 在前端监听到WebSocket的onclose()事件时,重新创建WebSocket连接。 第一种方式会对服务器造成很大的压力,因为即使WebSocket连接正常,也要定时发送心跳包,从而消耗服务器资

    2024年02月15日
    浏览(29)
  • WebSocket心跳机制

    WebSocket是HTML5开始提供的一种浏览器与服务器进行全双工通讯的网络技术,属于应用层协议。 WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。 1、创建webSocket 2、websocket事件 事件 事件处理程序 描述 open Socket.onopen 连接建立时触发

    2024年02月15日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包