WebSocket 实现长连接及通过WebSocket获取客户端IP
WebSocket 是一种支持双向通讯的网络通信协议。
实现过程:
1 添加ServerEndpointExporter配置bean
@Configuration
public class WebSocketConfig {
// 自动注册使用了@ServerEndpoint**注解声明的Websocket endpoint
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
2 实现过程
需求是通过WebSocket,建立长连接,并获取当前在线的人数。通过Websocket 不断发送消息,建立长连接,给Session续命。我是通过MAC地址,区分不同的设备,因为我的需求中需要一个账号能够登录多台机器。所以我通过MAC地址用于标识不同的设备信息。(若是一个账号只能登陆一次,采用用户ID)
1 . 添加配置
@ServerEndpoint(value = "/websocket/onlineAme/{Mac}")
2. 主要方法
-
@OnOpen
- 首次建立连接时,运行该注解下的方法。
- 在此方法中可以获取websocket的session
- 并将该用户的Mac 以及 Session存放到
private static ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>(); @OnOpen public void onOpen(Session session, @PathParam(value = "Mac") String mac) throws IOException { log.info("【Ame websocket 链接成功】,Ame mac:"+ mac); session.setMaxIdleTimeout(sessionTimeout); // 获取客户端的Ip if(StringUtils.isBlank(mac)||ObjectUtil.isNull(mac)){ log.error("并未上传设备信息"); } setMap(session,mac); } private void setMap(Session session,String mac){ sessionMap.put(mac,session); log.warn("Ame MAC address:{},当前在线人数为:{}",mac,sessionMap.size()); }
-
@OnMessage
-
该方法是客户端与服务端进行通讯。
-
每次客户端与服务端建立通讯时,会给Session续命,延长Session的时常
@OnMessage public void onMessage(Session session, String msg) { session.setMaxIdleTimeout(sessionTimeout); if(StringUtils.isBlank(msg)){ return; } // 判断 MAC 地址 是否 是正在上线 String mac = getMACBySession(session); if(StringUtils.isBlank(mac)){ return; } // 将上传的msg转化为 AmeServicePack handleAmeMsg(mac,ame); } private String getMACBySession(Session session){ String mac = getUserIdBySession(session); if(ObjectUtil.isNull(mac)){ return null; } return mac; } private String getUserIdBySession(Session session){ for (String mac : sessionMap.keySet()) { /*session 本身是有一个id的,通过userid 找到Session 然后再通过 其对应 id,与 传入的Session 中的 session对比 */ if(sessionMap.get(mac).getId().equals(session.getId())){ return mac; } } return null; }
-
-
@OnClose
@OnClose public void onClose(Session session,@PathParam(value = "Mac") String mac) { removeMap(session); log.info("【websocket退出成功】该设备退出:"+mac); }
private void removeMap(Session session){ String mac = getUserIdBySession(session); if(ObjectUtil.isNull(mac)){ return; } sessionMap.remove(mac); //userMap.remove(userId); removeAme(mac); } private void removeAme(String mac){ ameHashMap.remove(mac); sendInfo("Ame:"+ameInfo.getAmeip()+"下线成功",mac); }
-
@OnError
@OnError public void onError(Session session,Throwable throwable) { log.error("websocket: 发生了错误"); removeMap(session); throwable.printStackTrace(); }
-
向某一个用户发送消息文章来源:https://www.toymoban.com/news/detail-700094.html
/* 发送自定义消息 向某一个用户发送,消息*/ public static void sendInfo(String message,String toMac){ log.info("发送消息:{},内容是:{}",message,toMac); if(ObjectUtil.isNull(toMac) || StringUtils.isBlank(message)){ log.error("消息不完整"); return; } // 包含就发送 //System.out.println(sessionMap.containsKey(toUserId)); if(sessionMap.containsKey(toMac)){ try { sendMessage(sessionMap.get(toMac),message); }catch (Exception e){ log.error("发送给用户{}的消息出错",toMac); } } // 用户不在线 else { log.error("设备{}不在线",toMac); } } public static void sendMessage(Session session,String message) throws IOException { session.getBasicRemote().sendText(message); }
3 通过WebSocket获取 请求Ip地址
Websocket 中的request中并没有header 中并没有客户端的Ip地址,但是在SpringCloud中,是通过网关,路由转发。在网关中的请求的request中存在Ip地址,可以通过拦截器,获取网关的ip然后将request放到websocket的request中。文章来源地址https://www.toymoban.com/news/detail-700094.html
3.1 拦截器
package com.mam.gateway.filter;
import jdk.nashorn.internal.runtime.regexp.joni.Config;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.net.InetSocketAddress;
import java.util.Objects;
/**
* 获取 WebSocket 上传Session的 Ip信息
*/
@Component
public class SessionFilter extends AbstractGatewayFilterFactory<SessionFilter.Config> {
public SessionFilter()
{
super(SessionFilter.Config.class);
}
@Override
public String name()
{
return "SessionFilter";
}
@Override
public GatewayFilter apply(SessionFilter.Config config) {
return new GatewayFilter() {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerWebExchange mutatedServerWebExchange = exchange.mutate().request(exchange.getRequest()).build();
return chain.filter(mutatedServerWebExchange);
}
};
}
static class Config
{
private Integer order;
public Integer getOrder()
{
return order;
}
public void setOrder(Integer order)
{
this.order = order;
}
}
}
3.2 ServerEndpointConfig 的配置
public class WebSocketConfigurator extends ServerEndpointConfig.Configurator{
private static final Logger log = LoggerFactory.getLogger(WebSocketConfigurator.class);
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
Map<String, Object> attributes = sec.getUserProperties();
try{
String clientIp = IpUtils.getIpAddrByHandshakeRequest(request.getHeaders());
attributes.put("clientIp",clientIp);
log.info("websocker拦截器X-Real_IP{}header{}",request.getHeaders().get("X-Real_IP"),request.getHeaders().toString());
}catch (Exception e){
e.printStackTrace();
}
super.modifyHandshake(sec,request,response);
}
}
public static String getIpAddrByHandshakeRequest(Map<String, List<String>> map)
{
if (map == null)
{
return null;
}
String ip = null;
// X-Forwarded-For:Squid 服务代理
String ipAddresses = Convert.toStr(map.get("X-Forwarded-For"));
if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))
{
// Proxy-Client-IP:apache 服务代理
ipAddresses = Convert.toStr(map.get("Proxy-Client-IP"));
}else {
ipAddresses = ipAddresses.substring(1,ipAddresses.length()-1);
}
if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))
{
// WL-Proxy-Client-IP:weblogic 服务代理
ipAddresses = Convert.toStr(map.get("WL-Proxy-Client-IP"));
}
if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))
{
// HTTP_CLIENT_IP:有些代理服务器
ipAddresses = Convert.toStr(map.get("HTTP_CLIENT_IP"));
}
if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))
{
// X-Real-IP:nginx服务代理
ipAddresses = Convert.toStr(map.get("X-Real-IP"));
}
// 有些网络通过多层代理,那么获取到的ip就会有多个,一般都是通过逗号(,)分割开来,并且第一个ip为客户端的真实IP
if (ipAddresses != null && ipAddresses.length() != 0)
{
ip = ipAddresses.split(",")[0];
}
return ip.equals("0:0:0:0:0:0:0:1") ? "127.0.0.1" : ip;
}
3.3 WebSocket 获取Ip
AmeServicePack ame = JSONObject.toJavaObject(JSONObject.parseObject(msg), AmeServicePack.class);
Map<String, Object> userProperties = session.getUserProperties();
String clientip = (String) userProperties.get("clientIp");
4 完整代码
@Component
@ServerEndpoint(value = "/websocket/onlineAme/{Mac}",configurator = WebSocketConfigurator.class)
public class AmeLoginWebSocket {
static Logger log = LoggerFactory.getLogger(AmeLoginWebSocket.class);
/* 存储session */
private static ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>();
/* 存储 在线 ame服务 信息*/
private ConcurrentHashMap<String,AmeServicePack> ameHashMap = new ConcurrentHashMap<>();
/* 存储 Ip */
private static final long sessionTimeout = 600000;
/** 链接成功后发送消息**/
@OnMessage
public void onMessage(Session session, String msg) {
session.setMaxIdleTimeout(sessionTimeout);
log.info("【websocket 接收成功】内容为"+msg);
if(StringUtils.isBlank(msg)){
return;
}
// 判断 MAC 地址 是否 是正在上线
String mac = getMACBySession(session);
if(StringUtils.isBlank(mac)){
return;
}
AmeServicePack ame = JSONObject.toJavaObject(JSONObject.parseObject(msg), AmeServicePack.class);
Map<String, Object> userProperties = session.getUserProperties();
String clientip = (String) userProperties.get("clientIp");
// 将上传的msg转化为 AmeServicePack
handleAmeMsg(mac,ame);
}
private void handleAmeMsg(String mac, AmeServicePack ameInfo) {
log.info("Ame:MAC{}Ip{}:",mac,ameInfo.getAmeip());
if(ameHashMap.containsKey(mac)){
log.info("该设备{}Ip{}已上线",mac,ameInfo.getAmeip());
sendInfo("该用户Ip"+ameInfo.getAmeip()+"已存在",mac);
}else {
ameHashMap.put(mac,ameInfo);
}
}
private boolean updateOnline(AmeServicePack ameInfo){
AjaxResult isonline = SpringUtils.getBean(IAmePackService.class).update(ameInfo,SecurityConstants.INNER);
if((Integer)isonline.get("code")== 200){
return true;
}else {
return false;
}
}
/**
* ameIp 为上线 但 任务表中仍有 正在运行的任务,并将其修改为 -2
* @param ameIp
*/
private void updateErrorTaskStatus(String ameIp){
SpringUtils.getBean(IAmePackService.class).updateErrorAMEStatus(ameIp,SecurityConstants.INNER);
}
private void handlePCMsg(LoginUser loginUser, String msg) {
log.info("系统用户:{},消息{}:",loginUser.getUsername(),msg);
}
private String getMACBySession(Session session){
String mac = getUserIdBySession(session);
if(ObjectUtil.isNull(mac)){
return null;
}
return mac;
}
/*
*成功建立连接后调用
* @param [session, username]
* @return void
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "Mac") String mac) throws IOException {
log.info("【Ame websocket 链接成功】,Ame mac:"+ mac);
session.setMaxIdleTimeout(sessionTimeout);
// 获取客户端的Ip
if(StringUtils.isBlank(mac)||ObjectUtil.isNull(mac)){
log.error("并未上传设备信息");
}
setMap(session,mac);
}
private void setMap(Session session,String mac){
sessionMap.put(mac,session);
log.warn("Ame MAC address:{},当前在线人数为:{}",mac,sessionMap.size());
}
/*
*关闭连接时调用
* @param [userId]
* @return void
*/
@OnClose
public void onClose(Session session,@PathParam(value = "Mac") String mac) {
removeMap(session);
log.info("【websocket退出成功】该设备退出:"+mac);
}
private void removeMap(Session session){
String mac = getUserIdBySession(session);
if(ObjectUtil.isNull(mac)){
return;
}
sessionMap.remove(mac);
//userMap.remove(userId);
removeAme(mac);
}
private String getUserIdBySession(Session session){
for (String mac : sessionMap.keySet()) {
/*session 本身是有一个id的,通过userid 找到Session 然后再通过 其对应 id,与 传入的Session 中的 session对比 */
if(sessionMap.get(mac).getId().equals(session.getId())){
return mac;
}
}
return null;
}
private void removeAme(String mac){
ameHashMap.remove(mac);
log.info("{}:下线成功",ameInfo.getAmeip());
sendInfo("Ame:"+ameInfo.getAmeip()+"下线成功",mac);
}
/*
*发生错误时调用
* @param [session, throwable]
* @return void
*/
@OnError
public void onError(Session session,Throwable throwable) {
log.error("websocket: 发生了错误");
removeMap(session);
throwable.printStackTrace();
}
/*
发送自定义消息
向某一个用户发送,消息
*/
public static void sendInfo(String message,String toMac){
log.info("发送消息:{},内容是:{}",message,toMac);
if(ObjectUtil.isNull(toMac) || StringUtils.isBlank(message)){
log.error("消息不完整");
return;
}
// 包含就发送
//System.out.println(sessionMap.containsKey(toUserId));
if(sessionMap.containsKey(toMac)){
try {
sendMessage(sessionMap.get(toMac),message);
}catch (Exception e){
log.error("发送给用户{}的消息出错",toMac);
}
}
// 用户不在线
else {
log.error("设备{}不在线",toMac);
}
}
public static void sendMessage(Session session,String message) throws IOException {
session.getBasicRemote().sendText(message);
}
}
到了这里,关于WebSocket 实现长连接及通过WebSocket获取客户端IP的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!