SpringBoot WebSocket做客户端

这篇具有很好参考价值的文章主要介绍了SpringBoot WebSocket做客户端。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

常见的都是springboot应用做服务,前端页面做客户端,进行websocket通信进行数据传输交互。但其实springboot服务也能做客户端去连接别的webSocket服务提供者。
刚好最近在项目中就使用到了,需求背景大概就是我们作为一个java段应用需要和一个C语言应用进行通信。在项目需求及环境等多方面的考量之下,最后放了使用http协议和C程序进行通信转而使用webSocket,然后在C侧开发人员的要求下,由他们做服务端,我们做客户端。

引入pom依赖

<dependency>
    <groupId>org.java-websocket</groupId>
    <artifactId>Java-WebSocket</artifactId>
    <version>1.5.2</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>

配置类

ws连接配置

cancan:
  websocket:
    client:
      config:
        - wsUrl: ws://127.0.0.1:8080/websocket/${cancan.websocket.client.config[0].wsName}
          wsName: ws-01
          enableHeartbeat: true
          heartbeatInterval: 20000
          enableReconnection: true
#        - wsUrl: ws://localhost:8083
#          wsName: ws-02
#          enableHeartbeat: true
#          heartbeatInterval: 20000
#          enableReconnection: true
server:
  port: 8099

WebsocketClientConfiguration读取配置文件配置

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
 * @author: tanghaizhi
 * @CreateTime: 2022/10/13 14:11
 * @Description:
 */
@Configuration
@ConfigurationProperties(prefix = "cancan.websocket.client")
public class WebsocketClientConfiguration {

    private List<ServerProperties> config;

    public static class ServerProperties {
        /**
         * websocket server ws://ip:port
         */
        private String wsUrl;
        /**
         * websocket server name,用于区分不同的服务端
         */
        private String wsName;
        /**
         * 是否启用心跳监测 默认开启
         */
        private Boolean enableHeartbeat;
        /**
         * 心跳监测间隔 默认20000毫秒
         */
        private Integer heartbeatInterval;
        /**
         * 是否启用重连接 默认启用
         */
        private Boolean enableReconnection;

        public String getWsUrl() {
            return wsUrl;
        }

        public void setWsUrl(String wsUrl) {
            this.wsUrl = wsUrl;
        }

        public Boolean getEnableHeartbeat() {
            return enableHeartbeat;
        }

        public void setEnableHeartbeat(Boolean enableHeartbeat) {
            this.enableHeartbeat = enableHeartbeat;
        }

        public Integer getHeartbeatInterval() {
            return heartbeatInterval;
        }

        public void setHeartbeatInterval(Integer heartbeatInterval) {
            this.heartbeatInterval = heartbeatInterval;
        }

        public Boolean getEnableReconnection() {
            return enableReconnection;
        }

        public void setEnableReconnection(Boolean enableReconnection) {
            this.enableReconnection = enableReconnection;
        }

        public String getWsName() {
            return wsName;
        }

        public void setWsName(String wsName) {
            this.wsName = wsName;
        }
    }

    public List<ServerProperties> getConfig() {
        return config;
    }

    public void setConfig(List<ServerProperties> config) {
        this.config = config;
    }
}

WebsocketMultipleBeanConfig连接服务端并保存到bean中

import com.example.demo.service.WebsocketRunClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author: tanghaizhi
 * @CreateTime: 2022/10/13 14:14
 * @Description: Websocket多客户端配置
 */
@Slf4j
@Configuration
public class WebsocketMultipleBeanConfig {

    @Bean
    public Map<String, WebsocketRunClient> websocketRunClientMap(WebsocketClientConfiguration websocketClientConfiguration){

        Map<String, WebsocketRunClient> retMap = new HashMap<>(5);

        List<WebsocketClientConfiguration.ServerProperties> config = websocketClientConfiguration.getConfig();

        for (WebsocketClientConfiguration.ServerProperties serverProperties : config) {

            String wsUrl = serverProperties.getWsUrl();
            String wsName = serverProperties.getWsName();
            Boolean enableReconnection = serverProperties.getEnableReconnection();
            Boolean enableHeartbeat = serverProperties.getEnableHeartbeat();
            Integer heartbeatInterval = serverProperties.getHeartbeatInterval();

            try {
                WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(wsUrl),wsName);
                websocketRunClient.connect();
                websocketRunClient.setConnectionLostTimeout(0);

                new Thread(()->{
                    while (true){
                        try {
                            Thread.sleep(heartbeatInterval);
                            if(enableHeartbeat){
                                websocketRunClient.send("[websocket "+wsName+"] 心跳检测");
                                log.info("[websocket {}] 心跳检测",wsName);
                            }
                        } catch (Exception e) {
                            log.error("[websocket {}] 发生异常{}",wsName,e.getMessage());
                            try {
                                if(enableReconnection){
                                    log.info("[websocket {}] 重新连接",wsName);
                                    websocketRunClient.reconnect();
                                    websocketRunClient.setConnectionLostTimeout(0);
                                }
                            }catch (Exception ex){
                                log.error("[websocket {}] 重连异常,{}",wsName,ex.getMessage());
                            }
                        }
                    }
                }).start();

                retMap.put(wsName,websocketRunClient);
            } catch (URISyntaxException ex) {
                log.error("[websocket {}] 连接异常,{}",wsName,ex.getMessage());
            }
        }
        return retMap;

    }

}

客户端

import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

import java.net.URI;
import java.nio.ByteBuffer;

/**
 * @author: tanghaizhi
 * @CreateTime: 2022/10/13 14:13
 * @Description:
 */
@Slf4j
public class WebsocketRunClient extends WebSocketClient {
    /**
     * websocket连接名称
     */
    private String wsName;


    public WebsocketRunClient(URI serverUri,String wsName) {
        super(serverUri);
        this.wsName = wsName;
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        log.info("[websocket {}] Websocket客户端连接成功",wsName);
    }

    @Override
    public void onMessage(String msg) {
        log.info("[websocket {}] 收到String消息:{}",wsName,msg);
    }

    @Override
    public void onMessage(ByteBuffer bytes) {
        log.info("[websocket {}] 收到ByteBuffer消息:{}",wsName);
    }


    @Override
    public void onClose(int code, String reason, boolean remote) {
        log.info("[websocket {}] Websocket客户端关闭",wsName);
        System.out.println("Connection closed by " + (remote ? "remote peer" : "us") + " Code: " + code + " Reason: " + reason);
    }

    @Override
    public void onError(Exception e) {
        log.info("[websocket {}] Websocket客户端出现异常, 异常原因为:{}",wsName,e.getMessage());
    }


}

发送消息

/**
 * @author: tanghaizhi
 * @CreateTime: 2022/10/13 11:55
 * @Description:
 */
@RestController
public class ctrl {

    @Value("${cancan.websocket.client.config[0].wsName}")
    private String ws1Name;


    @Autowired
    private Map<String, WebsocketRunClient> websocketRunClientMap;

    @RequestMapping("/send")
    public String send(String text){
        System.out.println(ws1Name);
        WebsocketRunClient websocketRunClient = websocketRunClientMap.get(ws1Name);
        websocketRunClient.send(text);
        return "发送成功";
    }

    @RequestMapping("/close")
    public String close(){
        WebsocketRunClient websocketRunClient = websocketRunClientMap.get("ws-01");
        websocketRunClient.close();
        return "关闭成功";
    }
    
}

改造

上面是一个客户端的例子,但是这次的需求有些特殊。ws的连接需要在数据库表中读取,而且这个表的数据用户在系统页面上可以随时进行增删改查的操作。这个时候上面读配置文件,在项目启动时就发起ws连接,然后保存连接的方法就行不通了。
根据需求改造之后的设计是这样的,ws连接依旧保存再config注册bean的map中。但是注册bean哪里我们只是将这个map初始化,并不初始化ws连接。ws连接使用定时任务初始,定时读取表中的数据,如果表中的ws连接再map中没有,则初始化后放入map,如果存在表中没有而map中有的ws连接,则移除map中的这条ws连接。

WebsocketMultipleBeanConfig配置类

/**
 * @author: tanghaizhi
 * @CreateTime: 2022/10/18 9:20
 * @Description: websocket配置,用于开启websocket支持
 */
@Configuration
@Slf4j
public class WebsocketMultipleBeanConfig {


    @Bean
    public Map<String, WebsocketRunClient> websocketRunClientMap(){
        Map<String, WebsocketRunClient> retMap = new HashMap<>(8);
        return retMap;
    }


}

ws初始化定时任务

/**
 * @author: tanghaizhi
 * @CreateTime: 2022/10/18 11:26
 * @Description: 读取配置,保持ws连接
 */
@Component
@Slf4j
public class KeepConnectTask {

    @Value("${signaling.tracking.webSocket.uri:/websocket/}")
    private String uri;

    @Autowired
    private ServerConfigService serverConfigService;


    @Autowired
    private Map<String, WebsocketRunClient> websocketRunClientMap;

//    @Scheduled(cron = "0/20 * * * * ? ")
    public void execute() throws Exception {
        InetAddress adds = InetAddress.getLocalHost();
        String hostIp = adds.getHostAddress();
        //查询服务器配置
        //这里就是查询表的方法 测试方便先用固定数据代替了
//        List<ServerConfigModel> serverConfigs = serverConfigService.selectAll();
        List<ServerConfigModel> serverConfigs = new ArrayList<>();
        ServerConfigModel serverConfigModel = new ServerConfigModel();
        serverConfigModel.setServerIp("127.0.0.1");
        serverConfigModel.setServerPort("8080");
        serverConfigs.add(serverConfigModel);
        Map<String,String> configs = new HashMap<>();
        for(ServerConfigModel serverConfig:serverConfigs){
            String key = "ws://" + serverConfig.getServerIp() + ":" + serverConfig.getServerPort();
            String wsName = "ws-" + serverConfig.getServerIp() + ":" + serverConfig.getServerPort();
            String value = key + uri + wsName;
            configs.put(wsName,value);
        }
        //删除、关闭已不在配置中的连接
        //注意使用keySet边遍历边删除会报错,建议使用迭代器遍历删除
        Iterator<Map.Entry<String, WebsocketRunClient>> it = websocketRunClientMap.entrySet().iterator();
        while(it.hasNext()){
            Map.Entry<String, WebsocketRunClient> enter = it.next();
            if(!configs.containsKey(enter.getKey())){
                enter.getValue().close();
                it.remove();
            }
        }
        //已有连接心跳发送
        for(String key:websocketRunClientMap.keySet()){
            WebsocketRunClient client = websocketRunClientMap.get(key);
            try{
                client.send("websocket[" + hostIp + "]心跳检测");
                log.info("websocket[" + hostIp + "]心跳检测");
            } catch (Exception e){
                log.error("websocket[{}] 发生异常{}",hostIp,e.getLocalizedMessage());
                e.printStackTrace();
                try {
                    client.reconnect();
                    client.setConnectionLostTimeout(0);
                } catch (Exception ex){
                    log.error("websocket[{}] 重连异常,{}",hostIp,ex.getMessage());
                }
            }
        }
        //新配置增加连接
        for(String key:configs.keySet()){
            if(!websocketRunClientMap.containsKey(key)){
                String url = configs.get(key);
                try{
                    WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(url),key);
                    websocketRunClient.connect();
                    websocketRunClient.setConnectionLostTimeout(0);
                    websocketRunClientMap.put(key,websocketRunClient);
                } catch (Exception e){
                    log.error("websocket[{}] 新增连接异常,{}",key,e.getMessage());
                }
            }
        }
    }

}

发送消息

发送消息仍然和之前一样先注入

@Autowired
private Map<String, WebsocketRunClient> websocketRunClientMap;

然后map.get拿到对应想发送消息的WebsocketRunClient,调用其中的send方法即可文章来源地址https://www.toymoban.com/news/detail-512516.html

到了这里,关于SpringBoot WebSocket做客户端的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot集成WebSocket实现客户端与服务端长连接通信

    场景: 1、WebSocket协议是用于前后端长连接交互的技术,此技术多用于交互不断开的场景。特点是连接不间断、更轻量,只有在关闭浏览器窗口、或者关闭浏览器、或主动close,当前会话对象才会关闭。 2、相较于 Http/Https 通信只能由客户端主动发起请求,而 Socket 通信不仅能

    2024年02月02日
    浏览(59)
  • Springboot 集成WebSocket作为客户端,含重连接功能,开箱即用

    使用演示 只需要init后调用sendMessage方法即可,做到开箱即用。内部封装了失败重连接、断线重连接等功能。 基于Springboot工程 引入websocket依赖 开箱即用的工具类

    2024年02月04日
    浏览(60)
  • Java:SpringBoot整合WebSocket实现服务端向客户端推送消息

    思路: 后端通过websocket向前端推送消息,前端统一使用http协议接口向后端发送数据 本文仅放一部分重要的代码,完整代码可参看github仓库 websocket 前端测试 :http://www.easyswoole.com/wstool.html 依赖 项目目录 完整依赖 配置 WebSocketServer.java 前端页面 websocket.html 前端逻辑 index.js 参

    2024年02月04日
    浏览(53)
  • 【Java】SpringBoot快速整合WebSocket实现客户端服务端相互推送信息

    目录 什么是webSocket? webSocket可以用来做什么? WebSocket操作类 一:测试客户端向服务端推送消息 1.启动SpringBoot项目 2.打开网站 3.进行测试消息推送 4.后端进行查看测试结果 二:测试服务端向客户端推送消息 1.接口代码 2.使用postman进行调用 3.查看测试结果         WebSocke

    2024年01月20日
    浏览(61)
  • springboot RabbitMQ客户端连接故障恢复

    最近做RabbitMQ故障演练发现RabbitMQ服务器停止后,基于springboot的消费端不可以自动的恢复,队列的消费者消失,消息一直积压到队列中,这种情况肯定是不可接收的;通过研究源代码找到了解决方案。 一、添加自动恢复配置automaticRecovery 通过上述配置如果RabbitMQ服务器发生故

    2024年02月11日
    浏览(56)
  • SpringBoot3 响应式网络请求客户端

     SpringBoot是一个基于Spring的快速开发框架,它可以帮助我们快速构建、部署和运行Java应用程序。HTTP接口是Web应用程序与外部系统进行通信的一种方式,通过HTTP协议,我们可以实现客户端与服务器之间的数据交互。 SpringBoot 整合提供了很多方式进行远程调用 轻量级客户端方

    2024年02月15日
    浏览(33)
  • 二、springboot集成CAS客户端实现单点登录

    pom中引入依赖 yml中添加cas配置 读取CAS相关配置 cas配置类 单点登录接口demo 访问loingCas接口时,若未在CASserver登录,则会被拦截跳转到CAS的登陆页面,登陆成功后放行继续访问loginCas接口

    2024年02月15日
    浏览(54)
  • SpringBoot集成Elasticsearch客户端(新旧版本)(2023-01-28)

    第一章 SpringBoot集成ElasticSearch(2023-01-28) 例如:业务中需要使用es,所以做一些客户端选型,熟悉一下基本的操作,所以记录这篇博客,有关概念理论性的文章还在整理过程中,后续会整理个系列 Spring认证中国教育管理中心-Spring Data Elasticsearch教程一 SpringData集成Elasticsearch Sp

    2024年02月07日
    浏览(74)
  • kafka:java集成 kafka(springboot集成、客户端集成)

    摘要 对于java的kafka集成,一般选用springboot集成kafka,但可能由于对接方kafka老旧、kafka不安全等问题导致kafak版本与spring版本不兼容,这个时候就得自己根据kafka客户端api集成了。 一、springboot集成kafka 具体官方文档地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    浏览(62)
  • 高版本springboot3.1配置Eureka客户端问题

    只需要按上面配置好,然后高版本的Eureka,不需要@EnableEurekaClient这个注解了,直接SpringBoot启动,就可以注册到注册中心。 /*********************************************************/ /**  * 开启eureka客户端功能  */ //@EnableEurekaClient /**  * @EnableEurekaClient和@EnableDiscoveryClient都让注册中心能够发

    2024年02月10日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包