【基础】Kafka -- 主题与分区

这篇具有很好参考价值的文章主要介绍了【基础】Kafka -- 主题与分区。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

主题的管理

主题管理包括创建主题、查看主题消息、修改主题以及删除主题等操作,Kafka 提供的 kafka-topics.sh 脚本来执行这些操作,脚本位于$KAFKA_HOME/bin/目录下,该脚本实际上是调用了 kafka.admin.TopicCommand 类来执行主题管理的操作。

创建主题

简单创建与查看

若 broker 端的配置参数auto.create.topics.enable设置为 true,那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions(默认值为 1)、副本因子为default.replication.factor(默认值为 1)的主题。同理,当一个消费者开始从未知主题中读取消息或任意客户端向未知主题发送元数据时也会按照上述参数创建相应的主题。

更加通用的方式是使用 kafka-topics.sh 来创建主题,下列代码创建了一个分区数为 4、副本因子为 2 的主题 topic-create:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-create --partitions 4 --replication-factor 2
Created topic topic-create-diff.

命令中参数的含义如下:

  • --bootstrap-server:zookeeper 连接地址;

  • --create:表示创建主题的指令;

    • --topic:主题名称;

    • --partitions:主题分区数;

    • --replication-factor:分区副本数;

脚本执行成功护,Kafka 会在 log.dir 或 log.dirs 配置参数所指定的目录下创建相应的主题分区,该目录默认为 /tmp/kafka-logs/。在不同的节点分别执行下列命令来查看当前节点创建的主题分区:

# master节点
[root@master kafka_2.12-3.4.0]# ls -al /tmp/kafka-logs/ | grep topic-create
drwxr-xr-x   2 root root 167 417 17:00 topic-create-0
drwxr-xr-x   2 root root 167 417 17:00 topic-create-2
# node01节点
[root@node01 kafka_2.12-3.4.0]# ls -al /tmp/kafka-logs/ | grep topic-create
drwxr-xr-x   2 root root 167 418 08:40 topic-create-1
drwxr-xr-x   2 root root 167 417 16:44 topic-create-2
drwxr-xr-x   2 root root 167 417 16:44 topic-create-3
# node02节点
[root@node02 kafka_2.12-3.4.0]# ls -al /tmp/kafka-logs/ | grep topic-create
drwxr-xr-x   2 root root 167 418 08:40 topic-create-0
drwxr-xr-x   2 root root 167 417 17:01 topic-create-1
drwxr-xr-x   2 root root 167 417 17:01 topic-create-3

可以看到,topic-create 主题存在 0、1、2、3 四个分区,每个分区存在两个副本。主题、分区、副本和 log 日志的关系如下图所示,其中,主题与分区都是提供给用户的抽象,而副本层和 log 层才是实际物理上的存在。同一个分区中的多个副本必须分布在不同的 broker 当中才能提供有效的数据冗余。

【基础】Kafka -- 主题与分区

除上述方法外,还可以使用 kafka-topics.sh 的--describe指令来查看分区副本的分配细节:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create
Topic: topic-create     TopicId: aW0hKB9-RJ6NaxZK-Ws-ZQ PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-create     Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2
        Topic: topic-create     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 2,1
        Topic: topic-create     Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: topic-create     Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1

指定分区副本分配创建

上述的方法在进行主题的创建时,其分区副本是按照内部既定的逻辑来进行分配的。kafka-topics.sh 脚本还提供了--replica-assignment参数来手动指定分区副本的分配方案:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-create-diff --replica-assignment 0:1,1:2,2:0,2:1
Created topic topic-create-diff.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create-diff
Topic: topic-create-diff        TopicId: B-ngArJuQSyktp38UjKkzA PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-create-diff        Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: topic-create-diff        Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: topic-create-diff        Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0
        Topic: topic-create-diff        Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1

这种方式根据分区号的数值大小按照从小到大的顺序进行排列,分区与分区之间用英文逗号“,”隔开,分区内多个副本用英文冒号“:”隔开。

使用这种方法需要注意以下几点:

  • 同一个分区内的副本不能有重复,如0:0,1:1这种将会提示 AdminCommandFailedException 异常;

  • 各分区所指定的副本数应相同,如0:1,1,2,1:2这种将会提示 AdminOperationException 异常;

  • 不允许跳过分区进行分配,如0:1,,1:2,2:0这种将会提示 NumberFormatException 异常;

指定参数创建

下列命令使用--config参数来创建一个主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-config --partitions 1 --replication-factor 1 --config cleanup.policy=compact --config max.message.bytes=10000
Created topic topic-config.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-config
Topic: topic-config     TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=10000
        Topic: topic-config     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

示例中设置了 cleanup.policy 参数为 compact,以及 max.message.bytes 参数为 10000,这两个参数都是主题端的配置。

查看主题

主题的简单查看

kafka-topics.sh 脚本提供了--list--describe指令来方便的查看主题信息。

使用--list可以查看当前所有的可用主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --list
topic-config
topic-create
topic-create-diff

使用--describe可以查看单个或者多个主题的详细信息,若不适用--topic指定主题则显示所有主题的详细信息:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe
Topic: topic-create-diff        TopicId: B-ngArJuQSyktp38UjKkzA PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-create-diff        Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: topic-create-diff        Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: topic-create-diff        Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0
        Topic: topic-create-diff        Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1
Topic: topic-config     TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=10000
        Topic: topic-config     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
Topic: topic-create     TopicId: aW0hKB9-RJ6NaxZK-Ws-ZQ PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-create     Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2
        Topic: topic-create     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 2,1
        Topic: topic-create     Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: topic-create     Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-config
Topic: topic-config     TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=10000
        Topic: topic-config     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create,topic-create-diff
Topic: topic-create-diff        TopicId: B-ngArJuQSyktp38UjKkzA PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-create-diff        Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: topic-create-diff        Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: topic-create-diff        Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0
        Topic: topic-create-diff        Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1
Topic: topic-create     TopicId: aW0hKB9-RJ6NaxZK-Ws-ZQ PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-create     Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2
        Topic: topic-create     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 2,1
        Topic: topic-create     Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: topic-create     Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1

带附加功能的查看

在使用--describe指令时可以额外指定--topics-with-overridesunder-replicated-partitions以及--unavailable-partitions参数来实现一些附加功能。

topics-with-overrides

使用该参数可以查看所有包含覆盖配置的主题,它只会列出包含了与集群不同配置的主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topics-with-overrides
Topic: topic-config     TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=10000

under-replicated-partitions

使用该参数可以找出所有包含失效副本的分区,手动停掉 id=1 的 kafka broker,进行查询:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --under-replicated-partitions
        Topic: topic-create     Partition: 1    Leader: 2       Replicas: 1,2   Isr: 2
        Topic: topic-create     Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0
        Topic: topic-create     Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2

重新启动 id=1 的 broker,再次查询,不显示任何结果:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --under-replicated-partitions
[root@master kafka_2.12-3.4.0]#

unavailable-partitions

使用该参数可以查询主题中没有 leader 副本的分区,这些分区已处于离线状态,对于外界的生产者和消费者来说处于不可用的状态。手动停掉 id=1 和 id=2 的 kafka broker,进行查询:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --unavailable-partitions
        Topic: topic-create     Partition: 1    Leader: none    Replicas: 1,2   Isr: 1
        Topic: topic-create     Partition: 3    Leader: none    Replicas: 2,1   Isr: 1

重新启动 id=1 和 id=2 的 broker,再次查询,不显示任何结果:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-create --unavailable-partitions
[root@master kafka_2.12-3.4.0]#

修改主题

kafka-topics.sh 提供了--alert指令用于修改主题的分区数、修改配置等。

修改分区

下列指令可以修改主题的分区数:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --alter --topic topic-config --partitions 3
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-config
Topic: topic-config     TopicId: IWB1Chh_SEyi6GsHVyJWwg PartitionCount: 3       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=10000
        Topic: topic-config     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic-config     Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-config     Partition: 2    Leader: 2       Replicas: 2     Isr: 2

Kafka 只支持增加分区数而不支持减少分区数,如下:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --alter --topic topic-config --partitions 1
Error while executing topic command : Topic currently has 3 partitions, which is higher than the requested 1.
[2023-04-18 16:06:33,274] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 3 partitions, which is higher than the requested 1.
 (kafka.admin.TopicCommand$)

修改配置

在当前最新的 kafka 3.4.0 版本中已经不允许使用 kafka-topics.sh 脚本来变更主题的配置了,新版本推荐使用 kafka-configs.sh 脚本实现相关的功能。

删除主题

如果一个主题确定不会再使用,那么最好将其删除释放一些资源。kafka-topics.sh 脚本提供了--delete指令用于主题的删除:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-delete --partitions 1 --replication-factor 1
Created topic topic-delete.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --delete --topic topic-delete

主题的删除与 broker 端配置参数delete.topic.enable有关,该参数配置为 true 时才能够删除主题,默认值为 true。

在删除主题时,若删除的是 Kafka 内部的主题或删除的主题不存在,则会报错。

主题配置管理

kafka-configs.sh 脚本用于对配置进行管理和操作,其可以实现相关配置在运行状态下的动态变更。该脚本包含变更配置指令--alter以及查看配置指令--describe两种类型,增删改操作都可以看作是对配置的变更。该脚本不仅支持操作主题的配置,其同样支持操作 broker、用户、客户端的配置。

配置查看与变更

配置查看

使用 kafka-configs.sh 脚本查看主题配置的命令如下:

[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --describe --entity-type topics --entity-name topic-config
Dynamic configs for topic topic-config are:
  cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}
  max.message.bytes=10000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=10000, DEFAULT_CONFIG:message.max.bytes=1048588}

其中:

  • --entity-type指定要查看配置的实体类型;

  • --entity-config指定要查看配置的实体名称;

上述两个参数的对应关系如下:

entity-type entity-name
主题类型,取值为 topics 指定主题名称
broker 类型,取值为 brokers 指定 brokerId
客户端类型,取值为 clients 指定 clientId
用户类型,取值为 users 指定用户名

配置变更

使用 kafka-configs.sh 脚本实现配置的变更时,需要将--alter指令与add-config以及delete-config参数一起使用,前者用于实现配置的增、改操作,后者用于实现配置的删除。

add-config

下列命令对主题原有配置进行了覆盖,若需要修改多个参数,则将多个参数使用英文逗号","隔开即可:

[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --alter --entity-type topics --entity-name topic-config --add-config max.message.bytes=20000
Completed updating config for topic topic-config.
[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --describe --entity-type topics --entity-name topic-config
Dynamic configs for topic topic-config are:
  cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}
  max.message.bytes=20000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=20000, DEFAULT_CONFIG:message.max.bytes=1048588}

delete-config

下列命令对主题新增的配置进行了删除,还原为默认配置:

[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --alter --entity-type topics --entity-name topic-config --delete-config max.message.bytes,cleanup.policy
Completed updating config for topic topic-config.
[root@master kafka_2.12-3.4.0]# bin/kafka-configs.sh --bootstrap-server master:9092 --describe --entity-type topics --entity-name topic-config
Dynamic configs for topic topic-config are:

主题端参数

主题相关的所有配置参数在 broker 层面都有对应的参数,若没有指定或修改主题的任何配置参数,那么就使用 broker 端对应的参数作为默认值。

与主题相关的参数有很多,在进行配置时自行搜索查看即可。所有配置的查看与修改方法都与上述介绍的方法相同。

KafkaAdminClient 主题管理

一般情况下,我们使用 kafka-topics.sh 脚本来管理主题。但是在某些场景下,我们需要将主题管理类的功能集成到公司内部的系统当中进行统一管理,那么就需要使用程序调用 API 的方式去实现。KafkaAdminClient 类便提供了这些基本功能。

基本使用

创建主题

下列代码展示了如何使用 KafkaAdminClient 类创建一个分区数为 4、副本因子为 1 的主题 topic-admin:

public class TestDemo {
    public static void main(String[] args) {
        // 定义服务地址以及主题名称
        String brokerList = "192.168.86.133:9092";
        String topic = "topic-admin";
        // 配置参数
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        // 获取 AdminClient 对象
        AdminClient adminClient = AdminClient.create(properties);
        // 定义主题创建的信息
        NewTopic newTopic = new NewTopic(topic, 4, (short) 1);
        // 执行主题创建并获取执行结果
        CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));
        try {
            result.all().get();
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        // 关闭 AdminClient 对象实例
        adminClient.close();
    }
}

查询新创建的主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin
Topic: topic-admin      TopicId: 8mKJR8KBQKuUOXXJCFS1SQ PartitionCount: 4       ReplicationFactor: 1    Configs:
        Topic: topic-admin      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-admin      Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic-admin      Partition: 2    Leader: 2       Replicas: 2     Isr: 2
        Topic: topic-admin      Partition: 3    Leader: 1       Replicas: 1     Isr: 1

其中,AdminClient 对象的获取调用的是create()方法,其定义如下:

public static AdminClient create(Properties props) {
    return KafkaAdminClient.createInternal(new AdminClientConfig(props), (TimeoutProcessorFactory)null);
}

在进行主题信息的定义时,需要创建一个 NewTopic 对象,其包含下述属性:

public class NewTopic {
    // 主题名称
    private final String name;
    // 分区数
    private final int numPartitions;
    // 副本因子
    private final short replicationFactor;
    // 分配方案
    private final Map<Integer, List<Integer>> replicasAssignments;
    // 配置
    private Map<String, String> configs = null;
    ...
}

指定分区副本分配

若想指定分区副本的分配方案来创建一个主题,可以将代码中“定义主题创建的信息”处的代码替换为下述代码:

// 指定分区副本分配方案
Map<Integer, List<Integer>> replicasAssignments = new HashMap<>();
replicasAssignments.put(0, List.of(1));
replicasAssignments.put(1, List.of(0));
replicasAssignments.put(2, List.of(1));
replicasAssignments.put(3, List.of(0));
// 定义主题创建的信息
NewTopic newTopic = new NewTopic(topic, replicasAssignments); // topic:"topic-admin-test1"

查询新创建的主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin-test1
Topic: topic-admin-test1        TopicId: pptWSwHbT8m4W_dJBEoWkg PartitionCount: 4       ReplicationFactor: 1    Configs:
        Topic: topic-admin-test1        Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-admin-test1        Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic-admin-test1        Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-admin-test1        Partition: 3    Leader: 0       Replicas: 0     Isr: 0

指定主题配置参数

若想在创建主题时指定需要覆盖的配置,则需要在 NewTopic 对象中传入 configs 配置集,如下所示:

// 指定主题配置
Map<String, String> configs = new HashMap<>();
configs.put("cleanup.policy", "compact");
configs.put("max.message.bytes", "25000");
NewTopic newTopic = new NewTopic(topic, 4, (short) 1); // topic:"topic-admin-test2"
newTopic.configs(configs);

查看新创建的主题:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin-test2
Topic: topic-admin-test2        TopicId: IFegwoC6Tp-UwlnfwkrlmA PartitionCount: 4       ReplicationFactor: 1    Configs: cleanup.policy=compact,max.message.bytes=25000
        Topic: topic-admin-test2        Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic-admin-test2        Partition: 1    Leader: 2       Replicas: 2     Isr: 2
        Topic: topic-admin-test2        Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-admin-test2        Partition: 3    Leader: 0       Replicas: 0     Isr: 0

在使用 AdminClient 之后要调用close()方法来释放资源。

查看主题

查看主题可以调用listTopics()或者describeTopics()方法实现。

下列代码使用listTopics()方法查看主题:

public class TestDemo {
    public static void main(String[] args) {
        // 定义服务地址以及主题名称
        String brokerList = "192.168.86.133:9092";
        // 配置参数
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        // 获取 AdminClient 对象
        AdminClient adminClient = AdminClient.create(properties);
        // 查询主题并获取查询结果
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        try {
            listTopicsResult.names().get().forEach(System.out::println);

        } catch (Exception exception) {
            exception.printStackTrace();
        }
        // 关闭 AdminClient 对象实例
        adminClient.close();
    }
}

查询结果如下:

topic-admin-test1
topic-admin-test2
topic-create
topic-config
topic-create-diff
topic-admin

下列代码使用describeTopics()方法查看主题:

public class TestDemo {
    public static void main(String[] args) {
        // 定义服务地址以及主题名称
        String brokerList = "192.168.86.133:9092";
        // 配置参数
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        // 获取 AdminClient 对象
        AdminClient adminClient = AdminClient.create(properties);
        // 查询主题并获取查询结果
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList("topic-admin-test1", "topic-admin-test2"));
        try {
            describeTopicsResult.all().get().forEach((key, value) -> {
                System.out.println(key + "||" + value);
            });
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        // 关闭 AdminClient 对象实例
        adminClient.close();
    }
}

查询结果如下:

topic-admin-test1||(name=topic-admin-test1, internal=false, partitions=(partition=0, leader=node01:9092 (id: 1 rack: null), replicas=node01:9092 (id: 1 rack: null), isr=node01:9092 (id: 1 rack: null)),(partition=1, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)),(partition=2, leader=node01:9092 (id: 1 rack: null), replicas=node01:9092 (id: 1 rack: null), isr=node01:9092 (id: 1 rack: null)),(partition=3, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)))
topic-admin-test2||(name=topic-admin-test2, internal=false, partitions=(partition=0, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)),(partition=1, leader=node02:9092 (id: 2 rack: null), replicas=node02:9092 (id: 2 rack: null), isr=node02:9092 (id: 2 rack: null)),(partition=2, leader=node01:9092 (id: 1 rack: null), replicas=node01:9092 (id: 1 rack: null), isr=node01:9092 (id: 1 rack: null)),(partition=3, leader=master:9092 (id: 0 rack: null), replicas=master:9092 (id: 0 rack: null), isr=master:9092 (id: 0 rack: null)))

删除主题

调用deleteTopics()方法即可实现主题的删除:

public class TestDemo {
    public static void main(String[] args) {
        // 定义服务地址以及主题名称
        String brokerList = "192.168.86.133:9092";
        // 配置参数
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        // 获取 AdminClient 对象
        AdminClient adminClient = AdminClient.create(properties);
        // 删除主题
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("topic-admin-test1", "topic-admin-test2"));
        try {
            deleteTopicsResult.all().get();
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        // 关闭 AdminClient 对象实例
        adminClient.close();
    }
}

删除后执行主题查询,结果如下:

topic-create
topic-config
topic-create-diff
topic-admin

修改主题

使用describeConfigs()方法可以查看主题的具体配置信息,具体使用方法如下:

public class TestDemo {
    public static void main(String[] args) {
        // 定义服务地址以及主题名称
        String brokerList = "192.168.86.133:9092";
        // 配置参数
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        // 获取 AdminClient 对象
        AdminClient adminClient = AdminClient.create(properties);
        // 配置要查询的实体类型和实体名称
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-admin");
        // 执行查询并获取结果
        DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(List.of(configResource));
        try {
            Config config = describeConfigsResult.all().get().get(configResource);
            System.out.println("==========" + configResource.name() + "==========");
            config.entries().forEach(System.out::println);
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        // 关闭 AdminClient 对象实例
        adminClient.close();
    }
}

返回结果如下,该方法会列出主题当中所有的配置信息:

==========topic-admin==========
ConfigEntry(name=compression.type, value=producer, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=leader.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=min.insync.replicas, value=1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.downconversion.enable, value=true, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.jitter.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=cleanup.policy, value=delete, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=flush.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.bytes, value=1073741824, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=retention.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=flush.messages, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.format.version, value=3.0-IV1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=max.compaction.lag.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=file.delete.delay.ms, value=60000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=max.message.bytes, value=1048588, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=min.compaction.lag.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=preallocate, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=index.interval.bytes, value=4096, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=unclean.leader.election.enable, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=retention.bytes, value=-1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=delete.retention.ms, value=86400000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
ConfigEntry(name=segment.index.bytes, value=10485760, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])

使用alterConfigs()方法对主题的配置进行修改的方法如下:

public class TestDemo {
    public static void main(String[] args) {
        // 定义服务地址以及主题名称
        String brokerList = "192.168.86.133:9092";
        // 配置参数
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        // 获取 AdminClient 对象
        AdminClient adminClient = AdminClient.create(properties);
        // 配置要查询的实体类型和实体名称
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-admin");
        // 配置要更改的配置参数
        ConfigEntry entry = new ConfigEntry("cleanup.policy", "compact");
        Config configs = new Config(List.of(entry));
        Map<ConfigResource, Config> configMap = new HashMap<>();
        configMap.put(configResource, configs);
        // 执行配置更改并获取结果
        AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMap);
        try {
            alterConfigsResult.all().get();
        } catch (Exception exception) {
            exception.printStackTrace();
        }
        // 关闭 AdminClient 对象实例
        adminClient.close();
    }
}

修改完成后再次执行查询,可以看到配置已经被成功修改:

==========topic-admin==========
...
ConfigEntry(name=cleanup.policy, value=compact, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])
...

KafkaAdminClient 还提供了createPartitions()方法用于增加某个主题的分区,其基本使用方法如下:

public class TestDemo {
    public static void main(String[] args) {
        // 定义服务地址以及主题名称
        String brokerList = "192.168.86.133:9092";
        // 配置参数
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        // 获取 AdminClient 对象
        AdminClient adminClient = AdminClient.create(properties);
        // 配置要查询的实体类型和实体名称
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-admin");
        // 配置要更改的分区
        NewPartitions newPartitions = NewPartitions.increaseTo(5);
        Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
        newPartitionsMap.put("topic-admin", newPartitions);
        // 执行增加分区数的操作并获取结果
        CreatePartitionsResult partitionsResult = adminClient.createPartitions(newPartitionsMap);
        try {
            partitionsResult.all().get();
        } catch (Exception exception) {
            exception.printStackTrace();
        }      
        // 关闭 AdminClient 对象实例
        adminClient.close();
    }
}

直接在服务器上调用 kafka-topic.sh 脚本查看主题分区的增加结果:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-admin
Topic: topic-admin      TopicId: 8mKJR8KBQKuUOXXJCFS1SQ PartitionCount: 5       ReplicationFactor: 1    Configs: cleanup.policy=compact
        Topic: topic-admin      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-admin      Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic-admin      Partition: 2    Leader: 2       Replicas: 2     Isr: 2
        Topic: topic-admin      Partition: 3    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-admin      Partition: 4    Leader: 2       Replicas: 2     Isr: 2

主题合法性校验

一般在生产环境下,Kafka 的auto.create.topics.enable参数将被设置为 false,集不允许自动创建主题。主题的创建一般由运维人员通过 kafka-topics.sh 脚本创建,若普通用户想要通过 KafkaAdminClient 提供的方法创建主题,建议在代码中对主题的命名、分区数、分区副本数进行校验,对不符合标准的申请进行过滤。

分区的管理

优先副本选举

分区采用多副本的机制以提升可靠性,但是只有 leader 副本对外提供读写服务,follower 副本只负责在内部进行消息的同步。若一个分区的 leader 副本不可用,则意味着整个分区不可用,此时就需要从 follower 副本中挑选一个作为新的 leader 继续对外提供服务。

在进行主题的创建时,主题的分区以及副本会尽可能均匀的分不到 Kafka 集群的各个 broker 节点上,leader 副本的分配也比较均匀。如下所示,创建一个分区数、分区副本数均为 3 的主题 topic-partition 并观察其分区副本的分布:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-partition --partitions 3 --replication-factor 3
Created topic topic-partition.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-partition
Topic: topic-partition  TopicId: P3dZG_1kRCuXa6dcTHeu4Q PartitionCount: 3       ReplicationFactor: 3    Configs:
        Topic: topic-partition  Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: topic-partition  Partition: 1    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
        Topic: topic-partition  Partition: 2    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

可以看到,leader 副本均匀的分布在 id 为 0、2、1 的 broker 节点当中。此时,若某个分区 leader 副本所在的 broker 节点宕机,该分区的一个 follower 节点就会成为新的 leader 节点。当之前的 leader 节点恢复并重新加入集群后,其只能作为一个新的 follower 节点,不在对外提供服务。

我们重启 brokerId=2 的节点,观察分区副本的分布如下:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-partition
Topic: topic-partition  TopicId: P3dZG_1kRCuXa6dcTHeu4Q PartitionCount: 3       ReplicationFactor: 3    Configs:
        Topic: topic-partition  Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,1,2
        Topic: topic-partition  Partition: 1    Leader: 1       Replicas: 2,1,0 Isr: 1,0,2
        Topic: topic-partition  Partition: 2    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

可以看到分区 1 的 leader 节点从 2 变为了 1,而分区 2 的 leader 节点也为 1,这就导致了原本均衡的负载状态被打破,节点 1 的负载最高。

为处理上述负载失衡的情况,Kafka 引入了优先副本 preferred replica 的概念。优先副本即 AR 集合中的第一个副本,即查询主题信息时其中的 Replicas 集合。如上述查询结果所示,分区 1 的 AR 集合为【2,1,0】,其中第一个副本为 2,即分区 1 的优先副本为 2。

基于优先副本的概念,Kafka 提供了分区自动平衡的功能,该功能对应的 broker 端参数为auto.leader.rebalance.enable(默认值为 true)。自动平衡功能开启时,Kafka 会启动一个定时任务,该任务会轮询所有的 broker 节点,计算每个 broker 节点的分区不平衡率(=非优先副本的 leader 数量/分区总数)。若该值超过了leader.imbalance.per.broker.percentage(默认值为 10%)参数所配置的比率,就会自动执行优先副本选举的动作对分区进行平衡。自动平衡分区动作的执行周期由参数leader.imbalance.check.interval.seconds(默认值 300s)控制。

经过一段时间后再次查询分区副本分布:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-partition
Topic: topic-partition  TopicId: P3dZG_1kRCuXa6dcTHeu4Q PartitionCount: 3       ReplicationFactor: 3    Configs:
        Topic: topic-partition  Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,1,2
        Topic: topic-partition  Partition: 1    Leader: 2       Replicas: 2,1,0 Isr: 1,0,2
        Topic: topic-partition  Partition: 2    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

可以看到分区 1 的 leader 节点自动平衡为 2。

分区重新分配

在日常的生产工作中,可能会碰到以下的场景:

  • 某节点宕机后,该节点上的分区副本将处于失效的状态,Kafka 并不会将失效的分区副本自动的迁移到集群中可用的 broker 节点上;

  • 当需要对集群中的某个节点进行有计划的下线时,需要通过某种方式将该节点上的副本迁移到其他可用节点上;

  • 当集群中增加了新的节点,只有新创建的主题分区才有可能被分配到这个节点上,已经存在的主题需要某种重分配的方式使分区分配更加合理;

为解决上述问题,Kafka 提供了 kafka-reassign-partitions.sh 脚本来执行分区的重分配。创建一个分区数为 4 分区副本因子为 2 的主题 topic-assign 用于测试:

[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --create --topic topic-assign --partitions 4 --replication-factor 2
Created topic topic-assign.
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-assign
Topic: topic-assign     TopicId: b4TASor1RHeVKBVTAlpbPQ PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-assign     Partition: 0    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: topic-assign     Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: topic-assign     Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: topic-assign     Partition: 3    Leader: 1       Replicas: 1,2   Isr: 1,2

此时,如果我们想将 brokerId=1 的节点下线,首先需要将该节点上的分区副本迁移出去,整个迁移的操作如下所示:

首先创建一个 json 文件,文件内容包含要进行分区重分配的主题清单:

{   
    "topics":[
        {
            "topic":"topic-assign"    
        }
    ],
    "version":1
}

然后根据该文件内容,指定索要分配的 broker 的节点列表来生成一份候选的重分配方案:

[root@master kafka_2.12-3.4.0]# bin/kafka-reassign-partitions.sh --bootstrap-server master:9092 --generate --topics-to-move-json-file /root/reassignment.json --broker-list 0,2
Current partition replica assignment
{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[1,2],"log_dirs":["any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[0,2],"log_dirs":["any","any"]}]}

返回结果中包含两部分内容:Current partition replica assignment 后的内容展示的是当前分区副本的分配情况;Proposed partition reassignment configuration 后的内容展示的是重分配的候选方案。

接下来将提供的候选方案保存在一个 json 文件当中,执行下列命令进行分区重分配,并查看结果:

[root@master kafka_2.12-3.4.0]# bin/kafka-reassign-partitions.sh --bootstrap-server master:9092 --execute --reassignment-json-file /root/assignment.json
Current partition replica assignment

{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[1,2],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for topic-assign-0,topic-assign-1,topic-assign-2,topic-assign-3
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-assign                                                            Topic: topic-assign     TopicId: b4TASor1RHeVKBVTAlpbPQ PartitionCount: 4       ReplicationFactor: 2    Configs:
        Topic: topic-assign     Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2
        Topic: topic-assign     Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: topic-assign     Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0
        Topic: topic-assign     Partition: 3    Leader: 0       Replicas: 0,2   Isr: 2,0

可以看到分区的分配已经按照 json 文件中定义的方式进行了重分配。重分配的本质是首先增加新的副本,然后进行数据的同步,最后将就的副本删除。数据的复制会占用额外的资源。

修改副本因子

kafka-reassign-partition.sh 脚本还能够实现分区重分配的功能,将上一小节中的 json 文件进行修改,进行副本因子的增加,如下:

{
	"version": 1,
	"partitions": [
		{
			"topic": "topic-assign",
			"partition": 0,
			"replicas": [
				2,
                1,
				0
			],
			"log_dirs": [
				"any",
                "any",
				"any"
			]
		},
		{
			"topic": "topic-assign",
			"partition": 1,
			"replicas": [
				0,
                1,
				2
			],
			"log_dirs": [
				"any",
                "any",
				"any"
			]
		},
		{
			"topic": "topic-assign",
			"partition": 2,
			"replicas": [
				2,
                1,
				0
			],
			"log_dirs": [
				"any",
                "any",
				"any"
			]
		},
		{
			"topic": "topic-assign",
			"partition": 3,
			"replicas": [
				0,
                1,
				2
			],
			"log_dirs": [
				"any",
                "any",
				"any"
			]
		}
	]
}

执行脚本命令,并再次查看分区副本:

[root@master kafka_2.12-3.4.0]# bin/kafka-reassign-partitions.sh --bootstrap-server master:9092 --execute --reassignment-json-file /root/assignment.json
Current partition replica assignment

{"version":1,"partitions":[{"topic":"topic-assign","partition":0,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":1,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":2,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"topic-assign","partition":3,"replicas":[0,2],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for topic-assign-0,topic-assign-1,topic-assign-2,topic-assign-3
[root@master kafka_2.12-3.4.0]# bin/kafka-topics.sh --bootstrap-server master:9092 --describe --topic topic-assign
Topic: topic-assign     TopicId: b4TASor1RHeVKBVTAlpbPQ PartitionCount: 4       ReplicationFactor: 3    Configs:
        Topic: topic-assign     Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic-assign     Partition: 1    Leader: 0       Replicas: 0,1,2 Isr: 0,2,1
        Topic: topic-assign     Partition: 2    Leader: 2       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic-assign     Partition: 3    Leader: 0       Replicas: 0,1,2 Isr: 0,2,1

可以看到所有分区的副本中都增加了在 brokerId=1 中的副本。该方法同样适用与减少分区副本数。文章来源地址https://www.toymoban.com/news/detail-421227.html

到了这里,关于【基础】Kafka -- 主题与分区的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • [AIGC_coze] Kafka 的主题分区之间的关系

    在 Kafka 中,主题(Topics)和分区(Partitions)是两个重要的概念,它们之间存在着密切的关系。 主题是 Kafka 中用于数据发布和订阅的逻辑单元。每个主题可以包含多个分区,每个分区都是一个独立的有序数据集。生产者将数据发送到特定的主题,而消费者通过订阅主题来接收

    2024年02月19日
    浏览(33)
  • Kafka消费者订阅指定主题(subscribe)或分区(assign)详解

    在连接Kafka服务器消费数据前,需要创建Kafka消费者进行拉取数据,需要配置相应的参数,比如设置消费者所属的消费者组名称、连接的broker服务器地址、序列号和反序列化的方式等配置。 更多消费者配置可参考官网: https://kafka.apache.org/documentation/#consumerconfigs 订阅主题(s

    2023年04月24日
    浏览(45)
  • Kafka中的主题(Topic)和分区(Partition)是什么?它们之间有什么关系?

    在Kafka中,主题(Topic)和分区(Partition)都是用于组织和存储消息的概念,它们有密切的关系。 主题(Topic):主题是消息的逻辑分类。可以将主题理解为一个逻辑上的消息容器,类似于一个消息类别或者话题。在Kafka中,生产者(Producer)将消息发布到特定的主题,而消费

    2024年02月15日
    浏览(46)
  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题 0 号分区的数据,所下图所示: 1.2、案例代码 生产者往firstTopic主题 0 号分区发送数据代码 消费者消费firstTopic主题 0 分区数据代码 1.3、测试 在 IDEA 中执行消费者程序,如下图: 在 IDEA 中执行生产者程序 ,在控制台观察

    2024年02月09日
    浏览(45)
  • kafka 3.5 主题分区的Follower创建Fetcher线程从Leader拉取数据源码

    Kakfa集群有主题,每一个主题下又有很多分区,为了保证防止丢失数据,在分区下分Leader副本和Follower副本,而kafka的某个分区的Leader和Follower数据如何同步呢?下面就是讲解的这个 首先要知道,Follower的数据是通过Fetch线程异步从Leader拉取的数据,不懂的可以看一下Kafka——副

    2024年02月09日
    浏览(36)
  • 在Windows上搭建Kafka环境的步骤,包括安装Java、下载Kafka、配置Zookeeper和Kafka、启动Zookeeper和Kafka、创建主题和生产者/消费者等

    1. 安装Java Kafka需要Java环境支持。可以从Oracle官网下载JDK,或者使用OpenJDK。 2. 下载Kafka 可以从Kafka官网下载Kafka二进制压缩包。解压后可以看到bin、config、libs等目录。 3. 配置Zookeeper Kafka依赖Zookeeper实现分布式协作。可以使用Kafka自带的Zookeeper,也可以独立安装Zookeeper。 如果使

    2024年02月11日
    浏览(45)
  • kafka 3.5 主题分区的高水位线HW,低水位线LW,logStartOffset,LogEndOffset什么情况下会更新源码

    下面的例子只是各拿一个做举例,不是全部场景,不要以为logStartOffset,LogEndOffset,HW,LW只有三个场景可以修改 这里需要针对 logStartOffset 和 LogEndOffset 做特殊说明,要不会让大家脑袋混乱,并且前言后的章节讲的都是 主题分区级别 的 (1)主题分区级别 对于每个分区中每一个

    2024年02月09日
    浏览(49)
  • kafka操作命令-主题管理

    目录 1.创建主题 2.查看主题 3.修改主题 4.删除主题 1.1 创建名为:test-topic的主题,命令如下: 执行结果如下:  登录ZooKeeper客户端查看所创建的主题元数据信息,“test-topic”元数据信息如下: 可以看到,该主题有5个分区、1个副本  ⭐️⭐️⭐️⭐️ 1.2 在创建主题时,我

    2024年02月08日
    浏览(39)
  • Linux基础笔记18 | 磁盘分区管理

    fdisk 这个古老的软件并不认识 GPT ,所以 fdisk 只支持 MBR 的分区模式,且磁盘小于2T,大于了就不能使用 fdisk 进行分区了 新磁盘的分区 一块新加的磁盘的分区方式 检查系统中需要分区的磁盘 fdisk -l 对新磁盘进行磁盘分区 fdisk /dev/sdb 进入交互式分区界面 键入 m 获取帮助信息

    2024年02月03日
    浏览(46)
  • Kafka 之生产者与消费者基础知识:基本配置、拦截器、序列化、分区器

    kafaf集群地址列表:理论上写一个节点地址,就相当于绑定了整个kafka集群了,但是建议多写几个,如果只写一个,万一宕机就麻烦了 kafka消息的key和value要指定序列化方法 kafka对应的生产者id 使用java代码表示则为以下代码:  可使用 retries 参数 进行设置,同时要注意记住两

    2024年02月05日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包