SpringCloud GateWay+RocketMQ实现API访问日志收集
需求背景
产品经理突然找到我说,咱们这个产品貌似没有实现之前旧的系统平台操作日志了;希望我尽快实现这个需求,以应对一些检查;因为时间关系再加上人员问题,跟我原先规划得有些背道而驰
草拟方案
1.写一个AOP日志Starter,再需要的模块中引入,对应方法去标记注解,工程量比较大,目前所有的模块的都得逐步去添加,个人比较懒,因此该方案备选
2. 在网关层通过全局拦截器Filter拦截所有请求,通过MQ记录日志,再通过监听MQ实现日志入库,因为原先的架构已经有MQ了,所以觉得这种方案更快捷,因为撸起袖子往下干
具体实现(推荐使用方式1)
之前一直在看如何去获取请求体;各种区分MediaType跟Method对应不同的读取方式,解析重新构建请求往下游传递,中间出现了各种问题;没有解决的一个情况是有几个接口都是base64图片传参的,早前的通过BodyInserter 去重新构建请求体跟获取响应体,遇到这几个接口都会出现报错
private Mono<Void> writeBodyLog(ServerWebExchange exchange, GatewayFilterChain chain, GatewayLog gatewayLog) {
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
Mono<String> modifiedBody = serverRequest.bodyToMono(String.class)
.flatMap(body -> {
gatewayLog.setRequestBody(body);
return Mono.just(body);
});
// 通过 BodyInserter 插入 body, 避免 request body 只能获取一次
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
// the new content type will be computed by bodyInserter
// and then set in the request decorator
headers.remove(HttpHeaders.CONTENT_LENGTH);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
// 重新封装请求
ServerHttpRequest decoratedRequest = requestDecorate(exchange,headers,outputMessage);
// 处理响应日志
ServerHttpResponseDecorator decoratedResponse =recordResponse(exchange,gatewayLog);
return chain.filter(exchange.mutate().request(decoratedRequest).response(decoratedResponse).build()).then(Mono.fromRunnable(() -> { writeAccessLog(gatewayLog);}));}));
}
报错IllegalReferenceCountException异常(io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1)。具体如下
[TID:N/A] 2023-03-27 11:38:34.767 ERROR 30056 --- [ctor-http-nio-4] r.n.channel.ChannelOperationsHandler : [id: 0x53e73793, L:/192.168.1.53:6868 ! R:/192.168.1.62:56218] Error was received while reading the incoming data. The connection will be closed.
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) ~[netty-buffer-4.1.51.Final.jar:4.1.51.Final]
at io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:92) ~[netty-codec-http-4.1.51.Final.jar:4.1.51.Final]
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:344) ~[reactor-netty-0.9.10.RELEASE.jar:0.9.10.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) ~[reactor-netty-0.9.10.RELEASE.jar:0.9.10.RELEASE]
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:487) ~[reactor-netty-0.9.10.RELEASE.jar:0.9.10.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.10.RELEASE.jar:0.9.10.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.51.Final.jar:4.1.51.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.51.Final.jar:4.1.51.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.51.Final.jar:4.1.51.Fina
造成这个问题的点有一种说法:DataBufferUtils.release(buffer)在低版本spring-core下是有问题,详见:https://github.com/spring-projects/spring-framework/issues/26060;如果依赖的spring-cloud-starter-gateway版本较低,可以单独升spring-core的版本spring-core升级为5.2.13.RELEASE及以上【本人尝试后还是报错但不是上面的错误了,没再去定位】
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.2.13.RELEASE</version>
</dependency>
后来更改了requeset.getBody()的方式也可以完成日志实现,完整代码如下:
@Component
@Slf4j
@RequiredArgsConstructor
public class GatewayLogFilterBak230329 implements GlobalFilter, Ordered {
private final ApplicationEventPublisher applicationEventPublisher;
private static final String CONTENT_TYPE = "application/json";
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 请求路径
String requestPath = request.getPath().pathWithinApplication().value();
// 获取路由信息
Route route = getGatewayRoute(exchange);
String ipAddress = IpUtils.getIp(request);
GatewayLog gatewayLog = new GatewayLog();
gatewayLog.setDevice(IpUtils.getServerDevices(request));
gatewayLog.setProtocol(request.getURI().getScheme());
gatewayLog.setRequestMethod(request.getMethodValue());
gatewayLog.setRequestPath(requestPath);
gatewayLog.setTargetServer(route.getUri().toString());
gatewayLog.setStartTime(new Date().getTime());
gatewayLog.setIp(ipAddress);
Map<String, Object> headers = new HashMap<>();
for (String key : request.getHeaders().keySet()) {
headers.put(key, request.getHeaders().getFirst(key));
}
gatewayLog.setHeaders(JSON.toJSONString(headers));
MediaType mediaType = request.getHeaders().getContentType();
if (request.getHeaders().getContentType() != null) {
gatewayLog.setRequestContentType(request.getHeaders().getContentType().toString());
}
if (mediaType != null && (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType) || MediaType.APPLICATION_JSON.isCompatibleWith(mediaType))) {
return writeBodyLog(exchange, chain, gatewayLog);
} else {
return writeBasicLog(exchange, chain, gatewayLog);
}
}
@Override
public int getOrder() {
// 过滤器链路上的排序要在NettyWriteResponseFilter(这个拦截器默认是-1)之前
return -2;
}
/**
* 获取路由信息
*
* @param exchange
* @return
*/
private Route getGatewayRoute(ServerWebExchange exchange) {
return exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
}
private Mono<Void> writeBasicLog(ServerWebExchange exchange, GatewayFilterChain chain, GatewayLog gatewayLog) {
MultiValueMap<String, String> queryParams = exchange.getRequest().getQueryParams();
gatewayLog.setRequestBody(getUrlParamsByMap(queryParams));
//获取响应体
ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, gatewayLog);
return chain.filter(exchange.mutate().response(decoratedResponse).build())
.then(Mono.fromRunnable(() -> {
// 打印日志
writeAccessLog(gatewayLog);
}));
}
/**
* 解决 request body 只能读取一次问题,
* 参考: org.springframework.cloud.gateway.filter.factory.rewrite.ModifyRequestBodyGatewayFilterFactory
*
* @param exchange
* @param chain
* @param gatewayLog
* @return
*/
private Mono<Void> writeBodyLog(ServerWebExchange exchange, GatewayFilterChain chain, GatewayLog gatewayLog) {
ServerHttpRequest request = exchange.getRequest();
return DataBufferUtils.join(request.getBody())
.flatMap(d -> Mono.just(Optional.of(d))).defaultIfEmpty(Optional.empty())
.flatMap(optional -> {
try {
URI uri = request.getURI();
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
byte[] bodyBytes = null;
if (optional.isPresent()) {
byte[] oldBytes = new byte[optional.get().readableByteCount()];
optional.get().read(oldBytes);
bodyBytes = oldBytes;
}
// 无Body请求重写
if (ArrayUtils.isEmpty(bodyBytes)) {
return chain.filter(exchange.mutate().request(new ServerHttpRequestDecorator(request.mutate().uri(uri).build()) {
@Override
public HttpHeaders getHeaders() {
return headers;
}
}).response(recordResponseLog(exchange, gatewayLog)).build());
}
String body = new String(bodyBytes, StandardCharsets.UTF_8);
gatewayLog.setRequestBody(body);
final byte[] finalBodyBytes = bodyBytes;
return chain.filter(exchange.mutate().request(new ServerHttpRequestDecorator(request.mutate().uri(uri).build()) {
@Override
public Flux<DataBuffer> getBody() {
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(finalBodyBytes);
DataBufferUtils.retain(buffer);
return Flux.just(buffer);
}
@Override
public HttpHeaders getHeaders() {
return headers;
}
}).response(recordResponseLog(exchange, gatewayLog)).build()).then(Mono.fromRunnable(() -> {
writeAccessLog(gatewayLog);
}));
} catch (Exception ex) {
return chain.filter(exchange);
} finally {
if (optional.isPresent()) {
DataBufferUtils.release(optional.get());
}
}
});
}
/**
* 打印日志
*
* @param gatewayLog 网关日志
*/
private void writeAccessLog(GatewayLog gatewayLog) {
applicationEventPublisher.publishEvent(new GatewayLogEvent(this, gatewayLog));
}
/**
* 记录响应日志
* 通过 DataBufferFactory 解决响应体分段传输问题。
*/
private ServerHttpResponseDecorator recordResponseLog(ServerWebExchange exchange, GatewayLog gatewayLog) {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory bufferFactory = response.bufferFactory();
return new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Date responseTime = new Date();
gatewayLog.setEndTime(responseTime.getTime());
// 计算执行时间
long executeTime = (responseTime.getTime() - gatewayLog.getStartTime());
gatewayLog.setExecuteTime(executeTime);
gatewayLog.setStatus(response.getStatusCode().value() == 200 ? "成功" : "失败");
// 获取响应类型,如果是 json 就打印
String originalResponseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
if (Objects.equals(this.getStatusCode(), HttpStatus.OK)
&& StringUtils.isNotBlank(originalResponseContentType)
&& originalResponseContentType.contains(CONTENT_TYPE)) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
// 合并多个流集合,解决返回体分段传输
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer join = dataBufferFactory.join(dataBuffers);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
// 释放掉内存
DataBufferUtils.release(join);
String responseResult = new String(content, StandardCharsets.UTF_8);
gatewayLog.setResponseData(responseResult);
return bufferFactory.wrap(content);
}));
}
}
// if body is not a flux. never got there.
return super.writeWith(body);
}
};
}
/**
* 将map参数转换成url参数
*
* @param map
* @return
*/
private String getUrlParamsByMap(MultiValueMap<String, String> map) {
if (ObjectUtils.isEmpty(map)) {
return "";
}
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, List<String>> entry : map.entrySet()) {
sb.append(entry.getKey()).append("=").append(entry.getValue().get(0));
sb.append("&");
}
String s = sb.toString();
if (s.endsWith("&")) {
s = StringUtils.substringBeforeLast(s, "&");
}
return s;
}
}
后来也参考了蛮多大牛文章,自己也去找了很久gateway源代码,终于在这个工具类ServerWebExchangeUtils中发现有更好实现的点,cacheRequestBody()这个方法,英文注释大概是说可以缓存请求正文到这个属性中;后续可以通过获取属性的方式获取到请求正文;拿着这个方法去百度果然有人也是这么解决请求体的问题,直接上代码。文章来源:https://www.toymoban.com/news/detail-665352.html
方式一:通过两个拦截器终于实现了日志记录
@Slf4j
@Component
// 顶级过滤器用来缓存请求正文
public class CacheGlobalRequestBodyFilter implements Ordered, GatewayFilter, GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return ServerWebExchangeUtils
.cacheRequestBody(
exchange,
(request) -> chain.filter(
exchange.mutate().request(request).build()));
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
@Slf4j
@Component
@RequiredArgsConstructor
public class GatewayLogFilter implements GlobalFilter, Ordered {
private final ApplicationEventPublisher applicationEventPublisher;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 请求路径
String requestPath = request.getPath().pathWithinApplication().value();
// 获取路由信息
Route route = getGatewayRoute(exchange);
String ipAddress = IpUtils.getIp(request);
GatewayLog gatewayLog = new GatewayLog();
gatewayLog.setDevice(IpUtils.getServerDevices(request));
gatewayLog.setProtocol(request.getURI().getScheme());
gatewayLog.setRequestMethod(request.getMethodValue());
gatewayLog.setRequestPath(requestPath);
gatewayLog.setTargetServer(route.getUri().toString());
gatewayLog.setStartTime(new Date().getTime());
gatewayLog.setIp(ipAddress);
Map<String, Object> headers = new HashMap<>();
for (String key : request.getHeaders().keySet()) {
headers.put(key, request.getHeaders().getFirst(key));
}
gatewayLog.setHeaders(JSON.toJSONString(headers));
if (request.getHeaders().getContentType() != null) {
gatewayLog.setRequestContentType(request.getHeaders().getContentType().toString());
}
// GatewayUtil.getRequestBodyContent(exchange)这里实际上就是一个获取 exchange.getAttribute(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR)属性而已
gatewayLog.setRequestBody(GatewayUtil.getRequestBodyContent(exchange));
//获取响应体
ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, gatewayLog);
return chain.filter(exchange.mutate().response(decoratedResponse).build())
.then(Mono.fromRunnable(() -> {
// 打印日志
writeAccessLog(gatewayLog);
}));
}
@Override
public int getOrder() {
return 0;
}
/**
* 获取路由信息
*
* @param exchange
* @return
*/
private Route getGatewayRoute(ServerWebExchange exchange) {
return exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
}
/**
* 打印日志
*
* @param gatewayLog 网关日志
*/
private void writeAccessLog(GatewayLog gatewayLog) {
applicationEventPublisher.publishEvent(new GatewayLogEvent(this, gatewayLog));
}
/**
* 记录响应日志
* 通过 DataBufferFactory 解决响应体分段传输问题。
*/
private ServerHttpResponseDecorator recordResponseLog(ServerWebExchange exchange, GatewayLog gatewayLog) {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory bufferFactory = response.bufferFactory();
return new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Date responseTime = new Date();
gatewayLog.setEndTime(responseTime.getTime());
// 执行时间
long executeTime = (responseTime.getTime() - gatewayLog.getStartTime());
gatewayLog.setExecuteTime(executeTime);
gatewayLog.setStatus(response.getStatusCode().value() == 200 ? "成功" : "失败");
// 获取响应类型json
String originalResponseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
if (Objects.equals(this.getStatusCode(), HttpStatus.OK) && StringUtils.isNotBlank(originalResponseContentType)
&& originalResponseContentType.contains(MediaType.APPLICATION_JSON_VALUE)) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
// 合并多个流集合,解决返回体分段传输
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer join = dataBufferFactory.join(dataBuffers);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
// 释放掉内存
DataBufferUtils.release(join);
String responseResult = new String(content, StandardCharsets.UTF_8);
gatewayLog.setResponseData(responseResult);
return bufferFactory.wrap(content);
}));
}
}
// if body is not a flux. never got there.
return super.writeWith(body);
}
};
}
}
方式二 通过AdaptCachedBodyGlobalFilter实现请求体缓存
@Component
@RequiredArgsConstructor
public class GatewayCommonConfig{
private final GatewayProperties gatewayProperties;
private final ApplicationContext applicationContext;
@PostConstruct
public void init(){
//发布对应路由的EnableBodyCachingEvent事件
gatewayProperties.getRoutes().forEach(e->{
EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(new Object(), e.getId());
//发布事件
applicationContext.publishEvent(enableBodyCachingEvent);
});
}
}
然后 就可以在自定义的拦截器中通过request.getBody()获取请求体了文章来源地址https://www.toymoban.com/news/detail-665352.html
方式一参考链接:
方式一参考链接:
方式二参考链接:
到了这里,关于SpringCloud GateWay网关通过全局拦截器GlobalFilter实现API日志的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!