filebeat->kafka>elk日志采集

这篇具有很好参考价值的文章主要介绍了filebeat->kafka>elk日志采集。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

kafka常用命令


查看所有topic

./kafka-topics.sh --zookeeper 10.1.10.163:2181 --list


查看kafka中指定topic的详情

./kafka-topics.sh --zookeeper 10.1.10.163:2181 --topic ai_jl_analytic --describe

查看消费者consumer的group列表

./kafka-consumer-groups.sh --bootstrap-server 10.1.10.163:9092 --list

创建topic

./kafka-topics.sh --create --zookeeper 10.1.10.163:2181 --replication-factor 1 --partitions 1 --topic mytest_topic


查看指定的group

./kafka-consumer-groups.sh --bootstrap-server 10.1.10.163:9092 --group ai-trace --describe


 模拟生产者生产消息:

./kafka-console-producer.sh --broker-list 10.1.10.163:9092 --topic mytest_topic

模拟消费者消费消息:

./kafka-console-consumer.sh --bootstrap-server 10.1.10.163:9092 --from-beginning --topic mytest_topic

启动logstash  

//启动logstash          --path.data  指定数据存储路径
./logstash -f /data/logstash/config/kafka_applog_to_es.conf --path.data=/data/logstash/data1/

例子一 logstash  input  kafka output  ES 配置文件   

input{
  kafka{
    bootstrap_servers => "xx.x.xx.xxx:9092,xx.x.xx.xxx:9092,xx.x.xx.xxx:9092"
    topics_pattern  => "tag-nginx-log"
    consumer_threads => 5
    decorate_events => true
    auto_offset_reset => "latest"
    client_id => "logstash-bpm-nginx-access-log1"
    group_id => "logstashbpmnginxaccesslog1"
  }
}

filter {
    grok {
        match => [
                "message","%{HTTPDATE:timestamp}"
        ]
    }
    date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
}

output {
  elasticsearch {
    hosts => ["xx.x.xx.xxx:9200"]
    index => "tag-nginx-log-%{+YYYY.MM.dd}"
  }
}

  client_id 与 group_id  保证唯一即可。

例子二  logstash  input  kafka output  ES 配置文件   

input{
  kafka{
    bootstrap_servers => "XX.X.XX.XXX:9092,XX.X.XX.XXX:9092,XX.X.XX.XXX:9092"
    topics_pattern  => "tag*"
    consumer_threads => 5
    decorate_events => true
    auto_offset_reset => "latest"
    client_id => "logstash1-1"
    group_id => "logstash1"
    codec => multiline {
            pattern => "^\d{4}-\d{2}-\d{2}"
            negate => true
            what => "previous"
            charset => "UTF-8"
   }
  }
}

filter{
      grok{
        match=>{
                "message"=>"(?<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\s\[(?<appName>.*)\]\s\[(?<profile>.*)\]\s\[((?<thread>.*))\]\s\[(?<traceId>.*)\]\s\[(?<class>[A-Za-z0-9/./?]{1,50})\]\s(?<level>[A-Z]{4,5}):(?<msg>.*)"
        }
        remove_field=>"message"
      }
      ruby {
        code => "event.set('index_date', event.timestamp.time.localtime + 8*60*60)"
      }
       mutate {
        convert => ["index_date", "string"]
        gsub => ["index_date", "T([\S\s]*?)Z", ""]
        gsub => ["index_date", "-", "."]
      }
}


output {
  elasticsearch {
    hosts => ["XX.X.XX.XXX:9200"]
    index => "%{profile}-%{appName}-%{index_date}"
  }
}

启动filebeat


//可以防止日志爆盘,将所有标准输出及标准错误输出到/dev/null空设备,即没有任何输出信息。

nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 & disown文章来源地址https://www.toymoban.com/news/detail-494764.html

filebeat  output  kafka 基本配置文件  filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
   - /logs/nginx/access.log
  fields:
     log_topic: tag-nginx-access

output.kafka:
  enabled: true
  hosts: ["xx.x.xx.xxx:9092","xx.x.xx.xxx:9092","xx.x.xx.xxx:9092"]
  topic: tag-nginx-log
  version: "0.10"
  partition.round_robin:
    reachable_only: true
  worker: 2
  required_acks: 1
  compression: gzip
  max_message_bytes: 10240  

processors:
  - drop_fields:
     fields: ["host","input","agent","ecs","@version","flags","log","fields"]
     ignore_missing: false
logging.level: info 
name: "xx.x.x.xx"

到了这里,关于filebeat->kafka>elk日志采集的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于Filebeat、Kafka搭建ELK日志分析平台详细步骤

    写在前头:公司一直没有搭建一个支持实时查询日志的平台,平常看日志都得去服务器下载,大大降低开发效率,前段时间有大佬同事搭建了一款日志平台,支持sit,uat等各个环境的日志实时查询,大大提高bug定位速度。因对其感兴趣特向大佬请教,学习记录下搭建流程。 选

    2024年02月06日
    浏览(39)
  • filebeat采集日志数据到kafka(一)(filebeat->kafka->logstash->es)

    一、filebeat安装 filebeat-kafka版本适配 1、安装包下载 https://www.elastic.co/cn/downloads/past-releases#filebeat 解压 2、新建yml配置文件,如test.yml 3、Filebeat启动和停止 启动:./filebeat -c test.yml 停止:kill -9 PID

    2024年02月16日
    浏览(64)
  • docker搭建Elk+Kafka+Filebeat分布式日志收集系统

    目录 一、介绍 二、集群环境 三、ES集群 四、Kibana  五、Logstash 六、Zookeeper 七、Kafka 八、Filebeat 八、Nginx (一)架构图  (二)组件介绍 1.Elasticsearch 是一个基于Lucene的搜索服务器。提供搜集、分析、存储数据三大功能。它提供了一个分布式多用户能力的全文搜索引擎,基于

    2024年02月04日
    浏览(40)
  • k8s部署elk+filebeat;springCloud集成elk+filebeat+kafka+zipkin实现多个服务日志链路追踪聚合到es

    如今2023了,大多数javaweb架构都是springboot微服务,一个前端功能请求后台可能是多个不同的服务共同协做完成的。例如用户下单功能,js转发到后台 网关gateway服务 ,然后到 鉴权spring-sercurity服务 ,然后到 业务订单服务 ,然后到 支付服务 ,后续还有发货、客户标签等等服务

    2024年02月16日
    浏览(31)
  • ZooKeeper+Kafka+ELK+Filebeat集群搭建实现大批量日志收集和展示

    大致流程:将nginx 服务器(web-filebeat)的日志通过filebeat收集之后,存储到缓存服务器kafka,之后logstash到kafka服务器上取出相应日志,经过处理后写入到elasticsearch服务器并在kibana上展示。 一、集群环境准备 二、搭建zookeeper集群 前提条件:三台机器分别修改时区、关闭防火墙

    2024年02月04日
    浏览(35)
  • springboot整合ELK+kafka采集日志

    在分布式的项目中,各功能模块产生的日志比较分散,同时为满足性能要求,同一个微服务会集群化部署,当某一次业务报错后,如果不能确定产生的节点,那么只能逐个节点去查看日志文件;logback中RollingFileAppender,ConsoleAppender这类同步化记录器也降低系统性能,综上一些

    2024年02月15日
    浏览(35)
  • 基于Filebeat+Kafka+ELK实现Nginx日志收集并采用Elastalert2实现钉钉告警

           先准备3台Nginx服务器,用做后端服务器,(由于机器有限,也直接用这三台机器来部署ES集群),然后准备2台服务器做负载均衡器(Nginx实现负载均衡具体实现操作有机会在介绍),如果是简单学习测试,可以先使用3台Nginx服务器就可以,先告一段落。 3台Nginx服务

    2024年02月15日
    浏览(25)
  • 【ELK企业级日志分析系统】部署Filebeat+Kafka+Logstash+Elasticsearch+Kibana集群详解(EFLFK)

    参见安装与部署ELK详解 参见安装与部署EFLK详解 参见安装与部署Zookeeper集群详解 1.1.1 为什么需要消息队列(MQ) MQ(Message Queue)主要原因是由于 在高并发环境下,同步请求来不及处理,请求往往会发生阻塞 。比如大量的并发请求,访问数据库,导致行锁表锁,最后请求线程会

    2024年02月16日
    浏览(39)
  • 私有部署ELK,搭建自己的日志中心(六)-- 引入kafka对采集日志进行削峰填谷

    首先,要说明一点,elk日志中心,是可以缺少kafka组件的。 其次,如果是研发环境下,机器资源紧张的情况下,也是可不部署kafka。 最后,因为kafka的部署是可以独立的,所以本文将另行部署,不和elk一起。 本机的IP地址是192.168.8.29,请你修改为自己的IP 本机的IP地址是192.1

    2024年02月04日
    浏览(33)
  • 【运维知识大神篇】超详细的ELFK日志分析教程7(filebeat常用模块+filebeat采集固定格式日志+自定义日志格式写入ES+EFK架构转ELFK架构+两个业务实战练习)

    本篇文章继续给大家介绍ELFK日志分析,详细请见下面目录。 目录 filebeat采集nginx日志 filebeat模块使用 一、Nginx模块 二、tomcat模块 三、filebeat写数据到ES集群自定义索引 四、filebeat自定义字段之nginx写入ES 五、filebeat自定义字段之tomcat写入ES 六、indices模块实现多个input写入不同

    2024年02月05日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包