基于Webflux的Websocket的高级和全生命周期完整版讲解,包含代码

这篇具有很好参考价值的文章主要介绍了基于Webflux的Websocket的高级和全生命周期完整版讲解,包含代码。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用WebSocket处理器构建实时应用程序

WebSocket是一种在Web应用程序中实现实时双向通信的技术。它允许服务器主动向客户端推送消息,而不需要客户端发起请求。在Spring WebFlux中,我们可以使用WebSocketHandler接口来处理WebSocket连接和消息。

在本篇博客中,我们将介绍如何使用MyWebSocketHandler2类来构建一个简单的WebSocket处理器,实现实时聊天和文件上传功能。

1. 创建WebSocket处理器

首先,我们创建一个名为MyWebSocketHandler2的Java类,并实现WebSocketHandler接口。它是一个Spring组件,用于处理WebSocket连接和消息。

@Component
@Slf4j
public class MyWebSocketHandler2 implements WebSocketHandler {
    // ...

    @NotNull
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // 实现WebSocket连接和消息的处理逻辑
        // ...
    }

    // ...
}

MyWebSocketHandler2类使用@Component注解将其声明为Spring组件,以便能够在应用程序中自动进行依赖注入。

2. 处理WebSocket连接

handle方法中,我们首先处理WebSocket连接的逻辑。当有新的WebSocket连接建立时,会调用handle方法,并将WebSocketSession作为参数传递进来。

@NotNull
@Override
public Mono<Void> handle(WebSocketSession session) {
    // 生成唯一的会话ID
    UUID uuid = UUID.randomUUID();
    String uuidStr = uuid.toString();

    // 获取连接的URI和查询参数
    URI uri = session.getHandshakeInfo().getUri();
    Map<String, String> queryMap = getQueryMap(uri.getQuery());
    String group = queryMap.get("group");
    String username = queryMap.get("username");

    // 处理连接逻辑
    // ...

    return Mono.empty();
}

在上述代码中,我们首先生成一个唯一的会话ID,并从WebSocket连接的URI中获取查询参数。查询参数可以包含groupusername,用于标识连接所属的组和用户名。

3. 处理WebSocket消息

接下来,我们将处理WebSocket消息的逻辑添加到handle方法中。我们使用session.receive()方法来接收来自客户端的消息,并根据消息类型进行不同的处理。

@NotNull
@Override
public Mono<Void> handle(WebSocketSession session) {
    // ...

    return session.receive()
        .flatMap(message -> {
            if (message.getType().equals(WebSocketMessage.Type.TEXT)) {
                // 处理文本消息
                String payload = message.getPayloadAsText();
                // ...
            } else if (message.getType().equals(WebSocketMessage.Type.BINARY)) {
                // 处理二进制消息
                // ...
            } else if (message.getType().equals(WebSocketMessage.Type.PING)) {
                // 处理PING消息
                // ...
            } else if (message.getType().equals(WebSocketMessage.Type.PONG)) {
                // 处理PONG消息
                // ...
            }
            return Mono.empty();
        })
        .then();
}

在上述代码中,我们使用flatMap操作符处理接收到的消息。根据消息类型的不同,我们可以执行不同的逻辑,例如处理文本消息、处理二进制消息、处理PING消息或处理PONG消息。

4. 发送WebSocket消息

除了接收消息外,我们还可以使用session.send()方法向客户端发送消息。在处理完接收到的消息后,我们可以使用session.send(Flux)方法将响应消息发送给客户端。

@NotNull
@Override
public Mono<Void> handle(WebSocketSession session) {
    // ...

    return session.receive()
        .flatMap(message -> {
            if (message.getType().equals(WebSocketMessage.Type.TEXT)) {
                // 处理文本消息
                String payload = message.getPayloadAsText();
                // ...
                // 发送响应消息
                return session.send(Flux.just(session.textMessage("Response")));
            }
            return Mono.empty();
        })
        .then();
## 5. 注册WebSocket处理器

要在Spring WebFlux应用程序中使用WebSocket处理器,我们需要将其注册到`WebSocketHandlerAdapter`中。

```java
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Autowired
    private MyWebSocketHandler2 myWebSocketHandler;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myWebSocketHandler, "/websocket")
                .setAllowedOrigins("*");
    }
}

在上述代码中,我们创建了一个名为WebSocketConfig的配置类,并实现了WebSocketConfigurer接口。在registerWebSocketHandlers方法中,我们使用registry.addHandler()方法将MyWebSocketHandler2注册为WebSocket处理器,并指定了处理的URL路径为/websocket。同时,我们使用setAllowedOrigins("*")设置允许的来源,以便允许跨域访问。

6. 客户端使用WebSocket

在客户端,可以使用JavaScript或其他编程语言来连接WebSocket并发送/接收消息。以下是一个简单的JavaScript示例:

// 创建WebSocket对象
const socket = new WebSocket('ws://localhost:8080/websocket?group=mygroup&username=myuser');

// 打开WebSocket连接
socket.onopen = function() {
    console.log('WebSocket连接已打开');
};

// 接收服务器发送的消息
socket.onmessage = function(event) {
    const message = event.data;
    console.log('收到消息:', message);
};

// 发送消息给服务器
socket.send('Hello, server!');

// 关闭WebSocket连接
socket.onclose = function() {
    console.log('WebSocket连接已关闭');
};

在上述代码中,我们首先创建了一个WebSocket对象,指定了服务器的URL和查询参数。然后,我们定义了onopenonmessageonclose等事件处理程序,以处理与服务器的连接、消息发送和关闭。


以下是socket全生命周期的代码案例,可以实现自定义类型映射,可以通过不同的json格式映射,解析message的类型实现全生命周期的不同的操作

package com.example.webfluxdemo.handler;

import com.example.webfluxdemo.Entity.SocketEntity.PrivateMassage;
import com.example.webfluxdemo.protocol.MessageType;
import com.example.webfluxdemo.protocol.UriProtocol;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;


/**
 * @Author: kingdol
 * @Description: a test to websockthandler
 */
@Component
@Slf4j
public class MyWebSocketHandler2 implements WebSocketHandler {
    /**
     * @Description: 第一个Sting是组号, 第二个是session列表
     * @Query ws://localhost:8081/ws/test?group=1&username=123
     */
    private final Map<String, List<WebSocketSession>> allGroup = new HashMap<>();

    private final Map<String, WebSocketSession> allUserSessionMap = new HashMap<>();

    private final List<WebSocketSession> allSession = new ArrayList<>();

    private static final ObjectMapper objectMapper = new ObjectMapper();

    @NotNull
    @Override
    public Mono<Void> handle(WebSocketSession session) {

        UUID uuid = UUID.randomUUID();
        String uuidStr = uuid.toString();
        AtomicReference<String> fileName = new AtomicReference<>(uuidStr + ".txt");
        URI uri = session.getHandshakeInfo().getUri();
        Map<String, String> queryMap2 = getQueryMap(uri.getQuery());
        String group = queryMap2.get("group");
        String username = queryMap2.get("username");
        UriProtocol uriProtocol = new UriProtocol(username, group);
        return session.receive().doOnSubscribe(s -> {
            session.getHandshakeInfo().getHeaders().forEach((k, v) -> {
                log.info("header:{}", k + ":" + v);
            });
            System.out.println("queryMap2 = " + queryMap2);

            allGroup.computeIfAbsent(group, k -> new ArrayList<>());
            allGroup.get(group).add(session);
            System.out.println("allGroup = " + allGroup);
            allUserSessionMap.put(username, session);
            allSession.add(session);
            ConnectMessage(uriProtocol);
            log.info("发起连接:{}", s);
        }).flatMap(message -> {
            if (message.getType().equals(WebSocketMessage.Type.BINARY)) {
                log.info("收到二进制消息");
                BinaryMessageHandler(session, message, fileName.get());
            } else if (message.getType().equals(WebSocketMessage.Type.TEXT)) {
                MessageType messageType;
                String payload = message.getPayloadAsText();
                try {
                    messageType = objectMapper.readValue(payload, MessageType.class);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                    // 发送错误消息给客户端
                    return session.send(Flux.just(session.textMessage("Error: " + e.getMessage())));
                    // 抛出异常,或者进行其他错误处理
                }
                String content = messageType.getContent();
                PrivateMassage privateMassage = messageType.getPrivateMassage();
                switch (messageType.getCode()) {
                    // 广播信息
                    case "1" -> {
                        BroadToAllSession(session, content, username);
                        log.info("收到文本消息:{}", messageType.getContent());
                    }
                    // 组聊信息
                    case "2" -> {
                        log.info("收到组发信息" + messageType.getContent() + "-> 发送到第" + group + "组!");
                        GroupSendMessage(session, content, group, username);
                    }
                    // 私聊信息
                    case "3" -> {
                        try {
                            PrivateSendMassage(session, privateMassage, username);
                            log.info("收到私聊信息: " + content);
                        } catch (JsonProcessingException e) {
                            return session.send(Flux.just(session.textMessage("Error: " + e.getMessage())));
                        }
                    }
                }
            } else if (message.getType().equals(WebSocketMessage.Type.PING)) {
                log.info("收到ping消息");
                PingTypeHandler(session, message);
            } else if (message.getType().equals(WebSocketMessage.Type.PONG)) {
                log.info("收到pong消息");
                PongTypeHandler(session, message);
            }
            return session.send(Mono.empty());
        }).doOnTerminate(() -> {
            log.info("doOnTerminate");
        }).doOnComplete(() -> {
            allUserSessionMap.remove(username);
            log.info("doOnComplete");
        }).publishOn(Schedulers.boundedElastic()).doOnCancel(() -> {
            session.close().subscribe();
            log.info("doOnCancel");
        }).doOnError(e -> {
            e.printStackTrace();
            log.error("doOnError");
        }).doOnRequest(r -> {
            log.info("doOnRequest");
        }).then();
    }

    /*
     * 示例json
     * {"code": "3", "privateMassage": {"targetname": "123","content":"111"}}
     * */
    private void PrivateSendMassage(WebSocketSession session, PrivateMassage privateMassage, String username) throws JsonProcessingException {
        String targetname = privateMassage.getTargetname();
        String message = privateMassage.getContent();
        if (allUserSessionMap.containsKey(targetname)) {
            WebSocketSession webSocketSession = allUserSessionMap.get(targetname);
            webSocketSession.send(Flux.just(session.textMessage(username + "对你说: " + message))).subscribe();
        } else {
            session.send(Flux.just(session.textMessage("该用户未上线!"))).subscribe();
        }
    }

    /*
     * 示例json
     * {"code": "1", "content": "111"}
     * */
    private void BroadToAllSession(WebSocketSession session, String content, String username) {
        for (var sessions : allSession) {
            sessions.send(Flux.just(session.textMessage(username + "说: " + content))).subscribe();
        }
    }

    private void ConnectMessage(UriProtocol uriProtocol) {
        allSession.forEach(s -> {
            s.send(Flux.just(s.textMessage(uriProtocol.toString()))).subscribe();
        });
    }


    /*
     * 示例json
     * {"code": "2", "content": "111"}
     * */
    private void GroupSendMessage(WebSocketSession session, String content, String group, String username) {
        List<WebSocketSession> webSocketSessions = allGroup.get(group);
        for (var sessions : webSocketSessions) {
            sessions.send(Flux.just(session.textMessage(username + "说:" + content))).subscribe();
        }
    }

    private void BinaryMessageHandler(WebSocketSession session, WebSocketMessage message, String fileName) {
        DataBuffer dataBuffer = message.getPayload();

        // 获取字节数组
        ByteBuffer byteBuffer = dataBuffer.toByteBuffer();
        byte[] byteArray = new byte[byteBuffer.remaining()];
        byteBuffer.get(byteArray);
        // 将字节数组写入文件
        try (FileOutputStream fileOutputStream = new FileOutputStream("files/" + fileName)) {
            FileChannel fileChannel = fileOutputStream.getChannel();
            fileChannel.write(ByteBuffer.wrap(byteArray));
        } catch (IOException e) {
            // 处理文件写入错误
            e.printStackTrace();
            session.send(Flux.just(session.textMessage("fail to upload file"))).subscribe();
        }
    }

    private void PongTypeHandler(WebSocketSession session, WebSocketMessage message) {
    }

    private void PingTypeHandler(WebSocketSession session, WebSocketMessage message) {
    }

    private void TestMessageHandler(WebSocketSession session, WebSocketMessage message) {
        session.send(Flux.just(session.textMessage(message.getPayloadAsText()))).subscribe();
    }

    private Map<String, String> getQueryMap(String queryStr) {
        Map<String, String> queryMap = new HashMap<>();
        if (StringUtils.hasText(queryStr)) {
            String[] queryParam = queryStr.split("&");
            Arrays.stream(queryParam).forEach(s -> {
                String[] kv = s.split("=", 2);
                String value = kv.length == 2 ? kv[1] : "";
                queryMap.put(kv[0], value);
            });
        }
        return queryMap;
    }
}
package com.example.webfluxdemo.protocol;

import lombok.Data;

@Data
public class UriProtocol {
    private String username;
    private String group;

    public UriProtocol(String username, String group) {
        this.username = username;
        this.group = group;
    }
}

package com.example.webfluxdemo.protocol;

import com.example.webfluxdemo.Entity.SocketEntity.PrivateMassage;
import jakarta.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageType {
    private String code;
    private String content;
    @Nullable
    private PrivateMassage privateMassage;
}
package com.example.webfluxdemo.Entity.SocketEntity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/*
* code对应3
* */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class PrivateMassage {
    private String content;
    // 目标用户name
    private String targetname;
}

(运行之后生命周期的流程(控制台输出结果)
webflux websocket,springboot,前后端技术,websocket,网络协议,spring boot

(附带两个实体映射类)
该文章仅供参考,实际项目开发中的映射类和根据不同的code写不同业务,更加繁琐,但基础就是要理解webflux的输入输出流Flux和Mono的转换以及全生命周期执行的流程文章来源地址https://www.toymoban.com/news/detail-826961.html

到了这里,关于基于Webflux的Websocket的高级和全生命周期完整版讲解,包含代码的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 一文读懂 Spring Bean 的生命周期,unity高级工程师面试题

    实例化 该对象不再被使用时通过垃圾回收机制进行回收 而对于 Spring Bean 的生命周期来说: 实例化 Instantiation 属性赋值 Populate 初始化 Initialization 销毁 Destruction 实例化 - 属性赋值 - 初始化 - 销毁 只有四个步骤,这样拆解的话是不是感觉也不难?不像其他人写的那样直接一上

    2024年04月09日
    浏览(45)
  • 高级进阶多线程——多任务处理、线程状态(生命周期)、三种创建多线程的方式

    Java中的多线程是一个同时执行多个线程的进程。线程是一个轻量级的子进程,是最小的处理单元。多进程和多线程都用于实现多任务处理。 但是,一般使用多线程而不是多进程,这是因为线程使用共享内存区域。它们不分配单独的内存区域以节省内存,并且线程之间的上下

    2024年02月13日
    浏览(39)
  • 最强的单点登录认证系统,基于RBAC统一权限控制,实现用户生命周期管理,开源、安全

    MaxKey 单点登录认证系统,谐音马克思的钥匙寓意是最大钥匙,是 业界领先的IAM-IDaas身份管理和认证产品 ,支持OAuth 2.x/OpenID Connect、SAML 2.0、JWT、CAS、SCIM等标准协议,提供 安全、标准和开放 的用户身份管理(IDM)、身份认证(AM)、单点登录(SSO)、RBAC权限管理和资源管理等。 MaxKey注

    2024年02月03日
    浏览(55)
  • spring-webflux5 使用websocket

    换做平常springboot程序中使用websocket的话是很简单的,只需要三步就能实现前后端的实时通讯。而在spring5中则更简单了,并且支持定点推送与全推送的灵活运用。在这里就分常规编程与响应式编程两种使用,进行记录下。 WebFlux 本身就提供了对 WebSocket 协议的支持,处理 WebS

    2024年02月15日
    浏览(44)
  • WebFlux中使用WebSocket的拓展功能分析

    摘要:本文将介绍如何在Spring WebFlux中使用WebSocket实现高级功能,包括连接建立和断开时的操作、消息收发和广播等。 继WebFlux使用案例后拓展讲解 在现代的Web应用程序中,实时性和即时通信变得越来越重要。WebSocket是一种在Web应用程序中实现实时双向通信的协议,允许服务

    2024年01月17日
    浏览(43)
  • 微信小程序——生命周期,生命周期的分类,页面生命周期,生命周期函数的分类,应用的生命周期函数,页面的生命周期函数,wxs脚本概述

    生命周期( Life Cycle )是指一个对象从创建-运行-销毁的整个阶段,强调的是一个时间段。 例如: .张三出生,表示这个人生命周期的开始 .张三离世,表示这个人生命周期的结束 .中间张三的一生,就是张三的生命周期 我们可以把每个小程序运行的过程,也概括为生命周

    2024年02月01日
    浏览(60)
  • #Uniapp:页面生命周期&应用生命周期应用

    创建-运行-销毁 应用的生命周期 App.vue 页面的生命周期

    2024年01月23日
    浏览(40)
  • 微信小程序全局生命周期和页面生命周期

    目录 前言  小程序的生命周期 页面生命周期

    2024年02月11日
    浏览(51)
  • 微信小程序的全局生命周期和页面生命周期

            生命周期是指一个程序或者软件从创建、到开始、暂停、唤起、停止、卸载的过程,由于微信小程序分为全局和页面两部分,所有从这两部分来讲解微信小程序的生命周期            全局生命周期指的是使用App() 函数注册一个小程序,接受一个object参数,其指定

    2024年02月16日
    浏览(45)
  • 软件生命周期阶段有几个?常见软件生命周期模型有哪些?

    软件生命周期阶段及常见的软件生命周期模型,软件生命周期是指一个计算机软件从功能确定、设计,到开发 成功投入使用,并在使用中不断地修改、增补和完善,直到停止该软件的使用的全过程。 生命周期 从收到应用软件开始算起,到该软件不再使用为止。 它有以下几方

    2024年02月03日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包