Kafka - Broker 详解

这篇具有很好参考价值的文章主要介绍了Kafka - Broker 详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

零、前置

一、Kafka Broker 工作流程

1.Zookeeper 存储的 Kafka 信息

2.Kafka Broker 总体工作流程

模拟 Kafka 上下线,Zookeeper 中数据变化

3.Broker 重要参数

二、生产经验 节点服役和退役

1.服役新节点

新节点准备

执行负载均衡操作

生成负载均衡的计划

执行副本存储计划

验证副本存储计划

2.退役旧节点

执行负载均衡操作

创建一个要均衡的主题

创建执行计划

执行副本存储计划

三、Kafka 副本

1.副本基本信息

ISR

OSR

2.Leader 选举流程

创建一个新的 topic,4 个分区,4 个副本

查看 Leader 分布情况

3.Leader 和 Follower 故障处理细节

Follower故障处理细节

Leader故障处理细节

4.分区副本分配

创建 16 个分区,3 个副本

5.手动调整分区副本存储

创建一个新的 topic

查看分区副本存储情况

创建副本存储计划

执行副本存储计划

验证副本存储计划

查看分区副本存储情况

6.Leader Partition 负载平衡

7.增加副本因子

创建 topic

手动增加副本存储

执行副本存储计划

四、文件存储

1.文件存储机制

Topic 数据的存储机制

Topic 数据到底存储在什么位置 

index 文件和 log 文件详解

2.文件清理策略

delete 日志删除:将过期数据删除

compact 日志压缩

五、高效读写数据

Kafka 本身是分布式集群,可以采用分区技术,并行度高

读数据采用稀疏索引,可以快速定位要消费的数据

顺序写磁盘

页缓存 + 零拷贝技术


零、前置

Kafka集群搭建:Kafka + Zookeeper + Hadoop 集群配置

Kafka Topic命令:Kafka - Topic命令 & 命令行操作


一、Kafka Broker 工作流程

1.Zookeeper 存储的 Kafka 信息

启动Zookeeper客户端(注意目录):

cd /opt/module/zookeeper-3.5.7/
bin/zkCli.sh

通过 ls 进行查看:

ls /

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

可以看到除了默认的Zookeeper节点外还有一个节点叫做kafka(之所以叫这个是因为咱们在配置文件中写的名字就是/kafka)。

查看 kafka节点 内的信息:

ls /kafka

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

可以再往里面看看:

ls /kafka/brokers/ids

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

可以看到在 ids 下面有着 0 1 2 三个数字,这个其实就是我们Kafka集群中每台机器的 brokerid

当然如果你觉得这样一个一个看太慢了,你也可以使用 ‘漂亮的公园’ 进行查看( 软件名称:prettyZoo ):

连接时输入IP和端口号即可,非常方便。在如此之多的节点中,最重要的有三个,分别如下:

/kafka/brokers/ids

#用于记录有哪些服务器 

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

/kafka/brokers/topics/first/partitions/0/state

#用于记录谁是Leader,有哪些服务器可用

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

/kafka/controller

#用于辅助选举Leader。每一个节点都会去抢着注册controller,谁先抢到注册权,谁就有权利去确定一个Leader

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

还有一个比较特殊的节点是 consumers 节点。0.9版本之前用于保存 offset 信息。0.9版本之后 offset 存储在kafka主题中。


2.Kafka Broker 总体工作流程

第一步:broker启动后在zk中注册,争抢controller

第二步:谁先注册Controller,谁说了算

第三步:由选举出来的Controller,监听brokers节点变化

第四步:Controller决定Leader选举

第五步:Controller将节点信息上传到ZK

第六步:其他contorller从zk同步相关信息,万一挂了他们好上位

第七步:假设Broker1中Leader挂了

第八步:Controller监听到节点变化

第九步:获取ISR

第十步:选举新的Leader(在 isr中存活为前提,按照 AR中排在前面的优先)

第十一步:更新Leader及ISR

选举规则:在isr中存活为前提,按 照AR中排在前面的优先。例如 ar[1,0,2], isr [1,0,2],那么leader 就会按照1,0,2的顺序轮询

模拟 Kafka 上下线,Zookeeper 中数据变化

查看/kafka/brokers/ids 路径上的节点:

ls /kafka/brokers/ids

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

查看/kafka/controller 路径上的数据:

get /kafka/controller

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据:

get /kafka/brokers/topics/first/partitions/0/state

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

停止 test3 上的 kafka:

bin/kafka-server-stop.sh

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

再次查看/kafka/brokers/ids 路径上的节点:

ls /kafka/brokers/ids

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

再次查看/kafka/controller 路径上的数据:

get /kafka/controller

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

再次查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据:

get /kafka/brokers/topics/first/partitions/0/state

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

重新启动 test3 上的 kafka:

bin/kafka-server-start.sh -daemon ./config/server.properties

3.Broker 重要参数

参数名称 描述
replica.lag.time.max.ms ISR中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。 该时间阈值,默认 30s
auto.leader.rebalance.enable 默认是 true。 自动 Leader Partition 平衡
leader.imbalance.per.broker.percentage 默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器 会触发 leader 的平衡
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔时间
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是 指 log 日志划分成块的大小,默认值 1G
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引
log.retention.hours Kafka 中数据保存的时间,默认 7 天
log.retention.minutes Kafka 中数据保存的时间,分钟级别,默认关闭
log.retention.ms Kafka 中数据保存的时间,毫秒级别,默认关闭
log.retention.check.interval.ms 检查数据是否保存超时的间隔,默认是 5 分钟
log.retention.bytes 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment
log.cleanup.policy 默认是 delete,表示所有数据启用删除策略; 如果设置值为compact,表示所有数据启用压缩策略
num.io.threads 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%
num.replica.fetchers 副本拉取线程数,这个参数占总核数的 50%的1/3
num.network.threads 默认是 3。数据传输线程数,这个参数占总核数的 50%的2/3
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改, 交给系统自己管理
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理

二、生产经验 节点服役和退役

1.服役新节点

新节点准备

关闭机器test3,然后对其进行克隆。

修改新机器的IP,和主机名称:

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

 记得把test4的IP添加到test1的hosts文件中去。 

记得去把zk和kafka的配置也改一下,修改 test4 中 kafka 的 broker.id 为 3,删除 test4 中 kafka 下的 datas 和 logs(注意目录):

cd /opt/module/kafka
rm -rf datas/* logs/*

启动test1、test2和test3上的zk和kafka集群:

zk.sh start
kf.sh start

单独启动 test4 中的 kafka:

cd /opt/module/kafka/
bin/kafka-server-start.sh -daemon ./config/server.properties

执行负载均衡操作

接下来要做的是把咱们原本集群里的数据,分摊一部分到新创建的test4中,以达到负载均衡的作用。创建一个要均衡的主题(回到test1机器上):

cd /opt/module/kafka
vim topics-to-move.json

咱们针对 first 主题进行操作,把以下内容复制进去: 

{
     "topics": [
         {"topic": "first"}
     ],
     "version": 1
}

生成负载均衡的计划

bin/kafka-reassign-partitions.sh --bootstrap-server test1:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

复制 下面一行 下来, 在 kafka目录 下创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中):

vim increase-replication-factor.json

把上面的内容复制进去即可,就像这样:

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server test1:9092 --reassignment-json-file increase-replication-factor.json --execute

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

验证副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server test1:9092 --reassignment-json-file increase-replication-factor.json --verify

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

此时可以去确认一下:

bin/kafka-topics.sh --bootstrap-server test1:9092 --topic first --describe

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式


2.退役旧节点

执行负载均衡操作

先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。

创建一个要均衡的主题

 这里其实不用改,只是强调一下步骤 

vim topics-to-move.json

创建执行计划

 这里看到这里比之前少了一个3,从前面的"0,1,2,3"变成了"0,1,2",因为咱们要把3退役掉

bin/kafka-reassign-partitions.sh --bootstrap-server test1:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

依旧还是把下面的一行复制下来,然后创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中):

vim increase-replication-factor.json

把刚刚复制的粘贴进去:

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server test1:9092 --reassignment-json-file increase-replication-factor.json --execute

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

验证副本存储计划:

bin/kafka-reassign-partitions.sh --bootstrap-server test1:9092 --reassignment-json-file increase-replication-factor.json --verify

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式


三、Kafka 副本

1.副本基本信息

  • Kafka 副本作用:提高数据可靠性。
  • Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会 增加磁盘存储空间,增加网络上数据传输,降低效率。
  • Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader, 然后 Follower 找 Leader 进行同步数据。
  • Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)

 AR = ISR + OSR 

ISR

表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader

OSR

表示 Follower 与 Leader 副本同步时,延迟过多的副本


2.Leader 选举流程

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。Controller 的信息同步工作是依赖于 Zookeeper 的。

具体流程在上面已经写过了,这里不多说。

创建一个新的 topic,4 个分区,4 个副本

(注意这里是4个,所以得把之前的test4开启)

bin/kafka-topics.sh --bootstrap-server test1:9092 --create --topic van --partitions 4 --replication-factor 4

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

查看 Leader 分布情况

bin/kafka-topics.sh --bootstrap-server test1:9092 --describe --topic second

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

停止掉 test4 的 kafka 进程,并查看 Leader 分区情况:

bin/kafka-server-stop.sh

回到 test1 上重新看一眼:

bin/kafka-topics.sh --bootstrap-server test1:9092 --describe --topic second

可以看到两次的 Leader 分区发生的变化:

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

停止掉 test3 的 kafka 进程,并查看 Leader 分区情况:

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

启动 test4 test5 的 kafka 进程,并查看 Leader 分区情况(可以看到没有变回去): 

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式


3.Leader 和 Follower 故障处理细节

Follower故障处理细节

 LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。用下图的 broker0 来看,最后一个是7,但是因为它从0开始(有点类似于索引 index ),所以最后一个其实就是 7 + 1 = 8

 HW(High Watermark):所有副本中最小的 LEO 。还是以下图来看, HW = 最小的LEO ----> HW = 4 + 1 = 5  

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

咱们这里假设有三台kafka,最上面的是 Leader 下面两个为 Follower 。因为咱们kafka是由 Leader 节点跟客户做交互,所以他里面的数据比较多是正常的(有一个时间先后顺序)。上面的 ISR 表示三台服务器都正常启动。

消费者能够看到的最大的offset其实是 HW 下面的一个,也就是上图中的 4

此时其中的一个Follower也就是 broker2 挂了!

第一步:发生故障后,会被临时踢出ISR:

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

第二步:这个期间剩下的 broker0 broker1 会继续接收数据。它们不会因为 broker3 的掉线而停止工作,也正是因为这样 LEO HW 会继续往后添加:

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

 第三步:当挂掉的Follower恢复后,它会读取本地磁盘记录的上次的HW(5),并将高于HW的部分截取掉(因为它自身认为这是没有验证过的数据)。从HW,开始向Leader进行同步:

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

等该Follower追上Leader之后(准确上来说是最上新的 HW ),就可以重新加入ISR了。 

Leader故障处理细节

第一步:发生故障后,还是会被临时踢出ISR:

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

 第二步:Leader发生故障之后,会从ISR中选出一个新的Leader:

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

第三步:这时候问题来了。 broker2 里面的数据比 broker1 里面的还要多(说白了就是这个小弟的实力比老大还厉害)。此时为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据(这里的 broker2 会把多出来的 5 6 给扔了,向 broker1 看齐):

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

 注意:这只能保证副本之间的数据一致性,并不能保证数据 不丢失 或者 不重复 


4.分区副本分配

 如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数大于服务器台数,在 kafka 底层如何分配存储副本呢?

创建 16 个分区,3 个副本

先把之前的 second 给删了,重新创建一个:

bin/kafka-topics.sh --bootstrap-server test1:9092 --delete --topic second

创建一个新的 topic,名称为 second 

bin/kafka-topics.sh --bootstrap-server test1:9092 --create --partitions 16 --replication-factor 3 --topic second

查看分区和副本情况:

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

看一下细节(颜色标好了,找规律即可):

Topic: second                Partition: 0                Leader: 2                  Replicas: 2 , 3 , 1                 Isr: 2,3,1
Topic: second                Partition: 1                Leader: 3                  Replicas: 3 , 1 , 0                 Isr: 3,0
Topic: second                Partition: 2                Leader: 1                  Replicas: 1 , 0 , 2                 Isr: 1,0,2
Topic: second                Partition: 3                Leader: 0                  Replicas: 0 , 2 , 3                 Isr: 0,2,3

——————————————————————————————————————————————

Topic: second                Partition: 4               Leader: 2                   Replicas: 2 , 1 , 0                 Isr: 2,1,0
Topic: second                Partition: 5               Leader: 3                   Replicas: 3 , 0 , 2                 Isr: 3,0
Topic: second                Partition: 6               Leader: 1                   Replicas: 1 , 2 , 3                 Isr: 1,2,3
Topic: second                Partition: 7               Leader: 0                   Replicas: 0 , 3 , 1                 Isr: 0,3,1

——————————————————————————————————————————————

Topic: second                Partition: 8               Leader: 2                  Replicas: 2 , 0 , 3                Isr: 2,0,3
Topic: second                Partition: 9               Leader: 3                  Replicas: 3 , 2 , 1                Isr: 3
Topic: second                Partition: 10             Leader: 1                  Replicas: 1 , 3 , 0                Isr: 1,3,0
Topic: second                Partition: 11             Leader: 0                  Replicas: 0 , 1 , 2                Isr: 0,1,2

——————————————————————————————————————————————

Topic: second                Partition: 12             Leader: 2                  Replicas: 2 , 3 , 1                Isr: 2,3,1
Topic: second                Partition: 13             Leader: 3                  Replicas: 3 , 1 , 0                Isr: 3,0
Topic: second                Partition: 14             Leader: 1                  Replicas: 1 , 0 , 2                Isr: 1,0,2
Topic: second                Partition: 15             Leader: 0                  Replicas: 0 , 2 , 3                Isr: 0,2,3


5.手动调整分区副本存储

在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。

需求: 创建一个新的topic,4个分区,两个副本,名称为three。将 该topic的所有副本都存储到 broker0 和 broker1 两台服务器上。

下图中左侧为默认环境, L (Leader)被均匀的分布在了每一台服务器上。但是这里的 broker0 broker1 的性能远高于 broker2 broker3 ,所以咱们的目的如下图右侧:将 L (Leader)固定分配在性能较高的 broker0 broker1 中。

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

手动调整分区副本存储的步骤如下:

创建一个新的 topic

bin/kafka-topics.sh --bootstrap-server test1:9092 --create --partitions 4 --replication-factor 2 --topic three

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

查看分区副本存储情况

bin/kafka-topics.sh --bootstrap-server test1:9092 --describe --topic three

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

创建副本存储计划

所有副本都指定存储在 broker0、broker1 中:

vim increase-replication-factor.json

输入如下内容:

{
    "version":1,
    "partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
        {"topic":"three","partition":1,"replicas":[0,1]},
        {"topic":"three","partition":2,"replicas":[1,0]},
        {"topic":"three","partition":3,"replicas":[1,0]}]
}

执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server test1:9092 --reassignment-json-file increase-replication-factor.json --execute

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

验证副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server test1:9092 --reassignment-json-file increase-replication-factor.json --verify

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

查看分区副本存储情况

bin/kafka-topics.sh --bootstrap-server test1:9092 --describe --topic three

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

操作完成! 


6.Leader Partition 负载平衡

正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。

 auto.leader.rebalance.enable ,默认是true。自动 Leader Partition 平衡

 leader.imbalance.per.broker.percentage , 默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡 

 leader.imbalance.check.interval.seconds , 默认值300秒。检查leader负载是否平衡的间隔时间 

下面举个栗子: 

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

可以看到上图中先创建了一个topic,然后进行两次查看(第一次查看之后我手动把一台broker关掉再重启)。 

第一次查看:ISR全部存活,对Replicas进行对比,谁靠前谁就是Leader。

然后去关掉一个broker,当它下线的时候,集群会选举出新的Leader。之后重启它,让它重新上线。

第二次查看:因为中途Leader的变更,所以可以看到就算Replicas中2是靠前的,但是Leader变成了3。

broker0 来说,原本的Leader应该是2,但是在重新选举后变成了3,所以不平衡数加1。AR的副本数是4,所以 broker0 节点不平衡率为 1/4>10% ,需要再平衡。

 Broker1 2 3 的不平衡数为0,不需要再平衡。

在生产环境中,就算发生了broker重启的情况,也不会导致kafka集群不可用。此时再去进行平衡会浪费大量的系统资源,这是没必要的,所以生产环境中一般会把 auto.leader.rebalance.enable 的true改成false。若这里不改的话也可以讲后面 leader.imbalance.per.broker.percentage 的10%提高到20-30%左右。


7.增加副本因子

在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

创建 topic

bin/kafka-topics.sh --bootstrap-server test1:9092 --create --partitions 3 --replication-factor 1 --topic four

手动增加副本存储

创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中):

vim increase-replication-factor.json

内容如下: 

{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}

执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server test1:9092 --reassignment-json-file increase-replication-factor.json --execute

四、文件存储

1.文件存储机制

Topic 数据的存储机制

Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。

可以用Linux的文件目录来抽象的表示一下

Topic/
├── Partition-0
│   └── log
│       ├── *.index
│       ├── *.log
│       └── *.timeindex
├── Partition-1
│   └── log
└── Partition-2
    └── log

Topic 数据到底存储在什么位置 

查看 test1(或者 test2、test3)的/opt/module/kafka/datas/first-1 (first-0、first-2)路径上的文件:

cd /opt/module/kafka/datas/first-0/

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

这里的日志是无法直接用cat查看的,因为它是序列化文件。通过工具查看 index 和 log 信息:

kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index

index 文件和 log 文件详解

  • Index为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引.参数log.index.interval.bytes默认4kb。
  • Index文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大,因此能将offset的值控制在固定大小。

如何在log文件中定位到offset所对应的Record:

  • 根据目标offset定位Segment文件
  • 找到小于等于目标offset的最大offset对应的索引项
  • 定位到log文件
  • 向下遍历找到目标Record

日志存储参数配置:

参数 描述
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就 往 index 文件里面记录一个索引。 稀疏索引

2.文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间:

  •  log.retention.hours ,最低优先级小时,默认 7 天。
  •  log.retention.minutes ,分钟。
  •  log.retention.ms ,最高优先级毫秒。
  •  log.retention.check.interval.ms ,负责设置检查周期,默认 5 分钟。

那么日志一旦超过了设置的时间,怎么处理呢? Kafka 中提供的日志清理策略有 delete 和 compact 两种。

delete 日志删除:将过期数据删除

  •  log.cleanup.policy = delete 所有数据启用删除策略

基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。

基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。 log.retention.bytes,默认等于-1,表示无穷大。

compact 日志压缩

compact日志压缩:对于相同key的不同value值,只保留最后一个版本。 

  •  log.cleanup.policy = compact 所有数据启用压缩策略。

kafka broker,Kafka,Zookeeper,kafka,大数据,服务器,云原生,分布式

压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大 的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。

这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息 集里就保存了所有用户最新的资料。 


五、高效读写数据

Kafka 本身是分布式集群,可以采用分区技术,并行度高

读数据采用稀疏索引,可以快速定位要消费的数据

顺序写磁盘

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

页缓存 + 零拷贝技术

零拷贝:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高

PageCache页缓存:Kafka重度依赖底层操作系统提供的PageCache功 能。当上层有写操作时,操作系统只是将数据写入 PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存 都当做了磁盘缓存来使用。文章来源地址https://www.toymoban.com/news/detail-821370.html

参数 描述
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改, 交给系统自己管理

到了这里,关于Kafka - Broker 详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka 实战 - Kafka Broker工作流程

    Apache Kafka Broker 在 Kafka 集群中扮演着核心角色,负责接收、存储、复制及分发消息。以下是 Kafka Broker 的工作流程概览: 1. 启动与初始化 加载配置 :Kafka Broker 从 server.properties 文件加载配置参数,包括 Broker ID、监听地址、日志目录、ZooKeeper 连接信息等。 注册到 ZooKeeper :

    2024年04月15日
    浏览(38)
  • 「Kafka」Broker篇

    主要讲解的是在 Kafka 中是怎么存储数据的,以及 Kafka 和 Zookeeper 之间如何进行数据沟通的。 Zookeeper 存储的 Kafka 信息 启动 Zookeeper 客户端: 通过 ls 命令可以查看 kafka 相关信息: Kafka Broker 总体工作流程 模拟 Kafka 上下线,Zookeeper 中数据变化: 查看 /kafka/brokers/ids 路径上的节

    2024年01月18日
    浏览(32)
  • 深入Kafka broker

    颗粒度, PRODUCE和FETCH中支持topic,partion等层级的颗粒度; 测试友好, 基于session_id和epoch确定一条拉取链路的fetch session; 全量增量结合, FetchRequest中的全量拉取和增量拉取; 基本结构: header+body。 常见header: api_key, api_version, corelation_id, client_id。与网络协议类似, Kafka本身的协议也是分

    2024年01月22日
    浏览(71)
  • 【kafka】——Broker

    1 /kafka/brokers/dis 存储broker的id,记录有哪些服务器 2 /kafka/brokers/topics 存储topic 相关信息 3 /kafka/consumers Kafka 0.9 版本之前 用于保存offset信息 Kafka 0.9 版本之后offser存储在Kafka主题中 4 /kafka/controller 辅助选举Leader 1 Broker 启动后在Zookeeper中注册 2 每个节点中的Contoller 抢先在Zookeepe

    2024年02月09日
    浏览(33)
  • 四、Kafka Broker

    4.1.1 Zookeeper 存储的 Kafka 信息 4.1.2 Kafka Broker 总体工作流程 自己的理解:其实就是将kafka的分区,负载到集群中的各个节点上。 1、服役新节点 2、退役旧节点 1、副本的作用 2、Leader的选举流程 选举规则:在isr中存活为前提,按照AR中排在前面的优先。例如ar[1,0,2], isr [1,0,

    2024年02月11日
    浏览(30)
  • Kafka-Broker工作流程

     kafka集群在启动时,会将每个broker节点注册到zookeeper中,每个broker节点都有一个controller,哪个controller先在zookeeper中注册,哪个controller就负责监听brokers节点变化,当有分区的leader挂掉时,controller会监听到节点变化,然后去zookeeper中获取isr,选举新的leader,选举的规则是:在

    2024年02月14日
    浏览(38)
  • KafKa 3.x(二、Broker,消费者)

    4.1.1 Zoopkeeper存储的Kafka信息 启动Zookeeper客户端 通过ls命令查看kafka相关信息 在Zookeeper的服务端存储的Kafka相关信息 /kafka/brokers/ids [0,1,2] 记录那些服务器 /kafka/brokers/topics/first/partitions/0/state {“leader”:1,“isr”:[1,0,2]} 记录谁是leader,有哪些服务器可用 /kafka/controller {“brokerid”

    2024年02月12日
    浏览(31)
  • kafka的broker和replica和文件存储

    /brokers/ids,记录存在的服务器id /brokers/topics/test/partitions/0/state,记录leader和可用副本服务器 /comsumers,0.9版本之前存储消费者的offset信息,但是会产生zookeeper和broker的跨节点通信 /controller 辅助选举leader。每个broker上都会有一个controller模块,controller在zookeeper注册信息,先注册

    2024年02月13日
    浏览(34)
  • Kafka学习--3、Kafka Broker、节点服役和退役、Kafka 副本、Leader 选举流程、故障处理

    1.1 Kafka Broker工作流程 1.1.1 Zookeeper储存的Kafka信息 (1)启动Zookeeper集群、再启动Kafka集群,然后启动Zookeeper客户端 (2)通过ls命令可以查看kafka相关信息。 1.1.2 Kafka Broker总体工作流程 1、模拟Kafka上下线,Zookeeper中数据变化 (1)查看/kafka/brokers/ids 路径上的节点。 (2)查看

    2024年02月10日
    浏览(37)
  • Kafka3.0.0版本——Broker( 退役旧节点)示例

    三台服务器 原始服务器名称 原始服务器ip centos7虚拟机1 192.168.136.27 centos7虚拟机2 192.168.136.28 centos7虚拟机3 192.168.136.29 centos7虚拟机4 192.168.136.30 分别启动4台zookeeper 再分别启动4台kafka brokers中的ids=0、1、2、3都存在,说明4台kafka启动正常,如下图: 查看kafka中的创建过的名称为

    2024年02月13日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包