常见的都是springboot应用做服务,前端页面做客户端,进行websocket通信进行数据传输交互。但其实springboot服务也能做客户端去连接别的webSocket服务提供者。
刚好最近在项目中就使用到了,需求背景大概就是我们作为一个java段应用需要和一个C语言应用进行通信。在项目需求及环境等多方面的考量之下,最后放了使用http协议和C程序进行通信转而使用webSocket,然后在C侧开发人员的要求下,由他们做服务端,我们做客户端。
引入pom依赖
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
配置类
ws连接配置
cancan:
websocket:
client:
config:
- wsUrl: ws://127.0.0.1:8080/websocket/${cancan.websocket.client.config[0].wsName}
wsName: ws-01
enableHeartbeat: true
heartbeatInterval: 20000
enableReconnection: true
# - wsUrl: ws://localhost:8083
# wsName: ws-02
# enableHeartbeat: true
# heartbeatInterval: 20000
# enableReconnection: true
server:
port: 8099
WebsocketClientConfiguration读取配置文件配置
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;
/**
* @author: tanghaizhi
* @CreateTime: 2022/10/13 14:11
* @Description:
*/
@Configuration
@ConfigurationProperties(prefix = "cancan.websocket.client")
public class WebsocketClientConfiguration {
private List<ServerProperties> config;
public static class ServerProperties {
/**
* websocket server ws://ip:port
*/
private String wsUrl;
/**
* websocket server name,用于区分不同的服务端
*/
private String wsName;
/**
* 是否启用心跳监测 默认开启
*/
private Boolean enableHeartbeat;
/**
* 心跳监测间隔 默认20000毫秒
*/
private Integer heartbeatInterval;
/**
* 是否启用重连接 默认启用
*/
private Boolean enableReconnection;
public String getWsUrl() {
return wsUrl;
}
public void setWsUrl(String wsUrl) {
this.wsUrl = wsUrl;
}
public Boolean getEnableHeartbeat() {
return enableHeartbeat;
}
public void setEnableHeartbeat(Boolean enableHeartbeat) {
this.enableHeartbeat = enableHeartbeat;
}
public Integer getHeartbeatInterval() {
return heartbeatInterval;
}
public void setHeartbeatInterval(Integer heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
}
public Boolean getEnableReconnection() {
return enableReconnection;
}
public void setEnableReconnection(Boolean enableReconnection) {
this.enableReconnection = enableReconnection;
}
public String getWsName() {
return wsName;
}
public void setWsName(String wsName) {
this.wsName = wsName;
}
}
public List<ServerProperties> getConfig() {
return config;
}
public void setConfig(List<ServerProperties> config) {
this.config = config;
}
}
WebsocketMultipleBeanConfig连接服务端并保存到bean中
import com.example.demo.service.WebsocketRunClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author: tanghaizhi
* @CreateTime: 2022/10/13 14:14
* @Description: Websocket多客户端配置
*/
@Slf4j
@Configuration
public class WebsocketMultipleBeanConfig {
@Bean
public Map<String, WebsocketRunClient> websocketRunClientMap(WebsocketClientConfiguration websocketClientConfiguration){
Map<String, WebsocketRunClient> retMap = new HashMap<>(5);
List<WebsocketClientConfiguration.ServerProperties> config = websocketClientConfiguration.getConfig();
for (WebsocketClientConfiguration.ServerProperties serverProperties : config) {
String wsUrl = serverProperties.getWsUrl();
String wsName = serverProperties.getWsName();
Boolean enableReconnection = serverProperties.getEnableReconnection();
Boolean enableHeartbeat = serverProperties.getEnableHeartbeat();
Integer heartbeatInterval = serverProperties.getHeartbeatInterval();
try {
WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(wsUrl),wsName);
websocketRunClient.connect();
websocketRunClient.setConnectionLostTimeout(0);
new Thread(()->{
while (true){
try {
Thread.sleep(heartbeatInterval);
if(enableHeartbeat){
websocketRunClient.send("[websocket "+wsName+"] 心跳检测");
log.info("[websocket {}] 心跳检测",wsName);
}
} catch (Exception e) {
log.error("[websocket {}] 发生异常{}",wsName,e.getMessage());
try {
if(enableReconnection){
log.info("[websocket {}] 重新连接",wsName);
websocketRunClient.reconnect();
websocketRunClient.setConnectionLostTimeout(0);
}
}catch (Exception ex){
log.error("[websocket {}] 重连异常,{}",wsName,ex.getMessage());
}
}
}
}).start();
retMap.put(wsName,websocketRunClient);
} catch (URISyntaxException ex) {
log.error("[websocket {}] 连接异常,{}",wsName,ex.getMessage());
}
}
return retMap;
}
}
客户端
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.nio.ByteBuffer;
/**
* @author: tanghaizhi
* @CreateTime: 2022/10/13 14:13
* @Description:
*/
@Slf4j
public class WebsocketRunClient extends WebSocketClient {
/**
* websocket连接名称
*/
private String wsName;
public WebsocketRunClient(URI serverUri,String wsName) {
super(serverUri);
this.wsName = wsName;
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
log.info("[websocket {}] Websocket客户端连接成功",wsName);
}
@Override
public void onMessage(String msg) {
log.info("[websocket {}] 收到String消息:{}",wsName,msg);
}
@Override
public void onMessage(ByteBuffer bytes) {
log.info("[websocket {}] 收到ByteBuffer消息:{}",wsName);
}
@Override
public void onClose(int code, String reason, boolean remote) {
log.info("[websocket {}] Websocket客户端关闭",wsName);
System.out.println("Connection closed by " + (remote ? "remote peer" : "us") + " Code: " + code + " Reason: " + reason);
}
@Override
public void onError(Exception e) {
log.info("[websocket {}] Websocket客户端出现异常, 异常原因为:{}",wsName,e.getMessage());
}
}
发送消息
/**
* @author: tanghaizhi
* @CreateTime: 2022/10/13 11:55
* @Description:
*/
@RestController
public class ctrl {
@Value("${cancan.websocket.client.config[0].wsName}")
private String ws1Name;
@Autowired
private Map<String, WebsocketRunClient> websocketRunClientMap;
@RequestMapping("/send")
public String send(String text){
System.out.println(ws1Name);
WebsocketRunClient websocketRunClient = websocketRunClientMap.get(ws1Name);
websocketRunClient.send(text);
return "发送成功";
}
@RequestMapping("/close")
public String close(){
WebsocketRunClient websocketRunClient = websocketRunClientMap.get("ws-01");
websocketRunClient.close();
return "关闭成功";
}
}
改造
上面是一个客户端的例子,但是这次的需求有些特殊。ws的连接需要在数据库表中读取,而且这个表的数据用户在系统页面上可以随时进行增删改查的操作。这个时候上面读配置文件,在项目启动时就发起ws连接,然后保存连接的方法就行不通了。
根据需求改造之后的设计是这样的,ws连接依旧保存再config注册bean的map中。但是注册bean哪里我们只是将这个map初始化,并不初始化ws连接。ws连接使用定时任务初始,定时读取表中的数据,如果表中的ws连接再map中没有,则初始化后放入map,如果存在表中没有而map中有的ws连接,则移除map中的这条ws连接。
WebsocketMultipleBeanConfig配置类
/**
* @author: tanghaizhi
* @CreateTime: 2022/10/18 9:20
* @Description: websocket配置,用于开启websocket支持
*/
@Configuration
@Slf4j
public class WebsocketMultipleBeanConfig {
@Bean
public Map<String, WebsocketRunClient> websocketRunClientMap(){
Map<String, WebsocketRunClient> retMap = new HashMap<>(8);
return retMap;
}
}
ws初始化定时任务
/**
* @author: tanghaizhi
* @CreateTime: 2022/10/18 11:26
* @Description: 读取配置,保持ws连接
*/
@Component
@Slf4j
public class KeepConnectTask {
@Value("${signaling.tracking.webSocket.uri:/websocket/}")
private String uri;
@Autowired
private ServerConfigService serverConfigService;
@Autowired
private Map<String, WebsocketRunClient> websocketRunClientMap;
// @Scheduled(cron = "0/20 * * * * ? ")
public void execute() throws Exception {
InetAddress adds = InetAddress.getLocalHost();
String hostIp = adds.getHostAddress();
//查询服务器配置
//这里就是查询表的方法 测试方便先用固定数据代替了
// List<ServerConfigModel> serverConfigs = serverConfigService.selectAll();
List<ServerConfigModel> serverConfigs = new ArrayList<>();
ServerConfigModel serverConfigModel = new ServerConfigModel();
serverConfigModel.setServerIp("127.0.0.1");
serverConfigModel.setServerPort("8080");
serverConfigs.add(serverConfigModel);
Map<String,String> configs = new HashMap<>();
for(ServerConfigModel serverConfig:serverConfigs){
String key = "ws://" + serverConfig.getServerIp() + ":" + serverConfig.getServerPort();
String wsName = "ws-" + serverConfig.getServerIp() + ":" + serverConfig.getServerPort();
String value = key + uri + wsName;
configs.put(wsName,value);
}
//删除、关闭已不在配置中的连接
//注意使用keySet边遍历边删除会报错,建议使用迭代器遍历删除
Iterator<Map.Entry<String, WebsocketRunClient>> it = websocketRunClientMap.entrySet().iterator();
while(it.hasNext()){
Map.Entry<String, WebsocketRunClient> enter = it.next();
if(!configs.containsKey(enter.getKey())){
enter.getValue().close();
it.remove();
}
}
//已有连接心跳发送
for(String key:websocketRunClientMap.keySet()){
WebsocketRunClient client = websocketRunClientMap.get(key);
try{
client.send("websocket[" + hostIp + "]心跳检测");
log.info("websocket[" + hostIp + "]心跳检测");
} catch (Exception e){
log.error("websocket[{}] 发生异常{}",hostIp,e.getLocalizedMessage());
e.printStackTrace();
try {
client.reconnect();
client.setConnectionLostTimeout(0);
} catch (Exception ex){
log.error("websocket[{}] 重连异常,{}",hostIp,ex.getMessage());
}
}
}
//新配置增加连接
for(String key:configs.keySet()){
if(!websocketRunClientMap.containsKey(key)){
String url = configs.get(key);
try{
WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(url),key);
websocketRunClient.connect();
websocketRunClient.setConnectionLostTimeout(0);
websocketRunClientMap.put(key,websocketRunClient);
} catch (Exception e){
log.error("websocket[{}] 新增连接异常,{}",key,e.getMessage());
}
}
}
}
}
发送消息
发送消息仍然和之前一样先注入文章来源:https://www.toymoban.com/news/detail-512516.html
@Autowired
private Map<String, WebsocketRunClient> websocketRunClientMap;
然后map.get拿到对应想发送消息的WebsocketRunClient,调用其中的send方法即可文章来源地址https://www.toymoban.com/news/detail-512516.html
到了这里,关于SpringBoot WebSocket做客户端的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!