WebFlux中使用WebSocket的拓展功能分析

这篇具有很好参考价值的文章主要介绍了WebFlux中使用WebSocket的拓展功能分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

WebFlux中使用WebSocket的高级功能

摘要:本文将介绍如何在Spring WebFlux中使用WebSocket实现高级功能,包括连接建立和断开时的操作、消息收发和广播等。


继WebFlux使用案例后拓展讲解

在现代的Web应用程序中,实时性和即时通信变得越来越重要。WebSocket是一种在Web应用程序中实现实时双向通信的协议,允许服务器主动向客户端推送消息。在Spring WebFlux中,我们可以使用WebFlux的强大功能和响应式编程模型来实现WebSocket,并且还可以利用其高级功能来满足更复杂的需求。

本文将介绍如何在Spring WebFlux中使用WebSocket的高级功能,包括连接建立和断开时的操作、消息收发和广播等。让我们逐步深入了解这些功能。

1. 连接建立和断开时的操作

在WebSocket连接建立时,我们可以执行一些操作来处理连接的初始化。例如,我们可以进行身份验证、订阅特定主题或加载初始数据。在Spring WebFlux中,我们可以通过实现WebSocketHandler接口,并在handle()方法中重写相应的逻辑来实现这些操作。

以下是一个示例:

public class MyWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // 连接建立时执行的操作
        System.out.println("WebSocket连接建立");

        // 处理接收到的消息...

        return session.send(/* 响应消息给客户端 */)
                .doFinally(signalType -> {
                    // 连接关闭时执行的操作
                    System.out.println("WebSocket连接关闭");
                });
    }
}

在上述示例中,我们在handle()方法中打印一条消息表示连接已建立。在连接关闭时,我们使用doFinally()操作符注册一个回调函数,在连接关闭时执行相应的操作。

您可以根据实际需求扩展这些操作,例如执行身份验证、订阅主题或加载初始数据。

2. 消息收发

在WebSocket连接建立后,客户端和服务器之间可以相互发送消息。在Spring WebFlux中,我们可以使用WebSocketSession对象来处理接收到的消息,并使用send()方法将响应消息发送给客户端。

以下是一个示例:

public class MyWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // 连接建立时执行的操作...

        // 处理接收到的消息
        Flux<WebSocketMessage> messageFlux = session.receive()
                .doOnNext(message -> {
                    // 处理接收到的消息
                    System.out.println("接收到消息:" + message.getPayloadAsText());
                });

        // 发送消息给客户端
        Flux<WebSocketMessage> outputMessageFlux = /* 构造要发送的消息 */

        return session.send(messageFlux.concatWith(outputMessageFlux))
                .doFinally(signalType -> {
                    // 连接关闭时执行的操作...
                });
    }
}

在上述示例中,我们通过session.receive()来接收客户端发送的消息,并使用doOnNext()操作符处理接收到的消息。您可以根据需求执行相应的业务逻辑。

我们还创建了一个Flux来构造要发送给客户端的消息,并使用session.send()将消息发送给客户端。

根据实际需求,您可以进一步扩展消息的处理和发送逻辑。

3. 广播消息

在某些场景下,需要将消息广播给多个连接或订阅者。例如,在聊天室或实时通知应用中,您可能希望将消息发送给所有在线用户。在Spring WebFlux中,我们可以使用外部容器(如MapSet)来维护连接或订阅者列表,并在接收到消息时遍历列表,将消息发送给每个连接或订阅者。

以下是一个示例:

public class MyWebSocketHandler implements WebSocketHandler {

    private static Set<WebSocketSession> sessions = Collections.synchronizedSet(new HashSet<>());

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // 连接建立时执行的操作...
        sessions.add(session);

        // 处理接收到的消息...

        return session.send(/* 响应消息给客户端 */)
                .doFinally(signalType -> {
                    // 连接关闭时执行的操作...
                    sessions.remove(session);
                });
    }

    public void broadcastMessage(String message) {
        sessions.forEach(session -> {
            // 发送消息给每个连接或订阅者
            session.send(/* 构造要发送的消息 */);
        });
    }
}

在上述示例中,我们使用一个静态的Set来维护所有连接的会话。在连接建立时,我们将会话添加到集合中。在连接关闭时,我们从集合中移除会话。

我们还定义了一个broadcastMessage()方法,用于将消息广播给所有连接。在该方法中,我们遍历所有会话,并使用session.send()将消息发送给每个会话。

您可以根据需求扩展广播逻辑,例如只广播给特定的订阅者或根据条件过滤消息。


WebSocket全生命周期的配置

首先,让我们创建一个WebSocket拦截器,用于进行身份验证和日志记录:

public class WebSocketInterceptor implements WebSocketHandlerInterceptor {

    @Override
    public boolean beforeHandshake(ServerWebExchange exchange, WebSocketHandler handler, Map<String, Object> attributes) {
        // 在握手之前执行的操作,例如身份验证
        // 如果验证失败,可以通过返回false来拒绝连接
        String token = exchange.getRequest().getHeaders().getFirst("Authorization");
        if (isValidToken(token)) {
            attributes.put("userId", extractUserIdFromToken(token));
            return true;
        }
        return false;
    }

    @Override
    public void afterHandshake(ServerWebExchange exchange, WebSocketHandler handler, Exception exception) {
        // 在握手之后执行的操作,例如记录日志
        String userId = (String) exchange.getAttributes().get("userId");
        log.info("WebSocket连接建立,用户ID: {}", userId);
    }

    private boolean isValidToken(String token) {
        // 验证令牌的逻辑...
    }

    private String extractUserIdFromToken(String token) {
        // 从令牌中提取用户ID的逻辑...
    }
}

在上述示例中,我们实现了WebSocketHandlerInterceptor接口,并重写了beforeHandshake()afterHandshake()方法。在beforeHandshake()方法中,我们执行身份验证逻辑,并将验证通过的用户ID存储在attributes映射中。在afterHandshake()方法中,我们记录了连接建立的日志,包含了用户ID信息。

接下来,让我们创建一个自定义的消息编解码器,用于处理自定义的消息格式:

public class CustomMessageCodec implements WebSocketMessageCodec {

    @Override
    public List<WebSocketMessage<?>> decode(DataBuffer buffer, ResolvableType messageType,
                                           @Nullable String mimeType, @Nullable Map<String, Object> hints) {
        // 解码消息的逻辑...
    }

    @Override
    public DataBuffer encode(WebSocketMessage<?> message, DataBufferFactory bufferFactory,
                             ResolvableType messageType, @Nullable String mimeType, @Nullable Map<String, Object> hints) {
        // 编码消息的逻辑...
    }
}

在上述示例中,我们实现了WebSocketMessageCodec接口,并重写了decode()encode()方法。在decode()方法中,我们根据自定义的消息格式解码消息。在encode()方法中,我们根据自定义的消息格式编码消息。您可以根据实际需求自定义消息的格式和编解码逻辑。

最后,让我们创建一个WebSocket处理程序,用于处理连接和广播消息:

public class MyWebSocketHandler implements WebSocketHandler {

    private static Set<WebSocketSession> sessions = Collections.synchronizedSet(new HashSet<>());

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // 连接建立时执行的操作...
        sessions.add(session);

        // 处理接收到的消息
        Flux<WebSocketMessage<?>> messageFlux = session.receive()
                .doOnNext(message -> {
                    // 处理接收到的消息
                    String userId = (String) session.getAttributes().get("userId");
                    log.info("收到来自用户ID为 {} 的消息:{}", userId, message.getPayload());
                });

        // 发送消息给客户端
        Flux<WebSocketMessage<?>> outputMessageFlux = /* 构造要发送的消息 */

        return session.send(messageFlux.concatWith(outputMessageFlux))
                .doFinally(signalType -> {
                    // 连接关闭时执行的操作...
                    sessions.remove(session);
                });
    }

    public void broadcastMessage(String message) {
        sessions.forEach(session -> {
            // 发送消息给每个连接
            session.send(/* 构造要发送的消息 */);
        });
    }
}

在上述示例中,我们维护了一个静态的Set来存储所有连接的会话。在连接建立时,我们将会话添加到集合中。在连接关闭时,我们从集合中移除会话。在handle()方法中,我们处理接收到的消息,并根据需要发送消息给客户端。在broadcastMessage()方法中,我们遍历所有连接的会话,并向每个会话发送广播消息。文章来源地址https://www.toymoban.com/news/detail-796011.html

到了这里,关于WebFlux中使用WebSocket的拓展功能分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • springboot业务功能实战(四)告别轮询,websocket的集成使用

    org.springframework.boot spring-boot-starter-websocket 加入配置类 @Configuration public class WebSocketConfig { /** 注入ServerEndpointExporter, 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } } 加入连接发送消

    2024年04月15日
    浏览(40)
  • web网页端使用webSocket实现语音通话功能(SpringBoot+VUE)

    最近在写一个web项目,需要实现web客户端之间的语音通话,期望能够借助webSocket全双工通信的方式来实现,但是网上没有发现可以正确使用的代码。网上能找到的一个代码使用之后 只能听到“嘀嘀嘀”的杂音 解决方案: 使用Json来传递数据代替原有的二进制输入输出流 技术

    2024年02月02日
    浏览(95)
  • 55、基于 WebFlux 开发 WebSocKet

    两步: (1)实现WebSocketHandler开发WebSocket处理类。 实现该接口时只需要实现Mono handle(WebSocketSession webSocketSession)方法即可。 (2)使用HandlerMapping和WebSocketHandlerAdapter注册WebSocket处理类。 反应式API模型下,WebSocketSession的receive()方法返回的只是Flux(消息发布者), 它并不会同步

    2024年02月08日
    浏览(35)
  • 如何使用SpringBoot和Netty实现一个WebSocket服务器,并配合Vue前端实现聊天功能?

    本文将详细介绍如何使用SpringBoot和Netty实现一个WebSocket服务器,并配合Vue前端实现聊天功能。 WebSocket是一种基于TCP的协议,它允许客户端和服务器之间进行双向通信,而不需要像HTTP那样进行请求和响应。Netty是一个Java网络编程框架,它提供了强大的异步事件驱动网络编程能

    2024年02月16日
    浏览(31)
  • SpringBoot项目整合WebSocket+netty实现前后端双向通信(同时支持前端webSocket和socket协议哦)

    目录   前言 技术栈 功能展示 一、springboot项目添加netty依赖 二、netty服务端 三、netty客户端 四、测试 五、代码仓库地址   专属小彩蛋:前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站(前言 - 床长人工智能教程

    2024年02月12日
    浏览(39)
  • 基于WebFlux的websocket的分组和群发实现

    在WebFlux中实现分组发送数据和群发数据给所有客户端发送,可以借助 Sinks.Many 来管理消息流,并使用 Flux 进行订阅和发送消息。以下是一个示例代码,演示如何实现这两个功能: 在上述代码中,我们使用 Sinks.Many 来管理分组消息流和群发消息流。当有用户加入分组时,我们

    2024年01月16日
    浏览(61)
  • 基于SpringBoot+WebSocket+Spring Task的前后端分离外卖项目-订单管理(十七)

    1.1 介绍 Spring Task 是Spring框架提供的任务调度工具,可以按照约定的时间自动执行某个代码逻辑。 定位 :定时任务框架 作用 :定时自动执行某段Java代码 应用场景: 1). 信用卡每月还款提醒 2). 银行贷款每月还款提醒 3). 火车票售票系统处理未支付订单 4). 入职纪念日为用户发

    2024年02月21日
    浏览(38)
  • Spring Boot进阶(49):SpringBoot之集成WebSocket实现前后端通信 | 超级详细,建议收藏

            在上一期,我对WebSocket进行了基础及理论知识普及学习,WebSocket是一种基于TCP协议实现的全双工通信协议,使用它可以实现实时通信,不必担心HTTP协议的短连接问题。Spring Boot作为一款微服务框架,也提供了轻量级的WebSocket集成支持,本文将介绍如何在Spring Boot项

    2024年02月14日
    浏览(29)
  • Spring Boot进阶(49):实时通信不再是梦想,SpringBoot+WebSocket助你轻松实现前后端即时通讯!

            在上一期,我对WebSocket进行了基础及理论知识普及学习,WebSocket是一种基于TCP协议实现的全双工通信协议,使用它可以实现实时通信,不必担心HTTP协议的短连接问题。Spring Boot作为一款微服务框架,也提供了轻量级的WebSocket集成支持,本文将介绍如何在Spring Boot项

    2024年02月11日
    浏览(41)
  • Vue 中前后端使用WebSocket

    什么是websocket WebSocket 是一种网络通信协议。RFC6455定义了它的通信标准。 WebSocket是HTML5下一种新的协议(websocket协议本质上是一个基于tcp的协议) 它实现了浏览器与服务器全双工通信,能更好的节省服务器资源和带宽并达到实时通讯的目的 http是一种无状态,无连接,单向的

    2024年01月24日
    浏览(88)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包