WebSocket服务端数据推送及心跳机制(Spring Boot + VUE)

这篇具有很好参考价值的文章主要介绍了WebSocket服务端数据推送及心跳机制(Spring Boot + VUE)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、WebSocket简介

HTML5规范在传统的web交互基础上为我们带来了众多的新特性,随着web技术被广泛用于web APP的开发,这些新特性得以推广和使用,而websocket作为一种新的web通信技术具有巨大意义。WebSocket是HTML5新增的协议,它的目的是在浏览器和服务器之间建立一个不受限的双向通信的通道,比如说,服务器可以在任意时刻发送消息给浏览器。支持双向通信。

二、WebSocket通信原理及机制

websocket是基于浏览器端的web技术,那么它的通信肯定少不了http,websocket本身虽然也是一种新的应用层协议,但是它也不能够脱离http而单独存在。具体来讲,我们在客户端构建一个websocket实例,并且为它绑定一个需要连接到的服务器地址,当客户端连接服务端的时候,会向服务端发送一个消息报文

三、WebSocket特点和优点

1、支持双向通信,实时性更强。
2、更好的二进制支持。
3、较少的控制开销。连接创建后,ws客户端、服务端进行数据交换时,协议控制的数据包头部较小。在不包含头部的情况下,服务端到客户端的包头只有2~10字节(取决于数据包长度),客户端到服务端的的话,需要加上额外的4字节的掩码。而HTTP协议每次通信都需要携带完整的头部。
4、支持扩展。ws协议定义了扩展,用户可以扩展协议,或者实现自定义的子协议。(比如支持自定义压缩算法等)
5、建立在tcp协议之上,服务端实现比较容易
6、数据格式比较轻量,性能开销小,通信效率高
7、和http协议有着良好的兼容性,默认端口是80和443,并且握手阶段采用HTTP协议,因此握手的时候不容易屏蔽,能通过各种的HTTP代理

四、WebSocket心跳机制

在使用websocket过程中,可能会出现网络断开的情况,比如信号不好,或者网络临时性关闭,这时候websocket的连接已经断开,而浏览器不会执行websocket 的 onclose方法,我们无法知道是否断开连接,也就无法进行重连操作。如果当前发送websocket数据到后端,一旦请求超时,onclose便会执行,这时候便可进行绑定好的重连操作。

       心跳机制是每隔一段时间会向服务器发送一个数据包,告诉服务器自己还活着,同时客户端会确认服务器端是否还活着,如果还活着的话,就会回传一个数据包给客户端来确定服务器端也还活着,否则的话,有可能是网络断开连接了。需要重连~
 

五、在后端Spring Boot 和前端VUE中如何建立通信

1、在Spring Boot 中 pom.xml中添加 websocket依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2、创建 WebSocketConfig.java 开启websocket支持


 
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
 
/**
 * 开启WebSocket支持
 * 
 */
@Configuration
public class WebSocketConfig {
 
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
 
        return new ServerEndpointExporter();
    }
 
}

3、创建 WebSocketServer.java 链接

package com.mes.dispatch.socket;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;

/** @Author: best_liu
 * @Description:WebSocket服务
 * @Date: 13:05 2023/8/31
 * @Param
 * @return
 **/

@ServerEndpoint("/websocket/processSocket/{userId}")
@Slf4j
@Component
public class WebSocketServer {
    /**
     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
     */
    private static int onlineCount = 0;
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    /**
     * 接收userId
     */
    private String userId = "";

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        this.session = session;
        this.userId = userId;
        if (webSocketMap.containsKey(userId)) {
            webSocketMap.remove(userId);
            //加入set中
        } else {
            webSocketMap.put(userId, this);
            //加入set中
            addOnlineCount();
            //在线数加1
        }

        log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());

        try {
            HashMap<Object, Object> map = new HashMap<>();
            map.put("key", "连接成功");
            sendMessage(JSON.toJSONString(map));
        } catch (IOException e) {
            log.error("用户:" + userId + ",网络异常!!!!!!");
        }
    }


    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        if (webSocketMap.containsKey(userId)) {
            webSocketMap.remove(userId);
            //从set中删除
            subOnlineCount();
        }
        log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用户消息:" + userId + ",报文:" + message);
        //可以群发消息
        //消息保存到数据库、redis
        if (StringUtils.isNotBlank(message)) {
            try {
                //解析发送的报文
                JSONObject jsonObject = JSONObject.parseObject(message);
                //追加发送人(防止串改)
                jsonObject.put("fromUserId", this.userId);
                String fromUserId = jsonObject.getString("fromUserId");
                //传送给对应toUserId用户的websocket
                if (StringUtils.isNotBlank(fromUserId) && webSocketMap.containsKey(fromUserId)) {
                    webSocketMap.get(fromUserId).sendMessage(jsonObject.toJSONString());
                    //自定义-业务处理

//                    DeviceLocalThread.paramData.put(jsonObject.getString("group"),jsonObject.toJSONString());
                } else {
                    log.error("请求的userId:" + fromUserId + "不在该服务器上");
                    //否则不在这个服务器上,发送到mysql或者redis
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 发生错误时候
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) throws IOException {
        //加入线程锁
        synchronized (session) {
            try {
                //同步发送信息
                this.session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                log.error("服务器推送失败:" + e.getMessage());
            }
        }
    }

    /** @Author: best_liu
     * @Description:发送自定义消息
     * @Date: 13:01 2023/8/31
     * @Param [message, toUserId]
     * @return void
     **/
    public static void sendInfo(String message, String toUserId) throws IOException {
        //如果userId为空,向所有群体发送
        if (StringUtils.isEmpty(toUserId)) {
            //向所有用户发送信息
            Iterator<String> itera = webSocketMap.keySet().iterator();
            while (itera.hasNext()) {
                String keys = itera.next();
                WebSocketServer item = webSocketMap.get(keys);
                item.sendMessage(message);
            }
        }
        //如果不为空,则发送指定用户信息
        else if (webSocketMap.containsKey(toUserId)) {
            WebSocketServer item = webSocketMap.get(toUserId);
            item.sendMessage(message);
        } else {
            log.error("请求的userId:" + toUserId + "不在该服务器上");
        }
    }


    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }

    public static synchronized ConcurrentHashMap<String, WebSocketServer> getWebSocketMap() {
        return WebSocketServer.webSocketMap;
    }

}

4、创建一个测试调用websocket发送消息 TimerSocketMessage.java (用定时器发送推送消息


 
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
 
import java.util.HashMap;
import java.util.Map;
 
 
@Component
@EnableScheduling
public class TimerSocketMessage {
 
    /**
     * 推送消息到前台
     */
    @Scheduled(cron = "*/5 * * * * * ")
    public void SocketMessage(){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Map<String, Object> maps = new HashMap<>();
        maps.put("type", "sendMessage");
        maps.put("data", sdf.format(new Date()));
        WebSocketServer.sendInfo(maps);
    }
}

5、在VUE中创建和后端 websocket服务的连接并建立心跳机制。

<template>
  <div>
    <el-row :gutter="$ui.layout.gutter.g10">
      <el-col :span="$ui.layout.span.one" style="background-color: #FFFFFF; padding: 10px;">
        <el-form ref="form" :model="form" label-width="80px" size="small" :inline="true">
          <el-form-item label="生成数量">
            <el-input-number v-model="form.number" :min="1" :max="999" label="描述文字" />
          </el-form-item>

          <el-form-item label="数值范围">
            <el-input-number v-model="form.start" :min="1" :max="9999999999" label="描述文字" /> ~
            <el-input-number v-model="form.end" :min="1" :max="9999999999" label="描述文字" />
          </el-form-item>

          <el-form-item>
            <el-button size="mini" type="primary" @click="spawn">生成</el-button>
          </el-form-item>
        </el-form>

      </el-col>
    </el-row>
    <h1> websocket 消息推送测试:{{data}}</h1>

  </div>
</template>

<script>
export default {
  name: 'Index',
  data() {
    return {
      form: {
        number: 1,
        start: 1,
        end: 100
      },
      data:0,
      timeout: 28 * 1000,//30秒一次心跳
      timeoutObj: null,//心跳心跳倒计时
      serverTimeoutObj: null,//心跳倒计时
      timeoutnum: null,//断开 重连倒计时
      websocket: null,

    }
  },
  created () {
    // 初始化websocket
    this.initWebSocket()
  },
  methods: {
    spawn() {
      
    },
    //socket--start
    initWebSocket() {
        let url = 'ws://localhost/dev-api/process/websocket/processSocket/zkawsystem'
        this.websocket = new WebSocket(url)
        // 连接错误
        this.websocket.onerror = this.setErrorMessage

        // 连接成功
        this.websocket.onopen = this.setOnopenMessage

        // 收到消息的回调
        this.websocket.onmessage = this.setOnmessageMessage

        // 连接关闭的回调
        this.websocket.onclose = this.setOncloseMessage

        // 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
        window.onbeforeunload = this.onbeforeunload
      },
      reconnect() { // 重新连接
        if (this.lockReconnect) return;
        this.lockReconnect = true;
        //没连接上会一直重连,设置延迟避免请求过多
        this.timeoutnum && clearTimeout(this.timeoutnum);
        this.timeoutnum = setTimeout(() => {
          //新连接
          this.initWebSocket();
          this.lockReconnect = false;
        }, 5000);
      },
      reset() { // 重置心跳
        // 清除时间
        clearTimeout(this.timeoutObj);
        clearTimeout(this.serverTimeoutObj);
        // 重启心跳
        this.start();
      },
      start() { // 开启心跳
        this.timeoutObj && clearTimeout(this.timeoutObj);
        this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
        this.timeoutObj = setTimeout(() => {
          // 这里发送一个心跳,后端收到后,返回一个心跳消息,
          if (this.websocket && this.websocket.readyState == 1) { // 如果连接正常
            let actions = { "heartbeat": "12345" };
            this.websocketsend(JSON.stringify(actions));
          } else { // 否则重连
            this.reconnect();
          }
          this.serverTimeoutObj = setTimeout(() => {
            //超时关闭
            this.websocket.close();
          }, this.timeout);

        }, this.timeout)
      },
      setOnmessageMessage(event) {
        let obj = JSON.parse(event.data);
        console.log("obj", obj)
        switch (obj.type) {
          case 'heartbeat':
            //收到服务器信息,心跳重置
            this.reset();
            break;
          case 'sendMessage':
            this.data = obj.data
            console.log("接收到的服务器消息:", obj.data)
        }

      },
      setErrorMessage() {
        //重连
        this.reconnect();
        console.log("WebSocket连接发生错误" + '   状态码:' + this.websocket.readyState)
      },
      setOnopenMessage() {
        //开启心跳
        this.start();
        console.log("WebSocket连接成功" + '   状态码:' + this.websocket.readyState)
      },
      setOncloseMessage() {
        //重连
        this.reconnect();
        console.log("WebSocket连接关闭" + '   状态码:' + this.websocket.readyState)
      },
      onbeforeunload() {
        this.closeWebSocket();
      },
      //websocket发送消息
      websocketsend(messsage) {
        this.websocket.send(messsage)
      },
      closeWebSocket() { // 关闭websocket
        this.websocket.close()
      },
      //socket--end

  }
}
</script>

6、启动项目开始测试结果

WebSocket服务端数据推送及心跳机制(Spring Boot + VUE),websocket,spring boot,vue.js

 7、vue文件连接websocket的url地址要拼接 context-path: /demo

六、websocket不定时出现1005错误

后台抛出异常如下:

Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: WebSocket close status code does NOT comply with RFC-6455: 1005
Caused by: java.lang.IllegalArgumentException: WebSocket close status code does NOT comply with RFC-6455: 1005

分析原因是:

spring cloud gateway 转发websocket请求无法监听到 close 事件 没有收到预期的状态码

解决方案:

在gateway进行请求拦截

代码如下:

package com.mes.gateway.filter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import org.springframework.web.reactive.socket.server.WebSocketService;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;

import java.net.URI;
import java.util.*;

/**
 * @Author: best_liu
 * @Description:解决websocket关闭异常 问题
 * @Desc websocket客户端主动断开连接,网关服务报错1005
 * @Date Create in 11:15 2023/10/25
 * @Modified By:
 */
@Component
public class CustomWebsocketRoutingFilter implements GlobalFilter, Ordered {

    private static final Logger log = LoggerFactory.getLogger(AuthFilter.class);

    //Sec-Websocket protocol
    public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";
    //Sec-Websocket header
    public static final String SEC_WEBSOCKET_HEADER = "sec-websocket";
    //http header schema
    public static final String HEADER_UPGRADE_WebSocket = "websocket";
    public static final String HEADER_UPGRADE_HTTP = "http";
    public static final String HEADER_UPGRADE_HTTPS = "https";
    private final WebSocketClient webSocketClient;
    private final WebSocketService webSocketService;
    private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;
    // 不直接使用 headersFilters 用该变量代替
    private volatile List<HttpHeadersFilter> headersFilters;

    public CustomWebsocketRoutingFilter(WebSocketClient webSocketClient, WebSocketService webSocketService, ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider) {
        this.webSocketClient = webSocketClient;
        this.webSocketService = webSocketService;
        this.headersFiltersProvider = headersFiltersProvider;
    }

    /* for testing */
    //http请求转为ws请求
    static String convertHttpToWs(String scheme) {
        scheme = scheme.toLowerCase();
        return "http".equals(scheme) ? "ws" : "https".equals(scheme) ? "wss" : scheme;
    }

    @Override
    public int getOrder() {
        // Before NettyRoutingFilter since this routes certain http requests
        //修改了这里 之前是-1 降低优先级
        return Ordered.LOWEST_PRECEDENCE - 2;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        changeSchemeIfIsWebSocketUpgrade(exchange);

        URI requestUrl = exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        String scheme = requestUrl.getScheme();

        if (ServerWebExchangeUtils.isAlreadyRouted(exchange) || (!"ws".equals(scheme) && !"wss".equals(scheme))) {
            return chain.filter(exchange);
        }
        ServerWebExchangeUtils.setAlreadyRouted(exchange);

        HttpHeaders headers = exchange.getRequest().getHeaders();
        HttpHeaders filtered = HttpHeadersFilter.filterRequest(getHeadersFilters(), exchange);

        List<String> protocols = getProtocols(headers);

        return this.webSocketService.handleRequest(exchange, new ProxyWebSocketHandler(requestUrl, this.webSocketClient, filtered, protocols));
    }

    /* for testing */
    //获取请求头里的协议信息
    List<String> getProtocols(HttpHeaders headers) {
        List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
        if (protocols != null) {
            ArrayList<String> updatedProtocols = new ArrayList<>();
            for (int i = 0; i < protocols.size(); i++) {
                String protocol = protocols.get(i);
                updatedProtocols.addAll(Arrays.asList(StringUtils.tokenizeToStringArray(protocol, ",")));
            }
            protocols = updatedProtocols;
        }
        return protocols;
    }

    /* for testing */
    List<HttpHeadersFilter> getHeadersFilters() {
        if (this.headersFilters == null) {
            this.headersFilters = this.headersFiltersProvider.getIfAvailable(ArrayList::new);

            // remove host header unless specifically asked not to
            this.headersFilters.add((headers, exchange) -> {
                HttpHeaders filtered = new HttpHeaders();
                filtered.addAll(headers);
                filtered.remove(HttpHeaders.HOST);
                boolean preserveHost = exchange.getAttributeOrDefault(ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE, false);
                if (preserveHost) {
                    String host = exchange.getRequest().getHeaders().getFirst(HttpHeaders.HOST);
                    filtered.add(HttpHeaders.HOST, host);
                }
                return filtered;
            });

            this.headersFilters.add((headers, exchange) -> {
                HttpHeaders filtered = new HttpHeaders();
                for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
                    if (!entry.getKey().toLowerCase().startsWith(SEC_WEBSOCKET_HEADER)) {
                        filtered.addAll(entry.getKey(), entry.getValue());
                    }
                }
                return filtered;
            });
        }

        return this.headersFilters;
    }

    static void changeSchemeIfIsWebSocketUpgrade(ServerWebExchange exchange) {
        // 检查版本是否适合
        URI requestUrl = exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        String scheme = requestUrl.getScheme().toLowerCase();
        String upgrade = exchange.getRequest().getHeaders().getUpgrade();
        // change the scheme if the socket client send a "http" or "https"
        if (HEADER_UPGRADE_WebSocket.equalsIgnoreCase(upgrade) && (HEADER_UPGRADE_HTTP.equals(scheme) || HEADER_UPGRADE_HTTPS.equals(scheme))) {
            String wsScheme = convertHttpToWs(scheme);
            boolean encoded = ServerWebExchangeUtils.containsEncodedParts(requestUrl);
            URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build(encoded).toUri();
            exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, wsRequestUrl);
            if (log.isTraceEnabled()) {
                log.trace("changeSchemeTo:[" + wsRequestUrl + "]");
            }
        }
    }

    //自定义websocket处理方式
    private static class ProxyWebSocketHandler implements WebSocketHandler {

        private final WebSocketClient client;
        private final URI url;
        private final HttpHeaders headers;
        private final List<String> subProtocols;

        ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers, List<String> protocols) {
            this.client = client;
            this.url = url;
            this.headers = headers;
            if (protocols != null) {
                this.subProtocols = protocols;
            } else {
                this.subProtocols = Collections.emptyList();
            }
        }

        @Override
        public List<String> getSubProtocols() {
            return this.subProtocols;
        }

        @Override
        public Mono<Void> handle(WebSocketSession session) {
            return this.client.execute(this.url, this.headers, new WebSocketHandler() {
                private CloseStatus adaptCloseStatus(CloseStatus closeStatus) {
                    int code = closeStatus.getCode();
                    if (code > 2999 && code < 5000) {
                        return closeStatus;
                    }
                    switch (code) {
                        case 1000:
                            //正常关闭
                            return closeStatus;
                        case 1001:
                            //服务器挂了或者页面跳转
                            return closeStatus;
                        case 1002:
                            //协议错误
                            return closeStatus;
                        case 1003:
                            //收到了不能处理的数据类型
                            return closeStatus;
                        case 1004:
                            // 预留关闭状态码
                            return CloseStatus.PROTOCOL_ERROR;
                        case 1005:
                            // 预留关闭状态码 期望收到状态码但是没有收到
                            return CloseStatus.PROTOCOL_ERROR;
                        case 1006:
                            // 预留关闭状态码 连接异常关闭
                            return CloseStatus.PROTOCOL_ERROR;
                        case 1007:
                            //收到的数据与实际的消息类型不匹配
                            return closeStatus;
                        case 1008:
                            //收到不符合规则的消息
                            return closeStatus;
                        case 1009:
                            //收到太大的不能处理的消息
                            return closeStatus;
                        case 1010:
                            //client希望server提供多个扩展,server没有返回相应的扩展信息
                            return closeStatus;
                        case 1011:
                            //server遇到不能完成的请求
                            return closeStatus;
                        case 1012:
                            // Not in RFC6455
                            // return CloseStatus.SERVICE_RESTARTED;
                            return CloseStatus.PROTOCOL_ERROR;
                        case 1013:
                            // Not in RFC6455
                            // return CloseStatus.SERVICE_OVERLOAD;
                            return CloseStatus.PROTOCOL_ERROR;
                        case 1015:
                            // 不能进行TLS握手 如:server证书不能验证
                            return CloseStatus.PROTOCOL_ERROR;
                        default:
                            return CloseStatus.PROTOCOL_ERROR;
                    }
                }

                /**
                 * send      发送传出消息
                 * receive   处理入站消息流
                 * doOnNext  对每条消息做什么
                 * zip       加入流
                 * then      返回接收完成时完成的Mono<Void>
                 */
                @Override
                public Mono<Void> handle(WebSocketSession proxySession) {
                    Mono<Void> serverClose = proxySession.closeStatus().filter(__ -> session.isOpen())
                            .map(this::adaptCloseStatus)
                            .flatMap(session::close);
                    Mono<Void> proxyClose = session.closeStatus().filter(__ -> proxySession.isOpen())
                            .map(this::adaptCloseStatus)
                            .flatMap(proxySession::close);
                    // Use retain() for Reactor Netty
                    Mono<Void> proxySessionSend = proxySession
                            .send(session.receive().doOnNext(WebSocketMessage::retain));
                    Mono<Void> serverSessionSend = session
                            .send(proxySession.receive().doOnNext(WebSocketMessage::retain));
                    // Ensure closeStatus from one propagates to the other
                    Mono.when(serverClose, proxyClose).subscribe();
                    // Complete when both sessions are done
                    return Mono.zip(proxySessionSend, serverSessionSend).then();
                }
                @Override
                public List<String> getSubProtocols() {
                    return CustomWebsocketRoutingFilter.ProxyWebSocketHandler.this.subProtocols;
                }
            });
        }
    }


}

七、WebSocket+Redis实现离线消息推送

1、WebSocket消息机制redis工具类


import com.mes.process.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;


/** @Author: best_liu
 * @Description:WebSocket消息推送redis工具类
 * @Date: 13:50 2023/11/28
 * @Param
 * @return
 **/
@Component
@Slf4j
public class WebSocketRedisUtil {

    /**
     * 功能描述:将JavaBean对象的信息缓存进Redis
     *
     * @param message 信息JavaBean
     * @return 是否保存成功
     */
    public static boolean saveCacheChatMessage(String key, String message) {
        //判断key是否存在
        if (RedisUtil.hasKey(key)) {
            //将javabean对象添加到缓存的list中
            long redisSize = RedisUtil.lGetListSize(key);
            System.out.println("redis当前数据条数" + redisSize);
            Long index = RedisUtil.rightPushValue(key, message);
            System.out.println("redis执行rightPushList返回值:" + index);
            return redisSize<index;
        } else {
            //不存在key时,将chatVO存进缓存,并设置过期时间
//            JSONArray jsonArray=new JSONArray();
//            jsonArray.add(message);
            boolean isCache = RedisUtil.lSet(key, message);
            //保存成功,设置过期时间   暂不设置失效时间
            if (isCache) {
//                RedisUtil.expire(key, 3L, TimeUnit.DAYS);
                System.out.println("存储成功"+message);
            }
            return isCache;
        }
    }

    /**
     * 功能描述:从缓存中读取信息
     *
     * @param key 缓存信息的键
     * @return 缓存中信息list
     */
    public static List<Object> getCacheChatMessage(String key) {
        List<Object> chatList = null;
        //判断key是否存在
        if (RedisUtil.hasKey(key)) {
            chatList = RedisUtil.getOpsForList(key);
        } else {
            log.info("redis缓存中无此键值:" + key);
        }
        return chatList;
    }

    /**
     * 功能描述: 在缓存中删除信息
     *
     * @param key 缓存信息的键
     */
    public static void deleteCacheChatMessage(String key) {
        //判断key是否存在
        if (RedisUtil.hasKey(key)) {
            RedisUtil.del(key);
        }
    }

}
2、RedisUtil普通redis工具类



import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

/** @Author: best_liu
 * @Description:
 * @Date: 13:50 2023/11/28
 * @Param 
 * @return 
 **/
@Slf4j
@Component
public class RedisUtil {

    private static RedisTemplate redisTemplate;

    @Autowired
    public void setRedisTemplate(RedisTemplate redisTemplate) {
        RedisUtil.redisTemplate = redisTemplate;
    }


    //=============================common============================

    /**
     * 指定缓存失效时间
     *
     * @param key  键
     * @param time 时间(秒)
     * @return
     */
    public static boolean expire(String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            log.error("设置redis指定key失效时间错误:", e);
            return false;
        }
    }

    /**
     * 根据key 获取过期时间
     *
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效 失效时间为负数,说明该主键未设置失效时间(失效时间默认为-1)
     */
    public static Long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }

    /**
     * 判断key是否存在
     *
     * @param key 键
     * @return true 存在 false 不存在
     */
    public static Boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            log.error("redis判断key是否存在错误:", e);
            return false;
        }
    }

    /**
     * 删除缓存
     *
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public static void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete(Arrays.asList(key));
            }
        }
    }

    //============================String=============================

    /**
     * 普通缓存获取
     *
     * @param key 键
     * @return 值
     */
    @SuppressWarnings("unchecked")
    public static <T> T get(String key) {
        return key == null ? null : (T) redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     *
     * @param key   键
     * @param value 值
     * @return true成功 false失败
     */
    public static boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            log.error("设置redis缓存错误:", e);
            return false;
        }

    }

    /**
     * 普通缓存放入并设置时间
     *
     * @param key   键
     * @param value 值
     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */
    public static boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }



//    ===================================自定义工具扩展===========================================

    /**
     * HashGet
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return 值
     */
    public Object hget ( String key, String item ) {
        return redisTemplate.opsForHash().get(key, item);
    }

//    /**
//     * 获取hashKey对应的所有键值
//     *
//     * @param key 键
//     * @return 对应的多个键值
//     */
//    public static Map<Object, Object> hmget (String key ) {
//        return redisTemplate.opsForHash().entries(key);
//    }
    /**
     * 获取hashKey对应的所有键值
     *
     * @param key 键
     * @return 对应的多个键值
    //     */
//    public static List<Object> hmget (String key ) {
//        return redisTemplate.opsForList().ge;
//    }

    /**
     * 获取list缓存的长度
     *
     * @param key 键
     * @return
     */
    public static   long lGetListSize ( String key ) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 功能描述:在list的右边添加元素
     * 如果键不存在,则在执行推送操作之前将其创建为空列表
     *
     * @param key 键
     * @return value 值
     * @author RenShiWei
     * Date: 2020/2/6 23:22
     */
    public static Long rightPushValue ( String key, Object value ) {

        return redisTemplate.opsForList().rightPush(key, value);
    }
    /**
     * 功能描述:获取缓存中所有的List key
     *
     * @param key 键
     */
    public static   List<Object> getOpsForList ( String key) {
        return redisTemplate.opsForList().range(key, 0, redisTemplate.opsForList().size(key));
    }

    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @return
     */
    public static boolean lSet ( String key, Object value ) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 将list放入缓存
     *
     * @param key   键
     * @param value 值
     * @return
     */
    public boolean lSet ( String key, List<Object> value ) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
}
3、WebSocket操作类中加入一下两个方法

方法1:查询是否有离线消息并推送

/**
     * 查询是否有离线消息并推送
     *
     */
    public void cacheMessageContains(String userId){
        //是否有暂存的消息,如果有则发送消息
        String user = "socket-"+userId.split("-")[0];
        List<Object> Strings = WebSocketRedisUtil.getCacheChatMessage(user);

        if (Strings!=null) {
            //取出消息列表
            List<Object> list = Strings;
            if (list == null) {
                log.info("暂无缓存的消息");
            }
            list.forEach(message -> {
                //暂时群发消息
                WebSocketServer item = webSocketMap.get(userId);
                try {
                    item.sendMessage(message.toString());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            log.info("用户缓存的消息发送成功一共:"+list.size()+"条");
            list = null;
            WebSocketRedisUtil.deleteCacheChatMessage(user);
        }
    }

方法2:暂存离线消息

/**
     * 暂存离线消息
     *
     */
    public static void cacheMessagePut(String userId, String message){
//        //把新消息添加到消息列表
        if (!StringUtils.isEmpty(message)){
            boolean isCache = WebSocketRedisUtil.saveCacheChatMessage("socket-"+userId, message);
            if (isCache){
                log.info("消息暂存成功" + message);
            }else{
                log.error("消息暂存失败" + message);
            }
        }
    }

在用户上线的时候调用方法1;在消息推送判断用户不在线是调用方法2.文章来源地址https://www.toymoban.com/news/detail-654577.html

到了这里,关于WebSocket服务端数据推送及心跳机制(Spring Boot + VUE)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Vue中WebSocket链接中断、心跳机制防止自动断开

    1、WebSocket链接中断原因 WebSocket断开的原因有很多,最好在WebSocket断开时,将错误打印出来。 二、心跳机制防止自动断开 WebSocket在一段时间内没有进行通讯便会自读断开链接,可以每隔30秒或一分钟向服务器发送一次通讯防止链接终端

    2024年02月16日
    浏览(45)
  • WebSocket心跳机制/服务器端开连接(JS前端)

    情景: 前端使用 WebSocket 的时候,后端长时间没有推送数据,导致 WebSocket 连接经常断开,后端也会报错。 解决方法: 通过 心跳机制 让前端和后端始终保持连接。 代码: 使用方法: 注意: 后端收到以后需要给前端返回数据,否则还是无法保持连接 代码参考了:https://bl

    2024年02月12日
    浏览(36)
  • Spring Boot集成WebSocket实现消息推送

    项目中经常会用到消息推送功能,关于推送技术的实现,我们通常会联想到轮询、comet长连接技术,虽然这些技术能够实现,但是需要反复连接,对于服务资源消耗过大,随着技术的发展,HtML5定义了WebSocket协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。

    2023年04月08日
    浏览(44)
  • Spring Boot进阶(48):【实战教程】SpringBoot集成WebSocket轻松实现实时消息推送

            WebSocket是一种新型的通信协议,它可以在客户端与服务器端之间实现双向通信,具有低延迟、高效性等特点,适用于实时通信场景。在SpringBoot应用中,集成WebSocket可以方便地实现实时通信功能,如即时聊天、实时数据传输等。         本文将介绍如何在Sprin

    2024年02月09日
    浏览(56)
  • WebSocket心跳机制

    WebSocket是HTML5开始提供的一种浏览器与服务器进行全双工通讯的网络技术,属于应用层协议。 WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。 1、创建webSocket 2、websocket事件 事件 事件处理程序 描述 open Socket.onopen 连接建立时触发

    2024年02月15日
    浏览(55)
  • WebSocket心跳机制(笔记大全)

    一、WebSocket心跳机制前端 前端实现WebSocket心跳机制的方式主要有两种: 使用setInterval定时发送心跳包。 在前端监听到WebSocket的onclose()事件时,重新创建WebSocket连接。 第一种方式会对服务器造成很大的压力,因为即使WebSocket连接正常,也要定时发送心跳包,从而消耗服务器资

    2024年02月15日
    浏览(42)
  • websocket 心跳机制

    WebSocket 是一种在客户端和服务器之间创建持久连接的技术。为了保持连接的稳定性,就需要通过发送心跳消息来维持 WebSocket 连接。 1、创建一个webscoket基本的使用 2、在客户端连接到 WebSocket 服务器之后,通过 setInterval 方法定时发送心跳消息 这边的代码会每隔5秒向服务器发

    2024年02月11日
    浏览(36)
  • uniapp websocket机制 心跳 重连

    在开发程序过程中通信功能还是比较常用到的,本文主要介绍的是uniapp中websocket的使用 websocket建立连接后,断开、心跳机制重新链接的一个过程。 关于uni.connectSocket可仔细阅读uniapp官网中的uni.connetSocket以及连接socket创建的实例 SocketTask   具体代码如下:内有代码详细注解,

    2024年02月12日
    浏览(37)
  • websocket以及心跳机制的实现

            在浏览器与服务器通信之间,传统的http请求在某些场景下并不理想,比如实时聊天,实时性的小游戏等等, 其中面临主要的两个缺点: 无法做到消息的实时性 服务器无法主动推送信息 其基于http的主要解决方案有: 基于ajax的轮询:客户端定时或者动态相隔短时

    2024年02月04日
    浏览(37)
  • WebSocket断开原因和心跳机制

    WebSocket断开的原因有很多,最好在WebSocket断开时,将错误打印出来。 错误状态码: WebSocket断开时,会触发CloseEvent, CloseEvent会在连接关闭时发送给使用 WebSockets 的客户端. 它在 WebSocket 对象的 onclose 事件监听器中使用。CloseEvent的code字段表示了WebSocket断开的原因。可以从该字段

    2024年02月04日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包