Kafka-Topic&Partition

这篇具有很好参考价值的文章主要介绍了Kafka-Topic&Partition。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka主题与分区

主题与分区

topic & partition,是Kafka两个核心的概念,也是Kafka的基本组织单元。 主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消息的二次归类。 分区的划分为kafka提供了可伸缩性、水平扩展性、容错性等优势。 分区可以有一个至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件和快照文件等

主题的管理

主题的管理

  • 创建主题

  • 查看主题信息

  • 修改主题

  • 删除主题

上述操作可以采用Kafka提供的kafka-topics.sh脚本来完成,也可以采用Kafka提供的AdminClient来完成。 该脚本位于¥KAFKA_HOME/bin目录下 topic 日志名称,kafka,linq,数据库

创建主题

创建主题的命令格式如下:

kafka-topics.sh --bootstrap-server <server:port> \
    --create --topic <topic> \
    --partitions <numPartitions> \
    --replication-factor <replicationFactor>

创建一个分区数为4、副本因子为2的主题

kafka-topics.sh --bootstrap-server localhost:9092 \
    --create --topic topic-create \
    --partitions 4 \
    --replication-factor 2

创建一个分区数为4、副本因子为2的主题,并且指定主题的配置信息

kafka-topics.sh --bootstrap-server localhost:9092 \
    --create --topic topic-create \
    --partitions 4 \
    --replication-factor 2 \
    --config max.message.bytes=128000

通过describe指令来查看分区副本的分配细节

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create

使用replica-assignment参数手动指定分区副本的分配方案

使用这种方式根据分区号的数值大小按照从小到大的顺序进行排列

例如:0:1:2,0:1:2,0:1:2,0:1:2

  • 分区与分区之间用逗号分隔

  • 分区与副本之间用冒号分隔

kafka-topics.sh --bootstrap-server localhost:9092 \
    --create --topic topic-create-same \
    --replica-assignment 0:1:2,0:1:2,0:1:2,0:1:2

注意:

  • 同一个分区内的副本不能有重复,比如0:0,1:1这样,就会报出AdminCommandFailedException异常

  • 分区之间所指定的副本数不同,比如0:0,1:1这样,就会报出AdminOperationException异常

主题命名规范

  • 主题名称只能包含ASCII字母、数字、点、减号和下划线

  • 主题名称长度不能超过249个字符

  • 主题名称不能以点开头

  • 不能以__开头,这是Kafka内部使用的主题前缀

  • 不能包含空格、单引号、双引号、逗号、分号、冒号和NULL字符

  • 主题名称应该全部小写,因为Kafka在区分主题名称时是不区分大小写的

  • 主题名称不能与Kafka保留的名称冲突,比如__consumer_offsets

  • 主题名称不能与已经存在的消费者组名称冲突

  • 主题名称不能与已经存在的主题名称冲突

查看主题信息

通过list指令来查看当前Kafka集群中所有可用的主题

kafka-topics.sh --bootstrap-server localhost:9092 --list

topic 日志名称,kafka,linq,数据库

通过describe指令来查看主题的详细信息

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create

topic 日志名称,kafka,linq,数据库

修改主题

当主题被创建之后,依然允许我们对其做一定的修改,比如修改分区数、修改副本因子、修改配置等。 通过alter指令来修改主题的配置信息

# 修改主题的最大消息字节数,配置值从10000修改为20000

kafka-topics.sh --bootstrap-server localhost:9092 \
    --alter --topic topic-config \
    --config max.message.bytes=20000

通过alter指令来修改主题的分区数

kafka-topics.sh --bootstrap-server localhost:9092 \
    --alter --topic topic-create \
    --partitions 6

删除主题

通过delete指令来删除主题

kafka-topics.sh --bootstrap-server localhost:9092 \
    --delete --topic topic-delete

通过delete-config参数来删除之前设置的配置信息

kafka-topics.sh --bootstrap-server localhost:9092 \
    --alter --topic topic-config \
    --delete-config max.message.bytes

手动删除主题

  • 主题中的元数据存储在Zookeeper中的/brokers/topics和/config/topics路径下

  • 主题中的消息数据存储在log.dir或log.dirs配置的路径下,只需要手动删除这些地方的数据即可。

配置管理

kafka-configs.sh脚本用于管理Kafka的配置信息,该脚本位于$KAFKA_HOME/bin目录下 主要包含变更配置alter和查看配置describe两个指令

# 变更主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \
    --alter --entity-type topics --entity-name topic-config \
    --add-config max.message.bytes=128000

# 添加主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \
    --alter --entity-type topics --entity-name topic-config \
    --add-config max.message.bytes=128000

# 查看主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \
    --describe --entity-type topics --entity-name topic-config    

KafkaAdminClient

KafkaAdminClient是Kafka提供的一个管理客户端,用于管理Kafka集群中的资源,比如主题、分区、消费者组等。

TopicCommand基本使用

使用KafkaAdminClient来完成TopicCommand的基本操作

查看主题信息

public class demo{
    public static void describeTopic(){

        String[ ] options = new String[ ]{

                "--bootstrap-server localhost:9092",
                "--describe",
                "--topic", "topic-create"
        };
        kafka.admin.TopicCommand.main(options);
    }
}

创建主题

public class demo{
    public static void createTopic(){

        String[ ] options = new String[ ]{

                "--bootstrap-server localhost:9092",
                "--create",
                "--replication-factor", "1",
                "--partitions", "1",
                "--topic", "topic-create-api"
        };
        kafka.admin.TopicCommand.main(options);
    }
}

查看所有可用主题

public class demo{
    public static void listTopic(){

        String[ ] options = new String[ ]{

                "--bootstrap-server localhost:9092",
                "--list"
        };
        kafka.admin.TopicCommand.main(options);
    }
}

KafkaAdminClient基本使用

KafkaAdminClient可以用来管理broker、配置和ACL(Access Control List),以及管理主题、分区和消费者组等。 KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient,提供了一系列的API来管理Kafka集群中的资源。

AdminClient常见的方法

  • createTopics:创建主题

    • CreateTopicsResult createTopics(Collection newTopics)
  • deleteTopics:删除主题

    • DeleteTopicsResult deleteTopics(Collection topics)
  • listTopics:列出所有可用的主题

    • ListTopicsResult listTopics()
  • describeTopics:查看主题的详细信息

    • DescribeTopicsResult describeTopics(Collection topicNames)
  • describeCluster:查看集群的详细信息

    • DescribeClusterResult describeCluster()
  • describeConfigs:查看配置的详细信息

    • DescribeConfigsResult describeConfigs(Collection resources)
  • alterConfigs:修改配置信息

    • AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs)
  • describeConsumerGroups:查看消费者组的详细信息

    • DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds)
  • listConsumerGroups:列出所有可用的消费者组

    • ListConsumerGroupsResult listConsumerGroups()
  • createPartitions:创建分区文章来源地址https://www.toymoban.com/news/detail-817723.html

    • CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions)
使用KafkaAdminClient创建主题
public class KafkaAdminClientCreateTopic {
    /**
     * 使用AdminClient创建Topic
     *
     * 创建完成之后使用如下脚本进行检查
     * 进入KAFKA_HOME/bin
     * 执行 ./kafka-topics.sh --bootstrap-server localhost:9092 --list
     */
    public static void createTopic(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(props);
        NewTopic newTopic = new NewTopic("topic-create-api", 1, (short) 1);
        // 创建主题的方法内部是通过发送CreateTopicRequest请求来完成的
        CreateTopicsResult result = adminClient.createTopics(Arrays.asList(newTopic));
        try {
            result.all().get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 使用完之后需要关闭AdminClient,释放资源
        adminClient.close();
    }


    public static void main(String[ ] args) {

        createTopic();
    }
}
使用KafkaAdminClient查看主题信息
public class KafkaAdminClientDescribeTopic {
    /**
     * 使用AdminClient查看Topic信息
     */
    public static void describeTopic(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(props);
        DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("topic-create-api"));
        try {
            Map<String, TopicDescription> map = result.all().get();
            for (Map.Entry<String, TopicDescription> entry : map.entrySet()) {
                System.out.println(entry.getKey() + " : " + entry.getValue());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 使用完之后需要关闭AdminClient,释放资源
        adminClient.close();
    }


    public static void main(String[ ] args) {

        describeTopic();
    }
}
使用KafkaAdminClient查看所有可用的主题
public class KafkaAdminClientListTopic {
    /**
     * 使用AdminClient查看所有可用的Topic
     */
    public static void listTopic(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(props);
        ListTopicsResult result = adminClient.listTopics();
        try {
            Set<String> set = result.names().get();
            for (String s : set) {
                System.out.println(s);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 使用完之后需要关闭AdminClient,释放资源
        adminClient.close();
    }


    public static void main(String[ ] args) {

        listTopic();
    }
}
使用KafkaAdminClient创建分区
public class KafkaAdminClientCreatePartition {
    /**
     * 使用AdminClient创建分区
     */
    public static void createPartition(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(props);
        Map<String, NewPartitions> map = new HashMap<>();
        NewPartitions newPartitions = NewPartitions.increaseTo(2);
        map.put("topic-create-api", newPartitions);
        CreatePartitionsResult result = adminClient.createPartitions(map);
        try {
            result.all().get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 使用完之后需要关闭AdminClient,释放资源
        adminClient.close();
    }


    public static void main(String[ ] args) {

        createPartition();
    }
}
使用KafkaAdminClient删除主题
public class KafkaAdminClientDeleteTopic {
    /**
     * 使用AdminClient删除Topic
     */
    public static void deleteTopic(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(props);
        DeleteTopicsResult result = adminClient.deleteTopics(Arrays.asList("topic-create-api"));
        try {
            result.all().get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 使用完之后需要关闭AdminClient,释放资源
        adminClient.close();
    }


    public static void main(String[ ] args) {

        deleteTopic();
    }
}
使用KafkaAdminClient修改主题配置
public class KafkaAdminClientAlterTopic {
    /**
     * 使用AdminClient修改Topic配置
     */
    public static void alterTopic(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(props);
        ConfigEntry configEntry = new ConfigEntry("max.message.bytes", "128000");
        Config config = new Config(Arrays.asList(configEntry));
        Map<ConfigResource, Config> map = new HashMap<>();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-create-api");
        map.put(configResource, config);
        AlterConfigsResult result = adminClient.alterConfigs(map);
        try {
            result.all().get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 使用完之后需要关闭AdminClient,释放资源
        adminClient.close();
    }


    public static void main(String[ ] args) {

        alterTopic();
    }
}
使用KafkaAdminClient查看主题配置
public class KafkaAdminClientDescribeTopicConfig {
    /**
     * 使用AdminClient查看Topic配置
     */
    public static void describeTopicConfig(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(props);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-create-api");
        DescribeConfigsResult result = adminClient.describeConfigs(Arrays.asList(configResource));
        try {
            Map<ConfigResource, Config> map = result.all().get();
            for (Map.Entry<ConfigResource, Config> entry : map.entrySet()) {
                System.out.println(entry.getKey() + " : " + entry.getValue());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 使用完之后需要关闭AdminClient,释放资源
        adminClient.close();
    }


    public static void main(String[ ] args) {

        describeTopicConfig();
    }
}

到了这里,关于Kafka-Topic&Partition的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka--kafka的基本概念-topic和partition

    topic是逻辑概念 以Topic机制来对消息进行分类的,同一类消息属于同一个Topic,你可以将每个topic看成是一个消息队列。 生产者(producer)将消息发送到相应的Topic,而消费者(consumer)通过从Topic拉取消息来消费 kafka中是要求消费者主动拉取消息消费的,它并不会主动推送消息

    2024年02月12日
    浏览(45)
  • Kafka 清空Topic

    测试环境某topic的数据格式发生了更改,需要将原有数据清空重新生产数据。 还需检查当前kafka topic对应的分区副本(假定单分区单副本)大小,用于验证数据是否已删除。 将过期时间设置为1秒 直到该文件夹显示大小是20K即证明数据已清理 同时需注意,数据清理时机受ser

    2024年01月16日
    浏览(44)
  • kafka删除topic

    1. 首先需要在config/server.properties中添加 delete.topic.enable=true 属性 2. 用topic list找到想要删除的topic名称 3. 执行删除命令

    2024年02月11日
    浏览(43)
  • kafka topic分区数设定

    然后假设总的目标吞吐量是Tt,那么分区数=Tt / min(Tp,Tc) 例如:producer吞吐量 = 70m/s;consumer吞吐量 =100m/s,期望吞吐量 300m/s; 分区数 = 300 / 70 = 4或者5个分区

    2024年02月02日
    浏览(55)
  • Kafka/Spark-01消费topic到写出到topic

    消费者代码 注意点 consumerConfigs是定义的可变的map的类型的,具体如下 consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)是为了不限制groupId特意写的传参 是使用自带的kafka工具类createDirectStream方法去消费kafak 的数据,详细参数解释如下 Subscribe传参需要指定泛型,这边指定string,

    2024年02月09日
    浏览(35)
  • kafka单独设置topic过期时间

    kafka 默认存放7天的临时数据,如果遇到磁盘空间小,存放数据量大,可以设置缩短这个时间。 一、全局设置 修改 server.properties,如下的值: 二、单独对某一个topic设置过期时间 但如果只有某一个topic数据量过大,想单独对这个topic的过期时间设置短点: 三、查看设置: 四、

    2024年02月12日
    浏览(41)
  • 监控kafka topic,钉钉报警

    前几天公司我们部门需要演示一个应用,应用依赖kafka的数据,但是kafka的数据来自其他部门的投递。 一些原因导致数据无法给到,导致我们部门的演示也很有问题,所以想做一个简单的kafka topic的监控,在没有数据的时候及时发现并找兄弟部门沟通 这里记录下原因,因为机

    2024年02月12日
    浏览(29)
  • kafka @KafkaListener 动态接收topic

    @KafkaListener 里边的 topics 必须是常量,不可以是变量 但是某些业务场景 kafka定义的topic会不同这时候就需要传入变量才可以实现 具体实现方式如下: KafkaListener 监听方法 #{}  这里边是方法名称  这里是获取topic 其实可以在对应的@Bean里边写逻辑方法去处理 这里用到了获取配

    2024年02月13日
    浏览(39)
  • Kafka - Topic命令 & 命令行操作

    目录 零、前置 一、Topic命令 查看当前服务器中的所有 topic 创建 first topic 查看 first 主题的详情 修改分区数量 删除 topic 二、生产者命令行操作 发送消息 三、消费者命令行操作 消费 first 主题中的数据 Kafka集群的搭建:Kafka + Zookeeper + Hadoop 集群配置 参数 功能 --bootstrap-server

    2024年02月16日
    浏览(36)
  • Kafka某个Topic无法消费问题

    12月28日,公司测试环境Kafka的task.build.metadata.flow这个topic突然无法消费。 其他topic都正常使用,这个topic只有一个分区,并且只有一个消费者 首先登录服务器,运行kafka的cli命令,查看消费者组的详情。 由上图可以发现,task.build.metadata.flow这个topic,最新offset是2,但是当前o

    2024年02月03日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包