【Spring Boot 实现 WebSocket实时数据推送-服务端】

这篇具有很好参考价值的文章主要介绍了【Spring Boot 实现 WebSocket实时数据推送-服务端】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、WebSocket配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author HFL
 * @date 2022/5/16 14:49
 * 配置类
 */
@Configuration
public class WebSocketConfiguration {

    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
    
}
二、WebSocket服务端类
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;

/**
 * @author HFL
 * @date 2022/5/16 15:17
 * Websocket应用实现:
 *  1.建立连接,连接放入连接池
 *  2.关闭连接,连接移出连接池
 *  3.接收客户端发送的消息,并做出相应处理
 *  4.注入业务层的service
 *  [注意:Spring管理的Bean是单例模式的,而WebSocket不是单例,注入时需要处理一下]
 *  5.异常处理,连接移除连接池
*/
@Slf4j
@Component
@ServerEndpoint("/endPoint/{screen}")
public class WebSocketServer {

    /**
     * 建立连接成功调用 (Session + 场景)
     */
    @OnOpen
    public void onOpen(Session session,@PathParam("screen") String screen) throws IOException {
        log.info("[onOpen][session({}) 接入, [screen: {}]", session, screen);
        WebSocketServerPool.addDataConnect(session, screen);
        //WebSocketServerPool.sendMessage(session, configurationScreenService.queryAllJsonById(screen));
    }

    /**
     * 关闭连接时调用
     * @param session 连接
     */
    @OnClose
    public void onClose(Session session, CloseReason closeReason) {
        log.info("[onClose][session({}) 连接关闭。关闭原因是({})}]", session, closeReason);
        WebSocketServerPool.removeConnect(session);
    }

    /**
     * 错误时调用
     * @param session 连接
     * @param throwable 异常
     */
    @OnError
    public void onError(Session session, Throwable throwable) {
        log.info("[onClose][session({}) 发生异常]", session, throwable);
        WebSocketServerPool.removeConnect(session);
    }

    /**
     * 收到客户端信息后,根据接收到的信息进行处理
     * @param session 连接
     * @param message 数据消息
     */
    @OnMessage
    public void onMessage(Session session, String message) {
        log.info("[onOpen][session({}) 接收到一条消息({})]", session, message);
        // TODO: 2022/5/18 对于客户端发送的指令信息,解析后进行对应的逻辑处理
    }

}
三、WebSocket的连接池类
import javax.websocket.Session;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * @author HFL
 * @date 2022/5/16 9:39
 * Websocket连接池、对连接池内连接操作 和数据推送方法
 */
public class WebSocketServerPool {

    /**
     * WebSocket连接池
     */
    private static ConcurrentMap<Session, String> dataConnect = new ConcurrentHashMap<>();

    /**
     * 将websocket连接,放入连接池
     * @param session websocket连接
     * @param screen 场景ID
     */
    public static void addDataConnect(Session session, String screen){
        dataConnect.put(session, screen);
        Iterator<Map.Entry<Session, String>> iterator = dataConnect.entrySet().iterator();
        synchronized (iterator){
            //移除失效连接
            while(iterator.hasNext()){
                Map.Entry<Session, String> entry = iterator.next();
                Session sessionNew = entry.getKey();
                Map<String, Object> userProperties = sessionNew.getUserProperties();
                if(null != userProperties && null != userProperties.get("ReadyState") && "0" != String.valueOf(userProperties.get("ReadyState"))){
                    iterator.remove();
                }
            }
        }
    }

    /**
     * 将websocket连接从连接池中移除
     * @param session websocket连接
     */
    public static void removeConnect(Session session){
        Iterator<Map.Entry<Session, String>> iterator = dataConnect.entrySet().iterator();
        synchronized (iterator){
            //主动移除连接
            while (iterator.hasNext()){
                if(session.equals(iterator.next().getKey())){
                    iterator.remove();
                }
            }
        }
    }

    /**
     * 获取连接池中所有连接
     * @return 连接池所有数据
     */
    public static ConcurrentMap<Session, String> getDataConnect(){
        return dataConnect;
    }

    /**
     * Websocket消息推送
     * @param session 连接
     * @param message 消息主体
     * @throws IOException I/O异常
     */
    public static void sendMessage(Session session, String message) throws IOException {
        session.getBasicRemote().sendText(message);
    }
}
四、启动Spring Boot服务
import com.ljgk.ems.maitreya.user.annotation.EnableLoginArgResolver;
import com.ljgk.ems.maitreya.validator.config.EnableFormValidator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.EnableScheduling;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
 * @author HFL
 * @date 2022/5/13 8:40
 * 仅做测试,只需要@SpringBootApplication注释即可,
 * 其他注释,用于其他的功能,例如:
 *   @EnableScheduling用于扫描开启定时任务即@Scheduled注解的方法
 */
@SpringBootApplication
@EnableDiscoveryClient
@Configuration
@EnableFeignClients(value = {"com.ljgk.ems.maitreya"})
@EnableAspectJAutoProxy(proxyTargetClass = true, exposeProxy = true)
@Slf4j
@EnableLoginArgResolver
@EnableFormValidator
@EnableScheduling
public class ServerApplication {

    public static void main(String[] args) throws UnknownHostException {
        ConfigurableApplicationContext application = SpringApplication.run(ConfigurationServerApplication.class, args);
    }
    
}
五、测试WebSocket连接
  1. WebSocket在线测试工具:
    http://www.easyswoole.com/wstool.html
  2. 测试连接
    服务地址:ws://172.18.42.29:14785/endPoint/1
    服务启动的IP:172.18.42.29
    服务端口:14785
    WS的URl:/endPoint
    入参:1
    【Spring Boot 实现 WebSocket实时数据推送-服务端】
六、遇到的问题
  1. 服务启动报错:
    【Spring Boot 实现 WebSocket实时数据推送-服务端】

    2022-06-09 10:31:27.616:[ERROR] [main:18941] [org.springframework.boot.SpringApplication.reportFailure:826] --> Application run failed 
    java.lang.IllegalStateException: Failed to register @ServerEndpoint class: class com.ljgk.ems.maitreya.configuration.websocket.WebSocketServer$$EnhancerBySpringCGLIB$$8a624780
    	at org.springframework.web.socket.server.standard.ServerEndpointExporter.registerEndpoint(ServerEndpointExporter.java:159)
    	at org.springframework.web.socket.server.standard.ServerEndpointExporter.registerEndpoints(ServerEndpointExporter.java:134)
    	at org.springframework.web.socket.server.standard.ServerEndpointExporter.afterSingletonsInstantiated(ServerEndpointExporter.java:112)
    	at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:896)
    	at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878)
    	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550)
    	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
    	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)
    	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
    	at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
    	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
    	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
    	at com.ljgk.ems.maitreya.ConfigurationServerApplication.main(ConfigurationServerApplication.java:36)
    Caused by: javax.websocket.DeploymentException: UT003027: Class class com.ljgk.ems.maitreya.configuration.websocket.WebSocketServer$$EnhancerBySpringCGLIB$$8a624780 was not annotated with @ClientEndpoint or @ServerEndpoint
    	at io.undertow.websockets.jsr.ServerWebSocketContainer.addEndpointInternal(ServerWebSocketContainer.java:735)
    	at io.undertow.websockets.jsr.ServerWebSocketContainer.addEndpoint(ServerWebSocketContainer.java:628)
    	at org.springframework.web.socket.server.standard.ServerEndpointExporter.registerEndpoint(ServerEndpointExporter.java:156)
    	... 12 common frames omitted
    

    原因:
    WebSocketServer类被代理了,我出现这个问题的原因是做了整个项目的接口拦截,然后做了接口的日志切面处理,导致这个类被代理了,而@ServerEndpoint注解在处理WebSocketServer时,取到了代理的类,无法处理导致的异常。

    解决办法:

    ​ 将WebSocketServer类从日志处理的切面中排除掉即可。

  2. 加入业务处理时报错:
    【Spring Boot 实现 WebSocket实时数据推送-服务端】

    java.lang.NullPointerException: null
    	at com.ljgk.ems.maitreya.configuration.websocket.WebSocketServer.onOpen(WebSocketServer.java:47)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:497)
    	at io.undertow.websockets.jsr.annotated.BoundMethod.invoke(BoundMethod.java:87)
    	at io.undertow.websockets.jsr.annotated.AnnotatedEndpoint$3.run(AnnotatedEndpoint.java:158)
    	at io.undertow.websockets.jsr.ServerWebSocketContainer$1.call(ServerWebSocketContainer.java:170)
    	at io.undertow.websockets.jsr.ServerWebSocketContainer$1.call(ServerWebSocketContainer.java:167)
    

    原因:
    通过注解@Resource自动注入的ConfigurationScreenService的Bean为空,即Spring容器中,ConfigurationScreenService没有注入进去,因为Spring管理的Bean的作用域和WebSocket的作用域不同,Spring管理的Bean都是单例,WebSocket不是。
    解决办法:

    ​ 将截图中通过@Resource注解注入ConfigurationScreenService的方式换成下面方式:

    /**
     * @author HFL
     * @date 2022/5/16 15:17
     * Websocket应用实现:
     *  1.建立连接,连接放入连接池
     *  2.关闭连接,连接移出连接池
     *  3.接收客户端发送的消息,并做出相应处理
     *  4.注入业务层的service
     *  [注意:Spring管理的Bean是单例模式的,而WebSocket不是单例,注入时需要处理一下]
     *  5.异常处理,连接移除连接池
    */
    @Slf4j
    @Component
    @ServerEndpoint("/endPoint/{screen}")
    public class WebSocketServer {
    
        private static ConfigurationScreenService configurationScreenService;
    
        @Resource
        public void setConfigurationScreenService(ConfigurationScreenService configurationScreenService){
            WebSocketServer.configurationScreenService = configurationScreenService;
        }
    
        /**
         * 建立连接成功调用 (Session + 场景ID)
         */
        @OnOpen
        public void onOpen(Session session,@PathParam("screen") String screen) throws IOException {
            log.info("[onOpen][session({}) 接入, [screen: {}]", session, screen);
            WebSocketServerPool.addDataConnect(session, screen);
            WebSocketServerPool.sendMessage(session, configurationScreenService.queryAllJsonById(screen));
        }
    	
    	...
    
    }
    
七、扩展(实现实时推送数据)
  • 定时任务,轮询连接池中的连接,并取到对于的场景、绑定的设备,即可查询最新数据,最后推送至客户端。(最简单实现)

    import cn.hutool.json.JSONUtil;
    import com.ljgk.ems.maitreya.configuration.entity.Content;
    import com.ljgk.ems.maitreya.configuration.service.ConfigurationScreenDataService;
    import com.ljgk.ems.maitreya.configuration.vm.ScreenDataJsonVm;
    import com.ljgk.ems.maitreya.configuration.websocket.WebSocketServerPool;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import javax.websocket.Session;
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.ConcurrentMap;
    
    /**
     * @author HFL
     * @date 2022/6/88:35
     */
    @Slf4j
    @Component
    public class ScreenPushScheduledTask {
    
        @Resource
        private ConfigurationScreenDataService configurationScreenDataService;
    
        /**
         * 5秒一次
         * 定时场景数据推送
         */
        @Scheduled(cron = "0/5 * * * * ? ")
        public void executeScreenDataPush(){
            try {
                ConcurrentMap<Session, String> dataConnect = WebSocketServerPool.getDataConnect();
                //查询待推送场景
                dataConnect.keySet().forEach(session -> {
                    try {
                        String screen = dataConnect.get(session);
                        //查询需要的场景对应的元件需要的值,并按规则组装成JSON
                        List<ScreenDataJsonVm> screenDataJsonVms = configurationScreenDataService.queryDataJson(screen);
                        WebSocketServerPool.sendMessage(session, JSONUtil.toJsonStr(screenDataJsonVms));
                    } catch (IOException e) {
                        log.error("WebSocket SendMessage Error, Session:[{}], Exception : [{}]", session, e.getMessage());
                    }
                });
            }catch (Exception e){
                log.error("WebSocket Scheduler Executor Error : [{}]", e.getMessage());
            }
        }
    
    }
    
  • 监听Binlog日志,将MySql中变化数据取出,推送至客户端。

  • RocketMq实现,将变化数据写入队列,WS服务端消费队列中数据时,推送至客户端。

八、参考

1. WebSocket原理及技术简介
2. Java实现WebSocket服务端
3. Java Websocket——服务器端
4.【WebSocket】SpringBoot整合WebSocket 注入Bean的方式文章来源地址https://www.toymoban.com/news/detail-424830.html

到了这里,关于【Spring Boot 实现 WebSocket实时数据推送-服务端】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot集成WebSocket实现消息推送

    项目中经常会用到消息推送功能,关于推送技术的实现,我们通常会联想到轮询、comet长连接技术,虽然这些技术能够实现,但是需要反复连接,对于服务资源消耗过大,随着技术的发展,HtML5定义了WebSocket协议,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。

    2023年04月08日
    浏览(44)
  • webSocket实现数据的实时推送(附:前后端代码)

            之前开发的一个管理系统项目中,首页是数据大屏展示,一开始我是用JS的 setInterval() 方法,设置一个时间,每过时间发起一次 ajax 请求。虽然也能凑活着用,但总感觉不是最优的方法,而且还比较占用资源,所以学习 WebSocke ,以下是本人的一些学习心得及前后端的

    2024年02月02日
    浏览(51)
  • Spring Boot整合WebSocket实现实时通信,前端实时通信,前后端实时通信

    实时通信在现代Web应用中扮演着越来越重要的角色,无论是在线聊天、股票价格更新还是实时通知,WebSocket都是实现这些功能的关键技术之一。Spring Boot作为一个简化企业级应用开发的框架,其对WebSocket的支持也非常友好。本文将详细介绍如何在Spring Boot中整合WebSocket,实现一

    2024年04月27日
    浏览(38)
  • WebSocket:实现实时互动、数据推送的利器,你了解多少

    WebSocket技术是一种基于TCP协议的全双工通信协议,它允许浏览器和服务器之间进行实时、双向的通信。相比传统的HTTP请求-响应模式,WebSocket提供了持久连接,可以实时地推送数据,减少了通信的延迟。 WebSocket的工作原理是通过建立一条持久连接来实现实时通信。首先,浏览

    2024年01月18日
    浏览(43)
  • vue和node使用websocket实现数据推送,实时聊天

    需求:node做后端根据websocket,连接数据库,数据库的字段改变后,前端不用刷新页面也能更新到数据,前端也可以发送消息给后端,后端接受后把前端消息做处理再推送给前端展示 使用node ./app.js运行项目 在需要使用websocket连接的页面引入 默认如下: id为243 在数据库改为

    2024年02月15日
    浏览(46)
  • Vue使用WebSocket实现实时获取后端推送的数据。

    Vue可以使用WebSocket实现实时获取后端推送的数据。 1.在Vue项目中安装WebSocket库 可以使用npm或yarn安装WebSocket库: 2.创建WebSocket连接 在Vue组件中创建WebSocket连接,连接到后端WebSocket服务器,代码如下: 上面的代码中,使用WebSocket连接到后端WebSocket服务器,通过监听onmessage事件,

    2024年02月08日
    浏览(48)
  • 实现实时互动:用Spring Boot原生WebSocket打造你的专属聊天室

    😊 @ 作者: 一恍过去 💖 @ 主页: https://blog.csdn.net/zhuocailing3390 🎊 @ 社区: Java技术栈交流 🎉 @ 主题: 实现实时互动:用Spring Boot原生WebSocket打造你的专属聊天室 ⏱️ @ 创作时间: 2023年08月04日 WebSocket 实现聊天室的原理包括建立 WebSocket 连接的握手过程、保持连接状态以

    2024年02月10日
    浏览(50)
  • Spring Boot 3 + Vue 3 整合 WebSocket (STOMP协议) 实现广播和点对点实时消息

    🚀 作者主页: 有来技术 🔥 开源项目: youlai-mall 🍃 vue3-element-admin 🍃 youlai-boot 🌺 仓库主页: Gitee 💫 Github 💫 GitCode 💖 欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请纠正! WebSocket是一种在Web浏览器与Web服务器之间建立双向通信的协议,而Spring Boot提供了便捷的WebSocket支持

    2024年02月02日
    浏览(48)
  • Spring Boot进阶(49):实时通信不再是梦想,SpringBoot+WebSocket助你轻松实现前后端即时通讯!

            在上一期,我对WebSocket进行了基础及理论知识普及学习,WebSocket是一种基于TCP协议实现的全双工通信协议,使用它可以实现实时通信,不必担心HTTP协议的短连接问题。Spring Boot作为一款微服务框架,也提供了轻量级的WebSocket集成支持,本文将介绍如何在Spring Boot项

    2024年02月11日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包