基于WebFlux的Websocket的实现,高级实现自定义功能拓展

这篇具有很好参考价值的文章主要介绍了基于WebFlux的Websocket的实现,高级实现自定义功能拓展。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

基于WebFlux的Websocket

一、导入XML依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<!-- 或者引入jackson -->
<dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
    <version>2.0.26</version>
</dependency>

二、定义配置类,设置WebSocket拦截器

@Configuration
@EnableWebFlux
public class WebSocketConfig implements WebFluxConfigurer {
    @Bean
    public HandlerMapping handlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/ws/chat", new MyWebSocketChatHandler());
        map.put("/ws/echo", new MyWebSocketEchoHandler());

        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        mapping.setOrder(-1); // 需要设置较高的优先级,以避免与其他处理程序冲突

        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

三、设置处理器,WebSocket的处理器

// 1. Echo的处理器
public class MyWebSocketEchoHandler implements WebSocketHandler {
    @NotNull
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.send(
                session.receive()
                        .map(msg -> "Echo: " + msg.getPayloadAsText())
                        .map(session::textMessage)
        );
    }
}

设置自定义的处理器(高级处理)

public class MyWebSocketChatHandler implements WebSocketHandler {
    private static final Map<String, WebSocketSession> userMap = new ConcurrentHashMap<>();
    private static final ObjectMapper objectMapper = new ObjectMapper();

    @NotNull
    @Override
    public Mono<Void> handle(WebSocketSession session) {
//        String query = session.getHandshakeInfo().getUri().getQuery();
        URI uri = session.getHandshakeInfo().getUri();
//        Map<String, String> queryMap = getQueryMap(query);
        Map<String, String> queryMap = parseQueryParams(uri);
        String userId = queryMap.getOrDefault("userId", "");
        userMap.put(userId, session);
        System.out.println("当前用户:" + userId);
        System.out.println("当前在线人数:" + userMap.size());
        return session.receive().flatMap(webSocketMessage -> {
            String payload = webSocketMessage.getPayloadAsText();
            Message message;
            try {
                message = objectMapper.readValue(payload, Message.class);
                if(Integer.parseInt(message.getCode())==CodeEnum.SUCCESS.getCode()){
                    // 执行成功模式
                    Mono<Void> targetSession = SuccessMode(message);
                    if (targetSession != null) return targetSession;
                }else if(Integer.parseInt(message.getCode())==CodeEnum.ERROR.getCode()){
                    // 执行出错模式
                    return session.send(Mono.just(session.textMessage("消发送出错了")));
                }else{
                    // 其他code的功能实现
                    return session.send(Mono.just(session.textMessage("消息格式错误")));
                }

            } catch (JsonProcessingException e) {
                e.printStackTrace();
                // 这里一定要return不然会导致线程卡死直接断开连接
                return session.send(Mono.just(session.textMessage(e.getMessage())));
            }
            return session.send(Mono.just(session.textMessage("目标用户不在线")));
        }).then().doFinally(signal -> userMap.remove(userId)); // 用户关闭连接后删除对应连接
    }

    @Nullable
    private static Mono<Void> SuccessMode(Message message) {
        String targetId = message.getTargetId();
        if (userMap.containsKey(targetId)) {
            WebSocketSession targetSession = userMap.get(targetId);
            if (null != targetSession) {
                WebSocketMessage textMessage = targetSession.textMessage(CombineMessage(targetId, message.getMessageText()));
                return targetSession.send(Mono.just(textMessage));
            }
        }
        return null;
    }

    private static String CombineMessage(String targetId, String message) {
        // 创建一个JSONObject对象
        JSONObject json = new JSONObject();

        // 将参数添加到JSONObject中
        json.put("targetId", targetId);
        json.put("message", message);

        // 将JSONObject转换为字符串
        return json.toString();
    }

    // 其他的实现
    private Map<String, String> getQueryMap(String queryStr) {
        Map<String, String> queryMap = new HashMap<>();
        if (StringUtils.hasText(queryStr)) {
            String[] queryParam = queryStr.split("&");
            Arrays.stream(queryParam).forEach(s -> {
                String[] kv = s.split("=", 2);
                String value = kv.length == 2 ? kv[1] : "";
                queryMap.put(kv[0], value);
            });
        }
        return queryMap;
    }

    private static Map<String, String> parseQueryParams(URI uri) {
        return Arrays.stream(uri.getQuery().split("&"))
                .map(param -> param.split("="))
                .collect(Collectors.toMap(
                        array -> array[0],
                        array -> array.length > 1 ? array[1] : ""
                ));
    }

}

注意要先配置一个实体类映射—(注意客户端的信息一定也是要json格式不然会报错哟)

@Data
public class Message {
    @JsonProperty("code")
    private String code;
    @JsonProperty("targetId")
    private String targetId;
    @JsonProperty("messageText")
    private String messageText;
    @JsonProperty("userId")
    private String userId;
}

枚举类设置code对应的信息

public enum CodeEnum {
    SUCCESS(1),
    ERROR(2);
    // 其他枚举值...

    private final Integer code;

    CodeEnum(int code) {
        this.code = code;
    }

    public Integer getCode() {
        return code;
    }
}

最后运行即可。注意访问ws://localhost:8081/ws/chat?userId=123实现私聊的功能,访问ws://localhost:8081/ws/echo即可实现简单的服务器和客户端的回应。群聊功能可以根据自己的需求进行实现,只需要添加对应的code以及获取所有session并发送message即可。文章来源地址https://www.toymoban.com/news/detail-796179.html

到了这里,关于基于WebFlux的Websocket的实现,高级实现自定义功能拓展的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 55、基于 WebFlux 开发 WebSocKet

    两步: (1)实现WebSocketHandler开发WebSocket处理类。 实现该接口时只需要实现Mono handle(WebSocketSession webSocketSession)方法即可。 (2)使用HandlerMapping和WebSocketHandlerAdapter注册WebSocket处理类。 反应式API模型下,WebSocketSession的receive()方法返回的只是Flux(消息发布者), 它并不会同步

    2024年02月08日
    浏览(35)
  • 【IT资讯速递】ChatGPT自定义功能扩展至免费用户;阿里达摩院宣布免费开放100件AI专利许可;元宇宙平台Decentraland与人工智能公司Inworld合作

    2023年8月12日 星期六 癸卯年六月廿六 第000002号 本文收录于 IT资讯速递 专栏, 本专栏 主要用于发布各种IT资讯,为大家可以省时省力的就能阅读和了解到行业的一些新资讯 8月11日,在浙江省专利公开实施政策公布会上, 阿里达摩院(湖畔实验室)宣布向社会免费开放100件AI专

    2024年02月05日
    浏览(52)
  • 微信小程序实战项目开发(天气预报项目实战):内涵开发说明文档、需求文档 && 手把手分步骤教你写出自己的小程序项目 && 天气预报小程序 && 时实请求获取天气 && 自定义功能 && 完整的源代码

    微信小程序开发实现天气预报 需求分析 静态页面设计 :要求界面美观 → 在wxss代码文件中对 wxml代码文件进行合理布局和美化,舒适的交互效果. 功能逻辑完善 :能够使用到 wx.request 请求接口实现天气预报查询的功能 主要使用到的技术栈如下: wxml:中使用了 picker 组件标签

    2024年02月02日
    浏览(47)
  • spring-webflux5 使用websocket

    换做平常springboot程序中使用websocket的话是很简单的,只需要三步就能实现前后端的实时通讯。而在spring5中则更简单了,并且支持定点推送与全推送的灵活运用。在这里就分常规编程与响应式编程两种使用,进行记录下。 WebFlux 本身就提供了对 WebSocket 协议的支持,处理 WebS

    2024年02月15日
    浏览(35)
  • 51、基于注解方式开发Spring WebFlux,实现生成背压数据,就是实现一直向客户端发送消息

    什么是背压: 这个是Reactive(反应) 的概念,当订阅者的消费能力,远低于发布者时,订阅者(也就是消费者)有通知取消或终止发布者生产数据的机制,这种机制可以称作为“背压”。 说白了就是:当消费者消费积压的时候,反向告诉推送生产者,我不需要你生产了,你

    2024年02月09日
    浏览(39)
  • 两种实现WebSocket的方式,基于Java实现WebSocket。

    首先我们声明WebSocker方便打字为ws。 WebSocker ws = new WebSocket(); 或者说启用spring框架,因为spring已经整合了ws。 配置类:把spring中的ServerEndpointEx porter对象注入进来 用iava注解来 @ServerEndpoint          监听连接、 @OnOpen          连接成功、 @OnClose        连接关闭、 @

    2024年01月21日
    浏览(34)
  • websocket基于java实现

    随着互联网的发展,传统的HTTP协议已经很难满足Web应用日益复杂的需求了。近年来,随着HTML5的诞生,WebSocket协议被提出, 它实现了浏览器与服务器的全双工通信,扩展了浏览器与服务端的通信功能,使服务端也能主动向客户端发送数据 。 我们知道,传统的HTTP协议是无状态

    2024年02月06日
    浏览(39)
  • 基于WebSocket实现的后台服务

    基于WebSocket实现的后台服务,用于接收客户端的心跳消息,并根据心跳消息来维护客户端连接。 具体实现中,服务启动后会创建一个HttpListener对象,用于监听客户端的WebSocket连接请求。当客户端连接成功后,服务会为每个连接创建一个Task实例,用于接收客户端发送的心跳消

    2024年02月16日
    浏览(27)
  • WebSocket:基于 Spring Cloud 配置注解实现 WebSocket 集群方案

    上一篇:WebSocket 的具体介绍与内部执行原理 WebSocket 大家应该是再熟悉不过了,如果是单体应用确实不会有什么问题,但是当我们的项目使用微服务架构时,就可能会存在问题 比如 服务A 有两个实例 A1 和 A2 ,前端的 WebSocket 客户端 C 通过网关的负载均衡连到了 A1 ,这个时候

    2024年02月10日
    浏览(35)
  • 基于SockJS+Stomp的WebSocket实现

    前言     之前做个一个功能,通过websocket长链接接收后台推送的数据,然后在前端动态渲染。一直没来的及输出个文档,现在输出一下。 WebSocket介绍     WebSocket 是一种在 Web 应用中实现实时通信的方法,它可以在客户端和服务器端之间建立长连接,实现实时消息传递。  

    2024年02月12日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包