kafka常用命令归纳

这篇具有很好参考价值的文章主要介绍了kafka常用命令归纳。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一. 日常Topic操作

这里的命令以kafka2.2之后版本进行说明,社区推荐命令指定 --bootstrap-server参数,受kafka安全认证体系的约束,如果使用 --zookeeper 会绕过 Kafka 的安全体系。

1. 创建topic

bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name  --partitions 1 --replication-factor 1

2. 查看所有topic列表

bin/kafka-topics.sh --bootstrap-server broker_host:port --list

3. 查看某个特定topic

bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>

4. 增加topic分区数

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数>

5. 动态修改主题参数

以 max.message.bytes为例

5.1 增加指定broker的配置

bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760

eg:bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name testInfoTopic --alter --add-config max.message.bytes=128000
查看topic修改情况
bin/kafka-topics.sh  --bootstrap-server localhost:9092 --describe --topic testInfoTopic 

Topic: testInfoTopic    TopicId: KzPy24fVSsCR03ZOYRzq8g PartitionCount: 3       ReplicationFactor: 1    Configs: max.message.bytes=128000,unclean.leader.election.enable=false
        Topic: testInfoTopic    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: testInfoTopic    Partition: 1    Leader: 2       Replicas: 2     Isr: 2
        Topic: testInfoTopic    Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        

zookeeper 查看修改后内容

./zookeeper-shell.sh localhost:2181

> get /config/topics/testInfoTopic
{"version":1,"config":{"max.message.bytes":"128000"}}

5.2 删除指定broker的配置

bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config max.message.bytes

6. 修改主题限速

限制某个主题副本在执行副本同步机制时,带宽消耗不要过多(不得占用超过 100MBps)

–entity-name 就是 Broker ID。倘若该主题的副本分别在 0、1、2 多个 Broker 上,那么你还要依次为 Broker 1、2、3 执行这条命令。

for i in {0..2}
do 
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name $i
done

7. 删除topic

bin/kafka-topics.sh --bootstrap-server broker_host:port --delete  --topic <topic_name>

二. 数据生产消费

测试数据

broker_host:port ==> localhost:9092

Topic ==> testInfoTopic

Consumer Group ==> G1

1 生产数据

./kafka-console-producer.sh --broker-list localhost:9092 --topic testInfoTopic

举例:

# 指定生产者参数 acks 为 -1,同时启用了 LZ4 的压缩算法
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testInfoTopic --request-required-acks -1 --producer-property compression.type=lz4

1.1 生产者性能测试

向topic 发送10w条消息,每条消息1KB,在producer-props 后面指定要设置的生产者参数,比如本例中的压缩算法、延时时间等

bin/kafka-producer-perf-test.sh --topic testInfoTopic --num-records 100000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=localhost:9092 acks=-1 linger.ms=2000 compression.type=lz4

100000 records sent, 24764.735017 records/sec (24.18 MB/sec), 93.07 ms avg latency, 672.00 ms max latency, 56 ms 50th, 301 ms 95th, 325 ms 99th, 335 ms 99.9th.

生产吞吐量,消息发送延迟都可以看到

2 消费数据

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testInfoTopic

举例:

# 注意,这里消费最好指定一个消费组G1,如果没有指定的话,每次运行 Console Consumer,它都会自动生成一个新的消费者组来消费。时间长久后,就会产生大量的以 console-consumer的消费者组
# --from-beginning 等同于Consumer 端参数 auto.offset.reset 设置成 earliest;如果不指定,会默认从最新位移消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testInfoTopic --group G1 --from-beginning --consumer-property enable.auto.commit=false 
2.1消费者性能测试
 bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --messages 100000 --topic testInfoTopic

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-11-05 10:35:46:535, 2022-11-05 10:35:48:699, 97.5835, 45.0940, 100013, 46216.7283, 665, 1499, 65.0990, 66719.8132

消费吞吐量的指标

2.2 查看消费进度
bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>

./bin/kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --group G1 --describe

三. 内部topic操作

1. __consumer_offsets

该主题保存了消费者组的位移数据,默认有50个分区

1.1 变更主题副本数

如果该主题的副本值已经是 1 了,我们如何增加该主题的副本到3

第一步:创建一个 json 文件,显式提供 50 个分区对应的副本数,注意要将replicas 中的 3 台 Broker 排列顺序不同,使 Leader 副本均匀地分散在 Broker上


{"version":1, "partitions":[
 {"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]}, 
  {"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
  {"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
  ...
  {"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}`

第二步:执行kafka-reassign-partitions.sh

bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute

1.2 查看__consumer_offsets消费者组提交的位移数据

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

1.3 读取__consumer_offsets消息,查看消费者组状态信息

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning

2. __transaction_state

该主题为了支持事务引入的,默认有50个分区,操作方法参考__consumer_offsets

四. 常见错误处理

1. topic删除失败

原因1:副本所在的broker宕机

解决办法:重启broker后,会自动恢复

原因2:待删除的全部或者部分分区在迁移中

解决办法:

第 1 步,手动删除 ZooKeeper 节点 /admin/delete_topics 下待删除topic的 znode。

第 2 步,手动删除该主题在磁盘上的分区目录。

第 3 步,在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。(会导致大量的leader重选举)

2. __consumer_offsets占用太多磁盘

原因:kafka-log-cleaner-thread线程挂了

​ 可以用 jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题。

解决办法

重启对应的broker节点

五. kafka 推荐配置

#### kafka推荐配置

auto.create.topics.enable=false # 是否允许自动创建Topic
unclean.leader.election.enable=false    # 是否允许 Unclean Leader 选举
auto.leader.rebalance.enable=false  # 是否允许定期进行 Leader 选举。

六.参考资料

https://kafka.apache.org/documentation/#operations文章来源地址https://www.toymoban.com/news/detail-742892.html

到了这里,关于kafka常用命令归纳的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka 基础概念、命令行操作(查看所有topic、创建topic、删除topic、查看某个Topic的详情、修改分区数、发送消息、消费消息、 查看消费者组 、更新消费者的偏移位置)

    kafka官网 Broker   一台kafka服务器就是一个broker,可容纳多个topic。一个集群由多个broker组成; Producer   生产者,即向kafka的broker-list发送消息的客户端; Consumer   消费者,即向kafka的broker-list订阅消息的客户端; Consumer Group   消费者组是 逻辑上的一个订阅者 ,由多个

    2024年02月01日
    浏览(40)
  • kafka2.x常用命令:创建topic,查看topic列表、分区、副本详情,删除topic,测试topic发送与消费

    原创/朱季谦 接触kafka开发已经两年多,也看过关于kafka的一些书,但一直没有怎么对它做总结,借着最近正好在看《Apache Kafka实战》一书,同时自己又搭建了三台kafka服务器,正好可以做一些总结记录。 本文主要是记录如何在kafka集群服务器上创建topic,查看topic列表、分区、

    2024年02月03日
    浏览(31)
  • Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    kafka尚硅谷视频: 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​      1. producer初始化:加载默认配置,以及配置的参数,开启网络线程      2. 拦截器拦截      3. 序列化器进行消息key, value序列化      4. 进行分区      5. kafka broker集群 获取metaData      6. 消息缓存到

    2024年02月11日
    浏览(37)
  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(32)
  • Kafka消息队列实现消息的发送和接收

    消息在Kafka消息队列中发送和接收过程如下图所示: 消息生产者Producer产生消息数据,发送到Kafka消息队列中,一台Kafka节点只有一个Broker,消息会存储在Kafka的Topic(主题中),不同类型的消息数据会存储在不同的Topic中,可以利用Topic实现消息的分类,消息消费者Consumer会订阅

    2024年02月11日
    浏览(37)
  • kafka入门(一):kafka消息发送与消费

    kafka的基础概念 Producer (消息生产者) 向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。 Consumer (消息消费者) 订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。 Consumer Group (消费者组) 每个消费

    2024年04月12日
    浏览(28)
  • 配置Kafka发送大消息

    Apache Kafka是一个强大开源、分布式容错的事件流平台。然而,当我们使用Kafka发送大于配置的大小限制的消息时,它会给出一个错误。前文介绍了Spring集成Kafka,本文研究如何使用Kafka发送大消息。 Kafka配置限制允许发送消息大小,默认为1M。然而,如果需要发送大消息,需要

    2024年02月16日
    浏览(26)
  • Kafka消息发送流程

    我们通过创建 KafkaProducer 对象来发送消息,KafkaProducer有两个线程 Producer主线程:把消息发送到内存缓冲区 Sender线程:把内存缓冲区的消息发送到 broker Producer主线程 Producer 主线程的的流程如图所示 拉取元数据:每个 topic 有多个分区,需要知道对应的broker地址 序列化器:将

    2023年04月17日
    浏览(32)
  • Kafka 消息发送和消费流程

    流程如下: Producer 端直接将消息发送到 Broker 中的 Leader 分区中 Broker 对应的 Leader 分区收到消息会先写入 Page Cache,定时刷盘进行持久化(顺序写入磁盘) Follower 分区拉取 Leader 分区的消息,并保持与 Leader 分区数据一致,待消息拉取完毕后需要给 Leader 分区回复 ACK 确认消息

    2024年02月12日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包