55、基于 WebFlux 开发 WebSocKet

这篇具有很好参考价值的文章主要介绍了55、基于 WebFlux 开发 WebSocKet。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

★ 基于Web Flux开发WebSocket

两步:
(1)实现WebSocketHandler开发WebSocket处理类。

实现该接口时只需要实现Mono handle(WebSocketSession webSocketSession)方法即可。

(2)使用HandlerMapping和WebSocketHandlerAdapter注册WebSocket处理类。

★ 反应式的WebSocket处理类

反应式API模型下,WebSocketSession的receive()方法返回的只是Flux(消息发布者),
它并不会同步获取消息,也不会阻塞。

类似的,WebSocketSession的send()方法发送的也只是Flux(消息发布者)

因此WebSocket处理类receive()消息之后,程序依然使用map()等方法对Flux中的数据项进行处理。

★ 配置基于WebFlux的WebSocket

要配置两个Bean:

  1. HandlerMapping(通常使用SimpleUrlHandlerMapping实现类即可)Bean,它定义URL与WebSocketHandler Bean的映射关系。

  2. WebSocketHandlerAdapter:它负责管理对WebSocketHandler Bean进行适配。
    它会自动对容器中所有的WebSocketHandler Bean进行适配,
    因此,意味着无论容器中有多少个WebSocketHandler ,该WebSocketHandlerAdapter只要配置一个即可。

可直接使用www.websocket.org/echo.html页面来测试WebSocket

代码演示

1、创建项目
55、基于 WebFlux 开发 WebSocKet,Spring Boot,WebSocket,websocket,网络协议,网络,webflux

MyWebSocketHandler

实现 WebSocKet 处理类

package cn.ljh.webflux_websocket.websockethandler;


import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

//实现 WebSocKet 处理类

@Component
public class MyWebSocketHandler implements WebSocketHandler
{
    //实现这个接口,只需要实现一个方法,这个方法可通过 WebSocketSession 获取 Flux
    //这个方法并不需要处理具体的数据,它面向的是Flux编程
    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession)
    {
        //接收消息时,得到的并不是具体的消息,而是 Flux , 这个 Flux 负责消息通信,就是个消息通道
        Flux<WebSocketMessage> sourceFlux = webSocketSession.receive();
        //map() 方法里的参数,就是 function的apply()方法,通常写成 Lambda 表达式
        Flux<WebSocketMessage> resultFlux = sourceFlux.map(message ->
        {
            //textMessage() 方法负责将 String 转换成 WebSocketMessage
            //这个 message 是 WebSocketMessage 类型,WebSocketMessage.getPayloadAsText()负责将消息数据转成String
            WebSocketMessage webSocketMessage = webSocketSession.textMessage("回复:" + message.getPayloadAsText());
            return webSocketMessage;
        });

        //发送消息
        Mono<Void> sendMessage = webSocketSession.send(resultFlux);
        return sendMessage;
    }
}

WebSocketConfig

配置基于WebFlux的WebSocket

package cn.ljh.webflux_websocket.config;


import cn.ljh.webflux_websocket.websockethandler.MyWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;

import java.util.HashMap;
import java.util.Map;

//配置基于WebFlux的WebSocket

@Configuration
public class WebSocketConfig
{
    //这个bean负责对容器中所有的 WebSocketHandler Bean 进行适配
    @Bean
    public WebSocketHandlerAdapter webSocketHandlerAdapter()
    {
        return new WebSocketHandlerAdapter();
    }

    //MyWebSocketHandler 会接受容器中的依赖注入
    @Bean
    public HandlerMapping handlerMapping(MyWebSocketHandler myWebSocketHandler)
    {
        //定义 URL 与 WebSocketHandler Bean 之间的映射关系
        //就是向这个 /myWebSocket 地址发送请求的时候,就将这个请求交给这个 myWebSocketHandler 处理类进行处理
        Map map = Map.of("/myWebSocket",myWebSocketHandler);
        //参数1:指定 URL 和 Handler 之间的映射关系  ,  参数2:就是优先级
        SimpleUrlHandlerMapping simpleUrlHandlerMapping = new SimpleUrlHandlerMapping(map,-1);
        return simpleUrlHandlerMapping;
    }

}

前端没有写,www.websocket.org/echo.html页面已经没法测试WebSocket了。
现在简单的websocket 就完成了

55、基于 WebFlux 开发 WebSocKet,Spring Boot,WebSocket,websocket,网络协议,网络,webflux

通过 webFlux 弄一个webSocket 的聊天室。

完整代码:

client.html

这个客户端页面,先放在static 静态路径下面,就可以直接访问

<!DOCTYPE html>
<html lang="en">
<head>
	<meta charset="UTF-8">
	<title> 基于WebSocket的多人聊天 </title>
	<script type="text/javascript">
		// 定义Web Socket对象
		var webSocket = null;
		let sendMsg = function()
		{
			if (webSocket == null || webSocket.readyState != 1)
			{
				document.getElementById('show').innerHTML
					+= "还未连接服务器,请先连接WebSocket服务器<br>";
				return;
			}
			let inputElement = document.getElementById('msg');
			// 发送消息
			webSocket.send(inputElement.value);
			// 清空单行文本框
			inputElement.value = "";
		}
		let connect = function()
		{
			let name = document.getElementById('name').value.trim();
			if (name == null || name == "")
			{
				document.getElementById('show').innerHTML
					+= "用户名不能为空<br>";
				return;
			}
			if (webSocket && webSocket.readyState == 1)
			{
				webSocket.close();
			}
			webSocket = new WebSocket("ws://127.0.0.1:8080/myWebSocket/" + name);
			webSocket.onopen = function()
			{
				document.getElementById('show').innerHTML
					+= "恭喜您,连接服务器成功!<br>";
				document.getElementById('name').value = "";
				// 为onmessage事件绑定监听器,接收消息
				webSocket.onmessage= function(event)
				{
					// 接收、并显示消息
					document.getElementById('show').innerHTML
						+= event.data + "<br>";
				}
			};
		}
	</script>
</head>
<body>
<input type="text" size="20" id="name" name="name"/>
<input type="button" value="连接" onclick="connect();"/>
<div style="width:600px;height:240px;
	overflow-y:auto;border:1px solid #333;" id="show"></div>
<input type="text" size="80" id="msg" name="msg"/>
<input type="button" value="发送" onclick="sendMsg();"/>
</body>
</html>

MyWebSocketHandler

实现 WebSocKet 处理类

package cn.ljh.webflux_websocket.websockethandler;


import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

//实现 WebSocKet 处理类

@Component
public class MyWebSocketHandler implements WebSocketHandler
{
    //创建一个线程安全的map来存聊天信息,  FluxSink 代表了发送消息的通道
    public static final Map<WebSocketSession, FluxSink<WebSocketMessage>> myClients = new ConcurrentHashMap<>();


    //实现这个接口,只需要实现一个方法,这个方法可通过 WebSocketSession 获取 Flux
    //这个方法并不需要处理具体的数据,它面向的是Flux编程
    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession)
    {

        //1、获取连接路径:   得到 WenSocket 的连接路径
        String path = webSocketSession.getHandshakeInfo().getUri().getPath();
        //2、获取用户名:     获取path路径字符串最后一个斜杠之后的内容,也就是获取聊天的用户名
        String name = path.substring(path.lastIndexOf("/") + 1);


        //3、接收消息:     接收消息时,得到的并不是具体的消息,而是 Flux , 这个 Flux 负责消息通信,就是个消息通道
        Flux<WebSocketMessage> sourceFlux = webSocketSession.receive();
        //map() 方法里的参数,就是 function的apply()方法,通常写成 Lambda 表达式
        Mono<Void> mono1 = sourceFlux.map(message ->
        {
            //这个 message 是 WebSocketMessage 类型,WebSocketMessage.getPayloadAsText()负责将消息数据转成String
            //获取用户发送的消息
            String payloadAsText = message.getPayloadAsText();
            //返回 用户名+消息
            String nameAndMessage = name + " : " + payloadAsText;
            return nameAndMessage;
        })
                //4、实现消息广播:   把消息发给每一个用户
                //此时的 message 已经是转换之后的 message 了,这时候是 String 类型
                .doOnNext(message ->
                {
                    //此处做消息广播,      keySet()用于遍历map中的所有key,存在一个set集合中
                    for (WebSocketSession session : myClients.keySet())
                    {
                        //通过session这个key , 获取消息通道FluxSink
                        FluxSink<WebSocketMessage> fluxSink = myClients.get(session);
                        //调用 fluxSink 的 next() 方法向 Flux 发送消息
                        //textMessage() 方法负责将 String 转换成 WebSocketMessage,把string类型的消息转回 WebSocketMessage
                        fluxSink.next(session.textMessage(message));
                    }
                    //.then() 方法 讲解的时候说是合并上面的消息操作,百度说是异步执行
                }).then();


        //创建要发送消息的 outFlux
        Flux<WebSocketMessage> outFlux = Flux.create(fluxSink ->
        {
            //Flux 真正发布消息用的是Flux 底层的 fluxSink
            myClients.put(webSocketSession, fluxSink);
        });

        //发送消息
        Mono<Void> mono2 = webSocketSession.send(outFlux);
        //把两个mono 的消息汇总起来 再返回
        Mono<Void> allMono = Mono.zip(mono1, mono2).then();
        return allMono;
    }
}

WebSocketConfig

配置基于WebFlux的WebSocket

package cn.ljh.webflux_websocket.config;


import cn.ljh.webflux_websocket.websockethandler.MyWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;

import java.util.HashMap;
import java.util.Map;

//配置基于WebFlux的WebSocket

@Configuration
public class WebSocketConfig
{
    //这个bean负责对容器中所有的 WebSocketHandler Bean 进行适配
    @Bean
    public WebSocketHandlerAdapter webSocketHandlerAdapter()
    {
        return new WebSocketHandlerAdapter();
    }

    //MyWebSocketHandler 会接受容器中的依赖注入
    @Bean
    public HandlerMapping handlerMapping(MyWebSocketHandler myWebSocketHandler)
    {
        //定义 URL 与 WebSocketHandler Bean 之间的映射关系
        //就是向这个 /myWebSocket 地址发送请求的时候,就将这个请求交给这个 myWebSocketHandler 处理类进行处理
        Map map = Map.of("/myWebSocket/{name}",myWebSocketHandler);
        //参数1:指定 URL 和 Handler 之间的映射关系  ,  参数2:就是优先级
        SimpleUrlHandlerMapping simpleUrlHandlerMapping = new SimpleUrlHandlerMapping(map,-1);
        return simpleUrlHandlerMapping;
    }

}

测试结果

成功

55、基于 WebFlux 开发 WebSocKet,Spring Boot,WebSocket,websocket,网络协议,网络,webflux文章来源地址https://www.toymoban.com/news/detail-712538.html

到了这里,关于55、基于 WebFlux 开发 WebSocKet的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于Webflux的Websocket的高级和全生命周期完整版讲解,包含代码

    WebSocket是一种在Web应用程序中实现实时双向通信的技术。它允许服务器主动向客户端推送消息,而不需要客户端发起请求。在Spring WebFlux中,我们可以使用 WebSocketHandler 接口来处理WebSocket连接和消息。 在本篇博客中,我们将介绍如何使用 MyWebSocketHandler2 类来构建一个简单的

    2024年02月19日
    浏览(39)
  • 实时通信应用的开发:Vue.js、Spring Boot 和 WebSocket 整合实践

    目录 1. 什么是webSocket  2. webSocket可以用来做什么? 3. webSocket协议 4. 服务器端 5. 客户端 6. 测试通讯 WebSocket 是一种在单个 TCP连接 上进行全双工通信的协议。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允 许服务端主动向客户端推送数据 。在WebSocket API中,浏览

    2024年02月11日
    浏览(55)
  • 大华摄像头实时预览(spring boot+websocket+flv.js)Java开发

    1.大华NetSDK_JAVA; 这里使用的是 Linux64的架包 2.websocket 前端使用的vue框架    3.flv.js的播放插件     4.大华摄像头提供的平台(后面称为官方平台) 根据大华《NetSDK_JAVA编程指导手册》的流程图 根据图可以得知关键流程为: 初始化sdk——登录设备——打开实时预览——设置视

    2024年02月04日
    浏览(104)
  • Spring Boot整合WebSocket

    在HTTP协议中,所有的请求都是由客户端发起的,由服务端进行响应,服务端无法向客户端推送消息,但是在一些需要即时通信的应用中,又不可避免地需要服务端向客户端推送消息,传统的解决方案主要有如下几种。 轮询 轮询是最简单的一种解决方案,所谓轮询,就是客户

    2024年02月05日
    浏览(84)
  • spring boot 项目整合 websocket

            负责的项目有一个搜索功能,搜索的范围几乎是全表扫,且数据源类型贼多。目前对搜索的数据量量级未知,但肯定不会太少,不仅需要搜索还得点击下载文件。           关于搜索这块类型 众多,未了避免有个别极大数据源影响整个搜索效率,我采用多线程异步

    2024年02月11日
    浏览(39)
  • Spring Boot 实现 WebSocket 示例

    WebSocket协议提供了一种标准化的方法,通过单个TCP连接在客户机和服务器之间建立全双工、双向的通信通道。它是一种不同于HTTP的TCP协议,但被设计为在HTTP上工作,使用端口80和443,并允许重用现有的防火墙规则。 WebSocket 协议是独立的基于 TCP 协议。它与 HTTP 的唯一关系是

    2024年02月14日
    浏览(35)
  • 4 Spring Boot与WebSocket实战

    作者:禅与计算机程序设计艺术 WebSocket(Web Socket)是一种双向通讯协议,使得客户端和服务器之间可以进行实时通信。在WebSocket出现之前,开发者通常采用轮询或Comet的方式来实现Web应用中的实时更新功能。轮询方式是通过浏览器定时向服务器发送请求,来检查是否有新的消

    2024年02月06日
    浏览(40)
  • WebFlux中使用WebSocket的拓展功能分析

    摘要:本文将介绍如何在Spring WebFlux中使用WebSocket实现高级功能,包括连接建立和断开时的操作、消息收发和广播等。 继WebFlux使用案例后拓展讲解 在现代的Web应用程序中,实时性和即时通信变得越来越重要。WebSocket是一种在Web应用程序中实现实时双向通信的协议,允许服务

    2024年01月17日
    浏览(43)
  • Spring Boot集成WebSocket实现消息推送

    项目中经常会用到消息推送功能,关于推送技术的实现,我们通常会联想到轮询、comet长连接技术,虽然这些技术能够实现,但是需要反复连接,对于服务资源消耗过大,随着技术的发展,HtML5定义了WebSocket协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。

    2023年04月08日
    浏览(44)
  • 在 Spring Boot 中整合、使用 WebSocket

    WebSocket 是一种基于 TCP 协议的全双工通信协议,它允许客户端和服务器之间建立持久的、双向的通信连接。相比传统的 HTTP 请求 - 响应模式,WebSocket 提供了实时、低延迟的数据传输能力。通过 WebSocket,客户端和服务器可以在任意时间点互相发送消息,实现实时更新和即时通

    2024年04月13日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包