Kafka概述
定义
Kafka传统定义: Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
发布/订阅:消息的发布者不会将消息直接发布给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
Kafka最新定义:Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能的数据管道、流分析、数据集成和关键任务应用。
消息队列
目前企业中比较常见的消息队列产品主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。
在大数据场景主要采用Kafka作为消息队列。在JavaEE开发中主要采用ActiveMQ、RabbitMQ、RocketMQ。
目录结构分析
- bin:Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等
- config:Kafka的所有配置文件
- libs: 运行Kafka所需要的所有JAR包
- logs: Kafka的所有日志文件,如果Kafka出现一些问题,需要到该目录中去查看异常信息
- site-docs: Kafka的网站帮助文件
传统消息队列的应用场景
传统的消息队列的主要应用场景包括**:缓存/消峰、解耦和异步通信**。
缓冲/消峰: 有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后再需要的时候再去处理它们。
消息队列的两种模式
点对点模式
消费者主动拉去数据,消息收到后清除消息
发布/订阅模式
- 可以有多个topic主题(浏览,点赞,收藏,评论等)
- 消费者消费数据之后,不删除数据
- 每个消费者互相独立,都可以消费到数据
Kafka基础架构
1、为方便扩展,并提高吞吐量,一个topic分为多个partition
2、配合分区的设计,提出消费者组的概念,组内每个消费者并行消费
3、为提高可用性,为每个partition增加若干副本,类似NameNode HA
4、ZK中记录谁是leader,Kafka2.8.0 以后也可以配置不采用ZK.
-
Producer:消息生产者,就是向Kafka broker 发消息的客户端。
-
Consumer:消息消费者,向Kafka broker 取消息的客户端。
-
Consumer Group(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
-
Broker:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
-
Topic: 可以理解为一个队列,生产者和消费者面向的都是一个topic。
-
Partition: 为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
-
Replica:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower。
-
Leader:每个分区多个副本的 “主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
-
Follower:每个分区多个副本中的 “从”,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个Follower会成为新的 Leader。
Kafka快速入门
安装部署
集群规划
Hadoop102 | Hadoop103 | Hadoop104 |
---|---|---|
zk | zk | zk |
kafka | kafka | kafka |
集群部署
-
docker部署zk集群:参考《zk全解》
-
进入到/usr/local/kafka目录,修改配置文件
vim server.properties #broker 的全局唯一编号,不能重复,只能是数字。 broker.id=0 #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘 IO 的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以 # 配置多个磁盘路径,路径与路径之间可以用","分隔 log.dirs=/opt/module/kafka/datas # 监听所有网卡地址,允许外部端口连接 listeners=PLAINTEXT://0.0.0.0:9092 #topic 在当前 broker 上的分区个数 num.partitions=1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 # 每个 topic 创建时的副本数,默认时 1 个副本 offsets.topic.replication.factor=1 #segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 #每个 segment 文件的大小,默认最大 1G log.segment.bytes=1073741824 # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms=300000 #配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理) zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
可以提前在hosts文件中配置master,slave1,slave2的ip,之前在学习k8s的时候我已经配置过了,可以直接拿来用。
listeners=PLAINTEXT://0.0.0.0:9092
,默认情况下,advertised.listeners不设置的话,则默认使用listeners的属性,然而advertised.listeners是不支持0.0.0.0的,所以需要指定暴露的监听器,如下listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://虚拟机ip:9092
-
将安装包拷贝到其他服务器
-
分别在hadoop103和hadoop104 上修改配置文件
/opt/module/kafka/config/server.properties
中的broker.id=1
、broker.id=2
-
配置环境变量
- 在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置
sudo vim /etc/profile.d/my_env.sh 增加如下内容: #KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin
这里我将kafka直接放在了根目录下的一个文件夹,更加方便:
- 刷新一下环境变量。
source /etc/profile
- 分发环境变量文件到其他节点,并 source。
sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh source /etc/profile source /etc/profile
-
分别启动kafka:
bin/kafka-server-start.sh -daemon config/server.properties
- 如果遇到cluser_id不符合的问题,直接将日志文件删除重新启动即可。
集群启停脚本
- 脚本如下,
#! /bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -
daemon /opt/module/kafka/config/server.properties"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
done
};;
esac
- 添加执行权限
chmod +x kf.sh
- 启动集群命令
kf.sh start
- 停止集群命令
kf.sh stop
Kafka命令行操作
Kafka基础架构
主题命令行操作
-
查看操作主题命令参数
./bin/kafka-topics.sh
-
查看当前服务器中的所有topic
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
-
创建
first topic
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic first
选项说明:
- –topic 定义 topic 名
- –replication-factor 定义副本数
- –partitions 定义分区数
-
查看
first
主题的详情./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe
-
修改分区数(注意:分区数只能增加,不能减少)
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic first --partitions 3
-
查看结果:
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe Topic: first TopicId: _Pjhmn1NTr6ufGufcnsw5A PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: first Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: first Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: first Partition: 2 Leader: 0 Replicas: 0 Isr: 0
-
删除
topic
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first
生产者命令行操作
-
查看操作者命令参数
./bin/kafka-console-producer.sh
-
发送消息
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first >hello world >yooome yooome
消费者命令行操作
-
查看操作消费者命令参数
./bin/kafka-console-consumer.sh
-
消费消息
- 消费
first
主题中的数据:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
- 把主题中所有的数据都读取出来(包括历史数据)。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first
- 消费
kafka可视化工具
官网:https://www.kafkatool.com/download.html
Kafka重要概念
broker
- 一个Kafka的集群通常由多个broker组成,这样才能实现负载均衡、以及容错
- broker是无状态(Sateless)的,它们是通过ZooKeeper来维护集群状态
- 一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不影响性能
zookeeper
-
ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)
-
ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker。
-
Kafka正在逐步想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据
producer(生产者)
生产者负责将数据推送给broker的topic
consumer(消费者)
消费者负责从broker的topic中拉取数据,并自己进行处理
consumer group(消费者组)
- consumer group是kafka提供的可扩展且具有容错性的消费者机制
- 一个消费者组可以包含多个消费者
- 一个消费者组有一个唯一的ID(group Id)
- 组内的消费者一起消费主题的所有分区数据
分区(Partitions)
在Kafka集群中,主题被分为多个分区
副本(Replicas)
副本可以确保某个服务器出现故障时,确保数据依然可用,在Kafka中,一般都会设计副本的个数>1,
主题(Topic)
- 主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
- Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数量的主题,没有数量上的限制
- 在主题中的消息是有结构的,一般一个主题包含某一类消息
- 一旦生产者发送消息到主题中,这些消息就不能被更新(更改)
偏移量(offset)
- offset记录着下一条将要发送给Consumer的消息的序号
- 默认Kafka将offset存储在ZooKeeper中
- 在一个分区中,消息是有顺序的方式存储着,每个在分区的消费都是有一个递增的id。这个就是偏移量offset
- 偏移量在分区中才是有意义的。在分区之间,offset是没有任何意义的
消费者组
-
Kafka支持有多个消费者同时消费一个主题中的数据。
-
同时运行两个消费者,我们发现,只有一个消费者程序能够拉取到消息。想要让两个消费者同时消费消息,必须要给test主题,添加一个分区。
-
设置 test topic为2个分区
bin/kafka-topics.sh --zookeeper 192.168.88.100:2181 -alter --partitions 2 --topic test
Kafka生产者
生产者消息发送流程
发送原理
在消息发送的过程中,涉及到了两个线程 — main 线程和Sender线程。在main线程中创建了一个双端队列 RecordAccumulator。main线程将消息发送给ResordAccumlator,Sender线程不断从 RecordAccumulator 中拉去消息发送到 Kafka Broker。
生产者重要参数列表
异步发送API
普通异步发送
- 需求:创建Kafka生产者,采用异步的方式发送到Kafka Broker。
2、代码编程go get github.com/Shopify/sarama
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "first"
msg.Value = sarama.StringEncoder("this is a test log")
// 连接kafka
client, err := sarama.NewSyncProducer([]string{
"192.168.71.128:9092", "192.168.71.129:9092", "192.168.71.130:9092",
}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
} else {
fmt.Println(client)
}
defer client.Close()
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
带回调函数的异步发送
【注意:】消息发送失败会自动重试,不需要我们在回调函数中手动重试。
同步发送API
生产者分区
分区和副本机制
生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中
- 轮询分区策略
- 随机分区策略
- 按key分区分配策略
- 自定义分区策略
分区好处
-
便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
-
提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行 消费数据。
轮询策略
- 默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区
- 如果在生产消息时,key为null,则使用轮询算法均衡地分配分区
随机策略(不用)
随机策略,每次都随机地将消息分配到每个分区。在较早的版本,默认的分区策略就是随机策略,也是为了将消息均衡地写入到每个分区。但后续轮询策略表现更佳,所以基本上很少会使用随机策略。
按key分配策略
按key分配策略,有可能会出现「数据倾斜」,例如:某个key包含了大量的数据,因为key值一样,所有所有的数据将都分配到一个分区中,造成该分区的消息数量远大于其他的分区。
乱序问题
轮询策略、随机策略都会导致一个问题,生产到Kafka中的数据是乱序存储的。而按key分区可以一定程度上实现数据有序存储——也就是局部有序,但这又可能会导致数据倾斜,所以在实际生产环境中要结合实际情况来做取舍。
副本机制
副本的目的就是冗余备份,当某个Broker上的分区数据丢失时,依然可以保障数据可用。因为在其他的Broker上的副本是可用的。
producer的ACKs参数
对副本关系较大的就是,producer配置的acks参数了,acks参数表示当生产者生产消息的时候,写入到副本的要求严格程度。它决定了生产者如何在性能和可靠性之间做取舍。
acks配置为0
acks配置为1
当生产者的ACK配置为1时,生产者会等待leader副本确认接收后,才会发送下一条数据,性能中等。
acks配置为-1或者all
Kafka生产者幂等性与事务
幂等性
拿http举例来说,一次或多次请求,得到地响应是一致的(网络超时等问题除外),换句话说,就是执行多次操作与执行一次操作的影响是一样的。
如果,某个系统是不具备幂等性的,如果用户重复提交了某个表格,就可能会造成不良影响。例如:用户在浏览器上点击了多次提交订单按钮,会在后台生成多个一模一样的订单。
Kafka生产者幂等性
在生产者生产消息时,如果出现retry时,有可能会一条消息被发送了多次,如果Kafka不具备幂等性的,就有可能会在partition中保存多条一模一样的消息。
幂等性原理
为了实现生产者的幂等性,Kafka引入了 Producer ID(PID)和 Sequence Number的概念。
- PID:每个Producer在初始化时,都会分配一个唯一的PID,这个PID对用户来说,是透明的。
- Sequence Number:针对每个生产者(对应PID)发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number。
- 幂等性只能保证的是在单分区单会话内不重复
Kafka事务
-
Kafka事务是2017年Kafka 0.11.0.0引入的新特性。类似于数据库的事务。Kafka事务指的是生产者生产消息以及消费者提交offset的操作可以在一个原子操作中,要么都成功,要么都失败。尤其是在生产者、消费者并存时,事务的保障尤其重要。(consumer-transform-producer模式)
-
开启事务,必须开启幂等性
事务操作API
Producer接口中定义了以下5个事务相关方法:
- initTransactions(初始化事务):要使用Kafka事务,必须先进行初始化操作
- beginTransaction(开始事务):启动一个Kafka事务
- sendOffsetsToTransaction(提交偏移量):批量地将分区对应的offset发送到事务中,方便后续一块提交
- commitTransaction(提交事务):提交事务
- abortTransaction(放弃事务):取消事务
数据有序和数据乱序
Kafka Broker
Zookeeper存储的Kafka信息
[zk: localhost:2181(CONNECTING) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
Kafka Broker总体工作流程
Broker重要参数
Kafka副本
副本基本信息
- Kafka副本作用:提高数据可靠性。
- Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
- Kafka中副本为:Leader和Follower。Kafka生产者只会把数据发往 Leader,然后Follower 找 Leader 进行同步数据。
- Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。
AR = ISR + OSR
ISR:表示 Leader 保持同步的 Follower 集合。如果 Follower 长时间未 向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s 。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。
OSR:表示 Follower 与 Leader 副本同步时,延迟过多的副本。
Leader 选举流程
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader ,负责管理集群 broker 的上下线,所有 topic 的分区副本分配 和 Leader 选举等工作。
- 创建一个新的 topic,4 个分区,4 个副本
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu1 --partitions 4 --replication-factor 4
Created topic atguigu1.
- 查看 Leader 分布情况
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,3
- 停止掉 hadoop105 的 kafka 进程,并查看 Leader 分区情况
[atguigu@hadoop105 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0
- 停止掉 hadoop104 的 kafka 进程,并查看 Leader 分区情况
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0
- 启动 hadoop105 的 kafka 进程,并查看 Leader 分区情况
[atguigu@hadoop105 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3
- 启动 hadoop104 的 kafka 进程,并查看 Leader 分区情况
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3,2
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3,2
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3,2
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3,2
- 停止掉 hadoop103 的 kafka 进程,并查看 Leader 分区情况
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,3,2
Topic: atguigu1 Partition: 1 Leader: 2 Replicas: 1,2,3,0 Isr: 0,3,2
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 0,3,2
Leader 和 Follower 故障处理细节
LEO(Log End Offset): 每个副本的最后一个offset,LEO其实就是最新的 offset + 1。
HW(High Watermark):所有副本中最小的LEO。
LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1
HW(High Watermark):所有副本中最小的LEO
活动调整分区副本存储
在生产环境中,每台服务器的配置和性能不一致,但是kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。
需求:创建一个新的 topic ,4个分区,两个副本,名称为three 。将该 topic 的所有副本都存储到 broker0 和 broker1 两台服务器上。
手动调整分区副本存储的步骤如下:
- 创建一个新的 topic,名称为 three。
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --create --partitions 4 --replication-factor 2 --
topic three
- 查看分区副本存储情况
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --describe --topic three
- 创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。
[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
输入如下内容:
{
"version":1,
"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}]
}
- 执行副本存储计划。
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
bootstrap-server hadoop102:9092 --reassignment-json-file
increase-replication-factor.json --execute
- 验证副本存储计划。
[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
bootstrap-server hadoop102:9092 --reassignment-json-file
increase-replication-factor.json --verify
- 查看分区副本存储情况。
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --describe --topic three
Leader Partition 负载平衡
正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某 些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。
参数名称 | 描述 |
---|---|
auto.leader.rebalance.enable | 默认是 true。 自动 Leader Partition 平衡。生产环 境中,leader 重选举的代价比较大,可能会带来 性能影响,建议设置为 false 关闭。 |
leader.imbalance.per.broker.percentage | 默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器 会触发 leader 的平衡。 |
leader.imbalance.check.interval.seconds | 默认值 300 秒。检查 leader 负载是否平衡的间隔 时间。 |
文件存储
Topic 数据的存储机制
查看 hadoop102(或者 hadoop103、hadoop104)的/opt/module/kafka/datas/first-1 (first-0、first-2)路径上的文件
[atguigu@hadoop104 first-1]$ ls
00000000000000000092.index
00000000000000000092.log
00000000000000000092.snapshot
00000000000000000092.timeindex
leader-epoch-checkpoint
partition.metadata
直接查看 log 日志,发现是乱码。
通过工具查看 index 和 log 信息。
[atguigu@hadoop104 first-1]$ kafka-run-class.sh kafka.tools.DumpLogSegments
--files ./00000000000000000000.index
Dumping ./00000000000000000000.index
offset: 3 position: 152
日志存储参数配置
参数 | 描述 |
---|---|
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G。 |
log.index.interval.bytes | 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log), 然后就往 index 文件里面记录一个索引。 稀疏索引。 |
文件清理策略
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
- Log.retention.hours,最低优先级小时,默认7天。
- log.retention.minutes,分钟。
- log.retention.ms,最高优先级毫秒。
- log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。
那么日志一旦超过了设置的时间,怎么处理呢?
Kafka 中提供的日志清理策略有 delete 和 compact 两种。
- delete 日志阐述:将过期数据删除
- log.cleanup.policy = delete 所有数据启用阐述策略
(1) 基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
(2) 基于大小:默认关闭。超过设置的所有日志总大小,阐述最早的 segment 。
log.retention.bytes,默认等于-1,表示无穷大。
- compact 日志压缩
compact日志压缩:对于相同 key 的不同 value 值,值保留最后一个版本。
- log.cleanup.policy = compact所有数据启动压缩策略
压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个 offset 大的 offset 对应的消息,实际上会拿到 offset 为 7 的消息,并从这个位置开始消费。
这种策略只适合特殊场景,比如消息的 key 是用户 ID,value 是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。
Kafka 消费者
Kafka 消费方式
- pull(拉)模式:consumer 采用从 broker 中主动拉去数据。Kafka 采用这种方式。
- push(推)模式:Kafka没有采用这种方式,因为由 broker 决定消息发送速率,很难适应所有消费者的消费速率。例如推送的速度是 50m/s,Consumer1,Consumer2就来不及处理消息。
pull 模式不足之处是,如果Kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。
Kafka 消费者工作流程
消费者组原理
Consumer Group (CG):消费者组,由多个consumer组成。形成一个消费者组的条件是所有消费者的 groupid 相同。
- 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
- 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
消费者重要参数
参数名称 | 描述 |
---|---|
bootstrap.servers | 向 Kafka 集群建立初始连接用到的 host/port 列表。 |
key.deserializer 和value.deserializer | 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。 |
group.id | 标记消费者所属的消费者组。 |
enable.auto.commit | 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true, 则该值定义了 消费者偏移量向 Kafka 提交的频率,默认 5s。 |
auto.offset.reset | 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了),该如何处理? earliest:自动重置偏 移量到最早的偏移量。 latest:默认,自动重置偏移量为最 新的偏移量。 none:如果消费组原来的(previous)偏移量 不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
offsets.topic.num.partitions | __consumer_offsets 的分区数,默认是 50 个分区。 |
heartbeat.interval.ms | Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。 该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的 1/3。 |
session.timeout.ms | Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。 超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该 消费者被移除,消费者组执行再平衡。 |
fetch.min.bytes | 默认 1 个字节。消费者获取服务器端一批消息最小的字节数。 |
fetch.max.wait.ms | 默认 500ms。如果没有从服务器端获取到一批数据的最小字 节数。该时间到,仍然会返回数据。 |
fetch.max.bytes | 默认 Default: 52428800(50 m)。消费者获取服务器端一批 消息最大的字节数。如果服务器端一批次的数据大于该值 (50m)仍然可以拉取回来这批数据,因此,这不是一个绝 对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
max.poll.records | 一次 poll 拉取数据返回消息的最大条数,默认是 500 条。 |
offset 位移
offset 的默认维护位置
自动提交offset
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
自动提交offset的相关参数:
-
enable.auto.commit:是否开启自动提交offset功能,默认是true
-
auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s
参数名称 | 描述 |
---|---|
enable.auto.commit | 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 |
手动提交offset
虽然自动提交offset十分简单比那里,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。一次 Kafka 还提供了手动提交 offset 的API。
手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
- commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
- commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
指定Offset消费
auto.offset.reset = earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量
时(例如该数据已被删除),该怎么办?
(1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning。
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
Kafka-Kraft模式
Kafka-Kraft架构
左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller,由controller 进行 Kafka 集群管理。右图为 kraft 模式架构(实验性),不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理。
这样做的好处有以下几个:
-
Kafka 不再依赖外部框架,而是能够独立运行;
-
controller 管理集群时,不再需要从 zookeeper 中先读取数据,集群性能上升;
-
由于不依赖 zookeeper,集群扩展时不再受到 zookeeper 读写能力限制;
-
controller 不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策。
Go kafka
Kafka简介
-
Kafka是分布式的:其所有的构件borker(服务端集群)、producer(消息生产)、consumer(消息消费者)都可以是分布式的。
-
可以进行分区:每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。
-
高吞吐量。
Kafka的结构
Producer
Producer即生产者,消息的产生者,是消息的⼊口。
kafka cluster
kafka集群,一台或多台服务器组成
Broker
Broker是指部署了Kafka实例的服务器节点。
Topic
消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上 都可以创建多个topic。实际应用中通常是一个业务线建一个topic。
Partition
Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的⽂件夹!
Replication
每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。
在kafka中默认副本的最大数量是10 个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Consumer
消费者,即消息的消费方,是消息的出口。
Consumer Group
我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个 topic的不同分区的数据,这也是为了提高kafka的吞吐量!
Kafka⼯作流程
- ⽣产者从Kafka集群获取分区leader信息
- ⽣产者将消息发送给leader
- leader将消息写入本地磁盘
- follower从leader拉取消息数据
- follower将消息写入本地磁盘后向leader发送ACK
- leader收到所有的follower的ACK之后向生产者发送ACK
选择partition的原则(面试重点)
某个topic有多个partition,producer⼜怎么知道该将数据发往哪个partition?
- 直接指定:写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
- hash:如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
- 轮询:如果既没指定partition,又没有设置key,则会采用轮询⽅式,即每次取一小段时间的数据写入某个partition,下一小段的时间写入下一个partition。
ACK应答机制(面试重点)
producer在向kafka写入消息的时候,可以设置参数来确定是否确认kafka接收到数据,这个参数可设置 的值为 0,1,all
- 0:代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效 率最高。
- 1:代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。
- all:代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保 leader发送成功和所有的副本都完成备份。安全性最⾼高,但是效率最低。
如果往不存在的topic写数据,kafka会⾃动创建topic,partition和replication的数量 默认配置都是1。
Topic和数据⽇志
topic 是同⼀类别的消息记录(record)的集合。在Kafka中,⼀个主题通常有多个订阅者。对于每个
主题,Kafka集群维护了⼀个分区数据⽇志⽂件结构如下:
- 每个partition都是⼀个有序并且不可变的消息记录集合。
- 当新的数据写⼊时,就被追加到partition的末尾。
- 在每个partition中,每条消息都会被分配⼀个顺序的唯⼀标识,这个标识被称为offset,即偏移 量。Kafka只保证在同⼀个partition内部消息是有序的,在不同partition之间,并不能保证消息有序。
Kafka可以配置⼀个保留期限,⽤来标识⽇志会在Kafka集群内保留多⻓时间。Kafka集群会保留在保留期限内所有被发布的消息,不管这些消息是否被消费过。
⽐如保留期限设置为两天,那么数据被发布到 Kafka集群的两天以内,所有的这些数据都可以被消费。当超过两天,这些数据将会被清空,以便为后 续的数据腾出空间。
由于Kafka会将数据进⾏持久化存储(即写⼊到硬盘上),所以保留的数据⼤⼩可 以设置为⼀个⽐较⼤的值。
Partition结构
Partition在服务器上的表现形式就是⼀个⼀个的⽂件夹,每个partition的⽂件夹下⾯会有多组segment ⽂件,每组segment⽂件⼜包含 .index ⽂件、 .log ⽂件、 .timeindex ⽂件三个⽂件,其中 .log ⽂件就是实际存储message的地⽅,⽽ .index 和 .timeindex ⽂件为索引⽂件,⽤于检索消息。
消费数据
-
多个消费者实例可以组成⼀个消费者组,并⽤⼀个标签来标识这个消费者组。⼀个消费者组中的不同消费者实例可以运⾏在不同的进程甚⾄不同的服务器上。
-
如果所有的消费者实例都在不同的消费者组,那么每⼀条消息记录会被⼴播到每⼀个消费者实例。
-
在同⼀个消费者组中,每个消费者实例可以消费多个分区,但是每个分区最多只能被消费者组中的⼀个实例消费。
kafka环境搭建
kafka环境基于zookeeper,zookeeper环境基于JAVA-JDK。
!!!新版本的kafka自带zookeeper,可以不手动安装。
java环境变量
https://www.oracle.com/technetwork/java/javase/downloads/jdk12-downloads-5295953.html
安装kafka
http://kafka.apache.org/downloads
1.打开config目录下的server.properties文件
2.修改log.dirs=F:/tmp/kafka-logs //日志存放
3.打开config目录下的zookeeper.properties文件
4.修改dataDir=F:/tmp/zookeeper //数据存放
启动:
先执行:bin\windows\zookeeper-server-start.bat config\zookeeper.properties
再执行:bin\windows\kafka-server-start.bat config\server.properties
zookeeper:
kafka:
GO操作Kafka
sarama操作kafka
依赖安装
go get github.com/Shopify/sarama
windows: mod文件中手动加 require github.com/shopify/sarama v1.19.0
Go语言中连接kafka使用第三方库:github.com/IBM/sarama。
go get github.com/IBM/sarama
这个库已经由Shopify转给了IBM。
sarama v1.20之后的版本加入了zstd压缩算法,需要用到cgo,在Windows平台编译时会提示类似如下错误:
# github.com/DataDog/zstd
exec: "gcc":executable file not found in %PATH%
所以在Windows平台请使用v1.19版本的sarama。
连接kafka发送消息
package main
import (
"fmt"
"github.com/IBM/sarama"
)
// 基于sarama第三方库开发的kafka client
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder("this is a test log")
// 连接kafka
client, err := sarama.NewSyncProducer([]string{"192.168.1.7:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
连接kafka消费消息
package main
import (
"fmt"
"github.com/IBM/sarama"
)
// kafka consumer
func main() {
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
for partition := range partitionList { // 遍历所有的分区
// 针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
// 异步从每个分区消费信息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
}
}(pc)
}
}
kafka-go操作kafka
-
相较于sarama, kafka-go 更简单、更易用。
-
segmentio/kafka-go 是纯Go实现,提供了与kafka交互的低级别和高级别两套API,同时也支持Context。
-
此外社区中另一个比较常用的confluentinc/confluent-kafka-go,它是一个基于cgo的librdkafka包装,在项目中使用它会引入对C库的依赖。
准备Kafka环境
以下docker-compose.yml文件用来搭建一套单节点zookeeper和单节点kafka环境,并且在8080端口提供kafka-ui管理界面。
version: '2.1'
services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- kafka1
environment:
DYNAMIC_CONFIG_ENABLED: "TRUE"
将上述docker-compose.yml文件在本地保存,在同一目录下执行以下命令启动容器。
docker-compose up -d
容器启动后,使用浏览器打开127.0.0.1:8080 即可看到如下kafka-ui界面。
安装kafka-go
执行以下命令下载 kafka-go依赖。
go get github.com/segmentio/kafka-go
注意:kafka-go 需要 Go 1.15或更高版本。
kafka-go 提供了两套与Kafka交互的API。
低级别( low-level):基于与 Kafka 服务器的原始网络连接实现。
高级别(high-level):对于常用读写操作封装了一套更易用的API。
通常建议直接使用高级别的交互API。
Connection
Conn 类型是 kafka-go 包的核心。它代表与 Kafka broker之间的连接。基于它实现了一套与Kafka交互的低级别 API。
发送消息
下面是连接至Kafka之后,使用Conn发送消息的代码示例。
// writeByConn 基于Conn发送消息
func writeByConn() {
topic := "my-topic"
partition := 0
// 连接至Kafka集群的Leader节点
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
// 设置发送消息的超时时间
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
// 发送消息
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
// 关闭连接
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}
消费消息
// readByConn 连接至kafka后接收消息
func readByConn() {
// 指定要连接的topic和partition
topic := "my-topic"
partition := 0
// 连接至Kafka的leader节点
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
// 设置读取超时时间
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
// 读取一批消息,得到的batch是一系列消息的迭代器
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
// 遍历读取消息
b := make([]byte, 10e3) // 10KB max per message
for {
n, err := batch.Read(b)
if err != nil {
break
}
fmt.Println(string(b[:n]))
}
// 关闭batch
if err := batch.Close(); err != nil {
log.Fatal("failed to close batch:", err)
}
// 关闭连接
if err := conn.Close(); err != nil {
log.Fatal("failed to close connection:", err)
}
}
使用batch.Read更高效一些,但是需要根据消息长度选择合适的buffer(上述代码中的b),如果传入的buffer太小(消息装不下)就会返回io.ErrShortBuffer错误。
如果不考虑内存分配的效率问题,也可以按以下代码使用batch.ReadMessage读取消息。
for {
msg, err := batch.ReadMessage()
if err != nil {
break
}
fmt.Println(string(msg.Value))
}
创建topic
当Kafka关闭自动创建topic的设置时,可按如下方式创建topic。
// createTopicByConn 创建topic
func createTopicByConn() {
// 指定要创建的topic名称
topic := "my-topic"
// 连接至任意kafka节点
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
// 获取当前控制节点信息
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
// 连接至leader节点
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
// 创建topic
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}
}
通过非leader节点连接leader节点
下面的示例代码演示了如何通过已有的非leader节点的Conn,连接至 leader节点。
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
// 获取当前控制节点信息
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var connLeader *kafka.Conn
connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer connLeader.Close()
获取topic列表
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
partitions, err := conn.ReadPartitions()
if err != nil {
panic(err.Error())
}
m := map[string]struct{}{}
// 遍历所有分区取topic
for _, p := range partitions {
m[p.Topic] = struct{}{}
}
for k := range m {
fmt.Println(k)
}
Reader
Reader是由 kafka-go 包提供的另一个概念,对于从单个主题-分区(topic-partition)消费消息这种典型场景,使用它能够简化代码。Reader 还实现了自动重连和偏移量管理,并支持使用 Context 支持异步取消和超时的 API。
注意: 当进程退出时,必须在 Reader 上调用 Close() 。Kafka服务器需要一个优雅的断开连接来阻止它继续尝试向已连接的客户端发送消息。如果进程使用 SIGINT (shell 中的 Ctrl-C)或 SIGTERM (如 docker stop 或 kubernetes start)终止,那么下面给出的示例不会调用 Close()。当同一topic上有新Reader连接时,可能导致延迟(例如,新进程启动或新容器运行)。在这种场景下应使用signal.Notify处理程序在进程关闭时关闭Reader。
消费消息
下面的代码演示了如何使用Reader连接至Kafka消费消息。
// readByReader 通过Reader接收消息
func readByReader() {
// 创建Reader
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
Topic: "topic-A",
Partition: 0,
MaxBytes: 10e6, // 10MB
})
r.SetOffset(42) // 设置Offset
// 接收消息
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}
// 程序退出前关闭Reader
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}
消费者组
kafka-go支持消费者组,包括broker管理的offset。要启用消费者组,只需在 ReaderConfig 中指定 GroupID。
使用消费者组时,ReadMessage 会自动提交偏移量。
// 创建一个reader,指定GroupID,从 topic-A 消费消息
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id", // 指定消费者组id
Topic: "topic-A",
MaxBytes: 10e6, // 10MB
})
// 接收消息
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
// 程序退出前关闭Reader
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
在使用消费者组时会有以下限制:
- (*Reader).SetOffset 当设置了GroupID时会返回错误
- (*Reader).Offset 当设置了GroupID时会永远返回 -1
- (*Reader).Lag 当设置了GroupID时会永远返回 -1
- (*Reader).ReadLag 当设置了GroupID时会返回错误
- (*Reader).Stats 当设置了GroupID时会返回一个-1的分区
显式提交
kafka-go 也支持显式提交。当需要显式提交时不要调用 ReadMessage,而是调用 FetchMessage获取消息,然后调用 CommitMessages 显式提交。
ctx := context.Background()
for {
// 获取消息
m, err := r.FetchMessage(ctx)
if err != nil {
break
}
// 处理消息
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
// 显式提交
if err := r.CommitMessages(ctx, m); err != nil {
log.Fatal("failed to commit messages:", err)
}
}
在消费者组中提交消息时,具有给定主题/分区的最大偏移量的消息确定该分区的提交偏移量的值。例如,如果通过调用 FetchMessage 获取了单个分区的偏移量为 1、2 和 3 的消息,则使用偏移量为3的消息调用 CommitMessages 也将导致该分区的偏移量为 1 和 2 的消息被提交。
管理提交间隔
默认情况下,调用CommitMessages将同步向Kafka提交偏移量。为了提高性能,可以在ReaderConfig中设置CommitInterval来定期向Kafka提交偏移。
// 创建一个reader从 topic-A 消费消息
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
MaxBytes: 10e6, // 10MB
CommitInterval: time.Second, // 每秒刷新一次提交给 Kafka
})
Writer
向Kafka发送消息,除了使用基于Conn的低级API,kafka-go包还提供了更高级别的 Writer 类型。大多数情况下使用Writer即可满足条件,它支持以下特性。
- 对错误进行自动重试和重新连接。
- 在可用分区之间可配置的消息分布。
- 向Kafka同步或异步写入消息。
- 使用Context的异步取消。
- 关闭时清除挂起的消息以支持正常关闭。
- 在发布消息之前自动创建不存在的topic。
发送消息
// 创建一个writer 向topic-A发送消息
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.LeastBytes{}, // 指定分区的balancer模式为最小字节分布
RequiredAcks: kafka.RequireAll, // ack模式
Async: true, // 异步
}
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
kafka.Message{
Key: []byte("Key-B"),
Value: []byte("One!"),
},
kafka.Message{
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
创建不存在的topic
如果给Writer配置了AllowAutoTopicCreation:true,那么当发送消息至某个不存在的topic时,则会自动创建topic。
w := &Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
AllowAutoTopicCreation: true, // 自动创建topic
}
messages := []kafka.Message{
{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
{
Key: []byte("Key-B"),
Value: []byte("One!"),
},
{
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
}
var err error
const retries = 3
// 重试3次
for i := 0; i < retries; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = w.WriteMessages(ctx, messages...)
if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
time.Sleep(time.Millisecond * 250)
continue
}
if err != nil {
log.Fatalf("unexpected error %v", err)
}
break
}
// 关闭Writer
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
写入多个topic
通常,WriterConfig.Topic用于初始化单个topic的Writer。通过去掉WriterConfig中的Topic配置,分别设置每条消息的message.topic,可以实现将消息发送至多个topic。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
// 注意: 当此处不设置Topic时,后续的每条消息都需要指定Topic
Balancer: &kafka.LeastBytes{},
}
err := w.WriteMessages(context.Background(),
// 注意: 每条消息都需要指定一个 Topic, 否则就会报错
kafka.Message{
Topic: "topic-A",
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
kafka.Message{
Topic: "topic-B",
Key: []byte("Key-B"),
Value: []byte("One!"),
},
kafka.Message{
Topic: "topic-C",
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
注意:Writer中的Topic和Message中的Topic是互斥的,同一时刻有且只能设置一处。
其他配置
TLS
对于基本的 Conn 类型或在 Reader/Writer 配置中,可以在Dialer中设置TLS选项。如果 TLS 字段为空,则它将不启用TLS 连接。
注意:不在Conn/Reder/Writer上配置TLS,连接到启用TLS的Kafka集群,可能会出现io.ErrUnexpectedEOF错误。
Connection
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{...tls config...}, // 指定TLS配置
}
conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
Reader
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{...tls config...}, // 指定TLS配置
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
Dialer: dialer,
})
Writer
创建Writer时可以按如下方式指定TLS配置。
w := kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
Transport: &kafka.Transport{
TLS: &tls.Config{}, // 指定TLS配置
},
}
SASL
可以在Dialer上指定一个选项以使用SASL身份验证。Dialer可以直接用来打开一个 Conn,也可以通过它们各自的配置传递给一个 Reader 或 Writer。如果 SASLMechanism字段为 nil,则不会使用 SASL 进行身份验证。
SASL 身份验证类型
明文
mechanism := plain.Mechanism{
Username: "username",
Password: "password",
}
SCRAM
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
Connection
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}
conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
Reader
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
Dialer: dialer,
})
Writer
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
// Transport 负责管理连接池和其他资源,
// 通常最好的使用方式是创建后在应用程序中共享使用它们。
sharedTransport := &kafka.Transport{
SASL: mechanism,
}
w := kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
Transport: sharedTransport,
}
Client
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
// Transport 负责管理连接池和其他资源,
// 通常最好的使用方式是创建后在应用程序中共享使用它们。
sharedTransport := &kafka.Transport{
SASL: mechanism,
}
client := &kafka.Client{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Timeout: 10 * time.Second,
Transport: sharedTransport,
}
Balancer
kafka-go实现了多种负载均衡策略。特别是当你从其他Kafka库迁移过来时,你可以按如下说明选择合适的Balancer实现。
Sarama
如果从 sarama 切换过来,并且需要/希望使用相同的算法进行消息分区,则可以使用kafka.Hash或kafka.ReferenceHash。
kafka.Hash = sarama.NewHashPartitioner
kafka.ReferenceHash = sarama.NewReferenceHashPartitioner
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
}
librdkafka和confluent-kafka-go:kafka.CRC32Balancer与librdkafka默认的consistent_random策略表现一致。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: kafka.CRC32Balancer{},
}
Java:使用kafka.Murmur2Balancer可以获得与默认Java客户端相同的策略。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: kafka.Murmur2Balancer{},
}
Compression
可以通过设置Compression字段在Writer上启用压缩:
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Compression: kafka.Snappy,
}
Reader 将通过检查消息属性来确定消费的消息是否被压缩。
Logging
想要记录Reader/Writer类型的操作,可以在创建时配置日志记录器。
kafka-go中的Logger是一个接口类型。
type Logger interface {
Printf(string, ...interface{})
}
并且提供了一个LoggerFunc类型,帮我们实现了Logger接口。
type LoggerFunc func(string, ...interface{})
func (f LoggerFunc) Printf(msg string, args ...interface{}) { f(msg, args...) }
Reader:借助kafka.LoggerFunc我们可以自定义一个Logger。文章来源:https://www.toymoban.com/news/detail-403855.html
// 自定义一个Logger
func logf(msg string, a ...interface{}) {
fmt.Printf(msg, a...)
fmt.Println()
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
Topic: "q1mi-topic",
Partition: 0,
Logger: kafka.LoggerFunc(logf),
ErrorLogger: kafka.LoggerFunc(logf),
})
Writer:也可以直接使用第三方日志库,例如下面示例代码中使用了zap日志库。文章来源地址https://www.toymoban.com/news/detail-403855.html
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "q1mi-topic",
Logger: kafka.LoggerFunc(zap.NewExample().Sugar().Infof),
ErrorLogger: kafka.LoggerFunc(zap.NewExample().Sugar().Errorf),
}
到了这里,关于kafka全解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!