【运维知识大神篇】超详细的ELFK日志分析教程10(kafka集群原理+基本使用+zookeeper和kafka堆内存调优+kafka监控和压力测试+filebeat和logstash对接kafka)

这篇具有很好参考价值的文章主要介绍了【运维知识大神篇】超详细的ELFK日志分析教程10(kafka集群原理+基本使用+zookeeper和kafka堆内存调优+kafka监控和压力测试+filebeat和logstash对接kafka)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本篇文章继续给大家介绍ELFK日志分析,我们先前介绍了ELFK架构,zookeeper部署使用,kafka的部署,仅差kafka使用就将整个体系融汇贯通了。我们本篇文章将以kafka为核心,详细介绍kafka使用,最终将kafka融入ELFK架构中,大致内容见下面目录。

目录

kafka集群原理

一、专业术语

二、为什么kafka会丢数据

kafka集群基本使用

一、启动kafka

二、topic管理

三、生产者和消费者

四、消费者组管理

zookeeper堆内存调优

kafka堆内存调优

kafka开源监控组件kafka-eagle

一、前期准备

二、部署监控

kafka集群压力测试

filebeat对接kafka

一、filebeat作为生产者

二、filebeat作为消费者

logstash对接kafka

一、logstash作为生产者

二、logstash作为消费者


kafka集群原理

kafka学习使用中涉及许多原理,了解这些原理会让学习事半功倍。

一、专业术语

kafka cluster是分布式消息传递系统,与MQ cluster(消息队列)类似,由broker list(kafka运行的节点)和多个topic(主题)组成,在Kafka中,每个topic被细分为多个partition,而每个partition又可以被副本到多个kafka broker上实现高可用性。因此,kafka cluster是由多个broker节点和多个topic partition组成的。

producer是生产者角色,主要负责生产数据,是向kafka cluster写入数据的一方,数据的写入有两种常见策略,要么是rr算法,要么是基于key的hash值和分区数取余。

consumer是消费者角色,主要是负责消费数据,是从kafka cluster拉取数据的一方。

topic是主题,是数据存储的逻辑单元。

replica是副本,实际存储数据的地方,一个topic最少要有一个副本。

partition是分区,一个topic最少要有一个分区,正常情况下有多个分区编号的。副本是分区的实际载体。

consumer group是消费者组,一个消费者组里面最少有一个消费者,同一个消费者组的消费者不能同时消费同一个topic的partition,以免造成数据重复消费;当一个消费者组的消费者数据发生变化时,会触发rebalance(重平衡)机制,即重新分配分区消费。

ISR是和leader数据同步的所有副本集合

OSR是和leader数据不同步的所有副本集合

AR是ISR和OSR的集合,就是所有的副本集合

二、为什么kafka会丢数据

假如30秒内leader和follower数据的LEO(Log End Offset)一致,则认为数据一致,当follower的数据还没有与leader完全同步时,leader节点宕机了,此时,follower选举出新的leader,其他的follower会跟随这个leader数据继续工作,如果之前leader恢复了,会从之前的HW(高水位线,ISR中最后一个副本最小的LEO)开始重新写数据,与新的leader同步,之前follower没同步的就丢失了,若30秒内,副本没有和leader的LEO相同,会直接踢出ISR,进入OSR列表。

kafka集群基本使用

一、启动kafka

1、先启动zookeeper

[root@ELK103 ~]# manager_zk.sh start
启动服务
========== elk101 zkServer.sh start ================
Starting zookeeper ... STARTED
========== elk102 zkServer.sh start ================
Starting zookeeper ... STARTED
========== elk103 zkServer.sh start ================
Starting zookeeper ... STARTED
[root@ELK103 ~]# manager_zk.sh status
查看状态
========== elk101 zkServer.sh status ================
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
========== elk102 zkServer.sh status ================
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
========== elk103 zkServer.sh status ================
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

2、模仿zookeeper启动脚本写一个kafka启动脚本,并给予执行权限

[root@ELK101 ~]# cat /usr/local/sbin/manager_kafka.sh
#!/bin/bash

#判断用户是否传参
if [ $# -ne 1 ];then
    echo "无效参数,用法为: $0  {start|stop|restart|status}"
    exit
fi

#获取用户输入的命令
cmd=$1

#定义函数功能
function kafkaManger(){
    case $cmd in
    start)
        echo "启动服务"        
        remoteExecution start
        ;;
    stop)
        echo "停止服务"
        stopKafka stop
        ;;
    restart)
        echo "重启服务"
        remoteExecution restart
        ;;
    status)
        echo "查看状态"
        remoteExecution status
        ;;
    *)
        echo "无效参数,用法为: $0  {start|stop|restart|status}"
        ;;
    esac
}


#定义执行的命令
function remoteExecution(){
    for (( i=101 ; i<=103 ; i++ )) ; do
            tput setaf 2
            echo ========== elk${i} kafaka-server-start.sh   $1 ================
            tput setaf 9
            ssh elk${i} "kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties" 
    done
}

function stopKafka(){
    for (( i=101 ; i<=103 ; i++ )) ; do
            tput setaf 2
            echo ========== elk${i} kafaka-server-stop.sh   $1 ================
            tput setaf 9
            ssh elk${i} "kafka-server-stop.sh -daemon $KAFKA_HOME/config/server.properties" 
    done

}


#调用函数
kafkaManger
[root@ELK101 ~]# chmod +x /usr/local/sbin/manager_kafka.sh
[root@ELK101 ~]# ll /usr/local/sbin/manager_kafka.sh
-rwxr-xr-x 1 root root 1323 Jun  5 11:29 /usr/local/sbin/manager_kafka.sh

3、通过启动脚本启动kafka

[root@ELK101 ~]# manager_kafka.sh start
启动服务
========== elk101 kafaka-server-start.sh start ================
========== elk102 kafaka-server-start.sh start ================
========== elk103 kafaka-server-start.sh start ================

二、topic管理

1、增

[root@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --create --topic koten
Created topic koten.      #创建一个名为koten的topic

[root@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --create --topic koten-3 --partitions 3
Created topic koten-3.    #创建一个名为koten-3的topic,分区数为3

[root@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --create --topic koten-3-2 --partitions 3 --replication-factor 2
Created topic koten-3-2.  #创建一个名为koten-3-2的topic,分区数为3,副本数为2 

2、查

#查看topic列表
[root@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --list
koten
koten-3
koten-3-2

#查看所有topic的详细信息
[root@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --describe
Topic: koten-3-2	TopicId: 1l4P-Tv_Q3aTasKediU3MQ	PartitionCount: 3	ReplicationFactor: 2   Configs: segment.bytes=1073741824
	Topic: koten-3-2	Partition: 0	Leader: 103	Replicas: 103,102	Isr: 103
	Topic: koten-3-2	Partition: 1	Leader: 102	Replicas: 102,101	Isr: 102,101
	Topic: koten-3-2	Partition: 2	Leader: 101	Replicas: 101,103	Isr: 101,103
Topic: koten	TopicId: eXxgjWBySxe_WAx-fv83ZA	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: koten	Partition: 0	Leader: 103	Replicas: 103	Isr: 103
Topic: koten-3	TopicId: l7L3SY63QV-ayXOD46bViQ	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: koten-3	Partition: 0	Leader: 103	Replicas: 103	Isr: 103
	Topic: koten-3	Partition: 1	Leader: 102	Replicas: 102	Isr: 102
	Topic: koten-3	Partition: 2	Leader: 101	Replicas: 101	Isr: 101

#查看指定topic的详细信息
oot@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --describe --topic koten
Topic: koten	TopicId: eXxgjWBySxe_WAx-fv83ZA	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: koten	Partition: 0	Leader: 103   Replicas: 103	Isr: 103

3、改

主分片数可以用命令行直接修改,副本数修改比较麻烦,需要用json格式,可以参考这个大神的链接:https://www.cnblogs.com/yinzhengjie/p/9808125.html

#修改koten的topic分区为5个
[root@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --alter --topic koten --partitions 5

4、删

[root@ELK101 ~]# kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --delete --topic koten

三、生产者和消费者

1、启动生产者

[root@ELK101 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic koten-3

2、启动消费者(在同一主机)

表示从最新的offset拉取数据

[root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic koten-3

表示从头开始拉取数据

[root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.102:9092 --topic koten-3 --from-beginning

四、消费者组管理

1、查看现有的消费者组

[root@ELK101 ~]# kafka-consumer-groups.sh --bootstraerver  10.0.0.101:9092 --list
console-consumer-24702
console-consumer-58734
console-consumer-41114

2、列出所有消费者组的详细信息,包括偏移量,消费者ID,LEO等信息

[root@ELK101 ~]# kafka-consumer-groups.sh --bootstrap-server  10.0.0.101:9092 --describe --all-groups 

Consumer group 'console-consumer-24702' has no active members.

Consumer group 'console-consumer-41114' has no active members.

GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
console-consumer-58734 koten-3         0          -               1               -               console-consumer-f92294eb-383c-402b-9f9e-9a7ac5773b7d /10.0.0.101     console-consumer
console-consumer-58734 koten-3         1          -               2               -               console-consumer-f92294eb-383c-402b-9f9e-9a7ac5773b7d /10.0.0.101     console-consumer
console-consumer-58734 koten-3         2          -               3               -               console-consumer-f92294eb-383c-402b-9f9e-9a7ac5773b7d /10.0.0.101     console-consumer

3、查看内置topic的offset数据,了解即可,我这边运行没有显示内容

[root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"   --from-beginning

4、基于配置文件指定消费组

[root@ELK101 ~]# cat $KAFKA_HOME/config/consumer.properties
......
group.id=koten-consumer-group
......
[root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092 --topic koten-topic --consumer.config $KAFKA_HOME/config/consumer.properties
[2023-06-05 20:25:33,829] WARN [Consumer clientId=console-consumer, groupId=koten-consumer-group] Error while fetching metadata with correlation id 2 : {koten-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

5、基于命令行参数指定消费者组

[root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092 --topic koten-topic --consumer-property group.id=koten-consumer-group

zookeeper堆内存调优

生产环境建议调到2G~4G

1、查zookeeper的堆内存大小

[root@ELK101 ~]#  jmap -heap `jps | awk '/QuorumPeerMain/{print $1}'` | grep MaxHeapSize
   MaxHeapSize              = 1048576000 (1000.0MB)
[root@ELK101 ~]# 

2、修改堆内存的大小

[root@ELK101 ~]# cat > /koten/softwares/apache-zookeeper-3.8.1-bin/conf/java.env <<'EOF'
#!/bin/bash
# 指定JDK的按住路径
export JAVA_HOME=/koten/softwares/jdk1.8.0_291

# 指定zookeeper的堆内存大小
export JVMFLAGS="-Xms128m -Xmx128m $JVMFLAGS"
EOF

3、将配置文件同步到集群的其他zk节点上

[root@ELK101 ~]# data_rsync.sh /koten/softwares/apache-zookeeper-3.8.1-bin/conf/java.env 
===== rsyncing elk102: java.env =====
命令执行成功!
===== rsyncing elk103: java.env =====
命令执行成功!

4、重启zk集群,注意一定要重启后堆内存才生效,manager_zk脚本不好用,有时候停止了,进程还存在,需要手动挨个运行zkServer.sh stop

[root@ELK101 ~]# manager_zk.sh restart
重启服务
========== elk101 zkServer.sh restart ================
Stopping zookeeper ... STOPPED
Starting zookeeper ... STARTED
========== elk102 zkServer.sh restart ================
Stopping zookeeper ... STOPPED
Starting zookeeper ... STARTED
========== elk103 zkServer.sh restart ================
Stopping zookeeper ... STOPPED
Starting zookeeper ... STARTED

5、验证堆内存

[root@ELK102 ~]# jmap -heap `jps | awk '/QuorumPeerMain/{print $1}'` | grep MaxHeapSize
   MaxHeapSize              = 268435456 (256.0MB)

kafka堆内存调优

1、查看堆内存大小

[root@ELK101 ~]# jmap -heap `jps | awk '/Kafka/{print $1}'` | grep MaxHeapSize
   MaxHeapSize              = 1073741824 (1024.0MB)

2、修改堆内存(生产环境5~6G最佳)

捎带启动了JXM端口

[root@ELK101 ~]# cat `which kafka-server-start.sh`
......
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
#    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export KAFKA_HEAP_OPTS="-server -Xmx256M -Xms256M -
XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 
-XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:Initiat
ingHeapOccupancyPercent=70"
    export JMX_PORT="8888"
fi
......

3、单点重启kafka,查看堆内存大小

[root@ELK101 ~]# kafka-server-stop.sh 
[root@ELK101 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[root@ELK101 ~]# jmap -heap `jps | awk '/Kafka/{print $1}'` | grep MaxHeapSize
   MaxHeapSize              = 268435456 (256.0MB)

4、将启动脚本同步到其他节点

[root@ELK101 ~]# data_rsync.sh `which kafka-server-start.sh`
===== rsyncing elk102: kafka-server-start.sh =====
命令执行成功!
===== rsyncing elk103: kafka-server-start.sh =====
命令执行成功!

5、其他节点重启kafka环境,查看堆内存是否生效

[root@ELK101 ~]# manager_kafka.sh stop
停止服务
========== elk101 kafaka-server-stop.sh stop ================
========== elk102 kafaka-server-stop.sh stop ================
========== elk103 kafaka-server-stop.sh stop ================
[root@ELK101 ~]# manager_kafka.sh start
启动服务
========== elk101 kafaka-server-start.sh start ================
========== elk102 kafaka-server-start.sh start ================
========== elk103 kafaka-server-start.sh start ================
[root@ELK102 ~]# jmap -heap `jps | awk '/Kafka/{print $1}'` | grep MaxHeapSize
   MaxHeapSize              = 268435456 (256.0MB)
[root@ELK103 ~]# jmap -heap `jps | awk '/Kafka/{print $1}'` | grep MaxHeapSize
   MaxHeapSize              = 268435456 (256.0MB)

kafka开源监控组件kafka-eagle

图形化的方式管理kafka

一、前期准备

1、启动kafka的JMX端口

与上面修改堆内存步骤相同,略。

2、启动zookeeper的JMX端口

[root@ELK101 ~]# cat /koten/softwares/apache-zookeeper-3.8.1-bin/bin/zkEnv.sh
# zookeeper JMX
JMXLOCALONLY=false
JMXHOSTNAME=10.0.0.101
JMXPORT=9999
JMXSSL=false
JMXLOG4J=false

3、安装mysql,启动服务并设置开机自启动

[root@ELK101 ~]# yum -y install mariadb-server
[root@ELK101 ~]# cat /etc/my.cnf    
[mysqld]
......
skip-name-resolve=1 #跳过名称解析,不跳过再进行登录的时候,可能会进行反向解析
[root@ELK101 ~]# systemctl enable --now mariadb
Created symlink from /etc/systemd/system/multi-user.target.wants/mariadb.service to /usr/lib/systemd/system/mariadb.service.

4、创建数据库与授权用户

[root@ELK101 ~]# mysql
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MariaDB connection id is 2
Server version: 5.5.68-MariaDB MariaDB Server

Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

MariaDB [(none)]> CREATE DATABASE koten_kafka DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
Query OK, 1 row affected (0.00 sec)

MariaDB [(none)]> CREATE USER admin IDENTIFIED BY 'koten'; 
Query OK, 0 rows affected (0.00 sec)

MariaDB [(none)]> GRANT ALL ON koten_kafka.* TO admin;
Query OK, 0 rows affected (0.00 sec)

MariaDB [(none)]> SHOW GRANTS FOR admin;
+------------------------------------------------------------------------------------------------------+
| Grants for admin@%                                                                                   |
+------------------------------------------------------------------------------------------------------+
| GRANT USAGE ON *.* TO 'admin'@'%' IDENTIFIED BY PASSWORD '*87F5F6FF9376D7C33FEB4C2AA1F7F99E9853F2DB' |
| GRANT ALL PRIVILEGES ON `koten_kafka`.* TO 'admin'@'%'                                               |
+------------------------------------------------------------------------------------------------------+
2 rows in set (0.00 sec)

MariaDB [(none)]> quit
Bye

5、测试用户

[root@ELK101 ~]# mysql -u admin -pkoten -h 10.0.0.101
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MariaDB connection id is 3
Server version: 5.5.68-MariaDB MariaDB Server

Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

MariaDB [(none)]> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| koten_kafka        |
| test               |
+--------------------+
3 rows in set (0.00 sec)

MariaDB [(none)]> quit
Bye

二、部署监控

1、下载kafka-eagle软件,下面的链接下载的慢可以用我分享在文末的链接

[root@ELK101 ~]# wget https://github.com/smartloli/kafka-eagle-bin/archive/v2.0.8.tar.gz

2、解压软件包

[root@ELK101 ~]# unzip kafka-eagle-bin-2.0.8.zip 
Archive:  kafka-eagle-bin-2.0.8.zip
  inflating: efak-web-2.0.8-bin.tar.gz  
  inflating: system-config.properties  
[root@ELK101 ~]# tar xf efak-web-2.0.8-bin.tar.gz -C /koten/softwares/

3、修改配置文件

[root@ELK101 ~]# cat /koten/softwares/efak-web-2.0.8/conf/system-config.properties
efak.zk.cluster.alias=kafka
kafka.zk.list=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181/kafka-3.2.1  #注意chroot与kafka配置文件保持一致
kafka.efak.broker.size=20
kafka.zk.limit.size=16
efak.webui.port=8048
kafka.efak.offset.storage=zk
kafka.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
efak.metrics.charts=true
efak.metrics.retain=15
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10
efak.topic.token=koten
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://10.0.0.101:3306/koten_kafka?useUnicode=true&characterEncodi
ng=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=admin
efak.password=koten   #数据库密码

4、配置环境变量

[root@ELK101 ~]# cat >> /etc/profile.d/kafka.sh <<'EOF'
export KE_HOME=/koten/softwares/efak-web-2.0.8
export PATH=$PATH:$KE_HOME/bin
EOF
[root@ELK101 ~]# source /etc/profile.d/kafka.sh

5、修改配置启动脚本的堆内存大小

[root@ELK101 ~]# sed -i '/KE_JAVA_OPTS/s#2g#256m#g'  $KE_HOME/bin/ke.sh | grep KE_JAVA_OPTS
[root@ELK101 ~]# grep KE_JAVA_OPTS $KE_HOME/bin/ke.sh
export KE_JAVA_OPTS="-server -Xmx256m -Xms256m -XX:MaxGCPauseMillis=20 -XX:+UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"

6、启动服务

[root@ELK101 ~]# ke.sh start
......
[2023-06-05 22:36:46] INFO: [Job done!]
Welcome to
    ______    ______    ___     __ __
   / ____/   / ____/   /   |   / //_/
  / __/     / /_      / /| |  / ,<   
 / /___    / __/     / ___ | / /| |  
/_____/   /_/       /_/  |_|/_/ |_|  
( Eagle For Apache Kafka® )

Version 2.0.8 -- Copyright 2016-2021
*******************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://10.0.0.101:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************

7、登录eagle

【运维知识大神篇】超详细的ELFK日志分析教程10(kafka集群原理+基本使用+zookeeper和kafka堆内存调优+kafka监控和压力测试+filebeat和logstash对接kafka),运维知识分享,# 大神运维知识,kafka,运维,java,zookeeper,efak,apache,filebeat

8、忘记密码后可以进入数据库去找

MariaDB [koten_kafka]> SELECT * FROM koten_kafka.ke_users;
+----+-------+----------+-----------+-----------------+---------------+
| id | rtxno | username | password  | email           | realname      |
+----+-------+----------+-----------+-----------------+---------------+
|  1 |  1000 | admin    | 123456    | admin@email.com | Administrator |
+----+-------+----------+-----------+-----------------+---------------+
1 row in set (0.00 sec)

MariaDB [koten_kafka]> 

9、登陆进去后可以看到监控的kafka与zookeeper的一些数据信息

查看仪表盘

【运维知识大神篇】超详细的ELFK日志分析教程10(kafka集群原理+基本使用+zookeeper和kafka堆内存调优+kafka监控和压力测试+filebeat和logstash对接kafka),运维知识分享,# 大神运维知识,kafka,运维,java,zookeeper,efak,apache,filebeat

查看zookeeper与kafka的监控信息

【运维知识大神篇】超详细的ELFK日志分析教程10(kafka集群原理+基本使用+zookeeper和kafka堆内存调优+kafka监控和压力测试+filebeat和logstash对接kafka),运维知识分享,# 大神运维知识,kafka,运维,java,zookeeper,efak,apache,filebeat

10、不止可以监控数据,还可以创建topic,对kafka集群进行一些操作

【运维知识大神篇】超详细的ELFK日志分析教程10(kafka集群原理+基本使用+zookeeper和kafka堆内存调优+kafka监控和压力测试+filebeat和logstash对接kafka),运维知识分享,# 大神运维知识,kafka,运维,java,zookeeper,efak,apache,filebeat

【运维知识大神篇】超详细的ELFK日志分析教程10(kafka集群原理+基本使用+zookeeper和kafka堆内存调优+kafka监控和压力测试+filebeat和logstash对接kafka),运维知识分享,# 大神运维知识,kafka,运维,java,zookeeper,efak,apache,filebeat

【运维知识大神篇】超详细的ELFK日志分析教程10(kafka集群原理+基本使用+zookeeper和kafka堆内存调优+kafka监控和压力测试+filebeat和logstash对接kafka),运维知识分享,# 大神运维知识,kafka,运维,java,zookeeper,efak,apache,filebeat

kafka集群压力测试 

对kafka集群进行压力测试,方便我们了解集群的处理上限,可以作为集群调优和扩容的依据。

测试之前要先搞懂链路,如果你的生产者与kafka集群不在一个地方,那么你在一个主机进行压力测试也没有意义,确定好链路后,在实际的生产者和消费者执行压测,修改对应的主机参数即可。

1、通过下面脚本进行压测

mkdir /tmp/kafka-test

cat > kafka-test.sh <<'EOF'
# 创建topic
kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic kafka-2023 --replication-factor 1 --partitions 10 --create

# 启动消费者消费数据
nohup kafka-consumer-perf-test.sh --broker-list 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic  kafka-2023 --messages 100000 &>/tmp/kafka-test/kafka-consumer.log &
​
# 启动生产者写入数据
nohup kafka-producer-perf-test.sh --num-records 100000 --record-size 1000 --topic  kafka-2023 --throughput 1000 --producer-props bootstrap.servers=10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 &> /tmp/kafka-test/kafka-producer.log &
EOF

bash kafka-test.sh 


#可以根据自己要测的实际生产情况调整以下参数

kafka-consumer-perf-test.sh 

---messages: 指定消费消息的数量。
--broker-list: 指定broker列表。
--topic: 指定topic主体。
        
kafka-producer-perf-test.sh 

-num-records:生产消息的数量。
--record-size: 每条消息的大小,单位是字节。
--topic: 指定topic主体。
--throughput: 设置每秒发送的消息数量,即指定最大消息的吞吐量,若设置为-1代表不限制!
--producer-props bootstrap.servers: 指定broker列表

 2、可以通过efak查看实施进度

【运维知识大神篇】超详细的ELFK日志分析教程10(kafka集群原理+基本使用+zookeeper和kafka堆内存调优+kafka监控和压力测试+filebeat和logstash对接kafka),运维知识分享,# 大神运维知识,kafka,运维,java,zookeeper,efak,apache,filebeat

【运维知识大神篇】超详细的ELFK日志分析教程10(kafka集群原理+基本使用+zookeeper和kafka堆内存调优+kafka监控和压力测试+filebeat和logstash对接kafka),运维知识分享,# 大神运维知识,kafka,运维,java,zookeeper,efak,apache,filebeat

3、也可以通过脚本输出的日志观察生产和消费速度

【运维知识大神篇】超详细的ELFK日志分析教程10(kafka集群原理+基本使用+zookeeper和kafka堆内存调优+kafka监控和压力测试+filebeat和logstash对接kafka),运维知识分享,# 大神运维知识,kafka,运维,java,zookeeper,efak,apache,filebeat

【运维知识大神篇】超详细的ELFK日志分析教程10(kafka集群原理+基本使用+zookeeper和kafka堆内存调优+kafka监控和压力测试+filebeat和logstash对接kafka),运维知识分享,# 大神运维知识,kafka,运维,java,zookeeper,efak,apache,filebeat

filebeat对接kafka

filebeat对接kafka,如果filebeat作为生产者,kafka作为消费者,可以经过kafka后再写入到es集群;filebeat作为消费者,可以读取到kafka的数据,将kafka的数据展示出来,或者写入es集群。下面给大家展示下示例。

一、filebeat作为生产者

filebeat作为生产者,所以是output到kafka,input我们就用stdin去测试。

1、kafka启动消费者

[root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic filebeat-to-kafka

2、执行filebeat

[root@ELK101 config]# cat 01-stdin-to-kafka.yaml 
filebeat.inputs:
- type: stdin

output.kafka:
  hosts: ["10.0.0.101:9092", "10.0.0.102:9092", "10.0.0.103:9092"]
  topic: 'filebeat-to-kafka'

# 执行filebeat并输入测试数据
[root@ELK101 config]# filebeat -e -c config/01-stdin-to-kafka.yaml
...
1234567

# kafka会消费到数据
[root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic filebeat-to-kafka
[2023-12-18 15:51:33,774] WARN [Consumer clientId=console-consumer, groupId=console-consumer-60747] Error while fetching metadata with correlation id 2 : {filebeat-to-kafka=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
{"@timestamp":"2023-12-18T07:54:06.389Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.17.5"},"ecs":{"version":"1.12.0"},"log":{"offset":0,"file":{"path":""}},"message":"1234567","input":{"type":"stdin"},"host":{"name":"ELK101"},"agent":{"id":"8fa0a9d7-f6d8-45b8-9355-ddf800e337fa","name":"ELK101","type":"filebeat","version":"7.17.5","hostname":"ELK101","ephemeral_id":"cef5e36c-ef9b-4c38-91d9-54f7c4db48fe"}}

二、filebeat作为消费者

1、启动filebeat进行消费

[root@ELK101 config]# cat 02-kafka-to-filebeat.yaml 
filebeat.inputs:
- type: kafka
  hosts:
    - 10.0.0.101:9092
    - 10.0.0.102:9092
    - 10.0.0.103:9092
  topics: ["kafka-to-filebeat"]
  group_id: "filebeat"

output.console:
  pretty: true

[root@ELK101 config]# filebeat -e -c config/02-kafka-to-filebeat.yaml

2、启动kafka进行生产

[root@ELK101 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic kafka-to-filebeat
>123456ceshi   
>


[root@ELK101 config]# filebeat -e -c config/02-kafka-to-filebeat.yaml 
...
{
  "@timestamp": "2023-12-18T09:06:52.226Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "7.17.5"
  },
  "ecs": {
    "version": "1.12.0"
  },
  "host": {
    "name": "ELK101"
  },
  "agent": {
    "version": "7.17.5",
    "hostname": "ELK101",
    "ephemeral_id": "d4c29b42-d892-4532-bbe6-cff5f5a243f9",
    "id": "8fa0a9d7-f6d8-45b8-9355-ddf800e337fa",
    "name": "ELK101",
    "type": "filebeat"
  },
  "kafka": {
    "partition": 0,
    "offset": 0,
    "key": "",
    "headers": [],
    "topic": "kafka-to-filebeat"
  },
  "message": "123456ceshi",
  "input": {
    "type": "kafka"
  }
}

logstash对接kafka

一、logstash作为生产者

1、kafaka开始消费

[root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic logstash-to-kafka

2、启动logstash开始生产

[root@ELK101 logstash_cofig]# cat 01-logstash-to-kafka.yaml
input {
  stdin {
  }
}

output {
  kafka {
    bootstrap_servers => "10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092"
    topic_id => "logstash-to-kafka"
  }
}

# 启动logstash并手动写入数据
[root@ELK101 logstash_cofig]# logstash -r -f 01-logstash-to-kafka.yaml
...
ceshi123

# kafka可以消费到数据
[root@ELK101 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic logstash-to-kafka
...
2023-12-18T08:04:25.969Z ELK101 
2023-12-18T08:05:03.169Z ELK101 ceshi123

二、logstash作为消费者

1、启动logstash进行消费

[root@ELK101 logstash_cofig]# cat 02-kafka-to-logstasg.yaml
input {
  kafka {
    bootstrap_servers => "10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092"
    topics => "kafka-to-logstash"
  }
}

output {
  stdout {}
}
[root@ELK101 logstash_cofig]# logstash -r -f 02-kafka-to-logstasg.yaml

2、启动kafka进行生产

[root@ELK101 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic kafka-to-logstash
>123
>

[root@ELK101 logstash_cofig]# logstash -r -f 02-kafka-to-logstasg.yaml
...
{
      "@version" => "1",
    "@timestamp" => 2023-12-18T09:19:51.170Z,
       "message" => "123"
}

kafka图形化管理工具下载链接:https://pan.baidu.com/s/1_xciM_6OC0383f11phRycQ?pwd=j7ki 

我是koten,10年运维经验,持续分享运维干货,感谢大家的阅读和关注!文章来源地址https://www.toymoban.com/news/detail-761445.html

到了这里,关于【运维知识大神篇】超详细的ELFK日志分析教程10(kafka集群原理+基本使用+zookeeper和kafka堆内存调优+kafka监控和压力测试+filebeat和logstash对接kafka)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【运维知识大神篇】运维人必学的Docker教程6(Docker-Compose使用详解+Linux特性管理+macvlan技术+consul实现大规模跨主机通信overlay+私有仓库harbor)

    本篇文章继续给大家介绍Docker的有关内容,包括docker启动特权容器及利用特权修改内核参数,Attach和Exec区别,Docker-compose使用详解,Linux特性管理,macvlan技术实现docker跨主机通信,使用consul工具实现大规模跨主机通信overlay(基于vxlan实现),Docker相关参数说明,测试使用的

    2024年02月20日
    浏览(49)
  • ELK、ELFK日志分析系统

    ELK平台是一套完整的日志集中处理解决方案,将 ElasticSearch、Logstash 和 Kiabana 三个开源工具配合使用, 完成更强大的用户对日志的查询、排序、统计需求。 1.1.1 ElasticSearch ElasticSearch是基于Lucene(一个全文检索引擎的架构)开发的分布式存储检索引擎,用来存储各类日志。 E

    2024年02月13日
    浏览(50)
  • ELK日志分析系统和ELFK

    目录 什么要做日志分析平台 一、 Elasticsearch elasticsearch核心概念 二、Logstash Logstash 的主要组件 三、Kibana 介绍 功能 ELK搭建 四、Filebeat Filebeat工作方式 Filebeat工作原理 五、Filebeat和Logstash 为什么使用filebeat收集日志更好 Filebeat结合Logstash的好处 六、ELK+Filebeat+Kafka+Zookeeper ELK 是

    2023年04月08日
    浏览(35)
  • ELK 企业级日志分析系统 ELFK

    ELK平台是一套完整的日志集中处理解决方案,将 ElasticSearch、Logstash 和 Kiabana 三个开源工具配合使用, 完成更强大的用户对日志的查询、排序、统计需求。 使用ELK日志文件系统的原因:日志主要包括系统日志、应用程序日志和安全日志。系统运维和开发人员可以通过日志了

    2024年02月07日
    浏览(42)
  • ELFK日志分析系统并使用Filter对日志数据进行处理

    Node1节点(2C/4G):node1/192.168.154.10 Elasticsearch Node2节点(2C/4G):node2/192.168.154.11 Elasticsearch Apache节点:apache/192.168.154.12 Logstash Kibana Apache Filebeat节点:filebeat/192.168.154.13 Filebeat 先在Filebeat节点添加网页文件 浏览器访问http://192.168.154.13/test.html获取日志 1.安装 Filebeat 2.设置 f

    2024年02月06日
    浏览(43)
  • ELFK——ELK结合filebeat日志分析系统(2)

    目录   一、filebeat  二、ELFK 1.原理简介 2.在ELK基础上部署filebeat         Filebeat,轻量级的开源日志文件数据搜集器。通常在需要采集数据的客户端安装 Filebeat,并指定目录与日志格式,Filebeat 就能快速收集数据,并发送给 logstash 进行解析,或是直接发给 Elasticsearch 存储

    2024年02月13日
    浏览(40)
  • 从小白到大神之路之学习运维第37天---第三阶段---mysql数据库之拓展知识

    拓展知识 目录 一、MySQL数据库目录结构以及存放位置 二、MySQL Enterprise Backup 三、MySQL读写分离器 四、进程和线程 五、CentOS 7 中配置静态 IP     1. 数据库存储目录: MySQL数据库的数据文件存储在指定的数据目录下。MySQL安装时默认数据目录在Linux系统中为/var/lib/mysql,Windows系

    2024年02月08日
    浏览(52)
  • 【超详细~KVM】KVM概述、安装及简单操作-------从小白到大神之路之学习运维第91天

    第四阶段提升 时  间:2023年8月30日 参加人:全班人员 内  容: KVM概述、安装及简单操作 目录 一、KVM 概述 二、KVM工作原理 三、KVM应用场景 四、centos7 下安装部署 五、新建虚拟机步骤 1、创建存储池并创建存储卷 2、点击+号创建KVM存储池 3、创建存储卷 4、创建ISO存储池

    2024年02月03日
    浏览(45)
  • 从小白到大神之路之学习运维第44天---第三阶段----拓展知识-----文件管理命令(find+sed+awk)、pycharm工具

    第三阶段基础 时  间:2023年6月20日 参加人:全班人员 内  容: 目录 一、文件管理命令 find 1. 根据文件名查找文件 2. 根据文件类型查找文件 3. 根据文件大小查找文件 4. 根据时间戳查找文件 5. 组合多个条件查找文件 Sed 1. 替换文本 2. 插入和删除行 3. 格式化输出 总 结: a

    2024年02月09日
    浏览(52)
  • 【运维知识高级篇】超详细的Jenkins教程4(参数化构建+脚本传参+代码秒级发布+秒级回滚)

    之前我们介绍的大多是测试环境的推送,在生产环境中,我们不会用到那么多次的构建测试,但是会涉及稳定版本的发布和回滚,我们也通过jenkins配合gitlab去实现,通过远程仓库的tag传递参数,我们把这种方式称为参数化构建,在参数化构建的过程中,我们省略了代码质量检

    2024年02月07日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包