1.概述
1.1 消息队列
目 前 企 业 中 比 较 常 见 的 消 息 队 列 产 品 主 要 有
- Kafka(在大数据场景主要采用 Kafka 作为消息队列。)
- ActiveMQ
- RabbitMQ
- RocketMQ
1.1.1 传统消息队列的应用场景
传统的消息队列的主要应用场景包括:缓存/消峰、解耦和异步通信。
- 缓冲/消峰:
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
- 解耦:
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
- 异步通信:
允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。
1.1.2 消息队列的两种模式
- 点对点模式
- 发布/订阅模式
一、点对点模式
消费者主动拉取数据,消息收到后清除消息
二、发布/订阅模式
- 可以有多个topic主题(浏览、点赞、收藏、评论等)
- 消费者消费数据之后,不删除数据
- 每个消费者相互独立,都可以消费到数据
1.2 kafka基础结构
- Producer:消息生产者,就是向 Kafka broker 发消息的客户端。
- Consumer:消息消费者,向 Kafka broker 取消息的客户端。
- Consumer Group(CG):消费者组,由多个 consumer 组成。
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。 - Broker:一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个broker 可以容纳多个 topic。
- Topic:可以理解为一个队列,生产者和消费者面向的都是一个 topic。
- Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
- Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower。
- Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。
- Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。
2.kafka的快速入门
2.1 集群部署
2.1.1 安装java
一、上传并解压安装包
1.将资料中的jdk-8u361-linux-x64.tar.gz上传到/usr/local/java目录下
mkdir /usr/local/java/
tar -zxvf jdk-8u361-linux-x64.tar.gz
二、配置环境变量
export JAVA_HOME=/usr/local/java/jdk1.8.0_361
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
三、使环境变量生效
source /etc/profile
四、检查java版本
java -version
2.1.2 部署zookeeper集群
一、上传并解压安装包
1.将资料中的apache-zookeeper-3.7.1-bin.tar.gz上传到/usr/local目录下
cd /usr/local
2.解压安装包到本地文件夹
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz
3.修改文件名称
mv apache-zookeeper-3.7.1-bin/ zookeeper
二、修改配置
1.进入zookeeper目录下的conf目录,将目录中的zoo_sample.cfg改成zoo.cfg
cd zookeeper/conf/
mv zoo_sample.cfg zoo.cfg
2.在zookeeper目录下新建一个zkData文件夹, 作为数据文件存储目录
cd /usr/local/zookeeper/
mkdir zkData
3.回到zoo.cfg, 将dataDir的路径换成我们刚刚新建的zkData的路径
dataDir=/usr/local/zookeeper/zkData
三、启动和关闭
进入bin目录下,启动服务端
./zkServer.sh start
进入bin目录下,关闭服务端
./zkServer.sh stop
四、集群部署_生成myid
1.在/zkData下新建myid文件, 并写入对应的server.num中的num数字
echo 1 > myid
2.依次给其他两个服务器上的zookeeper设置num,分别是2和3
五、集群部署_配置集群中的各个节点
1.回到zoo.cfg, 将其他几个节点配置进来
server.1=192.168.202.128:2888:3888
server.2=192.168.202.130:2888:3888
server.3=192.168.202.131:2888:3888
六、分别启动三台zookeeper服务
进入 /bin 路径下,执行如下命令,启动 zookeeper 服务:
./zkServer.sh start
七、查看zk集群状态
可以看到哪个zk是leader,哪些是follow
./bin/zkServer.sh status ./conf/zoo.cfg
八、验证是否搭建成功
在命令行中输入:./zkCli.sh -server 127.0.0.1:2181,即可连接到本机 ZooKeeper 服务器。其他自动实现同步,客户端只需要和一台保持连接即可。出现如下语句表示链接成功:
注意:关闭其他linux的防火墙
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[zk: 127.0.0.1:2181(CONNECTED) 0]
2.1.3 部署kafka集群
一、集群规划
我们在三个服务器上分别部署zk和kafka
linux_1 | linux_2 | linux_3 |
---|---|---|
zk | zk | zk |
kafka | kafka | kafka |
二、上传并解压安装包
1.将资料中的kafka_2.12-3.0.0.tgz上传到/usr/local目录下
cd /usr/local
2.解压安装包到本地文件夹
tar -zxvf kafka_2.12-3.0.0.tgz
3.修改文件名称
mv kafka_2.12-3.0.0/ kafka
三、修改kafka配置文件
1.进入到kafka 目录, 修改配置文件
cd config/
vim server.properties
2.修改以下内容:
#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0
#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以
配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/usr/local/kafka/datas
# topic 在当前broker上的分片个数,与broker保持一致
num.partitions=3
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=192.168.202.128:2181,192.168.202.130:2181,192.168.202.131:2181
3.依次修改其他节点的配置, 需要保证broker.id唯一
分别修改另两个节点的broker.id=1、broker.id=2
四、启动集群
1.启动Zookeeper 集群,然后启动 Kafka。
2.依次在这三个节点上启动 Kafka。(在kafka目录下执行)
bin/kafka-server-start.sh -daemon config/server.properties
五、关闭集群
bin/kafka-server-stop.sh
注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。
Kafka启动报错:The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
处理方式:
- log.dirs配置的路径要正确
- 删除这里面的meta.properties文件
参考文章:Linux 搭建Kafka集群,最新教程,细到极致
2.2 Kafka命令行操作
2.2.1 主题命令行操作
操作主题命令行参数
bin/kafka-topics.sh
参数 | 描述 |
---|---|
--bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号。 |
--topic <String: topic> | 操作的 topic 名称。 |
--create | 创建主题。 |
--delete | 删除主题。 |
--alter | 修改主题。 |
--list | 查看所有主题。 |
--describe | 查看主题详细描述。 |
--partitions <Integer: # of partitions> | 设置分区数。 |
--replication-factor<Integer: replication factor> | 设置分区副本。 |
--config <String: name=value> | 更新系统默认的配置。 |
一、查看当前服务器中所有topic
# 连接多个节点(集群)
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092,192.168.202.130:9092,192.168.202.131:9092 --list
二、创建first topic
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --create --partitions 1 --replication-factor 3 --topic first
- --topic first:代表创建名称为first的topic
- --partitions 1:代表这个topic只有一个分区
- --replication-factor 3:代表这个topic有三个副本
三、查看first主题的详情
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --describe --topic first
- Replicas: 2,0,1 代表副本存储在哪些节点中, first这个topic我们设置了3个副本,此处的2,0,1对应的就是这三个副本存储的位置(broker.id)
- Isr: 2,0,1 Isr代表同步副本, 此处2,0,1都属于同步副本(主副本也属于同步副本)
- Leader: 2 代表主副本是borker.id=2的节点
四、修改分区数
注意:分区数只能增加,不能减少
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --alter --topic first --partitions 3
无法通过命令行的方式修改副本
五、删除topic
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --delete --topic first
2.2.2 生产者命令行操作
操作生产者命令参数
bin/kafka-console-producer.sh
参数 | 描述 |
---|---|
-`-bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号。 |
--topic <String: topic> | 操作的 topic 名称。 |
一、发送消息
bin/kafka-console-producer.sh --bootstrap-server 192.168.202.128:9092 --topic first
>hello world
>atguigu atguigu
2.2.3 消费者命令行操作
操作消费者命令参数
bin/kafka-console-consumer.sh
参数 | 描述 |
---|---|
-`-bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号。 |
--topic <String: topic> | 操作的 topic 名称。 |
--from-beginning | 从头开始消费。 |
--group <String: consumer group id> | 指定消费者组名称。 |
一、消费first主题的数据
bin/kafka-console-consumer.sh --bootstrap-server 192.168.202.128:9092 --topic first
二、把主题中所有的数据都读取出来(包括历史数据)。
bin/kafka-console-consumer.sh --bootstrap-server 192.168.202.128:9092 --from-beginning --topic first
3 Kafka 生产者
3.1 生产者消息发送流程
3.1.1 发送原理
- KafkaProducer将消息封装成ProducerRecord, 然后通过拦截链, 根据指定序列化方式进行序列化, 其次在分区器中根据设置的分区策略进行数据分区,封装成TopicPartition, TopicPartition中就包含了目标partition的信息
- 其后分区器将消息写入RecordAccumulator进行缓冲, RecordAccumulator是一个双端队列, RecordAccumulator中维护了一个ConcurrentMap<TopicPartition, Deque<ProducerBatch>> 类型的集合, 其中的Key是TopicPartition,它用来标识目标partition(消息的最终存储位置), Value是Deque<ProducerBatch> 队列,用来缓冲发往目标 partition 的消息。
- 当Deque达到一定阈值后,就会唤醒sender线程将消息发送到kafka集群
3.1.2 生产者重要参数列表
参数名称 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所需的broker地址清单. 例如hadoop102:9092,hadoop103:9092, 可以设置1个或者多个, 中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker 信息 |
key.serializer和value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。 |
buffer.memory | RecordAccumulator 缓冲区总大小,默认 32m。
|
batch.size | 缓冲区一批数据最大值,默认16k。 适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms, 表示没有延迟。生产环境建议该值大小为 5-100ms 之间。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader 收到数据后应答。-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的。
|
max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5, 开启幂等性要保证该值是 1-5 的数字。 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms。 |
enable.idempotence | 是否开启幂等性,默认 true, 开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。 |
3.2 异步发送API
3.2.1 普通异步发送
一、在broker2中开启kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.202.130:9092 --topic first
二、生产者生产消息
public class CustomProducer {
public static void main(String[] args) {
// 1.创建kafka生产者的配置对象
Properties properties = new Properties();
// 2.给kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.202.128:9092");
// 3.key,value 序列化(必须)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 4.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 5.调用send方法, 发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
}
// 6.关闭资源
kafkaProducer.close();
}
}
观察broker2中控制台消息
3.2.2 带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
callback只有在收到ack时调用, 它可以帮我们返回:主题、分区等消息, 例如我们想看看消息最终存储到哪个topic的哪个分区中
public class CustomProducerCallback {
public static void main(String[] args) throws InterruptedException {
// 1.创建kafka生产者的配置对象
Properties properties = new Properties();
// 2.给kafka配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.202.128:9092");
// 3.key,value序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 4.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 5.调用send方法,发送消息
for (int i = 0; i < 5; i++) {
// 添加回调
kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception == null) {
// 没有异常, 输出消息到控制台
System.out.println("主题:" + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
} else {
// 出现异常打印
exception.printStackTrace();
}
}
});
// 延迟一会会看到数据发往不同分区
Thread.sleep(2);
}
// 6.关闭资源
kafkaProducer.close();
}
}
观察控制台
3.3 同步发送API
只需在异步发送的基础上,再调用一下 get()方法即可。
public class CustomProducerSync {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.创建kafka生产者的配置对象
Properties properties = new Properties();
// 2.给kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.202.128:9092");
// 3.key,value 序列化(必须)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 4.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 5.调用send方法, 发送消息
for (int i = 0; i < 10; i++) {
// 异步发送
kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
// 同步发送
kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i)).get();
}
// 6.关闭资源
kafkaProducer.close();
}
}
3.4 生产者分区
3.4.1 分区好处
- 便于合理使用存储资源
- 提供并行速度
3.4.2 生产者发送消息的分区策略
默认的分区器DefaultPartitioner
在在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。
案例一:将数据发往指定 partition
观察kafka控制台中是否接收到消息。
在 IDEA 控制台观察回调信息。
案例二:没有指明 partition 值但有 key 的情况下
将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。
1.key="a"时,在控制台查看结果。
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
2.key="b"时,在控制台查看结果。
主题:first->分区:2
主题:first->分区:2
主题:first->分区:2
主题:first->分区:2
主题:first->分区:2
3.key="f"时,在控制台查看结果。
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
3.4.3 自定义分区器
如果研发人员可以根据企业需求,自己重新实现分区器。
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,
不包含 atguigu,就发往 1 号分区。
一、定义自定义分区器
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取消息
String msgValue = value.toString();
// 创建 partition
int partition;
// 判断消息是否包含atguigu
if (msgValue.contains("atguigu")) {
partition = 0;
} else {
partition = 1;
}
// 返回分区号
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
二、使用自定义分区器
// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");
3.5 生产经验—生产者如何提高吞吐量
Deque达到一定阈值后,就会唤醒sender线程将消息发送到kafka集群, 这个阈值受两个参数影响
- batch.size:批次大小, 默认16K
当Deque中的积压的消息达到16K后, 就会唤醒sender线程将消息打包发送到同一个分区 - linger.ms:等待时间,默认为0
如果Deque中的消息一直没有达到16K, 此时会根据linger.ms设置的时间,比如设置了1秒, 那么到了这个时间(上一个批次的消息发送完成后开始计时),即使数据没有达到16K, 也会唤醒sender线程发送消息
一、linger.ms=0产生的问题
因为linger.ms默认为0, 所以来一个消息就会唤醒sender来发送消息, 这样的效率并不高(会频繁开启线程发送消息), 为了提高拉取速度的能力, 我们希望一次能发送很多消息
所以在生产环境中, 我们一般会修改linger.ms的值, 改为5~100ms, 而batch.size使用默认值即可
注意点:不能将batch.size和linger.ms设置的很大, 这样每批次消息的发送时间间隔就会很大(延迟过大)
// batch.size:批次大小,默认 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待时间,默认 0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
3.6 生产经验—数据可靠性
为了保证producer发送的数据, 能可靠的发送到指定的partition, kafka为producer提供了消息确认机制(ack)
3.6.1 消息确认机制(ack)
- ack = 0
- ack = 1
- ack = -1或者all
一、ack=0
生产者发送消息后, 不需要等待任何来自服务器的响应
- 优点:
- 1.不需要等待服务器的响应, 所以可以以网络能够支持的最大速度发送消息, 从而达到很高的吞吐量
- 2.producer不管发送成不成功,只发送一次就不再发送了, 至少保证消息不会被重复消费
- 缺点:
- 如果当中出现问题,导致服务器没有收到消息, 没有落盘到partition,生产者无从得知,会造成消息丢失
二、ack=1
生产者发送消息后, 等待分区的leader收到数据后应答
- leader节点收到了消息, 生产者就会收到服务器的成功响应.(代表消息发送成功)
- leader节点没有收到消息, 生产者就会收到服务器的错误响应,为了避免数据丢失, 生产者会重发消息
存在的问题:如果leader落盘成功了, 向producer也收到了成功响应, 但是还没来得及将消息同步副本(follower), 此时leader挂了, 此时服务器会从follower中推选新的leader, 新的leader并没有同步消息, 而producer也不会再发了, 此时消息就丢失了
三、ack=-1或者all
生产者发送过来的数据, leader和ISR队列里面的所有节点都落盘成功后, 进行应答
producer只有收到分区中所有副本的成功写入通知来认为推送消息成功了
3.6.2 ISR机制
思考:如果leader在同步数据时, 有一个follow挂了, 迟迟不能与leader进行同步, 这个问题怎么解决?
一、ISR的概念
leader维护了一个动态的in-sync replica set(ISR), 意指跟leader保持同步的follower + leader集合, 例如我们之前看到的(leader:0, isr:0,1,2)
二、ISR的剔除
如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms
参数设定,默认30s。例如2超时后,会显示(leader:0, isr:0,1)。
这样就不用等长期联系不上或者已经故障的节点
3.6.3 数据完全可靠条件
如果分区的副本设置为1(即只有leader没有follower), 或者ISR中应答的最小副本数量(min.insync.replicas 默认为1)设置为1, 这种情况下就和ack=1时效果是一样的, 存在数据丢失问题(leader:0, isr:0)
数据完全可靠条件
- 分区副本大于等于2(除了leader以外, 存在至少一个follow副本)
- ACK级别设置为-1(保证ISR中所有节点都存入消息)
- min.insync.replicas >=2
ISR里应答的最小副本数量大于等于2(ISR中的数量至少有两个, 否则broker不处理这条消息, 并直接给生产者报错)
min.insync.replicas = n,代表的语义是,如果生产者acks=all,而在发送消息时,Broker的ISR数量没有达到n,Broker不能处理这条消息,需要直接给生产者报错。
3.6.4 可靠性总结
- acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
- acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
- acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
生产环境的使用场景:
- acks=0很少使用;
- acks=1,一般用于传输普通日志,允许丢个别数据
- acks=-1或者all,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
3.6.5 生产者重复发送消息
消息重复存在几个场景
- 生产端:服务器响应失败后, 基本的解决措施就是重发消息
- 消费端: poll 一批数据,处理完毕还没提交 offset ,机子宕机重启了,又会 poll 上批数据,再度消费就造成了消息重复。
一、生产端重复发送消息的一种情况
Leader收到数据后, 将数据落盘, 并将数据同步到follower, 此时在给Producer应答时Leader宕机了, 此时Producer就会收到服务器传来的响应失败, 重新发送消息, 服务器会重新挑选一个follower成为leader, 而这个新的leader其实已经落盘了消息
3.7 生产经验—数据去重
3.7.1 数据传递语义
-
至少一次(At Least Once)
-
什么是至少一次
: 生产者发送到kafka集群, 至少kafka集群能收到一次数据 -
如何保证至少一次
: ACK级别设置为-1或者all + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2 -
至少一次会产生的问题
: kafka集群重复收到数据的问题, 即可以保证数据不丢失,但是不能保证数据不重复
-
什么是至少一次
-
最多一次(At Most Once)
-
什么是最多一次
: 生产者发送到kafka集群, 不论成功与否, 只会发送一次 -
如何保证最多一次
: ACK级别设置为0 -
最多一次会产生的问题
: 无法保证数据是否落盘, 即可以保证数据不重复,但是不能保证数据不丢失
-
什么是最多一次
-
精确一次(Exactly Once)
-
什么是精确一次
: 数据既不会丢失, 也不会重复发送 -
如何保证精确一次
: Kafka 0.11版本以后,引入了一项重大特性:幂等性
和事务
, 通过这两点来保证严格一次
-
什么是精确一次
3.7.2 幂等性
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
一、幂等性原理
精确一次(Exactly Once) = 幂等性 + 至少一次(ack=-1或者all + 分区副本数>=2 + ISR最小副本数量>=2)
二、重复数据的判断标准
一个消息会被封装成TopicPartition, TopicPartition中记录了以下几个信息:PID、Partition、SeqNumber; 重复数据的判断标准:具有PID, Partition, SeqNumber相同主键的消息提交时,Broker只会持久化一条。
- PID是每个Producer在初始化时分配的一个唯一ID, 对于一个PID来说, Sequence Number是从0开始自增
- Partition 表示分区的标识
- Sequence Number是Producer在发送消息时, 会给每一条消息标识Sequence Number, 同一条消息被重复发送时, Sequence Number是不会递增
三、幂等性的条件
- 只能保证Producer在单个会话内不丢不重, 如果producer出现意外挂掉了再重启是无法保证幂等性, 因为PID已经改变了(单会话)
- 幂等性无法跨域多个topic-partition, 只能保证单个partition内的幂等性(单分区)
所以幂等性只能保证的是在单分区单会话内不重复。
四、如何使用幂等性
enable.idempotence被设置成true后, Producer自动升级成幂等性Producer,其他所有的代码逻辑都不需要改变。(enable.idempotence默认为true, 不需要手动开启)
properties.put(“enable.idempotence”, ture)
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
3.7.3 生产者事务
开启事务, 必须开启幂等性(即enable.idempotence设置为true), 不需要保证精确一次(幂等性 + 至少一次)
一、生产者事务的概念
Kafka事务是2017年Kafka 0.11.0.0引入的新特性。类似于数据库的事务。Kafka事务指的是消费者提交以及生产者生产消息offset的操作可以在一个原子操作中,要么都成功,要么都失败。
二、事务操作的API
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
三、事务编程流程
- 1.设置事务id
- 2.初始化事务
- 3.开启事务
- 4.运行结束 > 提交事务
- 5.运行失败 > 回滚事务
3.8 生产经验—数据有序
kafka只能保证单分区数据有序, 多分区时, 分区与分区间无序
3.9 生产经验—数据乱序
- kafka在1.x版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection=1
(不需要考虑是否开启幂等性)。 - kafka在1.x及以后版本保证数据单分区有序,条件如下:
- 未开启幂等性
max.in.flight.requests.per.connection
需要设置为1。 - 开启幂等性
max.in.flight.requests.per.connection
需要设置小于等于5。
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。(原理是使用了幂等性的Sequence Number, 连续5个消息会自动根据Sequence Number排序)
- 未开启幂等性
4.Kafka Broker
4.1 Kafka Broker工作流程
4.1.1 Zookeeper存储的 Kafka 信息
一、启动zookeeper客户端
bin/zkCli.sh
二、通过 ls 命令可以查看 kafka 相关信息。
ls /kafka
4.1.2 Kafka Broker总体工作流程
一、controller的概念
在Kafka集群中,某个Broker将被选举出来担任一种特殊的角色,其用于管理和协调Kafka集群,即管理集群中的所有分区的状态并执行相应的管理操作。每个Kafka集群任意时刻都只能有一个Controller。当集群启动时,所有Broker都参与Controller的竞选,最终有一个胜出,一旦Controller在某个时刻崩溃,集群中的其他的Broker会收到通知,然后开启新一轮的Controller选举,新选举出来的Controller将承担起之前Controller的所有工作。
二、controller的作用
- 维护每台Broker上的分区副本信息
- 维护每个分区的Leader副本信息
三、controller为每个分区选举leader
选举规则:在isr中存活为前提, 按照AR中排在前面的优先, 例如AR[1, 0, 2], ISR[1, 2], 那么leader会按照1,2的顺序轮巡
对于topicA的partition0这个分区,它选举出broker1作为leader, 而broker0、broker2作为follower, controller会把这个信息告诉zookeeper(将节点信息上传到zookeeper),这是为了防止controller挂了后, 新的controller不知道主副本信息
4.1.2 Kafka Broker上线与下线
模拟 Kafka 上下线,Zookeeper 中数据变化
一、查看/kafka/brokers/ids 路径上的节点。
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids
[0, 1, 2]
二、查看/kafka/controller 路径上的数据。
[zk: localhost:2181(CONNECTED) 15] get /kafka/controller
{"version":1,"brokerid":0,"timestamp":"1637292471777"}
三、查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。
显示first这个topic中id为0的partitions的情况
[zk:localhost:2181(CONNECTED)16] get/kafka/brokers/topics/first/partitions/0/state
{"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18,"isr":[0,1,2]}
4.1.3 Broker 重要参数
参数名称 | 描述 |
---|---|
replica.lag.time.max.ms | ISR中,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值,默认30s。 |
auto.leader.rebalance.enable | 默认是 true。 自动Leader Partition 平衡。 |
leader.imbalance.per.broker.percentage | 默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。 |
leader.imbalance.check.interval.seconds | 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。 |
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G |
log.index.interval.bytes | 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 |
log.retention.hours | Kafka 中数据保存的时间,默认 7 天。 |
log.retention.minutes | Kafka 中数据保存的时间,分钟级别,默认关闭。 |
log.retention.ms | Kafka 中数据保存的时间,毫秒级别,默认关闭。 |
log.retention.check.interval.ms | 检查数据是否保存超时的间隔,默认是 5 分钟。 |
log.retention.bytes | 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。 |
log.cleanup.policy | 默认是 delete,表示所有数据启用删除策略;如果设置值为 compact,表示所有数据启用压缩策略。 |
num.io.threads | 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。 |
num.replica.fetchers | 副本拉取线程数,这个参数占总核数的 50%的 1/3 |
num.network.threads | 默认是 3。数据传输线程数,这个参数占总核数的50%的 2/3 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。 |
4.2 生产经验—节点服役和退役
4.2.1 服役新节点(todo)
4.2.2 退役旧节点(todo)
4.3 Kafka副本
4.3.1 副本基本信息
一、kafka副本的作用
- kafka副本的作用:提高数据的可靠性
- kafka默认副本1个, 生产环境一般配置2个, 保证数据可靠性; 太多副本会增加磁盘存储空间, 增加网络上传数据传输, 降低效率
- kafka中副本分为:Leader和Follower, kafka生产者只会把数据发往Leader, 然后follower自己找leader进行数据同步
- kafka分区中所有的副本统称为AR(Assigned Repllicas), AR = ISR + OSR
- ISR: 表示和 Leader 保持同步的 Follower 集合。
如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。 - OSR: 表示 Follower 与 Leader 副本同步时,延迟过多的副本。
- ISR: 表示和 Leader 保持同步的 Follower 集合。
4.3.2 Leader选举流程
一、Controller Leader
Kafka 集群中有一个broker的Controller会被选举为Controller Leader,负责管理集群broker 的上下线,所有topic的分区副本分配和Leader选举等工作。(Controller的信息同步工作是依赖于Zookeeper的)
1.创建一个新的topic, 设置4个分区, 4个副本
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --create --topic atguigu1 --partitions 4 --replication-factor 4
Created topic atguigu1.
2.查看这4个分区的leander分布情况
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --describe --topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
Topic: atguigu1Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
Topic: atguigu1Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
Topic: atguigu1Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,3
3.停止其中一个kafka的进程, 查看Leader分区的状况
其中我们停到了Broker.id=3的节点, 我们发现分区0的leader原本是节点3. 但是因为节点3挂了, 所以ISR中重新找到节点0作为leader
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0
4.停止Broker.id=2的节点, 并查看 Leader 分区情况
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0
5.重新启动关停的kafka节点, 查看leader分区的状况
我们发现leader已经选举成功后, 并不会因为节点的重启再去选举一次
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3,2
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3,2
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3,2
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3,2
4.3.3 Leader和Follower故障处理细节(todo)
4.3.4 分区副本分配
如果kafka服务器只有4个节点, 那么设置kafka的分区数 > 服务器台数, 在kafka底层是如何分配存储副本的呢?
1.创建16个分区, 3个副本
创建一个新的 topic,名称为 second。
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --create --partitions 16 --replication-factor 3 --topic second
2.查看分区和副本情况
Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
4.3.5 手动调整分区副本存储
在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。
需求:创建一个新的topic,4个分区,两个副本,名称为three。将该topic的所有副本都存储到broker0和broker1两台服务器上。
1.创建一个新的 topic,名称为 three。
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --create --partitions 4 --replication-factor 2 --topic three
2.创建副本存储计划
所有副本都指定存储在 broker0、broker1 中
vim increase-replication-factor.json
输入如下内容:
{
"version":1,
"partitions":[
{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}
]
}
3.执行副本存储计划文章来源:https://www.toymoban.com/news/detail-676707.html
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.202.128:9092 --reassignment-json-file increase-replication-factor.json --execute
4.3.6 生产经验—Leader Partition 负载平衡(todo)
正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。文章来源地址https://www.toymoban.com/news/detail-676707.html
4.3.7 生产经验—增加副本因子(todo)
4.4 文件存储
4.4.1 文件存储机制(todo)
4.4.2 文件清洗策略(todo)
4.5 高效读写数据
到了这里,关于Kafka(生产者)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!