SpringBoot2.0集成WebSocket,多客户端

这篇具有很好参考价值的文章主要介绍了SpringBoot2.0集成WebSocket,多客户端。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

适用于单客户端,一个账号登陆一个客户端,登陆多个客户端会报错

The remote endpoint was in state [TEXT_FULL_WRITING] 

这是因为此时的session是不同的,只能锁住一个session,解决此问题的方法把全局静态对象锁住,因为账号是唯一的文章来源地址https://www.toymoban.com/news/detail-689135.html

/**
 * @Description 开启springboot对websocket的支持
 * @Author WangKun
 * @Date 2023/8/14 17:21
 * @Version
 */
@ConditionalOnProperty(name = "spring.profiles.active", havingValue = "dev")
@Configuration
public class WebSocketConfig{

    /**
     * @Description 注入一个ServerEndpointExporter, 会自动注册使用@ServerEndpoint注解
      * @param
     * @Throws
     * @Return org.springframework.web.socket.server.standard.ServerEndpointExporter
     * @Date 2023-08-14 17:26:31
     * @Author WangKun
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
/**
 * @Description websocket服务,不考虑分组
 * @Author WangKun
 * @Date 2023/8/14 17:29
 * @Version
 */
@ConditionalOnClass(value = WebSocketConfig.class)
@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocket {

    private static final long SESSION_TIMEOUT = 60000;

    //存放每个客户端对应的WebSocket对象。
    private static final ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocket>> WEB_SOCKET_MAP = new ConcurrentHashMap<>();

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    private String userId;

    /**
     * @param o
     * @Description 重写防止session重复
     * @Throws
     * @Return boolean
     * @Date 2023-09-01 10:02:51
     * @Author WangKun
     */
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        WebSocket that = (WebSocket) o;
        return Objects.equals(session, that.session);
    }

    @Override
    public int hashCode() {
        return Objects.hash(session);
    }

    /**
     * @param session
     * @param userId
     * @Description 建立连接
     * @Throws
     * @Return void
     * @Date 2023-08-14 17:52:08
     * @Author WangKun
     */
    @SneakyThrows
    @OnOpen
    public void onOpen(final Session session, @PathParam("userId") String userId) {
        this.session = session;
        this.userId = userId;
        session.setMaxIdleTimeout(SESSION_TIMEOUT);
        //先查找是否有uniCode
        CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(userId);
        if (users == null) {
            //处理多个同时连接并发
            synchronized (WEB_SOCKET_MAP) {
                if (!WEB_SOCKET_MAP.contains(userId)) {
                    users = new CopyOnWriteArraySet<>();
                    WEB_SOCKET_MAP.put(userId, users);
                }
            }
        }
        users.add(this);
        sendMessage(String.valueOf(ResponseCode.CONNECT_SUCCESS.getCode()));
        log.info("用户--->{} 连接成功,当前在线人数为--->{}", userId, WEB_SOCKET_MAP.size());
    }

    /**
     * @param message
     * @Description 向客户端发送消息 session.getBasicRemote()与session.getAsyncRemote()的区别
     * @Throws
     * @Return void
     * @Date 2023-08-14 17:51:07
     * @Author WangKun
     */
    @SneakyThrows
    public void sendMessage(String message) {
        // 加锁避免阻塞
        // 如果有多个客户端的话,亦或者同一个用户,或者打开了多个浏览器(同一个用户打开多个客户端或者多个界面),开了多个页面,此时Session是不同的,只能锁住一个session,所以锁住全局静态对象
//        synchronized(session) {
//            this.session.getBasicRemote().sendText(message);
//        }
        synchronized (WEB_SOCKET_MAP) {
            CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(userId);
            if (users != null) {
                for (WebSocket user : users) {
                    // 判断当前客户端的用户是否打开连接
                    if (user.session.isOpen()) {
                        user.session.getBasicRemote().sendText(message);
                        log.info("向客户端发送数据--->{} 数据为--->{}", userId, message);
                    }
                }
            }
        }
    }

    /**
     * @param
     * @Description 关闭连接
     * @Throws
     * @Return void
     * @Date 2023-08-14 17:52:30
     * @Author WangKun
     */
    @SneakyThrows
    @OnClose
    public void onClose(Session session) {
        // 避免多人同时在线直接关闭通道。
        CopyOnWriteArraySet<WebSocket> copyOnWriteArraySet = WEB_SOCKET_MAP.get(this.userId);
        if (!copyOnWriteArraySet.isEmpty()) {
            Object[] objects = copyOnWriteArraySet.toArray();
            for (Object object : objects) {
                if (((WebSocket) object).session.equals(session)) {
                    //删除当前用户
                    WEB_SOCKET_MAP.get(this.userId).remove((WebSocket) object);
                    // 如果有一个客户端登陆 下线清除用户
                    if (WEB_SOCKET_MAP.get(this.userId).isEmpty()) {
                        WEB_SOCKET_MAP.remove(this.userId);
                    }
                    CloseReason close = new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "关闭客户端,下线!");
                    session.close(close);
                    log.info("用户--->{} 关闭连接!", userId);
                }
            }
        }
    }

    /**
     * @param message
     * @param session
     * @Description 收到客户端消息
     * @Throws
     * @Return void
     * @Date 2023-08-15 10:54:55
     * @Author WangKun
     */
    @SneakyThrows
    @OnMessage
    public void onMessage(String message, Session session) {
        //枷锁避免多个资源互抢
        //这一块可以操作数据,比如存到数据

        // 同一个用户,多个地方登录(多个session),循环发送消息,
        // 如果有多个客户端的话,亦或者同一个用户,或者打开了多个浏览器,开了多个页面,此时Session是不同的,只能锁住一个session,所以锁住全局静态对象
        synchronized (WEB_SOCKET_MAP) {
            CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(userId);
            if (users != null) {
                for (WebSocket user : users) {
                    if (user.session.isOpen()) {
                        user.session.getBasicRemote().sendText("pong");
                        log.info("收到客户端发送的心跳的数据--->{} 数据为--->{}", userId, message);
                    }
                }
            }
        }
    }

    /**
     * @param session
     * @param error
     * @Description 发生错误时
     * @Throws
     * @Return void
     * @Date 2023-08-15 10:55:27
     * @Author WangKun
     */
    @SneakyThrows
    @OnError
    public void onError(Session session, Throwable error) {
        CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(userId);
        if (!users.isEmpty()) {
            Object[] objects = users.toArray();
            for (Object object : objects) {
                if (((WebSocket) object).session.equals(session)) {
                    //删除当前用户
                    WEB_SOCKET_MAP.get(this.userId).remove((WebSocket) object);
                    // 如果有一个客户端登陆 下线清除用户
                    if (WEB_SOCKET_MAP.get(this.userId).isEmpty()) {
                        WEB_SOCKET_MAP.remove(this.userId);
                    }
                    CloseReason close = new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "异常,下线!");
                    session.close(close);
                    log.error("用户--->{} 错误!" + userId, "原因--->{}" + error.getMessage(), error);
                }
            }
        }
//        WEB_SOCKET_MAP.remove(userId);
//        log.error("用户--->{} 错误!" + userId, "原因--->{}" + error.getMessage(), error);
    }


    /**
     * @param userId
     * @param message
     * @Description 通过userId向客户端发送消息(指定用户发送)
     * @Throws
     * @Return void
     * @Date 2023-08-14 18:01:35
     * @Author WangKun
     */
    public static void sendTextMessageByUserId(String userId, String message) {
        CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(userId);
        if (users != null) {
            for (WebSocket user : users) {
                user.sendMessage(message);
                log.info("服务端发送消息到用户{},消息:{}", userId, message);
            }
        }
    }

    /**
     * @param message
     * @Description 群发自定义消息
     * @Throws
     * @Return void
     * @Date 2023-08-14 18:03:38
     * @Author WangKun
     */
    public static void sendTextMessage(String message) {
        // 如果在线一个就广播
        if (!WEB_SOCKET_MAP.isEmpty()) {
            for (String item : WEB_SOCKET_MAP.keySet()) {
                CopyOnWriteArraySet<WebSocket> users = WEB_SOCKET_MAP.get(item);
                if (users != null) {
                    for (WebSocket user : users) {
                        user.sendMessage(message);
                        log.info("服务端发送消息到用户{},消息:{}", item, message);
                    }
                }
            }
        }
    }
}

到了这里,关于SpringBoot2.0集成WebSocket,多客户端的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 快速搭建springboot websocket客户端

    WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。 HTML5 定义的 WebSocket 协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。 HTML5 定义的 WebSocket 协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。 浏览器通过 JavaSc

    2024年02月06日
    浏览(57)
  • Spring Boot 集成 WebSocket 实现服务端推送消息到客户端

          假设有这样一个场景:服务端的资源经常在更新,客户端需要尽量及时地了解到这些更新发生后展示给用户,如果是 HTTP 1.1,通常会开启 ajax 请求询问服务端是否有更新,通过定时器反复轮询服务端响应的资源是否有更新。                         在长时间不更新

    2024年02月16日
    浏览(53)
  • SpringBoot+WebSocket实现服务端、客户端

    小编最近一直在使用springboot框架开发项目,毕竟现在很多公司都在采用此框架,之后小编也会陆续写关于springboot开发常用功能的文章。 什么场景下会要使用到websocket的呢? websocket主要功能就是实现网络通讯,比如说最经典的客服聊天窗口、您有新的消息通知,或者是项目与

    2024年02月13日
    浏览(47)
  • kafka:java集成 kafka(springboot集成、客户端集成)

    摘要 对于java的kafka集成,一般选用springboot集成kafka,但可能由于对接方kafka老旧、kafka不安全等问题导致kafak版本与spring版本不兼容,这个时候就得自己根据kafka客户端api集成了。 一、springboot集成kafka 具体官方文档地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    浏览(57)
  • 二、springboot集成CAS客户端实现单点登录

    pom中引入依赖 yml中添加cas配置 读取CAS相关配置 cas配置类 单点登录接口demo 访问loingCas接口时,若未在CASserver登录,则会被拦截跳转到CAS的登陆页面,登陆成功后放行继续访问loginCas接口

    2024年02月15日
    浏览(50)
  • SpringBoot集成Elasticsearch客户端(新旧版本)(2023-01-28)

    第一章 SpringBoot集成ElasticSearch(2023-01-28) 例如:业务中需要使用es,所以做一些客户端选型,熟悉一下基本的操作,所以记录这篇博客,有关概念理论性的文章还在整理过程中,后续会整理个系列 Spring认证中国教育管理中心-Spring Data Elasticsearch教程一 SpringData集成Elasticsearch Sp

    2024年02月07日
    浏览(72)
  • Java:SpringBoot整合WebSocket实现服务端向客户端推送消息

    思路: 后端通过websocket向前端推送消息,前端统一使用http协议接口向后端发送数据 本文仅放一部分重要的代码,完整代码可参看github仓库 websocket 前端测试 :http://www.easyswoole.com/wstool.html 依赖 项目目录 完整依赖 配置 WebSocketServer.java 前端页面 websocket.html 前端逻辑 index.js 参

    2024年02月04日
    浏览(51)
  • 【Java】SpringBoot快速整合WebSocket实现客户端服务端相互推送信息

    目录 什么是webSocket? webSocket可以用来做什么? WebSocket操作类 一:测试客户端向服务端推送消息 1.启动SpringBoot项目 2.打开网站 3.进行测试消息推送 4.后端进行查看测试结果 二:测试服务端向客户端推送消息 1.接口代码 2.使用postman进行调用 3.查看测试结果         WebSocke

    2024年01月20日
    浏览(57)
  • springboot集成webstock实战:服务端数据推送数据到客户端实现实时刷新

        之前介绍过springboot集成webstock方式,具体参考: springboot集成websocket实战:站内消息实时推送 这里补充另外一个使用webstock的场景,方便其他同学理解和使用,废话不多说了,直接开始!简单介绍一下业务场景:     现在有一个投票活动,活动详情中会显示投票活动的参与人数、访

    2024年02月08日
    浏览(96)
  • SpringBoot集成Milo库实现OPC UA客户端:连接、遍历节点、读取、写入、订阅与批量订阅

    前面我们搭建了一个本地的 PLC 仿真环境,并通过 KEPServerEX6 读取 PLC 上的数据,最后还使用 UAExpert 作为OPC客户端完成从 KEPServerEX6 这个OPC服务器的数据读取与订阅功能。在这篇文章中,我们将通过 SpringBoot 集成 Milo 库实现一个 OPC UA 客户端,包括连接、遍历节点、读取、写入

    2024年02月09日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包