电商大数据日志收集系统之EFK

这篇具有很好参考价值的文章主要介绍了电商大数据日志收集系统之EFK。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

日志管理的挑战:

  • 关注点很多,任何一个点都有可能引起问题

  • 日志分散在很多机器,出了问题时,才发现日志被删了

  • 很多运维人员是消防员,哪里有问题去哪里

efk,kafka,elasticsearch,Powered by 金山文档

集中化日志管理思路: 日志收集 ——》格式化分析 ——》检索和可视化 ——》 风险告警

ELK架构:

ELK架构分为两种,一种是经典的ELK,另外一种是加上消息队列(Redis或Kafka或RabbitMQ)和Nginx结构。

经典的ELK

经典的ELK主要是由Filebeat + Logstash + Elasticsearch + Kibana组成,如下图:(早期的ELK只有Logstash + Elasticsearch + Kibana)

efk,kafka,elasticsearch,Powered by 金山文档

弊端:此架构主要适用于数据量小的开发环境,存在数据丢失的危险。

整合消息队列+Nginx架构

这种架构,主要加上了Redis或Kafka或RabbitMQ做消息队列,保证了消息的不丢失。

efk,kafka,elasticsearch,Powered by 金山文档

Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的存储库中。

应用:ETL工具 / 数据采集处理引擎

https://www.elastic.co/cn/logstash/

efk,kafka,elasticsearch,Powered by 金山文档

Logstash核心概念

Pipeline

  • 包含了input—filter-output三个阶段的处理流程

  • 插件生命周期管理

  • 队列管理

Logstash Event

  • 数据在内部流转时的具体表现形式。数据在input 阶段被转换为Event,在 output被转化成目标格式数据

  • Event 其实是一个Java Object,在配置文件中,对Event 的属性进行增删改查

Codec (Code / Decode)

将原始数据decode成Event;将Event encode成目标数据

efk,kafka,elasticsearch,Powered by 金山文档

Logstash数据传输原理

  1. 数据采集与输入:Logstash支持各种输入选择,能够以连续的流式传输方式,轻松地从日志、指标、Web应用以及数据存储中采集数据。

  1. 实时解析和数据转换:通过Logstash过滤器解析各个事件,识别已命名的字段来构建结构,并将它们转换成通用格式,最终将数据从源端传输到存储库中。

  1. 存储与数据导出:Logstash提供多种输出选择,可以将数据发送到指定的地方。

Logstash通过管道完成数据的采集与处理,管道配置中包含input、output和filter(可选)插件,input和output用来配置输入和输出数据源、filter用来对数据进行过滤或预处理。

efk,kafka,elasticsearch,Powered by 金山文档

Logstash配置文件结构

参考:https://www.elastic.co/guide/en/logstash/7.17/configuration.html

Logstash的管道配置文件对每种类型的插件都提供了一个单独的配置部分,用于处理管道事件。

efk,kafka,elasticsearch,Powered by 金山文档

每个配置部分可以包含一个或多个插件。例如,指定多个filter插件,Logstash会按照它们在配置文件中出现的顺序进行处理。

#运行:

bin/logstash -f logstash-demo.conf

Input Plugins

https://www.elastic.co/guide/en/logstash/7.17/input-plugins.html

一个 Pipeline可以有多个input插件

  • Stdin / File

  • Beats / Log4J /Elasticsearch / JDBC / Kafka /Rabbitmq /Redis

  • JMX/ HTTP / Websocket / UDP / TCP

  • Google Cloud Storage / S3

  • Github / Twitter

Output Plugins

https://www.elastic.co/guide/en/logstash/7.17/output-plugins.html

将Event发送到特定的目的地,是 Pipeline 的最后一个阶段。

常见 Output Plugins:

  • Elasticsearch

  • Email / Pageduty

  • Influxdb / Kafka / Mongodb / Opentsdb / Zabbix

  • Http / TCP / Websocket

Filter Plugins

https://www.elastic.co/guide/en/logstash/7.17/filter-plugins.html

处理Event

内置的Filter Plugins:

  • Mutate 一操作Event的字段

  • Metrics — Aggregate metrics

  • Ruby 一执行Ruby 代码

Codec Plugins

https://www.elastic.co/guide/en/logstash/7.17/codec-plugins.html

将原始数据decode成Event;将Event encode成目标数据

内置的Codec Plugins:

  • Line / Multiline

  • JSON / Avro / Cef (ArcSight Common Event Format)

  • Dots / Rubydebug

Logstash Queue

  • In Memory Queue

进程Crash,机器宕机,都会引起数据的丢失

  • Persistent Queue

机器宕机,数据也不会丢失; 数据保证会被消费; 可以替代 Kafka等消息队列缓冲区的作用

queue.type: persisted (默认是memory)

queue.max_bytes: 4gb

efk,kafka,elasticsearch,Powered by 金山文档

Logstash安装

logstash官方文档: https://www.elastic.co/guide/en/logstash/8.6/installing-logstash.html

1)下载并解压logstash

下载地址: https://www.elastic.co/cn/downloads/past-releases#logstash

选择版本:7.17

efk,kafka,elasticsearch,Powered by 金山文档

测试:运行最基本的logstash管道

cd logstash-7.17

#linux

#-e选项表示,直接把配置放在命令中,这样可以有效快速进行测试

bin/logstash -e 'input { stdin { } } output { stdout {} }'

#windows

.\bin\logstash.bat -e "input { stdin { } } output { stdout {} }"

Codec Plugin测试

# single line

bin/logstash -e "input{stdin{codec=>line}}output{stdout{codec=> rubydebug}}"

bin/logstash -e "input{stdin{codec=>json}}output{stdout{codec=> rubydebug}}"

Codec Plugin —— Multiline

设置参数:

  • pattern: 设置行匹配的正则表达式

  • what : 如果匹配成功,那么匹配行属于上一个事件还是下一个事件

  • previous / next

  • negate : 是否对pattern结果取反

  • true / false

efk,kafka,elasticsearch,Powered by 金山文档

Input Plugin —— File

  • 支持从文件中读取数据,如日志文件

  • 文件读取需要解决的问题:只被读取一次。重启后需要从上次读取的位置继续(通过sincedb 实现)

  • 读取到文件新内容,发现新文件

  • 文件发生归档操作(文档位置发生变化,日志rotation),不能影响当前的内容读取

Filter Plugin

Filter Plugin可以对Logstash Event进行各种处理,例如解析,删除字段,类型转换

  • Date: 日期解析

  • Dissect: 分割符解析

  • Grok: 正则匹配解析

  • Mutate: 处理字段。重命名,删除,替换

  • Ruby: 利用Ruby 代码来动态修改Event

Filter Plugin - Mutate

对字段做各种操作:

  • Convert : 类型转换

  • Gsub : 字符串替换

  • Split / Join /Merge: 字符串切割,数组合并字符串,数组合并数组

  • Rename: 字段重命名

  • Update / Replace: 字段内容更新替换

  • Remove_field: 字段删除

Logstash导入数据到ES

  1. 测试数据集下载:https://grouplens.org/datasets/movielens/

efk,kafka,elasticsearch,Powered by 金山文档

2.准备logstash-movie.conf配置文件

efk,kafka,elasticsearch,Powered by 金山文档

3.运行logstash

# linux

bin/logstash -f logstash-movie.conf

  • --config.test_and_exit : 解析配置文件并报告任何错误

  • --config.reload.automatic: 启用自动配置加载

同步数据库数据到Elasticsearch

需求: 将数据库中的数据同步到ES,借助ES的全文搜索,提高搜索速度

  • 需要把新增用户信息同步到Elasticsearch中

  • 用户信息Update 后,需要能被更新到Elasticsearch

  • 支持增量更新

  • 用户注销后,不能被ES所搜索到

实现思路

  • 基于canal同步数据

  • 借助JDBC Input Plugin将数据从数据库读到Logstash

  • 需要自己提供所需的 JDBC Driver;

  • JDBC Input Plugin 支持定时任务 Scheduling,其语法来自 Rufus-scheduler,其扩展了 Cron,使用 Cron 的语法可以完成任务的触发;

  • JDBC Input Plugin 支持通过 Tracking_column / sql_last_value 的方式记录 State,最终实现增量的更新;

  • https://www.elastic.co/cn/blog/logstash-jdbc-input-plugin

JDBC Input Plugin实现步骤

1)拷贝jdbc依赖到logstash-7.17/drivers目录下

2)准备mysql-demo.conf配置文件

efk,kafka,elasticsearch,Powered by 金山文档

3.运行logstash

bin/logstash -f mysql-demo.conf

什么是Beats

轻量型数据采集器,文档地址: https://www.elastic.co/guide/en/beats/libbeat/7.17/index.html

Beats 是一个免费且开放的平台,集合了多种单一用途的数据采集器。它们从成百上千或成千上万台机器和系统向 Logstash 或 Elasticsearch 发送数据。

efk,kafka,elasticsearch,Powered by 金山文档

FileBeat简介

FileBeat专门用于转发和收集日志数据的轻量级采集工具。它可以作为代理安装在服务器上,FileBeat监视指定路径的日志文件,收集日志数据,并将收集到的日志转发到Elasticsearch或者Logstash。

FileBeat的工作原理

启动FileBeat时,会启动一个或者多个输入(Input),这些Input监控指定的日志数据位置。FileBeat会针对每一个文件启动一个Harvester(收割机)。Harvester读取每一个文件的日志,将新的日志发送到libbeat,libbeat将数据收集到一起,并将数据发送给输出(Output)。

efk,kafka,elasticsearch,Powered by 金山文档

logstash vs FileBeat

  • Logstash是在jvm上运行的,资源消耗比较大。而FileBeat是基于golang编写的,功能较少但资源消耗也比较小,更轻量级。

  • Logstash 和Filebeat都具有日志收集功能,Filebeat更轻量,占用资源更少

  • Logstash 具有Filter功能,能过滤分析日志

  • 一般结构都是Filebeat采集日志,然后发送到消息队列、Redis、MQ中,然后Logstash去获取,利用Filter功能过滤分析,然后存储到Elasticsearch中

  • FileBeat和Logstash配合,实现背压机制。当将数据发送到Logstash或 Elasticsearch时,Filebeat使用背压敏感协议,以应对更多的数据量。如果Logstash正在忙于处理数据,则会告诉Filebeat 减慢读取速度。一旦拥堵得到解决,Filebeat就会恢复到原来的步伐并继续传输数据。

下载并解压Filebeat

下载地址:https://www.elastic.co/cn/downloads/past-releases#filebeat

选择版本:7.17.3

efk,kafka,elasticsearch,Powered by 金山文档

编辑配置

修改 filebeat.yml 以设置连接信息:

output.elasticsearch:

hosts: ["192.168.65.174:9200","192.168.65.192:9200","192.168.65.204:9200"]

username: "elastic"

password: "123456"

setup.kibana:

host: "192.168.65.174:5601"

启用和配置数据收集模块

从安装目录中,运行:

efk,kafka,elasticsearch,Powered by 金山文档

启动 Filebeat

efk,kafka,elasticsearch,Powered by 金山文档

ELK整合

案例:采集tomcat服务器日志

Tomcat服务器运行过程中产生很多日志信息,通过Logstash采集并存储日志信息至ElasticSearch中

使用FileBeats将日志发送到Logstash

1)创建配置文件filebeat-logstash.yml,配置FileBeats将数据发送到Logstash

efk,kafka,elasticsearch,Powered by 金山文档
  • pattern:正则表达式

  • negate:true 或 false;默认是false,匹配pattern的行合并到上一行;true,不匹配pattern的行合并到上一行

  • match:after 或 before,合并到上一行的末尾或开头

启动FileBeat,并指定使用指定的配置文件

./filebeat -e -c filebeat-logstash.yml

可能出现的异常:

异常1:Exiting: error loading config file: config file ("filebeat-logstash.yml") can only be writable by the owner but the permissions are "-rw-rw-r--" (to fix the permissions use: 'chmod go-w /home/es/filebeat-7.17.3-linux-x86_64/filebeat-logstash.yml')

因为安全原因不要其他用户写的权限,去掉写的权限就可以了

chmod 644 filebeat-logstash.yml

异常2:Failed to connect to backoff(async(tcp://192.168.65.204:5044)): dial tcp 192.168.65.204:5044: connect: connection refused

FileBeat将尝试建立与Logstash监听的IP和端口号进行连接。但此时,我们并没有开启并配置Logstash,所以FileBeat是无法连接到Logstash的。

配置Logstash接收FileBeat收集的数据并打印

vim config/filebeat-console.conf

# 配置从FileBeat接收数据

input {

beats {

port => 5044

}

}

output {

stdout {

codec => rubydebug

}

}

测试logstash配置是否正确

bin/logstash -f config/filebeat-console.conf --config.test_and_exit

启动logstash

# reload.automatic:修改配置文件时自动重新加载

bin/logstash -f config/filebeat-console.conf --config.reload.automatic

测试访问tomcat,logstash是否接收到了Filebeat传过来的tomcat日志

Logstash输出数据到Elasticsearch

如果我们需要将数据输出值ES而不是控制台的话,我们修改Logstash的output配置。

vim config/filebeat-elasticSearch.conf

input {

beats {

port => 5044

}

}

output {

elasticsearch {

hosts => ["http://localhost:9200"]

user => "elastic"

password => "123456"

}

stdout{

codec => rubydebug

}

}

启动logstash

bin/logstash -f config/filebeat-elasticSearch.conf --config.reload.automatic

ES中会生成一个以logstash开头的索引,测试日志是否保存到了ES。

思考:日志信息都保证在message字段中,是否可以把日志进行解析一个个的字段?例如:IP字段、时间、请求方式、请求URL、响应结果。

利用Logstash过滤器解析日志

从日志文件中收集到的数据包含了很多有效信息,比如IP、时间等,在Logstash中可以配置过滤器Filter对采集到的数据进行过滤处理,Logstash中有大量的插件可以供我们使用。

查看Logstash已经安装的插件

bin/logstash-plugin list

Grok插件

Grok是一种将非结构化日志解析为结构化的插件。这个工具非常适合用来解析系统日志、Web服务器日志、MySQL或者是任意其他的日志格式。

https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-grok.html

Grok语法

Grok是通过模式匹配的方式来识别日志中的数据,可以把Grok插件简单理解为升级版本的正则表达式。它拥有更多的模式,默认Logstash拥有120个模式。如果这些模式不满足我们解析日志的需求,我们可以直接使用正则表达式来进行匹配。

grok模式的语法是:

%{SYNTAX:SEMANTIC}

SYNTAX(语法)指的是Grok模式名称,SEMANTIC(语义)是给模式匹配到的文本字段名。例如:

%{NUMBER:duration} %{IP:client}

duration表示:匹配一个数字,client表示匹配一个IP地址。

默认在Grok中,所有匹配到的的数据类型都是字符串,如果要转换成int类型(目前只支持int和float),可以这样:%{NUMBER:duration:int} %{IP:client}

常用的Grok模式

https://help.aliyun.com/document_detail/129387.html?scm=20140722.184.2.173

用法

filter {

grok {

match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }

}

}

比如,tomacat日志

192.168.65.103 - -[15/Mar/2023 00:28:45.120] "GET /docs/images/docs-stylesheet.css HTTP/1.1" 200 5780

解析后的字段

efk,kafka,elasticsearch,Powered by 金山文档

grok模式:

%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA:protocol}\" %{INT:status} %{INT:length}

为了方便测试,我们可以使用Kibana来进行Grok开发

修改Logstash配置文件

efk,kafka,elasticsearch,Powered by 金山文档

启动logstash测试:

bin/logstash -f config/filebeat-console.conf --config.reload.automatic

使用mutate插件过滤掉不需要的字段

mutate {

enable_metric => "false"

remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]

}

要将日期格式进行转换,我们可以使用Date插件来实现。该插件专门用来解析字段中的日期,官方说明文档:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-date.html

用法如下:

efk,kafka,elasticsearch,Powered by 金山文档

将date字段转换为「年月日 时分秒」格式。默认字段经过date插件处理后,会输出到@timestamp字段,所以,我们可以通过修改target属性来重新定义输出字段

match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]

target => "date"

}

输出到Elasticsearch指定索引

index来指定索引名称,默认输出的index名称为:logstash-%{+yyyy.MM.dd}。但注意,要在index中使用时间格式化,filter的输出必须包含 @timestamp字段,否则将无法解析日期。

output {

elasticsearch {

index => "tomcat_web_log_%{+YYYY-MM}"

hosts => ["http://localhost:9200"]

user => "elastic"

password => "123456"

}

stdout{

codec => rubydebug

}

}

注意:index名称中,不能出现大写字符

完整的Logstash配置文件

efk,kafka,elasticsearch,Powered by 金山文档

启动logstash

bin/logstash -f config/filebeat-filter-es.conf --config.reload.automatic文章来源地址https://www.toymoban.com/news/detail-725373.html

到了这里,关于电商大数据日志收集系统之EFK的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 部署ELK+Kafka+Filebeat日志收集分析系统

    ELK是三个软件的统称,即Elasticsearch、Logstash和Kibana三个开源软件的缩写。这三款软件都是开源软件,通常配合使用,并且都先后归于Elastic.co企业名下,故被简称为ELK协议栈。ELK主要用于部署在企业架构中,收集多台设备上多个服务的日志信息,并将其统一整合后提供给用户。

    2024年02月16日
    浏览(29)
  • docker搭建Elk+Kafka+Filebeat分布式日志收集系统

    目录 一、介绍 二、集群环境 三、ES集群 四、Kibana  五、Logstash 六、Zookeeper 七、Kafka 八、Filebeat 八、Nginx (一)架构图  (二)组件介绍 1.Elasticsearch 是一个基于Lucene的搜索服务器。提供搜集、分析、存储数据三大功能。它提供了一个分布式多用户能力的全文搜索引擎,基于

    2024年02月04日
    浏览(40)
  • 搭建EFK(Elasticsearch+Filebeat+Kibana)日志收集系统[windows]

    EFK简介 Elasticsearch 是一个实时的、分布式的可扩展的搜索引擎,允许进行全文、结构化搜索,它通常用于索引和搜索大量日志数据,也可用于搜索许多不同类型的文档。 FileBeats 是数据采集的得力工具。将 Beats 和您的容器一起置于服务器上,或者将 Beats 作为函数加以部署,然

    2024年02月08日
    浏览(44)
  • 基于Elasticsearch + Fluentd + Kibana(EFK)搭建日志收集管理系统

    目录 1、EFK简介 2、EFK框架 2.1、Fluentd系统架构 2.2、Elasticsearch系统架构

    2024年02月11日
    浏览(28)
  • K8s部署轻量级日志收集系统EFK(elasticsearch + filebeat + kibana)

    目录 K8s部署EFK(elasticsear + filebeat + kibana)日志收集 一.准备镜像 二.搭建Elasticsearch + kibana 1.在可执行kubectl命令的服务器准备安装的yml文件 2.在elasticsearch-kibana目录下创建配置文件elasticsearch.yml 3.创建kibana配置文件kibana.yml 4.在k8s中创建elasticsearch和kibana的配置文件configmap 5.检查

    2024年02月08日
    浏览(54)
  • docker搭建最新ELFK分布式日志收集系统(elasticsearch+logstash+filebeats+kibana7.16.1)

    随着分布式项目的集群部署,日志的存储也分散开来,在日后出现问题进行日志定位时就会出现很困难,服务器很多会做负载均衡,这样最终请求所落在的服务器也随机起来,所以好的方式就是集中收集起来,不需要一台一台服务器去查,方便查看。 ELFK是Elasticsearch+Logstash+F

    2024年02月08日
    浏览(37)
  • 大数据技术之flume——日志收集系统

    大数据需要解决的三个问题:采集、存储、计算。 Apache flume是一个分布式、可靠的、高可用的 海量日志数据采集、聚合和传输系统 ,将海量的日志数据从不同的数据源移动到一个中央的存储系统中。用一句话总结:Flume不生产数据,它只是数据的搬运工。 flume最主要的作用

    2024年02月06日
    浏览(27)
  • 使用Filebeat+Kafka+Logstash+Elasticsearch构建日志分析系统

            随着时间的积累,日志数据会越来越多,当您需要查看并分析庞杂的日志数据时,可通过Filebeat+Kafka+Logstash+Elasticsearch采集日志数据到Elasticsearch中,并通过Kibana进行可视化展示与分析。本文介绍具体的实现方法。 Kafka是一种分布式、高吞吐、可扩展的消息队列服务,

    2024年02月04日
    浏览(36)
  • ELK日志收集平台部署(kafka)

    正文:ELK日志收集平台部署 Kafka: 数据缓冲队列。作为消息队列解耦合处理过程,同时提高了可扩展性。具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 基于zookeeper协调的分布式消息系统,它的最大的特

    2024年01月25日
    浏览(29)
  • Elk+Filebeat+Kafka实现日志收集

    1.实验组件 2.安装前准备 3.安装Zookeeper 1.安装Kafka 2.命令行测试  1.安装Filebeat 2.时间同步 3.配置filebeat 4.配置logstash 1.安装配置filebeat 2.配置logstash

    2024年02月05日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包