基于WebFlux的websocket的分组和群发实现

这篇具有很好参考价值的文章主要介绍了基于WebFlux的websocket的分组和群发实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一,分组发送

在WebFlux中实现分组发送数据和群发数据给所有客户端发送,可以借助Sinks.Many来管理消息流,并使用Flux进行订阅和发送消息。以下是一个示例代码,演示如何实现这两个功能:

import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;

@Component
public class ChatWebSocketHandler implements WebSocketHandler {

    private final Map<String, Many<ChatMessage>> groupChatMessages = new HashMap<>();
    private final Many<ChatMessage> allChatMessages = Sinks.many().multicast().onBackpressureBuffer();

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        String username = session.getId(); // 使用WebSocket连接的唯一标识符作为用户名

        // 处理新用户加入聊天室
        Mono<Void> join = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .doOnNext(group -> joinGroup(username, group))
                .then();

        // 处理接收到的消息
        Mono<Void> chat = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .map(message -> {
                    ChatMessage chatMessage = new ChatMessage();
                    chatMessage.setSender(username);
                    chatMessage.setMessage(message);
                    chatMessage.setTimestamp(LocalDateTime.now());
                    return chatMessage;
                })
                .doOnNext(this::sendMessageToGroup)
                .then();

        // 关闭连接时,将用户从所有分组中移除
        Mono<Void> leave = session
                .close()
                .doFinally(signal -> leaveAllGroups(username));

        return Mono.zip(join, chat, leave).then();
    }

    private void joinGroup(String username, String group) {
        Many<ChatMessage> groupMessages = groupChatMessages.getOrDefault(group, Sinks.many().multicast().onBackpressureBuffer());
        groupChatMessages.put(group, groupMessages);
        groupMessages.asFlux().subscribe(); // 强制激活订阅以开始接收消息
    }

    private void sendMessageToGroup(ChatMessage chatMessage) {
        String group = chatMessage.getGroup();
        if (group != null && groupChatMessages.containsKey(group)) {
            Many<ChatMessage> groupMessages = groupChatMessages.get(group);
            groupMessages.tryEmitNext(chatMessage);
        }
        allChatMessages.tryEmitNext(chatMessage); // 群发给所有客户端
    }

    private void leaveAllGroups(String username) {
        groupChatMessages.values().forEach(groupMessages -> groupMessages.tryEmitNext(new ChatMessage(username, "left the chat room.", LocalDateTime.now())));
    }
}

在上述代码中,我们使用Sinks.Many来管理分组消息流和群发消息流。当有用户加入分组时,我们使用joinGroup()方法创建一个新的Many实例,并将其存储在groupChatMessages中。当收到消息时,我们通过sendMessageToGroup()方法选择性地将消息发送给特定分组的所有成员,并使用tryEmitNext()方法来发送消息到对应的消息流。同时,我们还维护了一个allChatMessages的消息流,用于群发给所有客户端。

通过使用groupChatMessagesallChatMessages管理消息流,你可以实现分组发送数据和群发数据给所有客户端发送的功能。

二, 广播发送

import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.UnicastProcessor;

@Component
public class BroadcastWebSocketHandler implements WebSocketHandler {

    private final UnicastProcessor<String> messagePublisher;
    private final Flux<String> outputMessages;

    public BroadcastWebSocketHandler() {
        this.messagePublisher = UnicastProcessor.create();
        this.outputMessages = messagePublisher.replay(25).autoConnect();
    }

    public void broadcastMessage(String message) {
        messagePublisher.onNext(message);
    }

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Flux<WebSocketMessage> input = session.receive();
        Flux<WebSocketMessage> output = outputMessages.map(session::textMessage);

        return session.send(output).and(input.then());
    }
}

上述代码中的BroadcastWebSocketHandler类实现了广播消息给所有连接的WebSocket客户端的功能。让我们逐行解释代码的实现:

  • messagePublisher是一个UnicastProcessor,用于发布新的消息。
  • outputMessages是一个Flux,用于保存最新的25条消息,并在每个新的WebSocket连接时自动发送这些消息。
  • broadcastMessage()方法用于向所有连接的WebSocket客户端广播消息。调用该方法时,新的消息将被发布到messagePublisher
  • handle()方法中,我们创建了一个input流来接收从客户端发送的消息。
  • 我们使用outputMessages流来创建一个output流,将最新的消息转换为WebSocketMessage对象。
  • 最后,我们将output流发送到客户端,并在input流完成时结束WebSocket会话。

通过这样的实现,当有新的消息到达时,所有连接的WebSocket客户端都会收到该消息。

三. 向特定用户发送消息:

import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class UserWebSocketHandler implements WebSocketHandler {

    private final Map<String, Sinks.Many<String>> userSinks = new ConcurrentHashMap<>();

    public void sendMessageToUser(String userId, String message) {
        Sinks.Many<String> userSink = userSinks.get(userId);
        if (userSink != null) {
            userSink.tryEmitNext(message);
        }
    }

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        String userId = session.getHandshakeInfo().getUri().getQuery();
        Sinks.Many<String> userSink = Sinks.many().unicast().onBackpressureBuffer();
        userSinks.put(userId, userSink);

        Flux<String> input = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .subscribeOn(Schedulers.boundedElastic())
                .doFinally(signalType -> userSinks.remove(userId));

        Flux<String> output = userSink.asFlux().doOnNext(message ->
                session.send(Mono.just(session.textMessage(message))));

        return session.send(output).and(input.then());
    }
}

上述代码中的UserWebSocketHandler类实现了向特定用户发送消息的功能。让我们逐行解释代码的实现:

  • userSinks是一个ConcurrentHashMap,用于存储特定用户的Sinks.Many实例。
  • sendMessageToUser()方法用于向特定用户发送消息。我们通过用户ID从userSinks中获取相应的Sinks.Many实例,并使用tryEmitNext()方法发送消息。
  • handle()方法中,我们首先从WebSocket会话中获取用户ID,并创建一个新的Sinks.Many实例,并将其放入userSinks中。
  • 我们创建一个input流来接收从客户端发送的消息。使用subscribeOn(Schedulers.boundedElastic())将其放入弹性调度器中,以避免阻塞WebFlux线程。
  • output流中,我们将userSink转换为Flux,并在每个新的消息到达时发送给客户端。
  • 最后,我们将output流发送到客户端,并在input流完成时结束WebSocket会话。
    很抱歉,我无法直接为您编写一篇博客。然而,我可以为您提供一个关于基于WebFlux的WebSocket分组和群发实现的简要代码案例分析,您可以根据此信息撰写自己的博客。

总结:

本文将介绍如何使用Spring WebFlux框架实现WebSocket的分组和群发功能。WebSocket提供了一种双向通信的机制,能够在服务器和客户端之间实现实时的数据传输。在本文中,我们将使用WebFlux的响应式编程模型和相关的类库来实现WebSocket的分组和群发功能。

通过本文的介绍和代码案例分析,我们了解了如何使用Spring WebFlux框架实现WebSocket的分组和群发功能。这种实现方式能够满足实时通信和群聊等需求,并且借助WebFlux的响应式编程模型,能够处理大量并发连接和高吞吐量的场景。文章来源地址https://www.toymoban.com/news/detail-792903.html

到了这里,关于基于WebFlux的websocket的分组和群发实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 整合 WebSocket 基于 STOMP 协议实现广播

    SpringBoot 实战 (十六) | 整合 WebSocket 基于 STOMP 协议实现广播 如题,今天介绍的是 SpringBoot 整合 WebSocket 实现广播消息。 什么是 WebSocket ? WebSocket 为浏览器和服务器提供了双工异步通信的功能,即浏览器可以向服务器发送信息,反之也成立。 WebSocket 是通过一个 socket 来实现双

    2024年01月21日
    浏览(52)
  • 持久双向通信网络协议-WebSocket-入门案例实现demo

    1 介绍 WebSocket 是基于 TCP 的一种新的 网络协议 。它实现了浏览器与服务器全双工通信——浏览器和服务器只需要完成一次握手,两者之间就可以创建 持久性 的连接, 并进行 双向 数据传输。 HTTP协议和WebSocket协议对比: HTTP是 短连接 (一次响应完即消除) WebSocket是 长连接

    2024年01月16日
    浏览(47)
  • 针对 WebSocket 协议的 Locust 压测脚本实现(基于 Locust 1.0 以上版本)

    前言: 嗨咯铁汁们,很久不见,我还是你们的老朋友凡叔,这里也感谢各位小伙伴的点赞和关注,你们的三连是我最大的动力哈,我也不会辜负各位的期盼,这里呢给大家出了一个针对 WebSocket 协议的 Locust 压测脚本   Locust 默认支持 HTTP 协议(默认通过 HttpUser 类),我们也

    2023年04月08日
    浏览(47)
  • WebFlux中使用WebSocket的拓展功能分析

    摘要:本文将介绍如何在Spring WebFlux中使用WebSocket实现高级功能,包括连接建立和断开时的操作、消息收发和广播等。 继WebFlux使用案例后拓展讲解 在现代的Web应用程序中,实时性和即时通信变得越来越重要。WebSocket是一种在Web应用程序中实现实时双向通信的协议,允许服务

    2024年01月17日
    浏览(51)
  • spring-webflux5 使用websocket

    换做平常springboot程序中使用websocket的话是很简单的,只需要三步就能实现前后端的实时通讯。而在spring5中则更简单了,并且支持定点推送与全推送的灵活运用。在这里就分常规编程与响应式编程两种使用,进行记录下。 WebFlux 本身就提供了对 WebSocket 协议的支持,处理 WebS

    2024年02月15日
    浏览(46)
  • WebSocket 网络协议(实时更新 )

    WebSocket 是一种在客户端和服务器之间建立双向通信信道的网络协议。它在客户端和服务器之间建立一个持久的、全双工的连接,允许数据在两个方向上实时传输,而不需要像HTTP一样进行多次请求和响应。  WebSocket 的主要优势是减少了服务器和客户端之间的通信延迟,因为数

    2024年01月17日
    浏览(50)
  • 51、基于注解方式开发Spring WebFlux,实现生成背压数据,就是实现一直向客户端发送消息

    什么是背压: 这个是Reactive(反应) 的概念,当订阅者的消费能力,远低于发布者时,订阅者(也就是消费者)有通知取消或终止发布者生产数据的机制,这种机制可以称作为“背压”。 说白了就是:当消费者消费积压的时候,反向告诉推送生产者,我不需要你生产了,你

    2024年02月09日
    浏览(51)
  • 【spring(六)】WebSocket网络传输协议

    🌈键盘敲烂,年薪30万🌈 目录 核心概要: 概念介绍: 对比HTTP协议:⭐ WebSocket入门案例:⭐ websocket对比http         WebSocket是Web服务器的一个组件,WebSocket是一种基于TCP的新的 网络传输协议 ,它实现了浏览器与服务器全双工通信——浏览器只需要完成 一次握手 ,两者之

    2024年02月05日
    浏览(44)
  • 使用JMeter测试基于WebSocket协议的服务

    示例:WebSocket是一种双向网络通信协议,与HTTP不同,它以ws://或wss://开头。它是一个有状态协议,这意味着客户端和服务器之间的连接将保持活动状态,直到被客户端或服务器中的任何一方关闭连接之后,连接将从两端终止。 初次接触 WebSocket的人,都会问同样的问题:我们

    2024年02月06日
    浏览(54)
  • 网络通信协议-HTTP、WebSocket、MQTT的比较与应用

    在今天的数字化世界中,各种通信协议起着关键的作用,以确保信息的传递和交换。HTTP、WebSocket 和 MQTT 是三种常用的网络通信协议,它们各自适用于不同的应用场景。本文将比较这三种协议,并探讨它们的主要应用领域。 HTTP (超文本传输协议) HTTP  是最常见的协议之一

    2024年02月05日
    浏览(61)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包