在 《Elasticsearch搜索引擎系统入门》中简单描述了Logstah的安装,本篇文章将较为详细的讲解Logstash的基本配置和用法。
Logstash介绍
Logstash是一个开源的、服务端的数据处理pipeline(管道),它可以接收多个源的数据、然后对它们进行转换、最终将它们发送到指定类型的目的地。Logstash是通过插件机制实现各种功能的,可以在 https://github.com/logstash-plugins
下载各种功能的插件,也可以自行编写插件。
Logstash实现的功能主要分为接收数据、解析过滤并转换数据、输出数据
三个部分,对应的插件依次是input插件、filter插件、output插件
,其中,filter插件
是可选的,其它两个是必须插件。也就是说 在一个完整的Logstash配置文件中,必须有input插件和output插件。
input {
#输入插件
}
filter {
#过滤匹配插件
}
output {
#输出插件
}
简单启动
./bin/logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'
解释下这条命令的含义:
- -e代表执行的意思。
- input即输入的意思,input里面即是输入的方式,这里选择了stdin,就是标准输入(从终端输入)。
- output即输出的意思,output里面是输出的方式,这里选择了stdout,就是标准输出(输出到终端)。
- 这里的codec是个插件,表明格式。这里放在stdout中,表示输出的格式,rubydebug是专门用来做测试的格式,一般用来在终端输出JSON格式。
Logstash在输出内容中会给事件添加一些额外信息。比如"@version"、"host"、"@timestamp"
都是新增的字段, 而最重要的是@timestamp
,用来标记事件的发生时间。由于这个字段涉及到Logstash内部流转,如果给一个字符串字段重命名为@timestamp
的话,Logstash就会直接报错。另外,也不能删除这个字段。
基本配置
在logstash的输出中,常见的字段还有type,表示事件的唯一类型、tags,表示事件的某方面属性,我们可以随意给事件添加字段或者从事件里删除字段。
使用-e参数在命令行中指定配置是很常用的方式,但是如果logstash需要配置更多规则的话,就必须把配置固化到文件里,这就是logstash事件配置文件,如果把上面在命令行执行的logstash命令,写到一个配置文件logstash-simple.conf
中,就变成如下内容:
input { stdin { }
}
output {
stdout { codec => rubydebug }
}
这就是最简单的Logstash事件配置文件。此时,可以使用logstash的-f参数来读取配置文件,然后启动logstash进程,操作如下:bin/logstash -f logstash-simple.conf
,如果要放到后台启动: nohup bin/logstash -f logstash-simple.conf &
logstash读取文件配置示例
# logstash-demo.conf
input {
file {
path => "/var/log/syslog"
}
}
output {
stdout {
codec => rubydebug #这里仍然采用rubydebug的JSON输出格式,这对于调试logstash输出信息是否正常非常有用
}
}
# 如果需要监控多个文件,可以通过逗号分隔即可,例如:
# path => ["/var/log/*.log","/var/log/message","/var/log/secure"]
logstash和kafka
这个配置文件中,输入input仍然是file,重点看输出插件,这里定义了output的输出源为kafka,通过bootstrap_servers选项指定了kafka集群的IP地址和端口。特别注意这里IP地址的写法,每个IP地址之间通过逗号分隔。另外,output输出中的topic_id选项,是指定输出到kafka中的哪个topic下,这里是osmessages,如果无此topic,会自动重建topic。
输出到kafka:
input {
file {
path => "/var/log/messages"
}
}
output {
kafka {
bootstrap_servers => "172.16.213.51:9092,172.16.213.75:9092,172.16.213.109:9092"
topic_id => "osmessages"
}
}
从kafka中读取数据并输出到ES:
input {
kafka {
bootstrap_servers => "172.16.213.51:9092,172.16.213.75:9092,172.16.213.109:9092"
topics => ["osmessages"]
}
}
output {
elasticsearch {
hosts => ["172.16.213.37:9200","172.16.213.77:9200","172.16.213.78:9200"]
index => " osmessageslog-%{+YYYY-MM-dd}"
}
}
Logstash的输入插件
input插件主要用于接收数据,Logstash支持接收多种数据源,常用的有如下几种:
-
file
: 读取一个文件,这个读取功能有点类似于linux下面的tail命令,一行一行的实时读取。 -
syslog
: 监听系统514端口的syslog messages,并使用RFC3164格式进行解析。 -
redis
: Logstash可以从redis服务器读取数据,此时redis类似于一个消息缓存组件。 -
kafka
:Logstash也可以从kafka集群中读取数据,kafka加Logstash的架构一般用在数据量较大的业务场景,kafka可用作数据的缓冲和存储。 -
filebeat
:filebeat是一个文本日志收集器,性能稳定,并且占用系统资源很少,Logstash可以接收filebeat发送过来的数据。
Stdin输入
Stdin输入就是把输入的内容直接输出到控制台,不做存储。配置示例:
input {
stdin {
}
}
filter {
}
output {
stdout {
codec => rubydebug
}
}
进入 logstash的安装目录:
mkdir config/input_config/
cd config/input_config/
vim Stdin.conf #然后写入上面的配置内容
再次回到logstash的安装目录:
sudo chmod -R 777 data/
# 使用刚才的配置文件启动
./bin/logstash -r -f ./config/input_config/Stdin.conf
启动后,在控制台输入内容就可以收集到:
文件内容输入
logstash使用一个名为filewatch的ruby gem库来监听文件变化,并通过一个叫.sincedb的数据库文件来记录被监听的日志文件的读取进度(时间戳),这个sincedb数据文件的默认路径在 logstash/data/plugins/inputs/file
下面,文件名类似于.sincedb_0776ae3d702d482a9bdd8900c6550225
:
配置示例:
input {
file {
path => "/home/rx/tmp/test.log"
tags => "123"
type => "syslog"
}
}
filter {
}
output {
stdout {
codec => rubydebug
}
}
- path里面支持日志路径的通配符,比如:/tmp/log*/*.log
- 这个配置是监听并接收本机的 /home/rx/tmp/test.log 文件内容,start_position表示按时间戳记录的地方开始读取,如果没有时间戳则从头开始读取,有点类似cat命令,默认情况下,logstash会从文件的结束位置开始读取数据,也就是说logstash进程会以类似tail -f命令的形式逐行获取数据。type用来标记事件类型,通常会在输入区域通过type标记事件类型。
将上面的内容保存到 logstash目录下面的 config/input_config/File.conf
然后启动:./bin/logstash -r -f ./config/input_config/File.conf
接下来,给 /home/rx/tmp/test.log 里面写入内容,查看logstash控制台:
读取TCP网络数据
使用了logstash的TCP/UDP插件读取网络数据,其中5514端口是logstash启动的tcp监听端口。
input {
tcp {
port => "5514"
}
}
filter {
}
output {
stdout{
codec=>rubydebug
}
}
Logstash编码插件(Codec)
其实我们就已经用过编码插件codec了,也就是这个rubydebug,它就是一种codec,虽然它一般只会用在stdout插件中,作为配置测试或者调试的工具。
编码插件(Codec)可以在logstash输入或输出时处理不同类型的数据,因此,Logstash不只是一个 input-->filter-->output
的数据流,而是一个input-->decode-->filter-->encode-->output
的数据流。
Codec支持的编码格式常见的有plain、json、json_lines
等。下面依次介绍。
codec插件之plain
plain是一个空的解析器,它可以让用户自己指定格式,也就是说输入是什么格式,输出就是什么格式。下面是一个包含plain编码的事件配置文件:
input{
stdin{
}
}
output{
stdout{
codec => "plain"
}
}
codec插件之json、json_lines
如果发送给logstash的数据内容为json格式,可以在input字段加入codec=>json
来进行解析,这样就可以根据具体内容生成字段,方便分析和储存。如果想让logstash输出为json格式,可以在output字段加入codec=>json
,下面是一个包含json编码的事件配置文件:
input {
stdin {
}
}
output {
stdout {
codec => json
}
}
这就是json格式的输出,可以看出,json每个字段是key:values格式,多个字段之间通过逗号分隔。有时候,如果json文件比较长,需要换行的话,那么就要用json_lines编码格式了。
Logstash过滤器插件(Filter)
filter插件主要用于数据的过滤、解析和格式化,也就是将非结构化的数据解析成结构化的、可查询的标准化数据。常见的filter插件有如下几个:
-
grok
:grok是Logstash最重要的插件,可解析并结构化任意数据,支持正则表达式,并提供了很多内置的规则和模板可供使用。此插件使用最多,但也最复杂。 -
mutate
: 此插件提供了丰富的基础类型数据处理能力。包括类型转换,字符串处理和字段处理等。 -
date
:此插件可以用来转换你的日志记录中的时间字符串。 -
GeoIP
:此插件可以根据IP地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。
filter基本配置
配置示例:
input {
stdin {
}
}
filter {
json {
source => "message"
target => "content"
}
}
output {
stdout {
codec => rubydebug
}
}
以上内容 filter 中的 source表示对
message
部分进行解析,target 表示目标字段,可以将message
里面的内容解析出来后放到content
中。
保存上面配置文件到 Stdin_filter.conf
中,然后运行:./bin/logstash -r -f ./config/input_config/Stdin_filter.conf
在控制台输入一个json,示例:{"ip":"127.0.0.1","method":"POST","url":"/user/info?id=1"}
。
可以看到 json的内容被解析到了 content
里面,而message
里面是原始的json字符串内容。
Grok 正则捕获
grok是一个十分强大的logstash filter插件,他可以通过正则解析任意文本,将非结构化日志数据弄成结构化和方便查询的结构。他是目前logstash 中解析非结构化日志数据最好的方式。
Grok 的语法规则是:%{语法: 语义}
,“语法”指的就是匹配的模式,例如使用NUMBER模式可以匹配出数字,IP模式则会匹配出127.0.0.1这样的IP地址。
例如输入的内容为:
172.16.213.132 [07/Feb/2018:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
那么:%{IP:clientip}
匹配模式将获得的结果为:clientip: 172.16.213.132
%{HTTPDATE:timestamp}
匹配模式将获得的结果为:timestamp: 07/Feb/2018:16:24:19 +0800
%{QS:referrer}
匹配模式将获得的结果为:referrer: "GET / HTTP/1.1"
下面是一个组合匹配模式,它可以获取上面输入的所有内容:
%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}
通过上面这个组合匹配模式,我们将输入的内容分成了五个部分(即五个字段),将输入内容分割为不同的数据字段,这对于日后解析和查询日志数据非常有用,这正是使用grok的目的。
Logstash默认提供了近200个匹配模式(其实就是定义好的正则表达式)让我们来使用,可以在logstash安装目录下 vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns
目录里面查看,基本定义在grok-patterns文件中。
从这些定义好的匹配模式中,可以查到上面使用的四个匹配模式对应的定义规则:
NUMBER (?:%{BASE10NUM})
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}
IP (?:%{IPV6}|%{IPV4})
QS %{QUOTEDSTRING}
举例说明,写入下面的配置文件
// Grok.conf
input{
stdin{}
}
filter{
grok{
match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
}
}
output{
stdout{
codec => "rubydebug"
}
}
启动 ./bin/logstash -r -f ./config/my_config/Grok.conf
输入内容:
127.10.12.13 [30/Aug/2023:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
可以看到已经对输入的内容按照相应的正则解析到了指定的字段
时间处理(Date)
date插件是对于排序事件和回填旧数据尤其重要,它可以用来转换日志记录中的时间字段,变成LogStash::Timestamp
对象,然后转存到@timestamp
字段里。下面是date插件的一个配置示例(这里仅仅列出filter部分):
filter {
grok {
match => ["message", "%{HTTPDATE:timestamp}"]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
数据修改(Mutate)
(1)正则表达式替换匹配字段gsub
可以通过正则表达式替换字段中匹配到的值,只对字符串字段有效,下面是一个关于mutate插件中gsub的示例(仅列出filter部分):
//这个示例表示将filed_name_1字段中所有"/"字符替换为"_"。
filter {
mutate {
gsub => ["filed_name_1", "/" , "_"]
}
}
(2)分隔符分割字符串为数组split
可以通过指定的分隔符分割字段中的字符串为数组,下面是一个关于mutate插件中split的示例(仅列出filter部分):
//这个示例表示将filed_name_2字段以"|"为区间分隔为数组。
filter {
mutate {
split => ["filed_name_2", "|"]
}
}
(3)重命名字段rename
可以实现重命名某个字段的功能,下面是一个关于mutate插件中rename的示例(仅列出filter部分):
//这个示例表示将字段old_field重命名为new_field。
filter {
mutate {
rename => { "old_field" => "new_field" }
}
}
(4)删除字段remove_field
可以实现删除某个字段的功能,下面是一个关于mutate插件中remove_field的示例(仅列出filter部分):
//这个示例表示将字段timestamp删除。
filter {
mutate {
remove_field => ["timestamp"]
}
}
综合案例:
input {
stdin {}
}
filter {
grok {
match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
remove_field => [ "message" ]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
mutate {
rename => { "response" => "response_new" }
convert => [ "response","float" ]
gsub => ["referrer","\"",""]
remove_field => ["timestamp"]
split => ["clientip", "."]
}
}
output {
stdout {
codec => "rubydebug"
}
}
GeoIP 地址查询归类
GeoIP是最常见的免费IP地址归类查询库,当然也有收费版可以使用。GeoIP库可以根据IP 地址提供对应的地域信息,包括国别,省市,经纬度等,此插件对于可视化地图和区域统计非常有用。下面是一个关于GeoIP插件的简单示例(仅列出filter部分):
filter {
geoip {
source => "ip_field" //ip_field字段是输出IP地址的一个字段。
}
}
filter插件综合应用实例
下面给出一个业务系统输出的日志格式,由于业务系统输出的日志格式无法更改,因此就需要我们通过logstash的filter过滤功能以及grok插件来获取需要的数据格式,此业务系统输出的日志内容以及原始格式如下:
2018-02-09T10:57:42+08:00|~|123.87.240.97|~|Mozilla/5.0 (iPhone; CPU iPhone OS 11_2_2 like Mac OS X) AppleWebKit/604.4.7 Version/11.0 Mobile/15C202 Safari/604.1|~|http://m.sina.cn/cm/ads_ck_wap.html|~|1460709836200|~|DF0184266887D0E
可以看出,这段日志都是以 |~|
为区间进行分隔的,那么刚好我们就以 |~|
为区间分隔符,将这段日志内容分割为6个字段。这里通过grok插件进行正则匹配组合就能完成这个功能。
完整的grok正则匹配组合语句如下:
%{TIMESTAMP_ISO8601:localtime}\|\~\|%{IPORHOST:clientip}\|\~\|(%{GREEDYDATA:http_user_agent})\|\~\|(%{DATA:http_referer})\|\~\|%{GREEDYDATA:mediaid}\|\~\|%{GREEDYDATA:osid}
调试grok正则表达式工具:http://grokdebug.herokuapp.com
Logstash的输出插件
output插件用于数据的输出,一个Logstash事件可以穿过多个output,直到所有的output处理完毕,这个事件才算结束。输出插件常见的有如下几种:
- file:表示将日志数据写入磁盘上的文件。
- elasticsearch:表示将日志数据发送给Elasticsearch。Elasticsearch可以高效方便和易于查询的保存数据。
- redis:发送数据到redis中,从这里可以看出,redis插件既可以用在input插件中,也可以用在output插件中。
- kafka:发送数据到kafka中,与redis插件类似,此插件也可以用在Logstash的输入和输出插件中。
- graphite:表示将日志数据发送给graphite,graphite是一种流行的开源工具,用于存储和绘制数据指标。此外,Logstash还支持输出到nagios、hdfs、email(发送邮件)和Exec(调用命令执行)。
更多支持的输出方式,可以在 https://www.elastic.co/guide/en/logstash/current/output-plugins.html
查看:
输出到标准输出(stdout)
stdout与之前介绍过的stdin插件一样,它是最基础和简单的输出插件,主要的功能和用途就是用于调试。下面是一个配置实例:
output {
stdout {
codec => rubydebug
}
}
保存为文件(file)
file插件可以将输出保存到一个文件中,比如下面这个配置中,使用了变量匹配,用于自动匹配时间和主机名,这在实际使用中很有帮助。配置实例如下:
output {
file {
path => "/data/log3/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
}
}
输出到elasticsearch
Logstash将过滤、分析好的数据输出到elasticsearch中进行存储和查询,是最经常使用的方法。下面是一个配置实例:
output {
elasticsearch {
host => ["172.16.213.37:9200","172.16.213.77:9200","172.16.213.78:9200"]
index => "logstash-%{+YYYY.MM.dd}"
manage_template => false
template_name => "template-web_access_log"
}
}
上面配置中每个配置项含义如下:
- host:是一个数组类型的值,后面跟的值是elasticsearch节点的地址与端口,默认端口是9200。可添加多个地址。
- index:写入elasticsearch的索引的名称,这里可以使用变量。Logstash提供了%{+YYYY.MM.dd}这种写法。在语法解析的时候,看到以+ 号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串。这种以天为单位分割的写法,可以很容易的删除老的数据或者搜索指定时间范围内的数据。此外,注意索引名中不能有大写字母。
- manage_template:用来设置是否开启logstash自动管理模板功能,如果设置为false将关闭自动管理模板功能。如果我们自定义了模板,那么应该设置为false。
- template_name:这个配置项用来设置在Elasticsearch中模板的名称。
输出到elasticsearch示例
下面测试对系统日志/var/log/syslog
和 /var/log/auth.log
进行收集,配置logstash如下的配置文件:vim /home/rx/soft/elastic/logstash-7.1.0/config/output_config/logstash-to-es.conf
写入内容:
input {
file {
path => ["/var/log/syslog"]
type => "system"
tags => ["syslog","test"]
start_position => "beginning"
}
file {
path => ["/var/log/auth.log"]
type => "system"
tags => ["auth","test"]
start_position => "beginning"
}
}
filter {
}
output {
if [type] == "system" {
if [tags][0] == "syslog" {
elasticsearch {
hosts => ["http://192.168.0.211:9200","http://192.168.0.212:9200","http://192.168.0.213:9200"]
index => "logstash-system-syslog-%{+YYYY.MM.dd}"
}
stdout { codec=> rubydebug }
}
else if [tags][0] == "auth" {
elasticsearch {
hosts => ["http://192.168.0.211:9200","http://192.168.0.212:9200","http://192.168.0.213:9200"]
index => "logstash-system-auth-%{+YYYY.MM.dd}"
}
stdout { codec=> rubydebug }
}
}
}
- 以上配置的hosts部分使用了集群配置,如果是单个ES服务,直接使用单个IP和端口即可,例如:hosts => [“http://127.0.0.1:9200”]
- start_position => “beginning” 表示从文件头部开始收集
启动 logstash:./logstash-7.1.0/bin/logstash -r -f ./logstash-7.1.0/config/output_config/logstash-to-es.conf
,
启动后就开始收集日志了:
启动kibana
修改 kibana/config/kibana.yml 里面关于ES的内容:
elasticsearch.hosts: ["http://localhost:9200"]
然后启动ES 和 kibana:
./elasticsearch-7.1.0/bin/elasticsearch #启动ES
./kibana-7.1.0-linux-x86_64/bin/kibana #启动kibana
浏览器访问 http://localhost:5601/
打开kibana的页面,进入 Management > Index management 即可看到最新收集到的日志索引:
在kibana中配置索引数据
进入 kibana 管理页面的 Management > Index Patterns > Create index pattern:
下一步:
创建完成:
然后在 Discover 中就可以看到最新收集的日志了:
补充内容:Logstash的条件判断语法
比较操作符
相等: ==, !=, <, >, <=, >=
正则: =~(匹配正则), !~(不匹配正则)
包含: in(包含), not in(不包含)
布尔操作符文章来源:https://www.toymoban.com/news/detail-666377.html
and(与), or(或), nand(非与), xor(非或)
一元运算符:文章来源地址https://www.toymoban.com/news/detail-666377.html
!(取反)
()(复合表达式), !()(对复合表达式结果取反)
到了这里,关于ELK中Logstash的基本配置和用法的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!