kafka的ISR工作机制原理

这篇具有很好参考价值的文章主要介绍了kafka的ISR工作机制原理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

昨日内容回顾:
    - ES的加密及客户端的连接方式,比如logstash,filebeat,curl,es-head,postman...
    - zk单点部署
    - kafka单点部署
    - kafka的集群扩容
    - kafka集群架构
    - kafka的堆内存调优
    - kafka的集群宏观架构
    

Q1: 为什么需要配置"dvertised.listeners".
    因为没有配置主机名解析导致的问题。
    
Q2: 一个分区数写入数据是有序的,但为什么多个分区数写入就是无序的呢?
    因为消费者是从多个分区读取的,顺序无法确定。
    
Q3: leader同步follower是如何同步的?
    follower主动去leader拉取数据。

Q4: leader挂掉之后发生了哪些事,请说明?
    leader挂掉后,follower会成为新的leader,需要借助zookeeper实现。


Q5: follower挂掉之后如何解决,会发生哪些问题?
    leader会根据replica.lag.time.max.ms定义的间隔时间,超出该范围leader就会将其剔除ISR列表。

Q6: 当follower数据和leader不同时,又会发生啥事?
    leader会根据replica.lag.time.max.ms定义的间隔时间,超出该范围leader就会将其剔除ISR列表。

Q7: 当消费者数据延迟时如何解决问题?请说明原因。
    将多个消费者加入到同一个消费者组进行消费,但是要注意消费者数量不能大于分区数数量。

Q8: 请问多副本,例如一个分区3个副本,kafka会存在数据丢失的风险吗?请说明原因。
    会,还是ISR的存在。

相关术语:
    Log End Offset (简称"LEO"):
        每个partition的最后一个日志的偏移量。
        
    High Water(简称"HW"):
        所有ISR列表中最小的LEO。
        所有的消费者只能读取到HW之前的偏移量数据。
        
    ISR:
        和leader数据同步的所有副本集合。
        
    OSR:
        和leader数据不同的所有副本集合。

    AR:
        是ISR+OSR,指的是所有副本集合。

    replica.lag.time.max.ms: 30000ms  ---> 30s
        当follower节点超过30秒没有向leader发送fetch请求或者follower的LEO不等于leader的LEO时,leader会将follower踢出ISR列表。
        
        


zookeeper的基础操作:
    1.简介
zookeeper是一个分布式协调服务,其扮演的是辅助的功能。本身并不支持存储大量数据,每个znode默认可以存储大约2M的数据。

    2.什么是znode
所谓的znode指的是zookeeper node,相当于Linux的目录,只不过znode下不仅仅可以存储子znode,其本身也能够存数据。

    3.znode的基础管理
登录:
# zkCli.sh 
增:
    create /test
        创建一个"/test"的znode。
    
    create /test/b
        支持多级路径创建,但要求父路径是必须存在的。
        
    

删:
    delete /test/a 
        删除"/test/a"的znode,要求znode为空。
        
    deleteall /test 
        删除"/test"的所有内容,可以删除非空znode。

改:
    set /test/c/1111 bbbbbbbbbb
        修改"/test/c/1111"的znode存储数据。
        
    
查:            *****
    ls /
        查看/的znode下有哪些子znode。
    
    ls /test -R
        递归查看"/test"子路径下的所有znode。
        
    get /test
        查看数据。
        
        
        
kafka 0.8.0 多实例部署:
    1.解压软包包
tar xf kafka_2.8.0-0.8.0.tar.gz  -C /oldboyedu/softwares/
 
    2.修改配置文件
vim /oldboyedu/softwares/kafka_2.8.0-0.8.0/config/server.properties
...
broker.id=101
# kafka的监听端口
port=19092
log.dirs=/oldboyedu/data/kafka080
zookeeper.connect=10.0.0.101:2181/oldboyedu-linux82-kafka080

    3.101节点同步数据
data_rsync.sh /oldboyedu/softwares/kafka_2.8.0-0.8.0/    
    
    4.102节点修改broker.id
vim /oldboyedu/softwares/kafka_2.8.0-0.8.0/config/server.properties
...
broker.id=102
 
    5.103节点修改broker.id
vim /oldboyedu/softwares/kafka_2.8.0-0.8.0/config/server.properties
...
broker.id=102

    6.zookeeper创建znode
create /oldboyedu-linux82-kafka080

    
    7.启动kafka
cd /oldboyedu/softwares/kafka_2.8.0-0.8.0  && ./bin/kafka-server-start.sh config/server.properties &>/dev/null &


    8.检查zookeeper集群的ids信息
ls /oldboyedu-linux82-kafka080/brokers/ids


早期kafka 0.9之前的offset存储在zookeeper:
    1.查看topic信息
./bin/kafka-list-topic.sh --zookeeper 10.0.0.101:2181/oldboyedu-linux82-kafka080
        
    2.启动生产者
./bin/kafka-console-producer.sh  --broker-list 10.0.0.101:19092,10.0.0.102:19092,10.0.0.103:19092 --topic oldboyedu-linux82

    3.启动消费者
./bin/kafka-console-consumer.sh --zookeeper 10.0.0.101:2181/oldboyedu-linux82-kafka080 --topic oldboyedu-linux82


    4.查看偏移量,注意"console-consumer-78441"根据您的环境适当改变即可
get /oldboyedu-linux82-kafka080/consumers/console-consumer-78441/offsets/oldboyedu-linux82/0

从kafka 0.9之后的版本之后支持将offset存储在kafka的"__consumer_offsets"内置topic中:
    1.查看kafka内置"__consumer_offsets"的方式
kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092  --topic __consumer_offsets  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning  | grep oldboyedu-linux82

....

# [消费者组,消费的topic,分区数]::OffsetAndMetadata(偏移量,...)
[linux82-elk,oldboyedu-linux82,4]::OffsetAndMetadata(offset=5, leaderEpoch=Optional[0], metadata=, commitTimestamp=1661498621467, expireTimestamp=None)
[linux82-elk,oldboyedu-linux82,1]::OffsetAndMetadata(offset=4, leaderEpoch=Optional[0], metadata=, commitTimestamp=1661498621467, expireTimestamp=None)
[linux82-elk,oldboyedu-linux82,2]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1661498621467, expireTimestamp=None)
[linux82-elk,oldboyedu-linux82,0]::OffsetAndMetadata(offset=4, leaderEpoch=Optional[0], metadata=, commitTimestamp=1661498621467, expireTimestamp=None)
[linux82-elk,oldboyedu-linux82,3]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1661498621467, expireTimestamp=None)


    2.查看消费者组信息,注意观察"linux82-elk"的消费者组是否有消息延迟!
kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092  --describe --all-groups
kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092  --describe --group linux82-elk

...
GROUP           TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
linux82-elk     oldboyedu-linux82 4          5               6               1               -               -               -
linux82-elk     oldboyedu-linux82 1          4               4               0               -               -               -
linux82-elk     oldboyedu-linux82 2          3               4               1               -               -               -
linux82-elk     oldboyedu-linux82 0          4               5               1               -               -               -
linux82-elk     oldboyedu-linux82 3          3               5               2               -               -               -

    3.修改配置文件
vim /oldboyedu/softwares/kafka_2.13-3.2.1/config/consumer.properties 
...
group.id=linux82-elk
 
    
    4.基于配置文件启动消费者
kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic oldboyedu-linux82 --from-beginning --consumer.config /oldboyedu/softwares/kafka_2.13-3.2.1/config/consumer.properties


    5.再次查看消费者信息
kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092  --describe --group linux82-elk


kafka生产环境应该监控哪些指标:
    (1)监控kafka的程序是否正常运行;
    (2)数据是否有延迟;
    (3)是否频繁涉及到ISR列表的变化;
    (4)iops;
    (5)其他参数可参考<<kafka权威指南>>;

安装开源工具监控kafka集群:
    1.安装MySQL数据库并授权
        1.1 部署mariadb
yum -y install mariadb-server
systemctl start mariadb

        1.2 修改配置文件
vim /etc/my.cnf
...
[mysqld]
# 跳过名称解析。
skip-name-resolve


        1.3 重启服务并测试链接
systemctl restart mariadb
mysql -u linux82 -poldboyedu -h 10.0.0.103

        1.4 创建数据库
CREATE DATABASE oldboyedu_linux82 DEFAULT CHARACTER SET utf8mb4;
        
        1.5 授权
CREATE USER linux82 IDENTIFIED BY 'oldboyedu';
GRANT ALL ON oldboyedu_linux82.* TO linux82;
SHOW GRANTS FOR linux82;

        
        
    2.修改kafka开启JMX
        2.1 修改启动脚本
vim /oldboyedu/softwares/kafka_2.13-3.2.1/bin/kafka-server-start.sh

...
# 大概在30行左右
    export KAFKA_HEAP_OPTS="-Xmx256m -Xms256m"
    export JMX_PORT="8888"

        2.2 同步启动脚本到其他节点
data_rsync.sh /oldboyedu/softwares/kafka_2.13-3.2.1/bin/kafka-server-start.sh

        2.3 所有重启kafka服务
kafka-server-stop.sh 
kafka-server-start.sh $KAFKA_HOME/config/server.properties &>/dev/null &
    
    

    3.启动zookeeper的JMX端口
        3.1 修改启动脚本
vim /oldboyedu/softwares/apache-zookeeper-3.8.0-bin/bin/zkServer.sh +77
...
# 大概是在77行左右
ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"

    
        3.2 重启zookeeper服务
zkServer.sh restart


    4.安装kafka eagle监控工具
        4.1 解压软件包
unzip kafka-eagle-bin-2.0.8.zip
tar xf efak-web-2.0.8-bin.tar.gz  -C /oldboyedu/softwares/

        4.2 修改kafka eagle启动脚本的堆内存大小
vim  /oldboyedu/softwares/efak-web-2.0.8/bin/ke.sh 
...
export KE_JAVA_OPTS="-server -Xmx256m -Xms256m -XX:MaxGCPauseMillis=20 -XX:+UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"

        4.3 修改配置文件 
cat > /oldboyedu/softwares/efak-web-2.0.8/conf/system-config.properties <<EOF
efak.zk.cluster.alias=linux82,cluster2
linux82.zk.list=10.0.0.101:2181/oldboyedu-linux82-kafka3.2.1
cluster2.zk.list=10.0.0.101:2181/oldboyedu-linux82-kafka080
linux82.efak.broker.size=20
kafka.zk.limit.size=32
efak.webui.port=8048
linux82.efak.offset.storage=kafka
cluster2.efak.offset.storage=zk
linux82.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
efak.metrics.charts=true
efak.metrics.retain=15
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10
efak.topic.token=keadmin
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://10.0.0.103:3306/oldboyedu_linux82?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=linux82
efak.password=oldboyedu
EOF

        4.4 配置环境变量
cat > /etc/profile.d/kafka_eagle.sh <<EOF
#!/bin/bash

export KE_HOME=/oldboyedu/softwares/efak-web-2.0.8
export PATH=$PATH:$KE_HOME/bin
EOF


        4.5 启动kafka eagle服务,第一次需要等一会,可能有点漫长哈!大概30s~1分钟左右
ke.sh start


kafka集群压力测试:
    1.简介 
所谓压力测试就是对一个集群的处理能力的上限做一个评估。为将来集群扩容提供有效的依据。
    
    2.为什么要进行压力测试
(1)压力测试可以了解当前集群的处理能力上限;
(2)当修改集群的配置参数后,压力测试可以协助运维人员去参考本次调优的效果;
(3)压力测试的结果以后可以作为参考扩容集群的有效依据;
    
    3.实战案例
install -d /tmp/kafka-test/
cat > oldboyedu-kafka-test.sh <<'EOF'
# 创建topic
kafka-topics.sh --bootstrap-server 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic oldboyedu-kafka-2022 --replication-factor 1 --partitions 10 --create

# 启动消费者消费数据
nohup kafka-consumer-perf-test.sh --broker-list 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic  oldboyedu-kafka-2022 --messages 100000000 &>/tmp/kafka-test/oldboyedu-kafka-consumer.log &

# 启动生产者写入数据
nohup kafka-producer-perf-test.sh --num-records 100000000 --record-size 1000 --topic  oldboyedu-kafka-2022 --throughput 1000000 --producer-props bootstrap.servers=10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 &> /tmp/kafka-test/oldboyedu-kafka-producer.log &
EOF
bash oldboyedu-kafka-test.sh 


参数说明:
kafka-consumer-perf-test.sh 
    ---messages:
        指定消费消息的数量。
    --broker-list:
        指定broker列表。
    --topic:
        指定topic主体。
        
kafka-producer-perf-test.sh 
    -num-records
        生产消息的数量。
    --record-size:
        每条消息的大小,单位是字节。
    --topic:
        指定topic主体。
    --throughput 
        设置每秒发送的消息数量,即指定最大消息的吞吐量,若设置为-1代表不限制!
    --producer-props bootstrap.servers
        指定broker列表。
        
        
温馨提示:
    本案例测试大约会生成93GB( echo  "100000000 * 1000/1024/1024/1024" | bc)的数据,如果硬盘资源不足的小伙伴可以暂时不用测试了,或者该小上面提到的参数。

今日内容回顾:
    - kafka的ISR工作机制原理   *****
    - ZK的znode基础操作  ***
    - kafka多实例部署
    - kafka eagle监控工具
    - kafka集群压力测试  ***

下次课程:
    - kafka优化
    - kafka配合elk使用案例
    - zk的ACL
    - zk的监控工具
    - zk调优
    - ES8集群环境搭建文章来源地址https://www.toymoban.com/news/detail-691046.html

到了这里,关于kafka的ISR工作机制原理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka的Replication策略和ISR同步机制

    目录 replication replication复制机制 同步复制 异步复制 ISR

    2024年02月13日
    浏览(36)
  • Kafka 的 replica 同步机制(ISR与OSR列表数据相互转换)

    注:ISR与OSR列表数据是存储在Zookeeper的Partition中的(一个Topic可能会配置多个partition)。 一、Kafka副本 Kafka中主题的每个Partition有一个预写式日志文件,每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中,Partition中的每个消息都有一个连续

    2024年04月28日
    浏览(26)
  • Kafka 入门到起飞 - 生产者参数详解 ,什么是生产者确认机制? 什么是ISR? 什么是 OSR?

    上回书我们讲了,生产者发送消息流程解析传送门 那么这篇我们来看下,生产者发送消息时几个重要的参数详解 ,什么是生产者确认机制? 什么是ISR? 什么是 OSR? bootstrap.servers : Kafka 集群地址 host1:port1,host2:port2,host3:port3 不需要写Kafka集群中全部的broker地址,但是也不要写

    2024年02月15日
    浏览(36)
  • kafka之集群工作机制理解

            回想一下,我们搭建kafka集群是如何搭建?修改kafka得配置文件,多个Kafka服务注册到同一个zookeeper集群上的节点,会自动组成集群。         学习服务端原理,通常我们是去读服务端的那些抽象的代码,但是Kafka为了保证高吞吐,高性能,高可扩展的三高架构,很多

    2024年01月19日
    浏览(42)
  • kafka--kafka基础概念-ISR详解

    主要是讲 主 往 从同步中的问题 当绿色P1接收到写入的数据,要同步到紫色的P1S1和P1S2 如何保证一致性呢? 使用In Sync Replicas 也就是ISR概念 为什么不一致的? 因为P1S1同步数据 可能花费 50ms P1S2可能花费60ms 同步完的进入ISR集合, 同步时间是可以设置规定时间的(容忍时间)

    2024年02月12日
    浏览(37)
  • 区块链工作原理,工作机制和详细概念

     各位小伙伴想要博客相关资料的话,关注公众号:chuanyeTry即可领取相关资料! 工作原理         区块链是一种去中心化的分布式账本技术,它的工作原理可以简单概括为以下几个步骤:         1. 交易:区块链中的交易可以是任何数字化的价值交换,例如加密货币

    2024年02月09日
    浏览(53)
  • Kafka之分区副本与ISR

    Kafka的Topic分区本质是一个用于存储Topic下的消息的日志,但是只存一份日志会因为机器损坏或其他原因导致消息丢失不可恢复, 因此需要多个相同的日志作为备份,提高系统可用性,这些备份在kafka中被称为副本(replica)。 kafka将分区的所有副本均匀的分配到所有broker上,并从

    2024年02月04日
    浏览(37)
  • 详解Kafka分区机制原理|Kafka 系列 二

    Kafka 系列第二篇,详解分区机制原理。为了不错过更新,请大家将本号“ 设为星标 ”。 点击上方“后端开发技术”,选择“设为星标” ,优质资源及时送达 上一篇文章介绍了 Kafka 的基本概念和术语,里面有个概念是 分区(Partition)。 kafka 将 一个Topic 中的消息分成多份,分

    2024年02月14日
    浏览(39)
  • Kafka系列(一)【消息队列、Kafka的基本概念、Kafka的工作机制、Kafka可满足的需求、Kafka的特性、Kafka的应用场景】

    转自《Kafka并不难学!入门、进阶、商业实战》 1. 消息队列的来源 在高并发的应用场景中, 由于来不及同步处理请求,接收到的请求往往会发生阻塞。 例如,大量的插入、更新请求同时到达数据库,这会导致行或表被锁住,最后会因为请求堆积过多而触发“连接数过多的异

    2024年02月20日
    浏览(35)
  • Kafka事务机制:原理和实践

    Apache Kafka 是一个分布式流处理平台,广泛用于构建实时数据管道和流应用程序。它不仅以高吞吐量、可扩展性和容错能力著称,还提供了事务支持,以确保数据的完整性和一致性。在这篇博客中,我们将深入探讨 Kafka 的事务机制,了解其原理,并通过一个实际的例子来说明

    2024年02月03日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包