基于elk的springboot web日志收集存储方案

这篇具有很好参考价值的文章主要介绍了基于elk的springboot web日志收集存储方案。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


WEB日志分类处理方案

web系统的日志按照价值排序分类

  1. controller层的接口访问日志(debug日志)

  2. 自定义包下的其他日志(debug日志)

  3. 全局日志

    ① 全局错误日志

    ② 部分组件的debug日志

    ③ 部分组件的Info日志

一般来讲,1和2的价值最大,2可基于实际业务情况,进一步划分,如component目录下日志,service层日志等。

ELK安装

Elasticsearch安装

版本:7.6.2

安装前

## 创建数据目录
mkdir -p /mydata/elasticsearch/config
mkdir -p /mydata/elasticsearch/data
mkdir -p /mydata/elasticsearch/plugins
## 限定不限制访问ip
echo "http.host: 0.0.0.0" >> /mydata/elasticsearch/config/elasticsearch.yml
## 修改目录权限
chmod -R 777 /mydata/

部署

docker run --name elasticsearch \
-p 9200:9200 \
-p 9300:9300 \
-e "discovery.type=single-node" \
-e "cluster.name=elasticsearch" \
-e ES_JAVA_OPTS="-Xms64m -Xmx256m" \
-v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /mydata/elasticsearch/data:/usr/share/elasticsearch/data \
-v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.6.2

# 设置重启
docker update --restart=always elasticsearch

部署后

docker exec -it elasticsearch /bin/bash
## 安装分词插件
#此命令需要在容器中运行
elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.6.2/elasticsearch-analysis-ik-7.6.2.zip
docker restart elasticsearch

## 防火墙加端口
firewall-cmd --zone=public --add-port=9200/tcp --permanent
firewall-cmd --reload

Logstash安装

版本:7.6.2

部署前

## 创建数据目录
mkdir -p /mydata/logstash
## 修改目录权限
chmod -R 777 /mydata/
# 修改配置文件
vim /mydata/logstash/logstash.conf

修改配置文件logstash.conf

  • input:根据不同的日志类型,指定不同的输入端口
  • filter:如果是接口日志,去除掉几个属性
  • output:指定输出的host和index
input {
  tcp {
    mode => "server"
    host => "0.0.0.0"
    port => 4560
    codec => json_lines
    type => "debug"
  }
  tcp {
    mode => "server"
    host => "0.0.0.0"
    port => 4561
    codec => json_lines
    type => "error"
  }
  tcp {
    mode => "server"
    host => "0.0.0.0"
    port => 4562
    codec => json_lines
    type => "business"
  }
  tcp {
    mode => "server"
    host => "0.0.0.0"
    port => 4563
    codec => json_lines
    type => "record"
  }
}
filter{
  if [type] == "record" {
    mutate {
      remove_field => "port"
      remove_field => "host"
      remove_field => "@version"
    }
    json {
      source => "message"
      remove_field => ["message"]
    }
  }
}
output {
  elasticsearch {
    hosts => "es:9200"
    index => "mall-%{type}-%{+YYYY.MM.dd}"
  }
}

部署

docker run --name logstash -p 4560:4560 -p 4561:4561 -p 4562:4562 -p 4563:4563 \
--link elasticsearch:es \
-v /mydata/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
-d logstash:7.6.2

# 设置重启
docker update --restart=always logstash

部署后

docker exec -it logstash /bin/bash
# 安装json_lines插件
logstash-plugin install logstash-codec-json_lines

## 防火墙加端口
firewall-cmd --zone=public --add-port=4560/tcp --permanent
firewall-cmd --zone=public --add-port=4561/tcp --permanent
firewall-cmd --zone=public --add-port=4562/tcp --permanent
firewall-cmd --zone=public --add-port=4563/tcp --permanent
firewall-cmd --reload

kibana

版本:7.6.2

部署

docker run --name kibana -p 5601:5601 \
--link elasticsearch:es \
-e "elasticsearch.hosts=http://es:9200" \
-d kibana:7.6.2

# 设置重启
docker update --restart=always kibana

部署后

## 防火墙加端口
firewall-cmd --zone=public --add-port=5601/tcp --permanent
firewall-cmd --reload

导入依赖

    <dependencies>
        <!-- web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- AOP -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <!--logstash-->
        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>5.3</version>
        </dependency>
        <!--提供Sli4j日志打印-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
            <scope>provided</scope>
            <version>${lombok.version}</version>
        </dependency>
    </dependencies>

  <!--版本管理-->
    <dependencyManagement>
        <dependencies>
            <!--SpringBoot-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.7.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

AOP做接口日志输出

日志entity类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * Controller层的日志封装类
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WebLog {
    /**
     * 操作描述
     */
    private String description;

    /**
     * 操作用户
     */
    private String username;

    /**
     * 操作时间
     */
    private Long startTime;

    /**
     * 消耗时间
     */
    private Integer spendTime;

    /**
     * 根路径
     */
    private String basePath;

    /**
     * URI
     */
    private String uri;

    /**
     * URL
     */
    private String url;

    /**
     * 请求类型
     */
    private String method;

    /**
     * IP地址
     */
    private String ip;

    /**
     * 请求参数
     */
    private Object parameter;

    /**
     * 请求返回的结果
     */
    private Object result;

}

AOP Aspect类

/**
 * 统一日志处理切面
 */
@Slf4j
@Aspect
@Component
@Order(1)
public class WebLogAspect {

    @Pointcut("execution(public * com.example.testproject.controller.*.*(..))")
    public void webLog() {
    }

    @Before("webLog()")
    public void doBefore(JoinPoint joinPoint) throws Throwable {
    }

    @AfterReturning(value = "webLog()", returning = "ret")
    public void doAfterReturning(Object ret) throws Throwable {
    }

    @Around("webLog()")
    public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
        long startTime = System.currentTimeMillis();
        //获取当前请求对象
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = attributes.getRequest();
        // 记录请求信息
        WebLog webLog = new WebLog();
        // 运行原始方法
        Object result = joinPoint.proceed();
        // 拿方法swagger注释信息
        Signature signature = joinPoint.getSignature();
        MethodSignature methodSignature = (MethodSignature) signature;
        Method method = methodSignature.getMethod();
        if (method.isAnnotationPresent(ApiOperation.class)) {
            ApiOperation apiOperation = method.getAnnotation(ApiOperation.class);
            webLog.setDescription(apiOperation.value());
        }
        // 记录运行时间
        long endTime = System.currentTimeMillis();
        webLog.setSpendTime((int) (endTime - startTime));
        String urlStr = request.getRequestURL().toString(); // url
        webLog.setUrl(urlStr); //URL
        webLog.setBasePath(StrUtil.removeSuffix(urlStr, URLUtil.url(urlStr).getPath())); //baseurl
        webLog.setMethod(request.getMethod()); // GET/POST/PUT/DELETE
        webLog.setParameter(getParameter(method, joinPoint.getArgs())); // 方法参数
        webLog.setResult(result); // 运行结果
        webLog.setStartTime(startTime); //开始时间
        webLog.setUri(request.getRequestURI()); // URI
        webLog.setIp(request.getRemoteAddr()); // ip
        webLog.setUsername(request.getRemoteUser()); // 登录用户名
        // 打印日志信息
        log.info("{}", JSONUtil.parse(webLog));
        // 正常返回方法运行结果
        return result;
    }

    /**
     * 根据方法和传入的参数获取请求参数
     */
    private Object getParameter(Method method, Object[] args) {
        List<Object> argList = new ArrayList<>();
        Parameter[] parameters = method.getParameters();
        for (int i = 0; i < parameters.length; i++) {
            //将RequestBody注解修饰的参数作为请求参数,json参数
            RequestBody requestBody = parameters[i].getAnnotation(RequestBody.class);
            if (requestBody != null) {
                argList.add(args[i]);
            }
            //将RequestParam注解修饰的参数作为请求参数,其他参数
            RequestParam requestParam = parameters[i].getAnnotation(RequestParam.class);
            if (requestParam != null) {
                Map<String, Object> map = new HashMap<>();
                String key = parameters[i].getName();
                if (StringUtils.hasLength(requestParam.value())) {
                    key = requestParam.value();
                }
                map.put(key, args[i]);
                argList.add(map);
            }
        }

        if (argList.size() == 0) {
            return null;
        } else if (argList.size() == 1) {
            return argList.get(0);
        } else {
            return argList;
        }
    }
}

logback-spring.xml配置

按照日志分类思路

  1. 全局日志:全局日志输出到console、debug日志输出到es中mall-debug index中,error日志输出到es中mall-error index中
  2. 接口日志:由于接口日志通过AOP打印,所以拦截AOP Aspect类的日志信息,输出到es mall-record index中
  3. 业务日志:其他自定义包下的日志,统一打印到es mall-bussiness index中

另限制部分全局日志模块,只打印info信息

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration>
<configuration>
    <!--读取基本信息-->
    <!--引用默认日志配置-->
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>
    <!--使用默认的控制台日志输出实现-->
    <include resource="org/springframework/boot/logging/logback/console-appender.xml"/>

    <!--application.yml中读取信息-->
    <!--应用名沿用springboot的-->
    <springProperty scope="context" name="APP_NAME" source="spring.application.name" defaultValue="springBoot"/>
    <!--logstash地址-->
    <springProperty name="LOG_STASH_HOST" scope="context" source="logstash.host" defaultValue="localhost"/>

    <!--自定义属性-->
    <!--项目名-->
    <property name="PROJECT_NAME" value="mall"/>

    <!--全局DEBUG日志,到LogStash-->
    <appender name="LOG_STASH_DEBUG" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <!--日志级别过滤-->
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>DEBUG</level>
        </filter>
        <!--logstash地址-->
        <destination>${LOG_STASH_HOST}:4560</destination>
        <!--控制编码格式-->
        <encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
            <providers>
                <timestamp>
                    <timeZone>Asia/Shanghai</timeZone>
                </timestamp>
                <!--自定义日志输出格式-->
                <pattern>
                    <pattern>
                        {
                        "level": "%level",
                        "project": "${PROJECT_NAME:-}",
                        "service": "${APP_NAME:-}",
                        "pid": "${PID:-}",
                        "thread": "%thread",
                        "class": "%logger",
                        "message": "%message",
                        "stack_trace": "%exception{20}"
                        }
                    </pattern>
                </pattern>
            </providers>
        </encoder>
        <!--当有多个LogStash服务时,设置访问策略为轮询-->
        <connectionStrategy>
            <roundRobin>
                <connectionTTL>5 minutes</connectionTTL>
            </roundRobin>
        </connectionStrategy>
    </appender>

    <!--全局ERROR日志,到LogStash-->
    <appender name="LOG_STASH_ERROR" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <!--日志级别过滤,只接收error级别-->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>ERROR</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
        <!--logstash地址-->
        <destination>${LOG_STASH_HOST}:4561</destination>
        <!--控制编码格式-->
        <encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
            <providers>
                <timestamp>
                    <timeZone>Asia/Shanghai</timeZone>
                </timestamp>
                <!--自定义日志输出格式-->
                <pattern>
                    <pattern>
                        {
                        "level": "%level",
                        "project": "${PROJECT_NAME:-}",
                        "service": "${APP_NAME:-}",
                        "pid": "${PID:-}",
                        "thread": "%thread",
                        "class": "%logger",
                        "message": "%message",
                        "stack_trace": "%exception{20}"
                        }
                    </pattern>
                </pattern>
            </providers>
        </encoder>
        <!--当有多个LogStash服务时,设置访问策略为轮询-->
        <connectionStrategy>
            <roundRobin>
                <connectionTTL>5 minutes</connectionTTL>
            </roundRobin>
        </connectionStrategy>
    </appender>

    <!--controller层日志输出,到LogStash-->
    <appender name="LOG_STASH_RECORD" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <!--logstash地址-->
        <destination>${LOG_STASH_HOST}:4563</destination>
        <!--控制编码格式-->
        <encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
            <providers>
                <timestamp>
                    <timeZone>Asia/Shanghai</timeZone>
                </timestamp>
                <!--自定义日志输出格式-->
                <pattern>
                    <pattern>
                        {
                        "level": "%level",
                        "project": "${PROJECT_NAME:-}",
                        "service": "${APP_NAME:-}",
                        "pid": "${PID:-}",
                        "thread": "%thread",
                        "class": "%logger",
                        "message": "%message",
                        "stack_trace": "%exception{20}"
                        }
                    </pattern>
                </pattern>
            </providers>
        </encoder>
        <!--当有多个LogStash服务时,设置访问策略为轮询-->
        <connectionStrategy>
            <roundRobin>
                <connectionTTL>5 minutes</connectionTTL>
            </roundRobin>
        </connectionStrategy>
    </appender>

    <!--业务日志输出,到LogStash-->
    <appender name="LOG_STASH_BUSINESS" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <!--logstash地址-->
        <destination>${LOG_STASH_HOST}:4562</destination>
        <!--控制编码格式-->
        <encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
            <providers>
                <timestamp>
                    <timeZone>Asia/Shanghai</timeZone>
                </timestamp>
                <!--自定义日志输出格式-->
                <pattern>
                    <pattern>
                        {
                        "level": "%level",
                        "project": "${PROJECT_NAME:-}",
                        "service": "${APP_NAME:-}",
                        "pid": "${PID:-}",
                        "thread": "%thread",
                        "class": "%logger",
                        "message": "%message",
                        "stack_trace": "%exception{20}"
                        }
                    </pattern>
                </pattern>
            </providers>
        </encoder>
        <!--当有多个LogStash服务时,设置访问策略为轮询-->
        <connectionStrategy>
            <roundRobin>
                <connectionTTL>5 minutes</connectionTTL>
            </roundRobin>
        </connectionStrategy>
    </appender>


    <!--控制日志输出级别,只有在info以上的日志,才会调用root的日志-->
    <!--日志组件的日志-->
    <logger name="org.slf4j" level="INFO"/>
    <!--swagger的日志-->
    <logger name="springfox" level="INFO"/>
    <logger name="io.swagger" level="INFO"/>
    <!--spring的日志-->
    <logger name="org.springframework" level="INFO"/>
    <!--controller层日志-->
    <logger name="org.hibernate.validator" level="INFO"/>


    <!--全局日志配置-->
    <root level="DEBUG">
        <!--输出日志到控制台-->
        <appender-ref ref="CONSOLE"/>
        <!--输出debug日志es-->
        <appender-ref ref="LOG_STASH_DEBUG"/>
        <!--输出错误日志到es-->
        <appender-ref ref="LOG_STASH_ERROR"/>
    </root>

    <!--controller层的日志,打在WebLogAspect里了,所以记录WebLogAspect里的日志,覆盖root定义-->
    <logger name="com.example.testproject.component.WebLogAspect" level="DEBUG">
        <appender-ref ref="LOG_STASH_RECORD"/>
    </logger>

    <!--自定义总包下其他的日志,覆盖root定义-->
    <logger name="com.example.testproject" level="DEBUG">
        <appender-ref ref="LOG_STASH_BUSINESS"/>
    </logger>

</configuration>

YML配置

配置logstash地址信息,同时,要打开全局的debug日志,不然即便在xml中配置了debug日志输出,依然拦截不到debug日志文章来源地址https://www.toymoban.com/news/detail-697926.html

logstash:
  host: 192.168.179.133

logging:
  level:
    root: debug

到了这里,关于基于elk的springboot web日志收集存储方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot应用整合并使用Docker安装ELK实现日志收集

    ELK即Elasticsearch、Logstash、Kibana,组合起来可以搭建线上日志系统,本文主要讲解使用ELK来收集SpringBoot应用产生的日志。 Elasticsearch:用于存储收集到的日志信息; Logstash:用于收集日志,SpringBoot应用整合了Logstash以后会把日志发送给Logstash,Logstash再把日志转发给Elasticsearch; Kiban

    2023年04月08日
    浏览(42)
  • 基于Filebeat+Kafka+ELK实现Nginx日志收集并采用Elastalert2实现钉钉告警

           先准备3台Nginx服务器,用做后端服务器,(由于机器有限,也直接用这三台机器来部署ES集群),然后准备2台服务器做负载均衡器(Nginx实现负载均衡具体实现操作有机会在介绍),如果是简单学习测试,可以先使用3台Nginx服务器就可以,先告一段落。 3台Nginx服务

    2024年02月15日
    浏览(32)
  • 【ELK日志收集系统】

    目录 一、概述 1.作用 2.为什么使用? 二、组件 1.elasticsearch 1.1 作用 1.2 特点 2.logstash 2.1 作用 2.2 工作过程 2.3 INPUT 2.4 FILETER 2.5 OUTPUTS 3.kibana 三、架构类型 1.ELK 2.ELKK 3.ELFK 4.ELFKK 四、案例 - 构建ELK集群 1.环境配置 2.安装node1与node2节点的elasticsearch 2.1 安装 2.2 配置 2.3 启动ela

    2024年02月10日
    浏览(38)
  • ELK---日志收集系统

    1.要收集哪些日志? ①系统日志–为监控做准备 ②服务日志–数据库–MySQL–慢查询日志、错误日志、普通日志 ③业务日志–log4j( 必须要收集的是业务日志 ) 注:log4j—Java类的数据业务日志 (1)要有针对性的去收集 (2)调整日志级别 2.日志收集后,如何展示?(可视化

    2023年04月08日
    浏览(38)
  • ELK日志收集系统

    ELK由三个组件构成 作用:日志收集 日志分析 日志可视化     日志分析     开源的日志收集、分析、存储程序     特点         分布式         零配置         自动发现         索引自动分片         索引副本机制         Restful风格接口         多数据源

    2024年02月10日
    浏览(37)
  • ELK日志收集记录

    logstash在需要收集日志的服务器里运行,将日志数据发送给es 在kibana页面查看es的数据 es和kibana安装: Install Elasticsearch with RPM | Elasticsearch Guide [8.8] | Elastic Configuring Elasticsearch | Elasticsearch Guide [8.8] | Elastic Install Kibana with RPM | Kibana Guide [8.8] | Elastic 需要收集日志的服务器里安装

    2024年02月09日
    浏览(55)
  • ELK群集部署日志收集

    ELK平台是一套完整的日志集中处理解决方案 由ElasticSearch、Logstash、Kiabana三个开源工具配合使用 是用户对日志的查询、排序、统计的强大工具组合 一般用于大型企业,中小型企业一般会选择(rsyslog+日志服务器或者shell+Python收集日志) logstash进行日志数据收集并且格式化之后

    2023年04月20日
    浏览(69)
  • ELK日志收集系统(四十九)

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 前言 一、概述 二、组件 1. elasticsearch 2. logstash 2.1 工作过程 2.2 INPUT 2.3  FILETER 2.4  OUTPUTS 3. kibana 三、架构类型 3.1 ELK 3.2 ELKK 3.3 ELFK 3.5 EFK 四、案例 ELK日志收集系统是一种常用的开源系统,由三个主

    2024年02月10日
    浏览(41)
  • ELK (一)部署ELK+Filebeat日志收集分析系统

    说明:此安装流程只适用于8.0.0以下的版本 1.1 下载ElasticSearch的wget指令: 1.2 解压安装包到指定目录 指定解压缩到 /usr/local 目录下 1.3 修改配置文件 (1)elasticsearch.yml 分别创建 path.data、path.logs 对应的 data、logs文件夹。 详细配置: (2)limits.conf 末尾追加以下内容: (3)s

    2024年02月08日
    浏览(46)
  • ELK日志收集平台部署(kafka)

    正文:ELK日志收集平台部署 Kafka: 数据缓冲队列。作为消息队列解耦合处理过程,同时提高了可扩展性。具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 基于zookeeper协调的分布式消息系统,它的最大的特

    2024年01月25日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包