主题的管理
主题管理包括创建主题、查看主题消息、修改主题以及删除主题等操作,Kafka 提供的 kafka-topics.sh 脚本来执行这些操作,脚本位于$KAFKA_HOME/bin/
目录下,该脚本实际上是调用了 kafka.admin.TopicCommand 类来执行主题管理的操作。
创建主题
简单创建与查看
若 broker 端的配置参数auto.create.topics.enable
设置为 true,那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions
(默认值为 1)、副本因子为default.replication.factor
(默认值为 1)的主题。同理,当一个消费者开始从未知主题中读取消息或任意客户端向未知主题发送元数据时也会按照上述参数创建相应的主题。
更加通用的方式是使用 kafka-topics.sh 来创建主题,下列代码创建了一个分区数为 4、副本因子为 2 的主题 topic-create:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-create --partitions 4 --replication-factor 2
Created topic topic-create-diff.
命令中参数的含义如下:
-
--bootstrap-server
:zookeeper 连接地址; -
--create
:表示创建主题的指令;-
--topic
:主题名称; -
--partitions
:主题分区数; -
--replication-factor
:分区副本数;
-
脚本执行成功护,Kafka 会在 log.dir 或 log.dirs 配置参数所指定的目录下创建相应的主题分区,该目录默认为 /tmp/kafka-logs/。在不同的节点分别执行下列命令来查看当前节点创建的主题分区:
# master节点
[root@master kafka_2.12-3.4.0]# ls -al /tmp/kafka-logs/ | grep topic-create
drwxr-xr-x 2 root root 167 4月 17 17:00 topic-create-0
drwxr-xr-x 2 root root 167 4月 17 17:00 topic-create-2
# node01节点
[root@node01 kafka_2.12-3.4.0]# ls -al /tmp/kafka-logs/ | grep topic-create
drwxr-xr-x 2 root root 167 4月 18 08:40 topic-create-1
drwxr-xr-x 2 root root 167 4月 17 16:44 topic-create-2
drwxr-xr-x 2 root root 167 4月 17 16:44 topic-create-3
# node02节点
[root@node02 kafka_2.12-3.4.0]# ls -al /tmp/kafka-logs/ | grep topic-create
drwxr-xr-x 2 root root 167 4月 18 08:40 topic-create-0
drwxr-xr-x 2 root root 167 4月 17 17:01 topic-create-1
drwxr-xr-x 2 root root 167 4月 17 17:01 topic-create-3
可以看到,topic-create 主题存在 0、1、2、3 四个分区,每个分区存在两个副本。主题、分区、副本和 log 日志的关系如下图所示,其中,主题与分区都是提供给用户的抽象,而副本层和 log 层才是实际物理上的存在。同一个分区中的多个副本必须分布在不同的 broker 当中才能提供有效的数据冗余。
除上述方法外,还可以使用 kafka-topics.sh 的--describe
指令来查看分区副本的分配细节:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create
Topic: topic-create TopicId: aW0hKB9-RJ6NaxZK-Ws-ZQ PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: topic-create Partition: 0 Leader: 2 Replicas: 2,0 Isr: 0,2
Topic: topic-create Partition: 1 Leader: 1 Replicas: 1,2 Isr: 2,1
Topic: topic-create Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: topic-create Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1
指定分区副本分配创建
上述的方法在进行主题的创建时,其分区副本是按照内部既定的逻辑来进行分配的。kafka-topics.sh 脚本还提供了--replica-assignment
参数来手动指定分区副本的分配方案:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-create-diff --replica-assignment 0:1,1:2,2:0,2:1
Created topic topic-create-diff.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create-diff
Topic: topic-create-diff TopicId: B-ngArJuQSyktp38UjKkzA PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: topic-create-diff Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: topic-create-diff Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: topic-create-diff Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: topic-create-diff Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1
这种方式根据分区号的数值大小按照从小到大的顺序进行排列,分区与分区之间用英文逗号“,”隔开,分区内多个副本用英文冒号“:”隔开。
使用这种方法需要注意以下几点:
-
同一个分区内的副本不能有重复,如
0:0,1:1
这种将会提示 AdminCommandFailedException 异常; -
各分区所指定的副本数应相同,如
0:1,1,2,1:2
这种将会提示 AdminOperationException 异常; -
不允许跳过分区进行分配,如
0:1,,1:2,2:0
这种将会提示 NumberFormatException 异常;
指定参数创建
下列命令使用--config
参数来创建一个主题:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-config --partitions 1 --replication-factor 1 --config cleanup.policy=compact --config max.message.bytes=10000
Created topic topic-config.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-config
Topic: topic-config TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1 ReplicationFactor: 1 Configs: cleanup.policy=compact,max.message.bytes=10000
Topic: topic-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0
示例中设置了 cleanup.policy 参数为 compact,以及 max.message.bytes 参数为 10000,这两个参数都是主题端的配置。
查看主题
主题的简单查看
kafka-topics.sh 脚本提供了--list
和--describe
指令来方便的查看主题信息。
使用--list
可以查看当前所有的可用主题:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --list
topic-config
topic-create
topic-create-diff
使用--describe
可以查看单个或者多个主题的详细信息,若不适用--topic
指定主题则显示所有主题的详细信息:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe
Topic: topic-create-diff TopicId: B-ngArJuQSyktp38UjKkzA PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: topic-create-diff Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: topic-create-diff Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: topic-create-diff Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: topic-create-diff Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topic-config TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1 ReplicationFactor: 1 Configs: cleanup.policy=compact,max.message.bytes=10000
Topic: topic-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: topic-create TopicId: aW0hKB9-RJ6NaxZK-Ws-ZQ PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: topic-create Partition: 0 Leader: 2 Replicas: 2,0 Isr: 0,2
Topic: topic-create Partition: 1 Leader: 1 Replicas: 1,2 Isr: 2,1
Topic: topic-create Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: topic-create Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-config
Topic: topic-config TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1 ReplicationFactor: 1 Configs: cleanup.policy=compact,max.message.bytes=10000
Topic: topic-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create,topic-create-diff
Topic: topic-create-diff TopicId: B-ngArJuQSyktp38UjKkzA PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: topic-create-diff Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: topic-create-diff Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: topic-create-diff Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: topic-create-diff Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topic-create TopicId: aW0hKB9-RJ6NaxZK-Ws-ZQ PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: topic-create Partition: 0 Leader: 2 Replicas: 2,0 Isr: 0,2
Topic: topic-create Partition: 1 Leader: 1 Replicas: 1,2 Isr: 2,1
Topic: topic-create Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: topic-create Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1
带附加功能的查看
在使用--describe
指令时可以额外指定--topics-with-overrides
、under-replicated-partitions
以及--unavailable-partitions
参数来实现一些附加功能。
topics-with-overrides
使用该参数可以查看所有包含覆盖配置的主题,它只会列出包含了与集群不同配置的主题:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topics-with-overrides
Topic: topic-config TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1 ReplicationFactor: 1 Configs: cleanup.policy=compact,max.message.bytes=10000
under-replicated-partitions
使用该参数可以找出所有包含失效副本的分区,手动停掉 id=1 的 kafka broker,进行查询:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --under-replicated-partitions
Topic: topic-create Partition: 1 Leader: 2 Replicas: 1,2 Isr: 2
Topic: topic-create Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0
Topic: topic-create Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2
重新启动 id=1 的 broker,再次查询,不显示任何结果:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --under-replicated-partitions
[root@master kafka_2.12-3.4.0]#
unavailable-partitions
使用该参数可以查询主题中没有 leader 副本的分区,这些分区已处于离线状态,对于外界的生产者和消费者来说处于不可用的状态。手动停掉 id=1 和 id=2 的 kafka broker,进行查询:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --unavailable-partitions
Topic: topic-create Partition: 1 Leader: none Replicas: 1,2 Isr: 1
Topic: topic-create Partition: 3 Leader: none Replicas: 2,1 Isr: 1
重新启动 id=1 和 id=2 的 broker,再次查询,不显示任何结果:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --unavailable-partitions
[root@master kafka_2.12-3.4.0]#
修改主题
kafka-topics.sh 提供了--alert
指令用于修改主题的分区数、修改配置等。
修改分区
下列指令可以修改主题的分区数:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --alter --topic topic-config --partitions 3
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-config
Topic: topic-config TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 3 ReplicationFactor: 1 Configs: cleanup.policy=compact,max.message.bytes=10000
Topic: topic-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: topic-config Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: topic-config Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Kafka 只支持增加分区数而不支持减少分区数,如下:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --alter --topic topic-config --partitions 1
Error while executing topic command : Topic currently has 3 partitions, which is higher than the requested 1.
[2023-04-18 16:06:33,274] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 3 partitions, which is higher than the requested 1.
(kafka.admin.TopicCommand$)
修改配置
在当前最新的 kafka 3.4.0 版本中已经不允许使用 kafka-topics.sh 脚本来变更主题的配置了,新版本推荐使用 kafka-configs.sh 脚本实现相关的功能。
删除主题
如果一个主题确定不会再使用,那么最好将其删除释放一些资源。kafka-topics.sh 脚本提供了--delete
指令用于主题的删除:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-delete --partitions 1 --replication-factor 1
Created topic topic-delete.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --delete --topic topic-delete
主题的删除与 broker 端配置参数delete.topic.enable
有关,该参数配置为 true 时才能够删除主题,默认值为 true。
在删除主题时,若删除的是 Kafka 内部的主题或删除的主题不存在,则会报错。
主题配置管理
kafka-configs.sh 脚本用于对配置进行管理和操作,其可以实现相关配置在运行状态下的动态变更。该脚本包含变更配置指令--alter
以及查看配置指令--describe
两种类型,增删改操作都可以看作是对配置的变更。该脚本不仅支持操作主题的配置,其同样支持操作 broker、用户、客户端的配置。
配置查看与变更
配置查看
使用 kafka-configs.sh 脚本查看主题配置的命令如下:
[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --describe --entity-type topics --entity-name topic-config
Dynamic configs for topic topic-config are:
cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}
max.message.bytes=10000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=10000, DEFAULT_CONFIG:message.max.bytes=1048588}
其中:
-
--entity-type
指定要查看配置的实体类型; -
--entity-config
指定要查看配置的实体名称;
上述两个参数的对应关系如下:
entity-type | entity-name |
---|---|
主题类型,取值为 topics | 指定主题名称 |
broker 类型,取值为 brokers | 指定 brokerId |
客户端类型,取值为 clients | 指定 clientId |
用户类型,取值为 users | 指定用户名 |
配置变更
使用 kafka-configs.sh 脚本实现配置的变更时,需要将--alter
指令与add-config
以及delete-config
参数一起使用,前者用于实现配置的增、改操作,后者用于实现配置的删除。
add-config
下列命令对主题原有配置进行了覆盖,若需要修改多个参数,则将多个参数使用英文逗号","隔开即可:
[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --alter --entity-type topics --entity-name topic-config --add-config max.message.bytes=20000
Completed updating config for topic topic-config.
[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --describe --entity-type topics --entity-name topic-config
Dynamic configs for topic topic-config are:
cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}
max.message.bytes=20000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=20000, DEFAULT_CONFIG:message.max.bytes=1048588}
delete-config
下列命令对主题新增的配置进行了删除,还原为默认配置:
[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --alter --entity-type topics --entity-name topic-config --delete-config max.message.bytes,cleanup.policy
Completed updating config for topic topic-config.
[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --describe --entity-type topics --entity-name topic-config
Dynamic configs for topic topic-config are:
主题端参数
主题相关的所有配置参数在 broker 层面都有对应的参数,若没有指定或修改主题的任何配置参数,那么就使用 broker 端对应的参数作为默认值。
与主题相关的参数有很多,在进行配置时自行搜索查看即可。所有配置的查看与修改方法都与上述介绍的方法相同。
KafkaAdminClient 主题管理
一般情况下,我们使用 kafka-topics.sh 脚本来管理主题。但是在某些场景下,我们需要将主题管理类的功能集成到公司内部的系统当中进行统一管理,那么就需要使用程序调用 API 的方式去实现。KafkaAdminClient 类便提供了这些基本功能。
基本使用
创建主题
下列代码展示了如何使用 KafkaAdminClient 类创建一个分区数为 4、副本因子为 1 的主题 topic-admin:
public class TestDemo {
public static void main(String[] args) {
// 定义服务地址以及主题名称
String brokerList = "192.168.86.133:9092";
String topic = "topic-admin";
// 配置参数
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
// 获取 AdminClient 对象
AdminClient adminClient = AdminClient.create(properties);
// 定义主题创建的信息
NewTopic newTopic = new NewTopic(topic, 4, (short) 1);
// 执行主题创建并获取执行结果
CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));
try {
result.all().get();
} catch (Exception exception) {
exception.printStackTrace();
}
// 关闭 AdminClient 对象实例
adminClient.close();
}
}
查询新创建的主题:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin
Topic: topic-admin TopicId: 8mKJR8KBQKuUOXXJCFS1SQ PartitionCount: 4 ReplicationFactor: 1 Configs:
Topic: topic-admin Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: topic-admin Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: topic-admin Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: topic-admin Partition: 3 Leader: 1 Replicas: 1 Isr: 1
其中,AdminClient 对象的获取调用的是create()
方法,其定义如下:
public static AdminClient create(Properties props) {
return KafkaAdminClient.createInternal(new AdminClientConfig(props), (TimeoutProcessorFactory)null);
}
在进行主题信息的定义时,需要创建一个 NewTopic 对象,其包含下述属性:
public class NewTopic {
// 主题名称
private final String name;
// 分区数
private final int numPartitions;
// 副本因子
private final short replicationFactor;
// 分配方案
private final Map<Integer, List<Integer>> replicasAssignments;
// 配置
private Map<String, String> configs = null;
...
}
指定分区副本分配
若想指定分区副本的分配方案来创建一个主题,可以将代码中“定义主题创建的信息”处的代码替换为下述代码:
// 指定分区副本分配方案
Map<Integer, List<Integer>> replicasAssignments = new HashMap<>();
replicasAssignments.put(0, List.of(1));
replicasAssignments.put(1, List.of(0));
replicasAssignments.put(2, List.of(1));
replicasAssignments.put(3, List.of(0));
// 定义主题创建的信息
NewTopic newTopic = new NewTopic(topic, replicasAssignments); // topic:"topic-admin-test1"
查询新创建的主题:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin-test1
Topic: topic-admin-test1 TopicId: pptWSwHbT8m4W_dJBEoWkg PartitionCount: 4 ReplicationFactor: 1 Configs:
Topic: topic-admin-test1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: topic-admin-test1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: topic-admin-test1 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: topic-admin-test1 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
指定主题配置参数
若想在创建主题时指定需要覆盖的配置,则需要在 NewTopic 对象中传入 configs 配置集,如下所示:
// 指定主题配置
Map<String, String> configs = new HashMap<>();
configs.put("cleanup.policy", "compact");
configs.put("max.message.bytes", "25000");
NewTopic newTopic = new NewTopic(topic, 4, (short) 1); // topic:"topic-admin-test2"
newTopic.configs(configs);
查看新创建的主题:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin-test2
Topic: topic-admin-test2 TopicId: IFegwoC6Tp-UwlnfwkrlmA PartitionCount: 4 ReplicationFactor: 1 Configs: cleanup.policy=compact,max.message.bytes=25000
Topic: topic-admin-test2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: topic-admin-test2 Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: topic-admin-test2 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: topic-admin-test2 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
在使用 AdminClient 之后要调用close()
方法来释放资源。
查看主题
查看主题可以调用listTopics()
或者describeTopics()
方法实现。
下列代码使用listTopics()
方法查看主题:
public class TestDemo {
public static void main(String[] args) {
// 定义服务地址以及主题名称
String brokerList = "192.168.86.133:9092";
// 配置参数
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
// 获取 AdminClient 对象
AdminClient adminClient = AdminClient.create(properties);
// 查询主题并获取查询结果
ListTopicsResult listTopicsResult = adminClient.listTopics();
try {
listTopicsResult.names().get().forEach(System.out::println);
} catch (Exception exception) {
exception.printStackTrace();
}
// 关闭 AdminClient 对象实例
adminClient.close();
}
}
查询结果如下:
topic-admin-test1
topic-admin-test2
topic-create
topic-config
topic-create-diff
topic-admin
下列代码使用describeTopics()
方法查看主题:
public class TestDemo {
public static void main(String[] args) {
// 定义服务地址以及主题名称
String brokerList = "192.168.86.133:9092";
// 配置参数
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
// 获取 AdminClient 对象
AdminClient adminClient = AdminClient.create(properties);
// 查询主题并获取查询结果
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList("topic-admin-test1", "topic-admin-test2"));
try {
describeTopicsResult.all().get().forEach((key, value) -> {
System.out.println(key + "||" + value);
});
} catch (Exception exception) {
exception.printStackTrace();
}
// 关闭 AdminClient 对象实例
adminClient.close();
}
}
查询结果如下:
topic-admin-test1||(name=topic-admin-test1, internal=false, partitions=(partition=0, leader=node01:9092 (id: 1 rack: null), replicas=node01:9092 (id: 1 rack: null), isr=node01:9092 (id: 1 rack: null)),(partition=1, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)),(partition=2, leader=node01:9092 (id: 1 rack: null), replicas=node01:9092 (id: 1 rack: null), isr=node01:9092 (id: 1 rack: null)),(partition=3, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)))
topic-admin-test2||(name=topic-admin-test2, internal=false, partitions=(partition=0, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)),(partition=1, leader=node02:9092 (id: 2 rack: null), replicas=node02:9092 (id: 2 rack: null), isr=node02:9092 (id: 2 rack: null)),(partition=2, leader=node01:9092 (id: 1 rack: null), replicas=node01:9092 (id: 1 rack: null), isr=node01:9092 (id: 1 rack: null)),(partition=3, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)))
删除主题
调用deleteTopics()
方法即可实现主题的删除:
public class TestDemo {
public static void main(String[] args) {
// 定义服务地址以及主题名称
String brokerList = "192.168.86.133:9092";
// 配置参数
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
// 获取 AdminClient 对象
AdminClient adminClient = AdminClient.create(properties);
// 删除主题
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("topic-admin-test1", "topic-admin-test2"));
try {
deleteTopicsResult.all().get();
} catch (Exception exception) {
exception.printStackTrace();
}
// 关闭 AdminClient 对象实例
adminClient.close();
}
}
删除后执行主题查询,结果如下:
topic-create
topic-config
topic-create-diff
topic-admin
修改主题
使用describeConfigs()
方法可以查看主题的具体配置信息,具体使用方法如下:
public class TestDemo {
public static void main(String[] args) {
// 定义服务地址以及主题名称
String brokerList = "192.168.86.133:9092";
// 配置参数
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
// 获取 AdminClient 对象
AdminClient adminClient = AdminClient.create(properties);
// 配置要查询的实体类型和实体名称
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-admin");
// 执行查询并获取结果
DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(List.of(configResource));
try {
Config config = describeConfigsResult.all().get().get(configResource);
System.out.println("==========" + configResource.name() + "==========");
config.entries().forEach(System.out::println);
} catch (Exception exception) {
exception.printStackTrace();
}
// 关闭 AdminClient 对象实例
adminClient.close();
}
}
返回结果如下,该方法会列出主题当中所有的配置信息:
==========topic-admin==========
ConfigEntry(name=compression.type, value=producer, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=leader.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=min.insync.replicas, value=1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.downconversion.enable, value=true, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.jitter.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=cleanup.policy, value=delete, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=flush.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.bytes, value=1073741824, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=retention.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=flush.messages, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.format.version, value=3.0-IV1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=max.compaction.lag.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=file.delete.delay.ms, value=60000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=max.message.bytes, value=1048588, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=min.compaction.lag.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=preallocate, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=index.interval.bytes, value=4096, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=unclean.leader.election.enable, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=retention.bytes, value=-1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=delete.retention.ms, value=86400000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.index.bytes, value=10485760, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
使用alterConfigs()
方法对主题的配置进行修改的方法如下:
public class TestDemo {
public static void main(String[] args) {
// 定义服务地址以及主题名称
String brokerList = "192.168.86.133:9092";
// 配置参数
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
// 获取 AdminClient 对象
AdminClient adminClient = AdminClient.create(properties);
// 配置要查询的实体类型和实体名称
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-admin");
// 配置要更改的配置参数
ConfigEntry entry = new ConfigEntry("cleanup.policy", "compact");
Config configs = new Config(List.of(entry));
Map<ConfigResource, Config> configMap = new HashMap<>();
configMap.put(configResource, configs);
// 执行配置更改并获取结果
AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMap);
try {
alterConfigsResult.all().get();
} catch (Exception exception) {
exception.printStackTrace();
}
// 关闭 AdminClient 对象实例
adminClient.close();
}
}
修改完成后再次执行查询,可以看到配置已经被成功修改:
==========topic-admin==========
...
ConfigEntry(name=cleanup.policy, value=compact, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
...
KafkaAdminClient 还提供了createPartitions()
方法用于增加某个主题的分区,其基本使用方法如下:
public class TestDemo {
public static void main(String[] args) {
// 定义服务地址以及主题名称
String brokerList = "192.168.86.133:9092";
// 配置参数
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
// 获取 AdminClient 对象
AdminClient adminClient = AdminClient.create(properties);
// 配置要查询的实体类型和实体名称
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-admin");
// 配置要更改的分区
NewPartitions newPartitions = NewPartitions.increaseTo(5);
Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
newPartitionsMap.put("topic-admin", newPartitions);
// 执行增加分区数的操作并获取结果
CreatePartitionsResult partitionsResult = adminClient.createPartitions(newPartitionsMap);
try {
partitionsResult.all().get();
} catch (Exception exception) {
exception.printStackTrace();
}
// 关闭 AdminClient 对象实例
adminClient.close();
}
}
直接在服务器上调用 kafka-topic.sh 脚本查看主题分区的增加结果:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin
Topic: topic-admin TopicId: 8mKJR8KBQKuUOXXJCFS1SQ PartitionCount: 5 ReplicationFactor: 1 Configs: cleanup.policy=compact
Topic: topic-admin Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: topic-admin Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: topic-admin Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: topic-admin Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: topic-admin Partition: 4 Leader: 2 Replicas: 2 Isr: 2
主题合法性校验
一般在生产环境下,Kafka 的auto.create.topics.enable
参数将被设置为 false,集不允许自动创建主题。主题的创建一般由运维人员通过 kafka-topics.sh 脚本创建,若普通用户想要通过 KafkaAdminClient 提供的方法创建主题,建议在代码中对主题的命名、分区数、分区副本数进行校验,对不符合标准的申请进行过滤。
分区的管理
优先副本选举
分区采用多副本的机制以提升可靠性,但是只有 leader 副本对外提供读写服务,follower 副本只负责在内部进行消息的同步。若一个分区的 leader 副本不可用,则意味着整个分区不可用,此时就需要从 follower 副本中挑选一个作为新的 leader 继续对外提供服务。
在进行主题的创建时,主题的分区以及副本会尽可能均匀的分不到 Kafka 集群的各个 broker 节点上,leader 副本的分配也比较均匀。如下所示,创建一个分区数、分区副本数均为 3 的主题 topic-partition 并观察其分区副本的分布:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-partition --partitions 3 --replication-factor 3
Created topic topic-partition.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-partition
Topic: topic-partition TopicId: P3dZG_1kRCuXa6dcTHeu4Q PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: topic-partition Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: topic-partition Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: topic-partition Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
可以看到,leader 副本均匀的分布在 id 为 0、2、1 的 broker 节点当中。此时,若某个分区 leader 副本所在的 broker 节点宕机,该分区的一个 follower 节点就会成为新的 leader 节点。当之前的 leader 节点恢复并重新加入集群后,其只能作为一个新的 follower 节点,不在对外提供服务。
我们重启 brokerId=2 的节点,观察分区副本的分布如下:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-partition
Topic: topic-partition TopicId: P3dZG_1kRCuXa6dcTHeu4Q PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: topic-partition Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
Topic: topic-partition Partition: 1 Leader: 1 Replicas: 2,1,0 Isr: 1,0,2
Topic: topic-partition Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
可以看到分区 1 的 leader 节点从 2 变为了 1,而分区 2 的 leader 节点也为 1,这就导致了原本均衡的负载状态被打破,节点 1 的负载最高。
为处理上述负载失衡的情况,Kafka 引入了优先副本 preferred replica 的概念。优先副本即 AR 集合中的第一个副本,即查询主题信息时其中的 Replicas 集合。如上述查询结果所示,分区 1 的 AR 集合为【2,1,0】,其中第一个副本为 2,即分区 1 的优先副本为 2。
基于优先副本的概念,Kafka 提供了分区自动平衡的功能,该功能对应的 broker 端参数为auto.leader.rebalance.enable
(默认值为 true)。自动平衡功能开启时,Kafka 会启动一个定时任务,该任务会轮询所有的 broker 节点,计算每个 broker 节点的分区不平衡率(=非优先副本的 leader 数量/分区总数
)。若该值超过了leader.imbalance.per.broker.percentage
(默认值为 10%)参数所配置的比率,就会自动执行优先副本选举的动作对分区进行平衡。自动平衡分区动作的执行周期由参数leader.imbalance.check.interval.seconds
(默认值 300s)控制。
经过一段时间后再次查询分区副本分布:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-partition
Topic: topic-partition TopicId: P3dZG_1kRCuXa6dcTHeu4Q PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: topic-partition Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
Topic: topic-partition Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 1,0,2
Topic: topic-partition Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
可以看到分区 1 的 leader 节点自动平衡为 2。
分区重新分配
在日常的生产工作中,可能会碰到以下的场景:
-
某节点宕机后,该节点上的分区副本将处于失效的状态,Kafka 并不会将失效的分区副本自动的迁移到集群中可用的 broker 节点上;
-
当需要对集群中的某个节点进行有计划的下线时,需要通过某种方式将该节点上的副本迁移到其他可用节点上;
-
当集群中增加了新的节点,只有新创建的主题分区才有可能被分配到这个节点上,已经存在的主题需要某种重分配的方式使分区分配更加合理;
为解决上述问题,Kafka 提供了 kafka-reassign-partitions.sh 脚本来执行分区的重分配。创建一个分区数为 4 分区副本因子为 2 的主题 topic-assign 用于测试:
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-assign --partitions 4 --replication-factor 2
Created topic topic-assign.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-assign
Topic: topic-assign TopicId: b4TASor1RHeVKBVTAlpbPQ PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: topic-assign Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: topic-assign Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: topic-assign Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topic-assign Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
此时,如果我们想将 brokerId=1 的节点下线,首先需要将该节点上的分区副本迁移出去,整个迁移的操作如下所示:
首先创建一个 json 文件,文件内容包含要进行分区重分配的主题清单:
{
"topics":[
{
"topic":"topic-assign"
}
],
"version":1
}
然后根据该文件内容,指定索要分配的 broker 的节点列表来生成一份候选的重分配方案:
[root@master kafka_2.12-3.4.0]# bin/kafka-reassign-partitions.sh --bootstrap-server master:9092 --generate --topics-to-move-json-file /root/reassignment.json --broker-list 0,2
Current partition replica assignment
{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[1,2],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[0,2],"log_dirs":["any","any"]}]}
返回结果中包含两部分内容:Current partition replica assignment 后的内容展示的是当前分区副本的分配情况;Proposed partition reassignment configuration 后的内容展示的是重分配的候选方案。
接下来将提供的候选方案保存在一个 json 文件当中,执行下列命令进行分区重分配,并查看结果:
[root@master kafka_2.12-3.4.0]# bin/kafka-reassign-partitions.sh --bootstrap-server master:9092 --execute --reassignment-json-file /root/assignment.json
Current partition replica assignment
{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[1,2],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for topic-assign-0,topic-assign-1,topic-assign-2,topic-assign-3
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-assign Topic: topic-assign TopicId: b4TASor1RHeVKBVTAlpbPQ PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: topic-assign Partition: 0 Leader: 2 Replicas: 2,0 Isr: 0,2
Topic: topic-assign Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: topic-assign Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: topic-assign Partition: 3 Leader: 0 Replicas: 0,2 Isr: 2,0
可以看到分区的分配已经按照 json 文件中定义的方式进行了重分配。重分配的本质是首先增加新的副本,然后进行数据的同步,最后将就的副本删除。数据的复制会占用额外的资源。
修改副本因子
kafka-reassign-partition.sh 脚本还能够实现分区重分配的功能,将上一小节中的 json 文件进行修改,进行副本因子的增加,如下:
{
"version": 1,
"partitions": [
{
"topic": "topic-assign",
"partition": 0,
"replicas": [
2,
1,
0
],
"log_dirs": [
"any",
"any",
"any"
]
},
{
"topic": "topic-assign",
"partition": 1,
"replicas": [
0,
1,
2
],
"log_dirs": [
"any",
"any",
"any"
]
},
{
"topic": "topic-assign",
"partition": 2,
"replicas": [
2,
1,
0
],
"log_dirs": [
"any",
"any",
"any"
]
},
{
"topic": "topic-assign",
"partition": 3,
"replicas": [
0,
1,
2
],
"log_dirs": [
"any",
"any",
"any"
]
}
]
}
执行脚本命令,并再次查看分区副本:文章来源:https://www.toymoban.com/news/detail-421227.html
[root@master kafka_2.12-3.4.0]# bin/kafka-reassign-partitions.sh --bootstrap-server master:9092 --execute --reassignment-json-file /root/assignment.json
Current partition replica assignment
{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[0,2],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for topic-assign-0,topic-assign-1,topic-assign-2,topic-assign-3
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-assign
Topic: topic-assign TopicId: b4TASor1RHeVKBVTAlpbPQ PartitionCount: 4 ReplicationFactor: 3 Configs:
Topic: topic-assign Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 0,2,1
Topic: topic-assign Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,2,1
Topic: topic-assign Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 0,2,1
Topic: topic-assign Partition: 3 Leader: 0 Replicas: 0,1,2 Isr: 0,2,1
可以看到所有分区的副本中都增加了在 brokerId=1 中的副本。该方法同样适用与减少分区副本数。文章来源地址https://www.toymoban.com/news/detail-421227.html
到了这里,关于【基础】Kafka -- 主题与分区的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!