Spring Boot 集成 WebSocket(原生注解与Spring封装)

这篇具有很好参考价值的文章主要介绍了Spring Boot 集成 WebSocket(原生注解与Spring封装)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spring Boot 集成 WebSocket

本章节将介绍 Spring Boot 集成 WebSocket 的两种主要方式:原生注解与Spring封装。

在线WebSocket测试工具

🤖 Spring Boot 2.x 实践案例(代码仓库)

原生注解

引入依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

配置文件

@Configuration
public class WebSocketConfiguration {
    /**
     * 	注入ServerEndpointExporter,
     * 	这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

处理消息

@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")
public class WebSocket {

    /**
     * 线程安全的无序的集合
     */
    private static final CopyOnWriteArraySet<Session> SESSIONS = new CopyOnWriteArraySet<>();

    /**
     * 存储在线连接数
     */
    private static final Map<String, Session> SESSION_POOL = new HashMap<>();

    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") String userId) {
        try {
            SESSIONS.add(session);
            SESSION_POOL.put(userId, session);
            log.info("【WebSocket消息】有新的连接,总数为:" + SESSIONS.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @OnClose
    public void onClose(Session session) {
        try {
            SESSIONS.remove(session);
            log.info("【WebSocket消息】连接断开,总数为:" + SESSIONS.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @OnMessage
    public void onMessage(String message) {
        log.info("【WebSocket消息】收到客户端消息:" + message);
    }

    /**
     * 此为广播消息
     *
     * @param message 消息
     */
    public void sendAllMessage(String message) {
        log.info("【WebSocket消息】广播消息:" + message);
        for (Session session : SESSIONS) {
            try {
                if (session.isOpen()) {
                    session.getAsyncRemote().sendText(message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 此为单点消息
     *
     * @param userId  用户编号
     * @param message 消息
     */
    public void sendOneMessage(String userId, String message) {
        Session session = SESSION_POOL.get(userId);
        if (session != null && session.isOpen()) {
            try {
                synchronized (session) {
                    log.info("【WebSocket消息】单点消息:" + message);
                    session.getAsyncRemote().sendText(message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 此为单点消息(多人)
     *
     * @param userIds 用户编号列表
     * @param message 消息
     */
    public void sendMoreMessage(String[] userIds, String message) {
        for (String userId : userIds) {
            Session session = SESSION_POOL.get(userId);
            if (session != null && session.isOpen()) {
                try {
                    log.info("【WebSocket消息】单点消息:" + message);
                    session.getAsyncRemote().sendText(message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  • @ServerEndpoint:将目前的类定义成一个websocket服务器端,注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
  • @OnOpen:当WebSocket建立连接成功后会触发这个注解修饰的方法。
  • @OnClose:当WebSocket建立的连接断开后会触发这个注解修饰的方法。
  • @OnMessage:当客户端发送消息到服务端时,会触发这个注解修改的方法。
  • @OnError:当WebSocket建立连接时出现异常会触发这个注解修饰的方法。

Spring封装

引入依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

自定义处理器

处理器作用类似于@RequestMapping注解,用于处理某一个路径的WebSocket连接,自定义处理器需要实现WebSocketHandler接口。

WebSocket操作类
public interface WebSocket {
    /**
     * 会话开始回调
     *
     * @param session 会话
     */
    void handleOpen(WebSocketSession session);

    /**
     * 会话结束回调
     *
     * @param session 会话
     */
    void handleClose(WebSocketSession session);

    /**
     * 处理消息
     *
     * @param session 会话
     * @param message 接收的消息
     */
    void handleMessage(WebSocketSession session, String message);

    /**
     * 发送消息
     *
     * @param session 当前会话
     * @param message 要发送的消息
     * @throws IOException 发送io异常
     */
    void sendMessage(WebSocketSession session, String message) throws IOException;

    /**
     * 发送消息
     *
     * @param userId  用户id
     * @param message 要发送的消息
     * @throws IOException 发送io异常
     */
    void sendMessage(String userId, TextMessage message) throws IOException;

    /**
     * 发送消息
     *
     * @param userId  用户id
     * @param message 要发送的消息
     * @throws IOException 发送io异常
     */
    void sendMessage(String userId, String message) throws IOException;

    /**
     * 发送消息
     *
     * @param session 当前会话
     * @param message 要发送的消息
     * @throws IOException 发送io异常
     */
    void sendMessage(WebSocketSession session, TextMessage message) throws IOException;

    /**
     * 广播
     *
     * @param message 字符串消息
     * @throws IOException 异常
     */
    void broadCast(String message) throws IOException;

    /**
     * 广播
     *
     * @param message 文本消息
     * @throws IOException 异常
     */
    void broadCast(TextMessage message) throws IOException;

    /**
     * 处理会话异常
     *
     * @param session 会话
     * @param error   异常
     */
    void handleError(WebSocketSession session, Throwable error);

    /**
     * 获得所有的 websocket 会话
     *
     * @return 所有 websocket 会话
     */
    Set<WebSocketSession> getSessions();

    /**
     * 得到当前连接数
     *
     * @return 连接数
     */
    int getConnectionCount();
}
WebSocket操作实现类
@Slf4j
public class WebSocketImpl implements WebSocket {
    /**
     * 在线连接数(线程安全)
     */
    private final AtomicInteger connectionCount = new AtomicInteger(0);

    /**
     * 线程安全的无序集合(存储会话)
     */
    private final CopyOnWriteArraySet<WebSocketSession> sessions = new CopyOnWriteArraySet<>();

    @Override
    public void handleOpen(WebSocketSession session) {
        sessions.add(session);
        int count = connectionCount.incrementAndGet();
        log.info("a new connection opened,current online count:{}", count);
    }

    @Override
    public void handleClose(WebSocketSession session) {
        sessions.remove(session);
        int count = connectionCount.decrementAndGet();
        log.info("a new connection closed,current online count:{}", count);
    }

    @Override
    public void handleMessage(WebSocketSession session, String message) {
        // 只处理前端传来的文本消息,并且直接丢弃了客户端传来的消息
        log.info("received a message:{}", message);
    }

    @Override
    public void sendMessage(WebSocketSession session, String message) throws IOException {
        this.sendMessage(session, new TextMessage(message));
    }

    @Override
    public void sendMessage(String userId, TextMessage message) throws IOException {
        Optional<WebSocketSession> userSession = sessions.stream().filter(session -> {
            if (!session.isOpen()) {
                return false;
            }
            Map<String, Object> attributes = session.getAttributes();
            if (!attributes.containsKey("uid") {
                return false;
            }
            String uid = (String) attributes.get("uid");
            return uid.equals(userId);
        }).findFirst();
        if (userSession.isPresent()) {
            userSession.get().sendMessage(message);
        }
    }

    @Override
    public void sendMessage(String userId, String message) throws IOException {
        this.sendMessage(userId, new TextMessage(message));
    }

    @Override
    public void sendMessage(WebSocketSession session, TextMessage message) throws IOException {
        session.sendMessage(message);
    }

    @Override
    public void broadCast(String message) throws IOException {
        for (WebSocketSession session : sessions) {
            if (!session.isOpen()) {
                continue;
            }
            this.sendMessage(session, message);
        }
    }

    @Override
    public void broadCast(TextMessage message) throws IOException {
        for (WebSocketSession session : sessions) {
            if (!session.isOpen()) {
                continue;
            }
            session.sendMessage(message);
        }
    }

    @Override
    public void handleError(WebSocketSession session, Throwable error) {
        log.error("websocket error:{},session id:{}", error.getMessage(), session.getId());
        log.error("", error);
    }

    @Override
    public Set<WebSocketSession> getSessions() {
        return sessions;
    }

    @Override
    public int getConnectionCount() {
        return connectionCount.get();
    }
}
自定义WebSocket处理器
public class DefaultWebSocketHandler implements WebSocketHandler {

    @Autowired
    private WebSocket webSocket;

    /**
     * 建立连接
     *
     * @param session Session
     */
    @Override
    public void afterConnectionEstablished(@NonNull WebSocketSession session) {
        webSocket.handleOpen(session);
    }

    /**
     * 接收消息
     *
     * @param session Session
     * @param message 消息
     */
    @Override
    public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            webSocket.handleMessage(session, textMessage.getPayload());
        }
    }

    /**
     * 发生错误
     *
     * @param session   Session
     * @param exception 异常
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) {
        webSocket.handleError(session, exception);
    }

    /**
     * 关闭连接
     *
     * @param session     Session
     * @param closeStatus 关闭状态
     */
    @Override
    public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) {
        webSocket.handleClose(session);
    }

    /**
     * 是否支持发送部分消息
     *
     * @return false
     */
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
}
自定义拦截器

自定义处理器需要实现HandshakeInterceptor接口

public class WebSocketInterceptor implements HandshakeInterceptor {

    @Override
    public boolean beforeHandshake(@NonNull ServerHttpRequest request, @NonNull ServerHttpResponse response, @NonNull WebSocketHandler wsHandler, @NonNull Map<String, Object> attributes) throws Exception {
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request;
            // 模拟用户(通常利用JWT令牌解析用户信息)
            String userId = servletServerHttpRequest.getServletRequest().getParameter("uid");
            // TODO 判断用户是否存在
            attributes.put("uid", userId);
            return true;
        }
        return false;
    }

    @Override
    public void afterHandshake(@NonNull ServerHttpRequest request, @NonNull ServerHttpResponse response, @NonNull WebSocketHandler wsHandler, Exception exception) {

    }
}

WebSocket 无法使用 header 传递参数,因此这里使用 url params 携带参数。

WebSocket配置项

将自定义处理器、拦截器以及WebSocket操作类依次注入到IOC容器中。

@Configuration
@EnableWebSocket
public class WebSocketConfiguration implements WebSocketConfigurer {

    @Bean
    public DefaultWebSocketHandler defaultWebSocketHandler() {
        return new DefaultWebSocketHandler();
    }

    @Bean
    public WebSocket webSocket() {
        return new WebSocketImpl();
    }

    @Bean
    public WebSocketInterceptor webSocketInterceptor() {
        return new WebSocketInterceptor();
    }

    @Override
    public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
        registry.addHandler(defaultWebSocketHandler(), "ws/message")
                    .addInterceptors(webSocketInterceptor())
                    .setAllowedOrigins("*");
    }
}
  • @EnableWebSocket:开启WebSocket功能
  • addHandler:添加处理器
  • addInterceptors:添加拦截器
  • setAllowedOrigins:设置允许跨域(允许所有请求来源)

WebSocket测试

原生注解

springboot集成websocket,后端笔记,websocket,spring,spring boot
springboot集成websocket,后端笔记,websocket,spring,spring boot

Spring封装

springboot集成websocket,后端笔记,websocket,spring,spring boot
springboot集成websocket,后端笔记,websocket,spring,spring boot文章来源地址https://www.toymoban.com/news/detail-781073.html

到了这里,关于Spring Boot 集成 WebSocket(原生注解与Spring封装)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【SpringBoot笔记37】SpringBoot基于@ServerEndpoint、@OnMessage等注解的方式集成WebSocket

    这篇文章,主要介绍SpringBoot基于@ServerEndpoint、@OnMessage等注解的方式集成WebSocket。 目录 一、基于注解集成WebSocket 1.1、WebSocket常见注解 1.2、创建WebSocket服务端

    2024年02月14日
    浏览(25)
  • Spring Boot集成WebSocket实现消息推送

    项目中经常会用到消息推送功能,关于推送技术的实现,我们通常会联想到轮询、comet长连接技术,虽然这些技术能够实现,但是需要反复连接,对于服务资源消耗过大,随着技术的发展,HtML5定义了WebSocket协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。

    2023年04月08日
    浏览(34)
  • Spring Boot 集成 Redisson分布式锁(注解版)

            Redisson 是一种基于 Redis 的 Java 驻留集群的分布式对象和服务库,可以为我们提供丰富的分布式锁和线程安全集合的实现。在 Spring Boot 应用程序中使用 Redisson 可以方便地实现分布式应用程序的某些方面,例如分布式锁、分布式集合、分布式事件发布和订阅等。本篇

    2024年02月09日
    浏览(34)
  • Spring Boot集成WebSocket Demo,简单明了

    如果是初次搭建Spring Boot+WebSocket项目,不需要太复杂,只需要快速上手,那么你搜到的大部分文章可能都不适合你,我的这篇文章以最精简的方式搭建一个可以运行并通信的Spring Boot+WebSocket的Demo项目,有了根基之后再进行复杂化就不是难事了。 搭建Spring Boot项目都会吧,下面

    2024年02月09日
    浏览(31)
  • Spring Boot 集成 WebSocket 实现服务端推送消息到客户端

          假设有这样一个场景:服务端的资源经常在更新,客户端需要尽量及时地了解到这些更新发生后展示给用户,如果是 HTTP 1.1,通常会开启 ajax 请求询问服务端是否有更新,通过定时器反复轮询服务端响应的资源是否有更新。                         在长时间不更新

    2024年02月16日
    浏览(40)
  • 实现实时互动:用Spring Boot原生WebSocket打造你的专属聊天室

    😊 @ 作者: 一恍过去 💖 @ 主页: https://blog.csdn.net/zhuocailing3390 🎊 @ 社区: Java技术栈交流 🎉 @ 主题: 实现实时互动:用Spring Boot原生WebSocket打造你的专属聊天室 ⏱️ @ 创作时间: 2023年08月04日 WebSocket 实现聊天室的原理包括建立 WebSocket 连接的握手过程、保持连接状态以

    2024年02月10日
    浏览(41)
  • 【SpringBoot】| Spring Boot 常见的底层注解剖析

    目录 一:Spring Boot 常见的底层注解 1. 容器功能 1.1 组件添加 方法一:使用@Configuration注解+@Bean注解 方法二:使用@Configuration注解+@Import注解  方法三:使用@Configuration注解+@Conditional注解  1.2 原生xml配置文件引入 @ImportResource注解 1.3 配置绑定 方法一:@Component注解 + @Configu

    2024年02月17日
    浏览(33)
  • Spring Boot学习随笔- 集成JSP模板(配置视图解析器)、整合Mybatis(@MapperScan注解的使用)

    学习视频:【编程不良人】2021年SpringBoot最新最全教程 在main创建webapp,然后创建index.jsp进行测试,在访问之前需要进行一个设置,否则springboot是找不到jsp页面的 修改jsp无需重启应用 数据库访问框架:hibernate、jpa、mybatis【主流】 SpringBoot(微框架) = Spring(工厂) + SpringMV

    2024年02月05日
    浏览(35)
  • Spring Boot 集成 WebSocket 实例 | 前端持续打印远程日志文件更新内容(模拟 tail 命令)

    这个是我在 CSDN 的第一百篇原则博文,留念😎 先说下项目结构,后端基于 Spring Boot 3,前端为 node.js 开发的控制台程序。现在希望能够在前端模拟 tail 命令,持续输出后端的日志文件。 这个方案实施较为简单,通过前端不断(定时)发起请求,并携带已读的内容坐标(posi

    2024年03月18日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包