Kafka:Topic概念与API介绍

这篇具有很好参考价值的文章主要介绍了Kafka:Topic概念与API介绍。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Topic

事件被组织并持久地存储在Topic中,Topic类似于文件系统中的文件夹,事件就是该文件夹中的文件。Kafka中的Topic始终是多生产者和多订阅者:一个Topic可以有零个、一个或多个生产者向其写入事件,也可以有零个、一个或多个消费者订阅这些事件。Topic中的事件可以根据需要随时读取,与传统的消息中间件不同,事件在使用后不会被删除,相反,可以通过配置来定义Kafka中每个Topic应该保留事件的时间,超过该事件后旧事件将被丢弃。Kafka的性能在数据大小方面实际上是恒定的,因此长时间存储数据是非常好的。

Partition

Topic是分区的,这意味着一个Topic可以分布在多个Kafka节点上。数据的这种分布式放置对于可伸缩性非常重要,因为它允许客户端应用程序同时从多个Kafka节点读取和写入数据。将新事件发布到Topic时,它实际上会appendedTopic的一个Partition中。具有相同事件key的事件将写入同一PartitionKafka保证给定TopicPartition的任何使用者都将始终以与写入时完全相同的顺序读取该分区的事件。

Replication

为了使数据具有容错性和高可用性,每个Topic都可以有多个Replication,以便始终有多个Kafka节点具有数据副本,以防出现问题。常见的生产设置是replicationFactor3,即始终有三份数据副本(包括一份原始数据)。此ReplicationTopicPartition级别执行。

Kafka在指定数量(通过replicationFactor)的服务器上复制每个TopicPartition,这允许在集群中的某些服务器发生故障时进行自动故障转移,以便在出现故障时服务仍然可用。Replication的单位是TopicPartition。在非故障条件下,Kafka中的每个Partition都有一个leader和零个或多个followerreplicationFactor是复制副本(包括leader)的总数。所有读和写操作都将转到Partitionleader上。通常,有比Kafka节点多得多的Partition,并且这些PartitionleaderKafka节点之间均匀分布。follower上的数据需要与leader的数据同步,所有数据都具有相同的偏移量和顺序(当然,在任何给定时间,leader的数据末尾可能有一些尚未复制的数据)。follower会像普通Kafka消费者一样使用来自leader的消息,并将其应用到自己的数据中。如下图所示,三个Kafka节点上有两个TopicTopic 0Topic 1),Topic 0有两个Partition并且replicationFactor3(红色的Partitionleader),Topic 1有三个PartitionreplicationFactor也为3(红色的Partitionleader)。
Kafka:Topic概念与API介绍

API

添加依赖:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>

这里使用的kafka-clients版本和博主之前部署的Kafka版本一致:

  • Kafka:部署Kafka

client

操作Topic的客户端通过AdminClient抽象类来创建,源码如下:

package org.apache.kafka.clients.admin;

import java.util.Map;
import java.util.Properties;

public abstract class AdminClient implements Admin {

    /**
     * 使用给定的配置创建一个新的Admin
     * props:Admin的配置
     * 返回KafkaAdminClient实例
     */
    public static AdminClient create(Properties props) {
        return (AdminClient) Admin.create(props);
    }

    /**
     * 重载方法
     * 使用给定的配置创建一个新的Admin
     * props:Admin的配置
     * 返回KafkaAdminClient实例
     */
    public static AdminClient create(Map<String, Object> conf) {
        return (AdminClient) Admin.create(conf);
    }
}

实际上会返回一个KafkaAdminClient实例(KafkaAdminClient类是AdminClient抽象类的子类),KafkaAdminClient类的方法比较多,其中private方法服务于public方法(提供给用户的服务)。
Kafka:Topic概念与API介绍
KafkaAdminClient类提供的public方法是对Admin接口的实现。
Kafka:Topic概念与API介绍

create

创建一个新的Topic

package com.kaven.kafka.admin;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

public class Admin {

    private static final AdminClient adminClient = Admin.getAdminClient();

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        Admin admin = new Admin();
        admin.createTopic();
        Thread.sleep(100000);
    }

    public void createTopic() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        CreateTopicsResult topics = adminClient.createTopics(
                Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
        );
        Map<String, KafkaFuture<Void>> values = topics.values();
        values.forEach((name, future) -> {
            future.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                latch.countDown();
            });
        });
        latch.await();
    }

    public static AdminClient getAdminClient() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
        return AdminClient.create(properties);
    }
}

创建AdminClient(简单使用,配置BOOTSTRAP_SERVERS_CONFIG就可以了):

    public static AdminClient getAdminClient() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
        return AdminClient.create(properties);
    }

创建Topic(传入一个NewTopic实例,并且给该NewTopic实例配置namenumPartitionsreplicationFactor):

    public void createTopic() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        CreateTopicsResult topics = adminClient.createTopics(
                Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
        );
        Map<String, KafkaFuture<Void>> values = topics.values();
        values.forEach((name, future) -> {
            future.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                latch.countDown();
            });
        });
        latch.await();
    }

提供的方法大都是异步编程模式的,这些基础知识就不介绍了,输出如下图所示:
Kafka:Topic概念与API介绍

list

获取Topic列表。

    public void listTopics() throws ExecutionException, InterruptedException {
        ListTopicsResult listTopicsResult = adminClient.
                listTopics(new ListTopicsOptions().listInternal(true));
        Set<String> names = listTopicsResult.names().get();
        names.forEach(System.out::println);
    }

get方法会等待future完成,然后返回其结果。输出如下图所示:
Kafka:Topic概念与API介绍
通过下面这个配置,可以获取到Kafka内置的Topic

new ListTopicsOptions().listInternal(true)

默认是不会获取到Kafka内置的Topic

    public void listTopics() throws ExecutionException, InterruptedException {
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        Set<String> names = listTopicsResult.names().get();
        names.forEach(System.out::println);
    }

Kafka:Topic概念与API介绍

delete

删除Topic

    public void deleteTopic() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(2);
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("java-client4", "java-client2"));
        deleteTopicsResult.topicNameValues().forEach((name, future) -> {
            future.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                latch.countDown();
            });
        });
        latch.await();
    }

输出如下图所示:
Kafka:Topic概念与API介绍
现在再获取Topic的列表,输出如下图所示(删除的Topic已经不在了):
Kafka:Topic概念与API介绍

describe

获取Topic的描述。

    public void describeTopic() {
        Map<String, KafkaFuture<TopicDescription>> values =
                adminClient.describeTopics(Collections.singleton("new-topic-kaven")).values();
        for (String name : values.keySet()) {
            values.get(name).whenComplete((describe, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                System.out.println(describe);
            });
        }
    }

输出如下图所示:
Kafka:Topic概念与API介绍
输出符合预期,因为创建该Topic的配置为:

new NewTopic("new-topic-kaven", 1, (short) 1)

config

获取Topic的配置。

    public void describeTopicConfig() throws ExecutionException, InterruptedException {
        DescribeConfigsResult describeConfigsResult = adminClient
                .describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven")));
        describeConfigsResult.all().get().forEach(((configResource, config) -> {
            System.out.println(configResource);
            System.out.println(config);
        }));
    }

输出如下图所示:
Kafka:Topic概念与API介绍
describeConfigs方法很显然还可以获取其他资源的配置(通过指定资源的类型)。

    public enum Type {
        BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
        ...
    }

alter

增量更新Topic的配置。

    public void incrementalAlterConfig() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        
        Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();
        alter.put(
                new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
                Collections.singletonList(
                        new AlterConfigOp(
                                new ConfigEntry("compression.type", "gzip"), 
                                AlterConfigOp.OpType.SET
                        )
                )
                
        );
        
        AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(alter);
        alterConfigsResult.values().forEach(((configResource, voidKafkaFuture) -> {
            voidKafkaFuture.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(configResource);
                latch.countDown();
            });
        }));
        latch.await();
    }

输出如下图所示:
Kafka:Topic概念与API介绍
很显然incrementalAlterConfigs方法也可以增量更新其他资源的配置(通过指定资源的类型)。

ConfigResource定义需要修改配置的资源,Collection<AlterConfigOp>定义该资源具体的配置修改操作。

Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();

configEntry定义资源需要修改的配置条目,operationType定义修改操作的类型。

    public AlterConfigOp(ConfigEntry configEntry, OpType operationType) {
        this.configEntry = configEntry;
        this.opType =  operationType;
    }

修改操作的类型。

    public enum OpType {
        /**
         * 设置配置条目的值
         */
        SET((byte) 0),
        /**
         * 将配置条目恢复为默认值(可能为空)
         */
        DELETE((byte) 1),
        /**
         * 仅适用于列表类型的配置条目
         * 将指定的值添加到配置条目的当前值
         * 如果尚未设置配置值,则添加到默认值
         */
        APPEND((byte) 2),
        /**
         * 仅适用于列表类型的配置条目
         * 从配置条目的当前值中删除指定的值
         * 删除当前不在配置条目中的值是合法的
         * 从当前配置值中删除所有条目会留下一个空列表,并且不会恢复为条目的默认值
         */
        SUBTRACT((byte) 3);
        ...
    }

资源的配置条目,包含配置名称、值等。

public class ConfigEntry {

    private final String name;
    private final String value;
    private final ConfigSource source;
    private final boolean isSensitive;
    private final boolean isReadOnly;
    private final List<ConfigSynonym> synonyms;
    private final ConfigType type;
    private final String documentation;
    ...
}

在获取Topic配置的输出中也可以发现这些配置条目。
Kafka:Topic概念与API介绍
很显然,这里修改名称为new-topic-kavenTopiccompression.type配置条目(压缩类型)。

        alter.put(
                new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
                Collections.singletonList(
                        new AlterConfigOp(
                                new ConfigEntry("compression.type", "gzip"), 
                                AlterConfigOp.OpType.SET
                        )
                )
                
        );

compression.type配置条目的默认值为producer(意味着保留生产者设置的原始压缩编解码器),和上面的图也对应,博主将该配置条目修改成了gzip
Kafka:Topic概念与API介绍
再来获取该Topic的配置,如下图所示(很显然配置修改成功了):
Kafka:Topic概念与API介绍
KafkaTopic概念与API介绍就到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。文章来源地址https://www.toymoban.com/news/detail-451810.html

到了这里,关于Kafka:Topic概念与API介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用Kafka客户端(spring-kafka)的Java API操作Kafka的Topic

    记录 :458 场景 :在Spring Boot微服务集成Kafka客户端spring-kafka-2.8.2操作Kafka的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka安装 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析

    2024年02月09日
    浏览(32)
  • 使用Kafka客户端(kafka-clients)的Java API操作Kafka的Topic

    记录 :460 场景 :在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安装 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析

    2024年02月09日
    浏览(52)
  • 【项目实战】Kafka中Topic创建介绍

    Kafka是一种分布式流处理平台,它可以处理实时数据流,支持高吞吐量、低延迟的数据处理。 它通过Topic和Partition机制将消息存储在集群中,并支持高吞吐量的消息发布和订阅。 Topic可以看作是一个消息队列 生产者将消息发送到Topic中,消费者从Topic中消费消息。 生产者将消

    2024年02月09日
    浏览(37)
  • (五)kafka从入门到精通之topic介绍

    Kafka是一个流行的分布式消息系统,它的核心是一个由多个节点组成的分布式集群。在Kafka中,数据被分割成多个小块,并通过一些复杂的算法在节点之间传递。这些小块被称为Kafka Topic。 一个Topic是一组具有相同主题的消息。可以将Topic看作是一个数据仓库,在这个仓库中存

    2024年02月12日
    浏览(25)
  • kafka 基础概念、命令行操作(查看所有topic、创建topic、删除topic、查看某个Topic的详情、修改分区数、发送消息、消费消息、 查看消费者组 、更新消费者的偏移位置)

    kafka官网 Broker   一台kafka服务器就是一个broker,可容纳多个topic。一个集群由多个broker组成; Producer   生产者,即向kafka的broker-list发送消息的客户端; Consumer   消费者,即向kafka的broker-list订阅消息的客户端; Consumer Group   消费者组是 逻辑上的一个订阅者 ,由多个

    2024年02月01日
    浏览(40)
  • kafka配置大全broker、topic、生产者和消费者等配置介绍

    每个kafka broker中配置文件 server.properties 默认必须配置的属性如下: **bootstrap.servers** - 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 **key

    2024年02月05日
    浏览(38)
  • 如何借助Kafka持久化存储K8S事件数据?

    大家应该对 Kubernetes Events 并不陌生,特别是当你使用 kubectl describe 命令或 Event API 资源来了解集群中的故障时。     尽管这些信息十分有用,但它只是临时的,保留时间最长为30天。如果出于审计或是故障诊断等目的,你可能想要把这些信息保留得更久,比如保存在像 Kafka

    2024年02月05日
    浏览(38)
  • 【Kafka】Kafka介绍、架构和概念

    Kafka是是一个优秀的分布式消息中间件,关于常用的消息中间件对比可参考文章:消息中间件概述。 Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多生产者、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、

    2024年01月22日
    浏览(40)
  • 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月04日
    浏览(31)
  • kafka topic和topic权限操作

    创建 查询 删除 增加 移除 查询

    2024年02月12日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包