ELK日志收集平台部署(kafka)

这篇具有很好参考价值的文章主要介绍了ELK日志收集平台部署(kafka)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

正文:ELK日志收集平台部署

Kafka和zookeeper简介

Kafka:

数据缓冲队列。作为消息队列解耦合处理过程,同时提高了可扩展性。具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

Kafka的特性:

  • 高吞吐量:kafka每秒可以处理几十万条消息。

  • 可扩展性:kafka集群支持热扩展

  • 可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

  • 高并发:支持数千个客户端同时读写

    它主要包括以下组件

    话题(Topic):是特定类型的消息流。(每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。) 生产者(Producer):是能够发布消息到话题的任何对象(发布消息到 kafka 集群的终端或服务). 消费者(Consumer):可以订阅一个或多个话题,从而消费这些已发布的消息。 服务代理(Broker):已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。 zookeeper:kafka 通过 zookeeper 来存储集群的信息。

ZooKeeper的特性:

ZooKeeper是一个分布式协调服务,Kafka的运行依赖ZooKeeper。ZooKeeper主要用来协调Kafka的各个broker,而且当增加了broker或者某个broker故障了,ZooKeeper将会通知生产者和消费者,这样可以保证整个系统正常运转。

在Kafka中集群中broker的分布情况与消费者当前消费的状态信息都会保存在ZooKeeper中。

=========================================================================

系统类型:Centos7

节点IP:192.168.246.234,192.168.246.231、192.168.246.235

软件版本:jdk-8u121-linux-x64.tar.gz、kafka_2.11-2.1.0.tgz

示例节点:172.16.246.231

1.安装配置jdk8

1)Kafka、Zookeeper(简称:ZK)运行依赖jdk8

tar zxvf /usr/local/package/jdk-8u121-linux-x64.tar.gz -C /usr/local/
echo '
JAVA_HOME=/usr/local/jdk1.8.0_121
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME PATH
' >>/etc/profile
source /etc/profile

 2.安装配置ZK

Kafka运行依赖ZK,Kafka官网提供的tar包中,已经包含了ZK,这里不再额下载ZK程序。

配置相互解析---三台机器

[root@es-2-zk-log ~]# vim /etc/hosts
192.168.246.234 mes-1
192.168.246.231 es-2-zk-log
192.168.246.235 es-3-head-kib

1)安装

[root@es-2-zk-log ~]# tar xzvf kafka_2.11-2.1.0.tgz -C /usr/local/

2)配置

[root@mes-1 ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties
[root@mes-1 ~]# vim /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties  #添加如下配置

dataDir=/opt/data/zookeeper/data
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.1=192.168.246.231:2888:3888             //kafka集群IP:Port
server.2=192.168.246.234:2888:3888
server.3=192.168.246.235:2888:3888


#创建data、log目录
[root@mes-1 ~]# mkdir -p /opt/data/zookeeper/{data,logs}
#创建myid文件
[root@mes-1 ~]# echo 1 > /opt/data/zookeeper/data/myid     #myid号按顺序排

[root@es-2-zk-log ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties
[root@es-2-zk-log ~]# vim /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties

dataDir=/opt/data/zookeeper/data
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.1=192.168.246.231:2888:3888
server.2=192.168.246.234:2888:3888
server.3=192.168.246.235:2888:3888


#创建data、log目录
[root@es-2-zk-log ~]# mkdir -p /opt/data/zookeeper/{data,logs}
#创建myid文件
[root@es-2-zk-log ~]# echo 2 > /opt/data/zookeeper/data/myid

[root@es-3 ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties
[root@es-3-head-kib ~]# vim /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties

dataDir=/opt/data/zookeeper/data
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.1=192.168.246.231:2888:3888
server.2=192.168.246.234:2888:3888
server.3=192.168.246.235:2888:3888


#创建data、log目录
[root@es-3-head-kib ~]# mkdir -p /opt/data/zookeeper/{data,logs}
#创建myid文件
[root@es-3-head-kib ~]# echo 3 > /opt/data/zookeeper/data/myid

3.配置Kafka

1)配置

节点1:

[root@mes-1 ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/server.properties
[root@mes-1 ~]# vim /usr/local/kafka_2.11-2.1.0/config/server.properties  #在最后添加

broker.id=1
listeners=PLAINTEXT://192.168.246.231:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.246.231:2181,192.168.246.234:2181,192.168.246.235:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0


[root@mes-1 ~]# mkdir -p /opt/data/kafka/logs

节点2:

[root@es-2-zk-log ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/server.properties
[root@es-2-zk-log ~]# vim /usr/local/kafka_2.11-2.1.0/config/server.properties

broker.id=2
listeners=PLAINTEXT://192.168.246.234:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.246.231:2181,192.168.246.234:2181,192.168.246.235:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0


[root@es-2-zk-log ~]# mkdir -p /opt/data/kafka/logs

节点3:

[root@es-3-head-kib ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/server.properties
[root@es-3-head-kib ~]# vim /usr/local/kafka_2.11-2.1.0/config/server.properties

broker.id=3
listeners=PLAINTEXT://192.168.246.235:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.246.231:2181,192.168.246.234:2181,192.168.246.235:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0


[root@es-3-head-kib ~]# mkdir -p /opt/data/kafka/logs

4、其他节点配置

只需把配置好的安装包直接分发到其他节点,Kafka的broker.id和listeners就可以了。

5、启动、验证ZK集群

1)启动

在三个节点依次执行:

[root@mes-1 ~]# cd /usr/local/kafka_2.11-2.1.0/
[root@mes-1 kafka_2.11-2.1.0]# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

2)验证

查看端口

[root@mes-1 ~]# netstat -lntp | grep 2181
tcp6       0      0 :::2181                 :::*                    LISTEN      1226/java

6、启动、验证Kafka

1)启动

在三个节点依次执行:

[root@mes-1 ~]# cd /usr/local/kafka_2.11-2.1.0/
[root@mes-1 kafka_2.11-2.1.0]# nohup bin/kafka-server-start.sh config/server.properties &

2)验证

在192.168.246.231上创建topic

[root@es-2-zk-log kafka_2.11-2.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic
Created topic "testtopic".

在246.235上面查询192.168.246.231上的topic

[root@es-3-head-kib kafka_2.11-2.1.0]# bin/kafka-topics.sh --zookeeper 192.168.246.231:2181 --list
testtopic

模拟消息生产和消费 发送消息到192.168.246.231

[root@mes-1 kafka_2.11-2.1.0]# bin/kafka-console-producer.sh --broker-list 192.168.246.231:9092 --topic testtopic
>hello
 

​从192.168.246.234接受消息

[root@es-2-zk-log kafka_2.11-2.1.0]# bin/kafka-console-consumer.sh --bootstrap-server  192.168.246.234:9092 --topic testtopic --from-beginning
hello

kafka配置完成

kafka没有问题之后,回到logstash服务器:
#安装完kafka之后的操作:
[root@es-2-zk-log ~]# cd /usr/local/logstash-6.5.4/etc/conf.d/
[root@es-2-zk-log conf.d]# cp input.conf input.conf.bak
[root@es-2-zk-log conf.d]# vim input.conf

input {
kafka {               #指定kafka服务
   type => "nginx_log"
   codec => "json"        #通用选项,用于输入数据的编解码器
   topics => "nginx"        #这里定义的topic
   decorate_events => true  #会将当前topic信息也带到message中
   bootstrap_servers => "192.168.246.234:9092, 192.168.246.231:9092, 192.168.246.235:9092"
  }
}  


启动 logstash
[root@es-2-zk-log conf.d]# cd /usr/local/logstash-6.5.4/
[root@es-2-zk-log logstash-6.5.4]# nohup bin/logstash -f etc/conf.d/  --config.reload.automatic &文章来源地址https://www.toymoban.com/news/detail-822549.html

到了这里,关于ELK日志收集平台部署(kafka)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【ELK 使用指南 1】ELK + Filebeat 分布式日志管理平台部署

    在运维中, 日志是非常重要的工具 ,用于记录系统、应用程序或设备的运行状态、事件和异常情况。 1)故障排除和问题诊断 日志是排查故障和诊断问题的关键信息源。 通过分析日志文件,可以查找和定位系统故障、错误和异常,帮助运维人员迅速找出问题的根本原因,并

    2024年02月07日
    浏览(53)
  • 【分布式技术】ELK大型日志收集分析系统

    目录 步骤一:完成JAVA环境部署 步骤二:部署ES节点(三台主机) 步骤三:内核参数修改 步骤四:web端查看验证 步骤五:yum安装nginx  步骤六:完成logstash部署 步骤七:部署kibana  步骤八:测试ELK

    2024年01月20日
    浏览(46)
  • Elk+Filebeat+Kafka实现日志收集

    1.实验组件 2.安装前准备 3.安装Zookeeper 1.安装Kafka 2.命令行测试  1.安装Filebeat 2.时间同步 3.配置filebeat 4.配置logstash 1.安装配置filebeat 2.配置logstash

    2024年02月05日
    浏览(47)
  • ELK+Kafka+Zookeeper日志收集系统

    节点IP 节点规划 主机名 192.168.112.3 Elasticsearch + Kibana + Logstash + Zookeeper + Kafka + Nginx elk-node1 192.168.112.3 Elasticsearch + Logstash + Zookeeper + Kafka elk-node2 192.168.112.3 Elasticsearch + Logstash + Zookeeper + Kafka + Nginx elk-node3 修改主机名 配置映射 安装Elasticserach 三台主机都需安装java及elasticserach 启动

    2024年04月18日
    浏览(67)
  • 分布式技术--------------ELK大规模日志实时收集分析系统

    目录 一、ELK日志分析系统 1.1ELK介绍 1.2ELK各组件介绍 1.2.1ElasticSearch 1.2.2Kiabana 1.2.3Logstash 1.2.4可以添加的其它组件 1.2.4.1Filebeat filebeat 结合logstash 带来好处 1.2.4.2缓存/消息队列(redis、kafka、RabbitMQ等) 1.2.4.3Fluentd 二、为什么要使用 ELK 三、完整日志系统基本特征 四、ELK 的工作

    2024年04月17日
    浏览(49)
  • ELK (一)部署ELK+Filebeat日志收集分析系统

    说明:此安装流程只适用于8.0.0以下的版本 1.1 下载ElasticSearch的wget指令: 1.2 解压安装包到指定目录 指定解压缩到 /usr/local 目录下 1.3 修改配置文件 (1)elasticsearch.yml 分别创建 path.data、path.logs 对应的 data、logs文件夹。 详细配置: (2)limits.conf 末尾追加以下内容: (3)s

    2024年02月08日
    浏览(48)
  • ELK群集部署日志收集

    ELK平台是一套完整的日志集中处理解决方案 由ElasticSearch、Logstash、Kiabana三个开源工具配合使用 是用户对日志的查询、排序、统计的强大工具组合 一般用于大型企业,中小型企业一般会选择(rsyslog+日志服务器或者shell+Python收集日志) logstash进行日志数据收集并且格式化之后

    2023年04月20日
    浏览(71)
  • 从小白到大神之路之学习运维第54天--------ELK日志收集分析

    第三阶段基础 时  间:2023年7月6日 参加人:全班人员 内  容: ELK技术堆栈 目录 服务器设置: 部署elasticsearch集群: 配置elasticsearch集群: 配置收集系统日志: elk_cluster集群 主机名 huyang1 huyang2 huyang3 IP地址 192.168.59.137 192.168.59.138 192.168.59.140 环境配置:(三台服务器都配置)

    2024年02月13日
    浏览(59)
  • SpringBoot+Kafka+ELK 完成海量日志收集(超详细)

    SpringBoot项目准备 引入log4j2替换SpringBoot默认log,demo项目结构如下: pom IndexController 测试Controller,用以打印日志进行调试 InputMDC 用以获取log中的 [%X{hostName}] 、 [%X{ip}] 、 [%X{applicationName}] 三个字段值 NetUtil 启动项目,访问 /index 和 /ero 接口,可以看到项目中生成了 app-collector.

    2024年04月16日
    浏览(37)
  • ZooKeeper+Kafka+ELK+Filebeat集群搭建实现大批量日志收集和展示

    大致流程:将nginx 服务器(web-filebeat)的日志通过filebeat收集之后,存储到缓存服务器kafka,之后logstash到kafka服务器上取出相应日志,经过处理后写入到elasticsearch服务器并在kibana上展示。 一、集群环境准备 二、搭建zookeeper集群 前提条件:三台机器分别修改时区、关闭防火墙

    2024年02月04日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包