1. 文章引言
最近在学习kafka相关的知识,特将学习成功记录成文章,以供大家共同学习。
首先要注意的是,Kafka
中的Topic
和ActiveMQ
中的Topic
是不一样的。
在Kafka
中,Topic
是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到Kafka
集群的消息都有一个类别。
物理上来说,不同的Topic
的消息是分开存储的,每个Topic
可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息。
每个Topic
可以划分多个分区**(每个Topic
至少有一个分区)**,同一Topic
下的不同分区包含的消息是不同的。
每个消息在被添加到分区时,都会被分配一个offset
,它是消息在此分区中的唯一编号,Kafka
通过offset
保证消息在分区内的顺序。
offset
的顺序不跨分区,即Kafka
只保证在同一个分区内的消息是有序的。
消息是每次追加到对应的Partition
的后面:
2. Topic & Partition的存储
Topic
是一个逻辑上的概念,具体的存储还是基于Partition
来的。
创建一个test2 Topic
(注意这里的 partitions 参数为 3):
可以进入/tmp/kafka-logs
目录下进行查看(当前机器IP 192.168.220.135
),如下图所示:
在另外一台136
机器上:
可以发现:
-
在
135
机器上有test2-0
和test2-2
-
在
136
机器上有test2-1
。
接下来,再结合Kafka
的消息分发策略来看。
3. Kafka的消息分发
Kafka
中最基本的数据单元就是消息,而一条消息其实是由Key + Value
组成:
-
Key
是可选项,可传空值 -
Value
也可以传空值
这也是与ActiveMQ
不同的一个地方。
在发送一条消息时,我们可以指定这 Key
,那 Producer
会根据Key
和partition
机制来判断当前这条消息应该发送并存储到哪个partition
中(这个就跟分片机制类似)。
我们可以根据需要进行扩展Producer
的partition
机制 (默认算法是hash
取%
)。
如下扩展自己的partition
代码所示:
/**
* 消息发送后会调用自定义的策略
*
* @author super先生
* @date 2023/2/10 14:20
*/
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取当前 topic 有多少个分区(分区列表)
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int partitionNum = 0;
if (key == null) { //之前介绍过 Key 是可以传空值的
partitionNum = new Random().nextInt(partitions.size()); //随机
} else {
//取 %
partitionNum = Math.abs((key.hashCode()) % partitions.size());
}
System.out.println("key:" + key + ",value:" + value + ",partitionNum:" + partitionNum);
//发送到指定分区
return partitionNum;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
生产者和消费者代码可参考我的博文:实现kafka的生产者(Producer)和消费者(Consumer)的代码
如下创建kafka
生产者(Producer
)的代码:
/**
* @author super先生
* @date 2023/2/10 14:40
*/
public class KafkaProducerDemo extends Thread {
/**
* 消息发送者
*/
private final KafkaProducer<Integer, String> producer;
/**
* topic
*/
private final String topic;
private final Boolean isAsync;
public KafkaProducerDemo(String topic, Boolean isAsync) {
this.isAsync = isAsync;
//构建相关属性
//@see ProducerConfig
Properties properties = new Properties();
//Kafka 地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.220.135:9092,192.168.220.136:9092");
//kafka 客户端 Demo
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
//The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent.
/**发送端消息确认模式:
* 0:消息发送给broker后,不需要确认(性能较高,但是会出现数据丢失,而且风险最大,因为当 server 宕机时,数据将会丢失)
* 1:只需要获得集群中的 leader节点的确认即可返回
* -1/all:需要 ISR 中的所有的 Replica进行确认(集群中的所有节点确认),最安全的,也有可能出现数据丢失(因为 ISR 可能会缩小到仅包含一个 Replica)
*/
properties.put(ProducerConfig.ACKS_CONFIG, "-1");
/**【调优】
* batch.size 参数(默认 16kb)
* public static final String BATCH_SIZE_CONFIG = "batch.size";
*
* producer对于同一个 分区 来说,会按照 batch.size 的大小进行统一收集进行批量发送,相当于消息并不会立即发送,而是会收集整理大小至 16kb.若将该值设为0,则不会进行批处理
*/
/**【调优】
* linger.ms 参数
* public static final String LINGER_MS_CONFIG = "linger.ms";
* 一个毫秒值。Kafka 默认会把两次请求的时间间隔之内的消息进行搜集。相当于会有一个 delay 操作。比如定义的是1000(1s),消息一秒钟发送5条,那么这 5条消息不会立马发送,而是会有一个 delay操作进行聚合,
* delay以后再次批量发送到 broker。默认是 0,就是不延迟(同 TCP Nagle算法),那么 batch.size 也就不生效了
*/
//linger.ms 参数和batch.size 参数只要满足其中一个都会发送
/**【调优】
* max.request.size 参数(默认是1M) 设置请求最大字节数
* public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
* 如果设置的过大,发送的性能会受到影响,同时写入接收的性能也会受到影响。
*/
//设置 key的序列化,key 是 Integer类型,使用 IntegerSerializer
//org.apache.kafka.common.serialization
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
//设置 value 的序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//指定分区策略
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"superJson.kafka.partition.MyPartitioner");
//构建 kafka Producer,这里 key 是 Integer 类型,Value 是 String 类型
producer = new KafkaProducer<Integer, String>(properties);
this.topic = topic;
}
public static void main(String[] args) {
new KafkaProducerDemo("test2",true).start();
}
@Override
public void run() {
int num = 0;
while (num < 100) {
String message = "message--->" + num;
System.out.println("start to send message 【 " + message + " 】");
if (isAsync) { //如果是异步发送
producer.send(new ProducerRecord<Integer, String>(topic, message), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata!=null){
System.out.println("async-offset:"+metadata.offset()+"-> partition"+metadata.partition());
}
}
});
} else { //同步发送
try {
RecordMetadata metadata = producer.send(new ProducerRecord<Integer, String>(topic, message)).get();
System.out.println("sync-offset:"+metadata.offset()+"-> partition"+metadata.partition());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
num++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
实现kafka
的消费者(Consumer
)中在接收消息的时候输出分区,如下代码所示:
/**
* @author super先生
* @date 2023/2/10 15:10
*/
public class KafkaConsumerDemo extends Thread {
private final KafkaConsumer<Integer, String> kafkaConsumer;
public KafkaConsumerDemo(String topic) {
//构建相关属性
//@see ConsumerConfig
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.220.135:9092,192.168.220.136:9092");
//消费组
/**
* consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是
一个组,那么组内必然可以有多个消费者或消费者实例((consumer instance),
它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订
阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一
个消费组内的一个consumer来消费.后面会进一步介绍。
*/
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo");
/** auto.offset.reset 参数 从什么时候开始消费
* public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
*
* 这个参数是针对新的groupid中的消费者而言的,当有新groupid的消费者来消费指定的topic时,对于该参数的配置,会有不同的语义
* auto.offset.reset=latest情况下,新的消费者将会从其他消费者最后消费的offset处开始消费topic下的消息
* auto.offset.reset= earliest情况下,新的消费者会从该topic最早的消息开始消费
auto.offset.reset=none情况下,新的消费组加入以后,由于之前不存在 offset,则会直接抛出异常。说白了,新的消费组不要设置这个值
*/
//enable.auto.commit
//消费者消费消息以后自动提交,只有当消息提交以后,该消息才不会被再次接收到(如果没有 commit,消息可以重复消费,也没有 offset),还可以配合auto.commit.interval.ms控制自动提交的频率。
//当然,我们也可以通过consumer.commitSync()的方式实现手动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
/**max.poll.records
*此参数设置限制每次调用poll返回的消息数,这样可以更容易的预测每次poll间隔
要处理的最大值。通过调整此值,可以减少poll间隔
*/
//间隔时间
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//反序列化 key
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
//反序列化 value
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//构建 KafkaConsumer
kafkaConsumer = new KafkaConsumer<>(properties);
//设置 topic
kafkaConsumer.subscribe(Collections.singletonList(topic));
}
/**
* 接收消息
*/
@Override
public void run() {
while (true) {
//拉取消息
ConsumerRecords<Integer, String> consumerRecord = kafkaConsumer.poll(100000000);
for (ConsumerRecord<Integer, String> record : consumerRecord) {
//record.partition() 获取当前分区
System.out.println(record.partition()+"】】 message receive 【" + record.value() + "】");
}
}
}
public static void main(String[] args) {
new KafkaConsumerDemo("test2").start();
}
}
首先启动Consumer
,再启动Producer
:
可以看到是能够对的上的。
默认情况下,Kafka
采用的是hash 取 % 的分区算法
。
如果Key
为null
,则会随机分配一个分区。
这个随机是在这个参数metadata.max.age.ms
的时间范围内随机选择一个。
对于这个时间段内,如果Key
为 null,则只会发送到唯一的分区。这个值默认情况下是 10
分钟更新一次 (因为 partition
状态可能会发生变化)。
4. 关于 Metadata
Metadata
包含 Topic
和 Partition
和 broker
的映射关系,每一个 Topic
的每一个 partition
,需要知道对应的 broker
列表是什么,Leader
是谁,Follower
是谁。
这些信息都是存储在Metadata
这个类中,如下图所示:
5. 消费端如何消费指定分区
Consumer
可以指定具体消费的分区,如下图所示:
再重新启动Consumer
和Producer
:
可以看到Consumer
只消费了分区为1
的消息。
以上是单个Consumer
消费(指定)分区的情况。
一般每个Topic
都会有多个partition
(主要是用于数据分片,减少消息的容量,从而提升 I/O 性能)。
当然也可以使用多个Consumer
从而提高消费能力,有一个消费组的概念(具体可参看:一文了解kafka消息队列)
如果Consumer1、Consumer2 和 Consumer3
都属于group.id 为1
的消费组,那么消费情况如下:
-
Consumer1
就会消费p0
-
Consumer2
就会消费p1
-
Consumer3
就会消费p2
不使用指定分区的方式创建三个Consumer
:
而且,它们都是同一个消费组:
同时启动三个Consumer
和Producer
,而且它们都是同一个消费组:
可以看到三个Consumer
分别消费三个Partition
,很均匀。
对同一个Group
来说,其中的Consumer
可以消费指定分区也可以消费自动分配的分区(这里是 Consumer
数量和partition
数量一致,均匀分配)。
如果下述情况如何处理:
- 如果
Consumer
数量大于partition
数量呢? - 如果
Consumer
数量小于partition
数量呢?
这两种情况,读者可自行测试。
但要注意如下情况:
-
如果
Consumer
数量比partition
数量多,会有的Consumer
闲置无法消费,这样是一个浪费。 -
如果
Consumer
数量小于partition
数量会有一个Consumer
消费多个partition
。
Kafka
在partition
上是不允许并发的。Consuemr
数量建议最好是partition
的整数倍。
还有一点,如果Consumer
从多个partiton
上读取数据,是不保证顺序性的。Kafka
只保证一个partition
的顺序性,跨partition
是不保证顺序性的。增减 Consumer、broker、partition 会导致 Rebalance。
6. Kafka 分区分配策略
在Kafka
中,同一个Group
中的消费者对于一个Topic
中的多个partition
存在一定的分区分配策略——分区分配策略有如下两种:
-
Range(默认)
策略 -
RoundRobin(轮询)
策略
通过partition.assignment.strategy
这个参数来设置。
6.1 Range strategy(范围分区)
Range
策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
假设我们有10
个分区,3
个消费者,排完序的分区将会是0,1,2,3,4,5,6,7,8,9
;消费者线程排完序将会是C1-0, C2-0, C3-0
。
然后partitions
的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
假如在 Topic1
中有10
个分区,3
个消费者线程,10/3 = 3
,而且除不尽,那么消费者线程C1-0
将会多消费一个分区,所以最后分区分配的结果是这样的:
-
C1-0
将消费0,1,2,3
分区 -
C2-0
将消费4,5,6
分区 -
C3-0
将消费7,8,9
分区
假如在Topic1
中有11
个分区,那么最后分区分配的结果看起来是这样的:
-
C1-0
将消费0,1,2,3
分区 -
C2-0
将消费4, 5, 6, 7
分区 -
C3-0
将消费8,9,10
分区
假如有两个Topic:Topic1 和 Topic2
,都有10
个分区。那么,最后分区分配的结果看起来是这样的:
-
C1-0
将消费Topic1
的0,1,2,3
分区和Topic1
的0,1,2,3
分区 -
C2-0
将消费Topic1
的4,5,6
分区和Topic2
的4,5,6
分区 -
C3-0
将消费Topic1
的7,8,9
分区和Topic2
的7,8,9
分区
其实这样就会有一个问题,C1-0
就会多消费两个分区,这就是一个很明显的弊端。
6.2 RoundRobin strategy(轮询分区)
轮询分区策略是把所有partition
和所有Consumer
线程都列出来,然后按照hashcode
进行排序。
最后通过轮询算法分配partition
给消费线程。如果所有Consumer
实例的订阅是相同的,那么partition
会均匀分布。
假如按照hashCode
排序完的Topic / partitions
组依次为:
T1一5
T1一3
T1-0
T1-8
T1-2
T1-1
T1-4
T1-7
T1-6
T1-9
消费者线程排序为:
C1-0
C1-1
C2-0
C2-1
最后的分区分配的结果为:
-
C1-0
将消费T1-5, T1-2, T1-6
分区 -
C1-1
将消费T1-3, T1-1, T1-9
分区 -
C2-0
将消费T1-0, T1-4
分区 -
C2-1
将消费T1-8, T1-7
分区
使用轮询分区策略必须满足两个条件:
-
每个主题的消费者实例具有相同数量的流
-
每个消费者订阅的主题必须是相同的
1. 什么时候会触发这个策略呢?
当出现以下几种情况时,Kafka
会进行一次分区分配操作,也就是Kafka Consumer
的Rebalance
-
同一个
Consumer group
内新增了消费者 -
消费者离开当前所属的
Consumer group
,比如主动停机或者宕机 -
Topic
新增了分区(也就是分区数量发生了变化)
Kafka Consuemr
的Rebalance
机制规定了一个Consumer group
下的所有Consumer
如何达成一致来分配订阅Topic
的每个分区。
而具体如何执行分区策略,就是前面提到过的两种内置的分区策略。而Kafka
对于分配策略这块,提供了可插拔的实现方式,也就是说,除了这两种之外,我们还可以创建自己的分配机制。
2. 谁来执行Rebalance
以及管理Consumer
的group
呢?
Consumer group
如何确定自己的coordinator
是谁呢,消费者向Kafka
集群中的任意一个broker
发送一个 GroupCoord inatorRequest
请求,服务端会返回一个负载最小的broker
节点的id
,并将该broker
设置为 coordinator
。
3. JoinGroup
的过程
在Rebalance
之前,需要保证coordinator
是已经确定好了的,整个Rebalance
的过程分为两个步骤,Join和Syncjoin
:表示加入到Consumer group
中,在这一步中,所有的成员都会向coordinator
发送 joinGroup
的请求。
一旦所有成员都发了joinGroup
请求,那么coordinator
会选择一个Consumer
担任leader
角色,并把组成员信息和订阅信息发送给消费者:
-
protocol-metadata
:序列化后的消费者的订阅信息 -
leader id
:消费组中的消费者,coordinator
会选择一个作为leader
,对应的就是member id
-
member metadata
:对应消费者的订阅信息 -
members
:consumer group
中全部的消费者的订阅信息 -
generation_id
:年代信息,类似于ZooKeepe
中的epoch
,对于每一轮Rebalance
,generation_id
都会递增。主要用来保护consumer group
,隔离无效的offset
提交。也就是上一轮的consumer
成员无法提交offset
到新的Consumer group
中。
4. Synchronizing Group State
阶段
完成分区分配之后,就进入了Synchronizing Group Stat
阶段,主要逻辑是向GroupCoordinator
发送SyncGroupRequest
请求,并且处理SyncGroupResponse
响应,简单来说,就是leader
将消费者对应的 partition
分配方案同步给Consumer group
中的所有Consumer
:
每个消费者都会向coordinator
发送syncgroup
请求,不过只有leader
节点会发送分配方案,其他消费者只是打打酱油而已。
当leader
把方案发给coordinator
以后,coordinator
会把结果设置到SyncGroupResponse
中。这样所有成员都知道自己应该消费哪个分区。文章来源:https://www.toymoban.com/news/detail-823649.html
Consumer group
的分区分配方案是在客户端执行的!Kafka
将这个权利下放给客户端主要是因为这样做可以有更好的灵活性。文章来源地址https://www.toymoban.com/news/detail-823649.html
7.参考文献
- https://dongguabai.blog.csdn.net/article/details/86536894
到了这里,关于全网最详细地理解Kafka中的Topic和Partition以及关于kafka的消息分发、服务端如何消费指定分区、kafka的分区分配策略(range策略和RoundRobin策略)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!