Elk+Filebeat+Kafka实现日志收集

这篇具有很好参考价值的文章主要介绍了Elk+Filebeat+Kafka实现日志收集。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Elk+Filebeat+Kafka实现日志收集(本机nginx)

部署Zookeeper

1.实验组件

#准备3台服务器做Zookeeper集群
20.0.0.10
20.0.0.20
20.0.0.30

2.安装前准备

#关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
setenforce 0

#安装JDK
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version

#将apache-zookeeper-3.5.7-bin.tar.gz压缩包上传至/opt目录

3.安装Zookeeper

#三台服务器一齐操作
cd /opt
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /opt/zookeeper

#修改配置文件
cd /opt/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
--2--
tickTime=2000
#通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒

--5--
initLimit=10
#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s

--8--
syncLimit=5
#Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer

--12--修改
dataDir=/opt/zookeeper/data
#指定保存Zookeeper中的数据的目录,目录需要单独创建

--添加--
dataLogDir=/opt/zookeeper/logs
#指定存放日志的目录,目录需要单独创建

--15--
clientPort=2181
#客户端连接端口

--末尾添加集群信息--
server.1=20.0.0.10:3188:3288
server.2=20.0.0.20:3188:3288
server.3=20.0.0.30:3188:3288

Elk+Filebeat+Kafka实现日志收集,elk,kafka,分布式

#在每个节点上创建数据目录和日志目录
mkdir /opt/zookeeper/data
mkdir /opt/zookeeper/logs

#在每个节点的dataDir指定的目录下创建一个 myid 的文件,不同节点分配1、2、3
echo 1 > /opt/zookeeper/data/myid
echo 2 > /opt/zookeeper/data/myid
echo 3 > /opt/zookeeper/data/myid

#配置 Zookeeper 启动脚本
vim /etc/init.d/zookeeper

#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/opt/zookeeper'
case $1 in
start)
	echo "---------- zookeeper 启动 ------------"
	$ZK_HOME/bin/zkServer.sh start
;;
stop)
	echo "---------- zookeeper 停止 ------------"
	$ZK_HOME/bin/zkServer.sh stop
;;
restart)
	echo "---------- zookeeper 重启 ------------"
	$ZK_HOME/bin/zkServer.sh restart
;;
status)
	echo "---------- zookeeper 状态 ------------"
	$ZK_HOME/bin/zkServer.sh status
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac

#设置开机自启
chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper

#分别启动 Zookeeper
service zookeeper start

#查看当前状态
service zookeeper status

Elk+Filebeat+Kafka实现日志收集,elk,kafka,分布式

部署Kafka(3.4.1版本)

1.安装Kafka

cd /opt
--上传kafka_2.13-3.4.1.tgz--
tar -xf kafka_2.13-3.4.1.tgz
mv kafka_2.13-3.4.1 kafka
cd kafka/config/
cp server.properties server.properties.bak
vim server.properties
--24--
broker.id=1
#broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=2、broker.id=3

--34--
listeners=PLAINTEXT://20.0.0.10:9092
#每台服务器分别为10、20、30,不用加地址映射

--62--
log.dirs=/var/log/kafka
#kafka运行日志存放的路径,也是数据存放的路径

--125--
zookeeper.connect=20.0.0.10:2181,20.0.0.20:2181,20.0.0.30:2181
#配置连接Zookeeper集群地址

#修改全局配置
vim /etc/profile
--添加--
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile
#配置Zookeeper启动脚本
vim /etc/init.d/kafka

#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/opt/kafka'
case $1 in
start)
	echo "---------- Kafka 启动 ------------"
	${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
	echo "---------- Kafka 停止 ------------"
	${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
	$0 stop
	$0 start
;;
status)
	echo "---------- Kafka 状态 ------------"
	count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
	if [ "$count" -eq 0 ];then
        echo "kafka is not running"
    else
        echo "kafka is running"
    fi
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac

#设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka

#分别启动Kafka
service kafka start

Elk+Filebeat+Kafka实现日志收集,elk,kafka,分布式

2.命令行测试

#创建topic
kafka-topics.sh --create --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --replication-factor 2 --partitions 3 --topic test1

#查看当前服务器中的所有 topic
kafka-topics.sh --list --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092

#发布消息
kafka-console-producer.sh --broker-list 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092  --topic test1

#消费消息
kafka-console-consumer.sh --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --topic test1 --from-beginning

#修改分区数
kafka-topics.sh --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --alter --topic test1 --partitions 6

#删除 topic
kafka-topics.sh --delete --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --topic test1

Elk+Filebeat+Kafka实现日志收集,elk,kafka,分布式

Elk+Filebeat+Kafka实现日志收集,elk,kafka,分布式

部署Filebeat 

 1.安装Filebeat

#10
cd /opt/
--上传filebeat-6.7.2-linux-x86_64.tar.gz--
tar -xf filebeat-6.7.2-linux-x86_64.tar.gz
mv filebeat-6.7.2-linux-x86_64 filebeat
vim /etc/logstash/logstash.yml
--64--
path.config: /opt/log

systemctl restart logstash

2.时间同步

#所有节点
yum -y install ntpdate
ntpdate ntp.aliyun.com 
date

3.配置filebeat

#给nginx日志文件赋权
cd /var/log/nginx/
chmod 777 access.log error.log

Elk+Filebeat+Kafka实现日志收集,elk,kafka,分布式

#配置filebeat
cd /opt/filebeat
vim filebeat.yml
filebeat.inputs:

- type: log
  enabled: true	
  paths:
    - /var/log/nginx/access.log
    - /var/log/nginx/error.log
  tags: ["nginx"]
  fields:
    service_name: 20.0.0.10_nginx
    log_type: nginx
    from: 20.0.0.10

output.kafka:
  enabled: true
  hosts: ["20.0.0.10:9092","20.0.0.20:9092","20.0.0.30:9092"]
  topic: "nginx"


--------------Elasticsearch output-------------------
(全部注释掉)

----------------Logstash output---------------------
(全部注释掉)

nohup ./filebeat -e -c filebeat.yml > filebeat.out &
#启动filebeat

Elk+Filebeat+Kafka实现日志收集,elk,kafka,分布式

Elk+Filebeat+Kafka实现日志收集,elk,kafka,分布式

4.配置logstash

cd /opt/log/
vim kafka.conf

input {
    kafka {
        bootstrap_servers => "20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092"
        topics  => "nginx"
        type => "nginx_kafka"
        codec => "json"
                auto_offset_reset => "earliest"
                decorate_events => true
    }
}

output {
  if "nginx" in [tags] {
    elasticsearch {
      hosts => ["20.0.0.20:9200","20.0.0.30:9200"]
      index => "nginx_access-%{+YYYY.MM.dd}"
    }
  }

  stdout { codec => rubydebug }
}

logstash -f kafka.conf --path.data /opt/test1

Elk+Filebeat+Kafka实现日志收集,elk,kafka,分布式

日志收集(远程Apache+Mysql)

部署Filebeat

1.安装配置filebeat

#收集81服务器上的mysql和apache日志
cd /opt/
--上传filebeat-6.7.2-linux-x86_64.tar.gz--
tar -xf filebeat-6.7.2-linux-x86_64.tar.gz
mv filebeat-6.7.2-linux-x86_64 filebeat
cd filebeat/
vim filebeat.yml
 
filebeat.inputs:

- type: log
  enabled: true
  paths:
    - /etc/httpd/logs/access_log
    - /etc/httpd/logs/error_log
  tags: ["httpd_81"]
  fields:
    service_name: 20.0.0.81_httpd
    log_type: httpd
    from: 20.0.0.81
 
- type: log
  enabled: true
  paths:
    - /usr/local/mysql/data/mysql_general.log
  tags: ["mysql_81"]
  fields:
    service_name: 20.0.0.81_mysql
    log_type: mysql
    from: 20.0.0.81

output.kafka:
  enabled: true
  hosts: ["20.0.0.10:9092","20.0.0.20:9092","20.0.0.30:9092"]
  topic: "httpdmysql"
 
--------------Elasticsearch output-------------------
(全部注释掉)

----------------Logstash output---------------------
(全部注释掉)

nohup ./filebeat -e -c filebeat.yml > filebeat.out &
#启动filebeat

Elk+Filebeat+Kafka实现日志收集,elk,kafka,分布式

2.配置logstash

10:
cd /opt/log/
vim 81_a+m.conf 
 
input {
    kafka {
        bootstrap_servers => "20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092"
        topics  => "httpdmysql"
        type => "httpd+mysql_kafka"
        codec => "json"
                auto_offset_reset => "earliest"
                decorate_events => true
    }
}

output {
  if "httpd_81" in [tags] {
    elasticsearch {
      hosts => ["20.0.0.20:9200","20.0.0.30:9200"]
      index => "nginx_access-%{+YYYY.MM.dd}"
    }
  }


  if "mysql_81" in [tags] {
    elasticsearch {
      hosts => ["20.0.0.20:9200","20.0.0.30:9200"]
      index => "nginx_access-%{+YYYY.MM.dd}"
    }
  }

  stdout { codec => rubydebug }
}

logstash -f 81_a+m.conf --path.data /opt/test2

Elk+Filebeat+Kafka实现日志收集,elk,kafka,分布式文章来源地址https://www.toymoban.com/news/detail-754479.html

到了这里,关于Elk+Filebeat+Kafka实现日志收集的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ELK分布式日志收集快速入门-(一)-kafka单体篇

    JDK 安装教程自行百度-这个比较简单。 zookeeper zookeeper安装参考地址((2条消息) 快速搭建-分布式远程调用框架搭建-dubbo+zookper+springboot demo 演示_康世行的博客-CSDN博客) 修改zookeeper配合文件 启动成功 开放端口号 下载kafka安装包 安装遇到的问题(由于网站证书不安全导致) 解

    2023年04月08日
    浏览(32)
  • k8s部署elk+filebeat;springCloud集成elk+filebeat+kafka+zipkin实现多个服务日志链路追踪聚合到es

    如今2023了,大多数javaweb架构都是springboot微服务,一个前端功能请求后台可能是多个不同的服务共同协做完成的。例如用户下单功能,js转发到后台 网关gateway服务 ,然后到 鉴权spring-sercurity服务 ,然后到 业务订单服务 ,然后到 支付服务 ,后续还有发货、客户标签等等服务

    2024年02月16日
    浏览(31)
  • filebeat->kafka>elk日志采集

    kafka常用命令 查看所有topic ./kafka-topics.sh --zookeeper 10.1.10.163:2181 --list 查看kafka中指定topic的详情 ./kafka-topics.sh --zookeeper 10.1.10.163:2181 --topic ai_jl_analytic --describe 查看消费者consumer的group列表 ./kafka-consumer-groups.sh --bootstrap-server 10.1.10.163:9092 --list 创建topic ./kafka-topics.sh --create --zooke

    2024年02月10日
    浏览(31)
  • 【分布式应用】kafka集群、Filebeat+Kafka+ELK搭建

    主要原因是由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发 too many connection 错误,引发雪崩效应。 我们使用消息队列,通过异步处理请求,从而缓解系统的压力。消息队

    2024年02月16日
    浏览(28)
  • Elasticsearch实践:ELK+Kafka+Beats对日志收集平台的实现

    可以在短时间内搜索和分析大量数据。 Elasticsearch 不仅仅是一个全文搜索引擎,它还提供了分布式的多用户能力,实时的分析,以及对复杂搜索语句的处理能力,使其在众多场景下,如企业搜索,日志和事件数据分析等,都有广泛的应用。 本文将介绍 ELK+Kafka+Beats 对日志收集

    2024年02月08日
    浏览(26)
  • Filebeat+Kafka+ELK日志采集(五)——Elasticsearch

    1、下载 2、解压: 3、配置 启动Elasticsearch,进入/bin目录下 ./elasticsearch ,不出意外的外会报以下错误: 报错1:能打开的文件或进程数太低。 解决方法: 修改/etc/security/limits.conf 配置文件,添加配置如下: 报错2: 解决方法: 修改/etc/sysctl.conf 配置文件,添加配置如下: 修

    2024年02月05日
    浏览(42)
  • EFLFK——ELK日志分析系统+kafka+filebeat架构

    node1节点 192.168.40.16 elasticsearch 2c/4G node2节点 192.168.40.17 elasticsearch 2c/4G Apache节点 192.168.40.170 logstash/Apache/kibana 2c/4G filebeat节点 192.168.40.20 filebeat 2c/4G https://blog.csdn.net/m0_57554344/article/details/132059066?spm=1001.2014.3001.5501 接上期elk部署我们这次加一个filebeat节点   //准备 3 台服务器做

    2024年02月14日
    浏览(27)
  • 分布式运用之Filebeat+Kafka+ELK 的服务部署

    Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。 topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,

    2024年02月06日
    浏览(33)
  • 基于Filebeat、Kafka搭建ELK日志分析平台详细步骤

    写在前头:公司一直没有搭建一个支持实时查询日志的平台,平常看日志都得去服务器下载,大大降低开发效率,前段时间有大佬同事搭建了一款日志平台,支持sit,uat等各个环境的日志实时查询,大大提高bug定位速度。因对其感兴趣特向大佬请教,学习记录下搭建流程。 选

    2024年02月06日
    浏览(39)
  • ELK日志收集平台部署(kafka)

    正文:ELK日志收集平台部署 Kafka: 数据缓冲队列。作为消息队列解耦合处理过程,同时提高了可扩展性。具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 基于zookeeper协调的分布式消息系统,它的最大的特

    2024年01月25日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包