一、Logstash企业级插件案例(EFLK架构)
1.常见的插件概述
gork插件:
Grok是将⾮结构化⽇志数据解析为结构化和可查询的好⽅法。底层原理是基于正则匹配任意
⽂本格式。
该⼯具⾮常适合syslog⽇志、apache和其他⽹络服务器⽇志、mysql⽇志,以及通常为⼈
类⽽⾮计算机消耗⽽编写的任何⽇志格式。
内置120种匹配模式,当然也可以⾃定义匹配模式:
https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
2.使用Logstash内置的正则案例1
[root@elk101.oldboyedu.com ~]# cat config-logstash/14-beat-grok-es.conf
input {
beats {
port => 8888
}
}
filter {
grok {
match => {
# "message" => "%{COMBINEDAPACHELOG}"
# 上⾯的""变量官⽅github上已经废弃,建议使⽤下⾯的匹配模式
# https://github.com/logstash-plugins/logstash-patterns-
core/blob/main/patterns/legacy/httpd
"message" => "%{HTTPD_COMMONLOG}"
}
}
}
output {
stdout {}
elasticsearch {
hosts => ["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"]
index => "oldboyedu-linux80-logstash-%{+YYYY.MM.dd}"
}
}
[root@elk101.oldboyedu.com ~]# logstash -rf config-logstash/14-beat-grok-es.conf
3.使用Logstash内置的正则案例1
[root@elk101.oldboyedu.com ~]# cat config-logstash/15-stdin-grok-stdout.conf
input {
stdin {}
}
filter {
grok {
match => {
"message" => "%{IP:oldboyedu-client} %{WORD:oldboyedu-method} %{URIPATHPARAM:oldboyedu-request} %{NUMBER:oldboyedu-bytes} %{NUMBER:oldboyedu-duration}"
}
}
}
output {
stdout {}
}
[root@elk101.oldboyedu.com ~]# logstash -f config-logstash/15-stdin-
grok-stdout.conf
温馨提示:(如下图所示,按照要求输⼊数据)
55.3.244.1 GET /index.html 15824 0.043
10.0.0.103 POST /oldboyedu.html 888888 5.20
参考地址:
https://github.com/logstash-plugins/logstash-patterns-core/tree/main/patterns/legacy
4.使用logstash自定义的正则案例
[root@elk101.oldboyedu.com ~]# cat config-logstash/16-stdin-grok_custom_patterns-stdout.conf
input {
stdin {}
}
filter {
grok {
# 指定匹配模式的⽬录,可以使⽤绝对路径哟~
# 在./patterns⽬录下随便创建⼀个⽂件,并写⼊以下匹配模式
# POSTFIX_QUEUEID [0-9A-F]{10,11}
# OLDBOYEDU_LINUX80 [\d]{3}
patterns_dir => ["./patterns"]
# 匹配模式
# 测试数据为: Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>
# match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
# 测试数据为: ABCDE12345678910 ---> 333FGHIJK
match => { "message" => "%{POSTFIX_QUEUEID:oldboyedu_queue_id} ---> %{OLDBOYEDU_LINUX80:oldboyedu_linux80_elk}" }
}
}
output {
stdout {}
}
[root@elk101.oldboyedu.com ~]# logstash -f config-logstash/16-stdin-grok_custom_patterns-stdout.conf
5.filter插件通用字段案例
[root@elk101.oldboyedu.com ~]# cat config-logstash/17-beat-grok-es.conf
input {
beats {
port => 8888
}
}
filter {
grok {
match => {
# "message" => "%{COMBINEDAPACHELOG}"
# 上⾯的""变量官⽅github上已经废弃,建议使⽤下⾯的匹配模式
# https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/httpd
"message" => "%{HTTPD_COMMONLOG}"
}
# 移除指定的字段
remove_field => [ "host", "@version", "ecs", "tags","agent","input", "log" ]
# 添加指定的字段
add_field => {
"school" => "北京市昌平区沙河镇⽼男孩IT教育"
"oldboyedu-clientip" => "clientip ---> %{clientip}"
}
# 添加tag
add_tag => [ "linux80","zookeeper","kafka","elk" ]
# 移除tag
remove_tag => [ "zookeeper", "kafka" ]
# 创建插件的唯⼀ID,如果不创建则系统默认⽣成
id => "nginx"
}
}
output {
stdout {}
# elasticsearch {
# hosts => ["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"]
# index => "oldboyedu-linux80-logstash-%{+YYYY.MM.dd}"
# }
}
[root@elk101.oldboyedu.com ~]# logstash -rf config-logstash/17-beat-grok-es.conf
6.data插件修改写入file的时间
[root@elk101.oldboyedu.com ~]# cat config-logstash/18-beat-grok_date-es.conf
input {
beats {
port => 8888
}
}
filter {
grok {
match => {
# "message" => "%{COMBINEDAPACHELOG}"
# 上⾯的""变量官⽅github上已经废弃,建议使⽤下⾯的匹配模式
# https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/httpd
"message" => "%{HTTPD_COMMONLOG}"
}
# 移除指定的字段
remove_field => [ "host", "@version", "ecs", "tags","agent","input", "log" ]
# 添加指定的字段
add_field => {
"school" => "北京市昌平区沙河镇⽼男孩IT教育"
}
}
date {
# 匹配时间字段并解析,值得注意的是,logstash的输出时间可能会错8⼩时,但写⼊es但数据是准确的!
# "13/May/2022:15:47:24 +0800", 以下2种match写法均可!
# match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
# 当然,我们也可以不对时区字段进⾏解析,⽽是使⽤"timezone"指定时区哟!
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss +0800"]
# 设置时区字段为UTC时间,写⼊ES的数据时间是不准确的
# timezone => "UTC"
# 建议⼤家设置为"Asia/Shanghai",写⼊ES的数据是准确的!
timezone => "Asia/Shanghai"
# 将匹配到到时间字段解析后存储到⽬标字段,若不指定,则默认字段为"@timestamp"字段
target => "oldboyedu-linux80-nginx-access-time"
}
}
output {
stdout {}
elasticsearch {
hosts => ["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"]
index => "oldboyedu-linux80-logstash-%{+YYYY.MM.dd}"
}
}
[root@elk101.oldboyedu.com ~]# logstash -rf config-logstash/18-beat-grok_date-es.conf
7.geoip分析源地址的地址位置
[root@elk101.oldboyedu.com ~]# cat config-logstash/19-beat-grok_date_geoip-es.conf
input {
beats {
port => 8888
}
}
filter {
grok {
match => {
"message" => "%{HTTPD_COMMONLOG}"
}
remove_field => [ "host", "@version", "ecs", "tags","agent","input", "log" ]
add_field => {
"school" => "北京市昌平区沙河镇⽼男孩IT教育"
}
}
date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
timezone => "Asia/Shanghai"
target => "oldboyedu-linux80-nginx-access-time"
}
geoip {
# 指定基于哪个字段分析IP地址
source => "clientip"
# 如果期望查看指定的字段,则可以在这⾥配置即可,若不设置,表示显示所有的查询字段.
fields => ["city_name","country_name","ip"]
# 指定geoip的输出字段,如果想要对多个IP地址进⾏分析,则该字段很有⽤哟~
target => "oldboyedu-linux80"
}
}
output {
stdout {}
elasticsearch {
hosts => ["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"]
index => "oldboyedu-linux80-logstash-%{+YYYY.MM.dd}"
}
}
[root@elk101.oldboyedu.com ~]# logstash -rf config-logstash/19-beat-grok_date_geoip-es.conf
8.useragent分析客户端的设备类型
[root@elk101.oldboyedu.com ~]# cat config-logstash/20-beat-grok_date_geoip_useragent-es.conf
input {
beats {
port => 8888
}
}
filter {
date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
timezone => "Asia/Shanghai"
target => "oldboyedu-linux80-nginx-access-time"
}
mutate {
add_field => {
"school" => "北京市昌平区沙河镇⽼男孩IT教育"
}
remove_field => [ "agent", "host", "@version", "ecs", "tags","input", "log" ]
}
geoip {
source => "clientip"
fields => ["city_name","country_name","ip"]
target => "oldboyedu-linux80-geoip"
}
useragent {
# 指定客户端的设备相关信息的字段
source => "http_user_agent"
# 将分析的数据存储在⼀个指定的字段中,若不指定,则默认存储在target字段中。
target => "oldboyedu-linux80-useragent"
}
}
output {
stdout {}
elasticsearch {
hosts => ["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"]
index => "oldboyedu-linux80-logstash-%{+YYYY.MM.dd}"
}
}
[root@elk101.oldboyedu.com ~]# logstash -rf config-logstash/20-beat-grok_date_geoip_useragent-es.conf
9.mutate组件数据准备-python脚本
cat > generate_log.py <<EOF
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author : oldboyedu-linux80
import datetime
import random
import logging
import time
import sys
LOG_FORMAT = "%(levelname)s %(asctime)s [com.oldboyedu.%(module)s] - %(message)s "
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
# 配置root的logging.Logger实例的基本配置
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=sys.argv[1], filemode='a',)
actions = ["浏览⻚⾯", "评论商品", "加⼊收藏", "加⼊购物⻋", "提交订单", "使⽤优惠券", "领取优惠券", "搜索", "查看订单", "付款", "清空购物⻋"]
while True:
time.sleep(random.randint(1, 5))
user_id = random.randint(1, 10000)
# 对⽣成的浮点数保留2位有效数字.
price = round(random.uniform(15000, 30000),2)
action = random.choice(actions)
svip = random.choice([0,1])
logging.info("DAU|{0}|{1}|{2}|{3}".format(user_id,
action,svip,price))
EOF
nohup python generate_log.py /tmp/app.log &>/dev/null &
10.mutate组件常用字段案例
[root@elk101.oldboyedu.com ~]# cat config-logstash/21-mutate.conf
input {
beats {
port => 8888
}
}
filter {
mutate {
add_field => {
"school" => "北京市昌平区沙河镇⽼男孩IT教育"
}
remove_field => [ "@timestamp", "agent", "host", "@version", "ecs", "tags","input", "log" ]
}
mutate {
# 对"message"字段内容使⽤"|"进⾏切分。
split => {
"message" => "|"
}
}
mutate {
# 添加字段,其中引⽤到了变量
add_field => {
"user_id" => "%{[message][1]}"
"action" => "%{[message][2]}"
"svip" => "%{[message][3]}"
"price" => "%{[message][4]}"
}
}
mutate {
strip => ["svip"]
}
mutate {
# 将指定字段转换成相应对数据类型.
convert => {
"user_id" => "integer"
"svip" => "boolean"
"price" => "float"
}
}
mutate {
# 将"price"字段拷⻉到"oldboyedu-linux80-price"字段中.
copy => { "price" => "oldboyedu-linux80-price" }
}
mutate {
# 修改字段到名称
rename => { "svip" => "oldboyedu-ssvip" }
}
mutate {
# 替换字段的内容
replace => { "message" => "%{message}: My new message" }
}
mutate {
# 将指定字段的字⺟全部⼤写
uppercase => [ "message" ]
}
}
output {
stdout {}
elasticsearch {
hosts => ["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"]
index => "oldboyedu-linux80-logstash-%{+YYYY.MM.dd}"
}
}
[root@elk101.oldboyedu.com ~]#
[root@elk101.oldboyedu.com ~]# logstash -rf config-logstash/21-mutate.conf
11.logstash的多if分支案列
文章来源:https://www.toymoban.com/news/detail-436919.html
[root@elk101.oldboyedu.com ~]# cat config-logstash/22-beats_tcp-filter-es.conf
input {
beats {
type => "oldboyedu-beats"
port => 8888
}
tcp {
type => "oldboyedu-tcp"
port => 9999
}
tcp {
type => "oldboyedu-tcp-new"
port => 7777
}
http {
type => "oldboyedu-http"
port => 6666
}
file {
type => "oldboyedu-file"
path => "/tmp/apps.log"
}
}
filter {
mutate {
add_field => {
"school" => "北京市昌平区沙河镇⽼男孩IT教育"
}
}
if [type] == ["oldboyedu-beats","oldboyedu-tcp-new","oldboyedu-http"]
{
mutate {
remove_field => [ "agent", "host", "@version", "ecs", "tags","input", "log" ]
}
geoip {
source => "clientip"
target => "oldboyedu-linux80-geoip"
}
useragent {
source => "http_user_agent"
target => "oldboyedu-linux80-useragent"
}
} else if [type] == "oldboyedu-file" {
mutate {
add_field => {
"class" => "oldboyedu-linux80"
"address" => "北京昌平区沙河镇⽼男孩IT教育"
"hobby" => ["LOL","王者荣耀"]
}
remove_field => ["host","@version","school"]
}
} else {
mutate {
remove_field => ["port","@version","host"]
}
mutate {
split => {
"message" => "|"
}
add_field => {
"user_id" => "%{[message][1]}"
"action" => "%{[message][2]}"
"svip" => "%{[message][3]}"
"price" => "%{[message][4]}"
}
# 利⽤完message字段后,在删除是可以等!注意代码等执⾏顺序!
remove_field => ["message"]
strip => ["svip"]
}
mutate {
convert => {
"user_id" => "integer"
"svip" => "boolean"
"price" => "float"
}
}
}
}
output {
stdout {}
if [type] == "oldboyedu-beats" {
elasticsearch {
hosts => ["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"]
index => "oldboyedu-linux80-logstash-beats"
}
} else {
elasticsearch {
hosts => ["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"]
index => "oldboyedu-linux80-logstash-tcp"
}
}
}
[root@elk101.oldboyedu.com ~]# logstash -rf config-logstash/22-beats_tcp-filter-es.conf
12.今日作业
文章来源地址https://www.toymoban.com/news/detail-436919.html
如上图所示,要求完成以下内容:
(1)收集nginx⽇志,写⼊ES集群,分⽚数量为3,副本数量为0,索引名称为"oldboyedu-linux80-nginx";
(2)收集tomcat⽇志,写⼊ES集群,分⽚数量为5,副本数量为0,索引名称为"oldboyedu-linux80-tomcat";
(3)收集app⽇志,写⼊ES集群,分⽚数量为10,副本数量为0,索引名称为"oldboyedu-linux80-app";
进阶作业:
(1)分析出nginx,tomcat的客户端ip所属城市,访问时使⽤的设备类型等。
(2)请调研使⽤logstash的pipline来替代logstash的多实例⽅案;
filebeat收集tomcat日志
[root@elk102.oldboyedu.com ~]# cat ~/config/38-tomcat-to-logstash.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /oldboyedu/softwares/apache-tomcat-10.0.20/logs/*.txt
json.keys_under_root: true
output.logstash:
hosts: ["10.0.0.101:7777"]
[root@elk102.oldboyedu.com ~]# filebeat -e -c ~/config/38-tomcat-to-logstash.yml
filebeat收集nginx日志
[root@elk102.oldboyedu.com ~]# cat ~/config/37-nginx-to-logstash.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log*
json.keys_under_root: true
output.logstash:
hosts: ["10.0.0.101:8888"]
[root@elk102.oldboyedu.com ~]#
[root@elk102.oldboyedu.com ~]# filebeat -e -c ~/config/37-nginx-to-logstash.yml --path.data /tmp/filebeat-nginx
filebeat收集apps日志
[root@elk102.oldboyedu.com ~]# cat ~/config/39-apps-to-logstash.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/app.log*
output.logstash:
hosts: ["10.0.0.101:6666"]
[root@elk102.oldboyedu.com ~]#
[root@elk102.oldboyedu.com ~]# filebeat -e -c ~/config/39-apps-to-logstash.yml --path.data /tmp/filebeat-app
logstash收集nginx日志
[root@elk101.oldboyedu.com ~]# cat config-logstash/24-homework-01-to-es.conf
input {
beats {
port => 8888
}
}
filter {
mutate {
remove_field => ["tags","log","agent","@version", "input","ecs"]
}
geoip {
source => "clientip"
target => "oldboyedu-linux80-geoip"
}
useragent {
source => "http_user_agent"
target => "oldboyedu-linux80-useragent"
}
}
output {
stdout {}
elasticsearch {
hosts => ["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"]
index => "oldboyedu-linux80-nginx"
}
}
[root@elk101.oldboyedu.com ~]# logstash -rf config-logstash/24-homework-01-to-es.conf
logstash收集tomcat日志
[root@elk101.oldboyedu.com ~]# cat config-logstash/24-homework-02-to-es.conf
input {
beats {
port => 7777
}
}
filter {
mutate {
remove_field => ["tags","log","agent","@version", "input","ecs"]
}
geoip {
source => "clientip"
target => "oldboyedu-linux80-geoip"
}
useragent {
source => "AgentVersion"
target => "oldboyedu-linux80-useragent"
}
}
output {
stdout {}
elasticsearch {
hosts => ["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"]
index => "oldboyedu-linux80-tomcat"
}
}
[root@elk101.oldboyedu.com ~]# logstash -rf config-logstash/24-homework-02-to-es.conf --path.data /tmp/homework-logstash-02
logstash收集apps日志
[root@elk101.oldboyedu.com ~]# cat config-logstash/24-homework-03-to-es.conf
input {
beats {
port => 6666
}
}
filter {
mutate {
remove_field => ["tags","log","agent","@version", "input","ecs"]
}
mutate {
remove_field => ["port","@version","host"]
}
mutate {
split => {
"message" => "|"
}
add_field => {
"user_id" => "%{[message][1]}"
"action" => "%{[message][2]}"
"svip" => "%{[message][3]}"
"price" => "%{[message][4]}"
}
remove_field => ["message"]
strip => ["svip"]
}
mutate {
convert => {
"user_id" => "integer"
"svip" => "boolean"
"price" => "float"
}
}
}
output {
stdout {}
elasticsearch {
hosts => ["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"]
index => "oldboyedu-linux80-apps"
}
}
[root@elk101.oldboyedu.com ~]# logstash -rf config-logstash/24-homework-03-to-es.conf --path.data /tmp/homework-logstash-03
到了这里,关于Logstash filter grok正则的使用及介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!