Topic
事件被组织并持久地存储在Topic
中,Topic
类似于文件系统中的文件夹,事件就是该文件夹中的文件。Kafka
中的Topic
始终是多生产者和多订阅者:一个Topic
可以有零个、一个或多个生产者向其写入事件,也可以有零个、一个或多个消费者订阅这些事件。Topic
中的事件可以根据需要随时读取,与传统的消息中间件不同,事件在使用后不会被删除,相反,可以通过配置来定义Kafka
中每个Topic
应该保留事件的时间,超过该事件后旧事件将被丢弃。Kafka
的性能在数据大小方面实际上是恒定的,因此长时间存储数据是非常好的。
Partition
Topic
是分区的,这意味着一个Topic
可以分布在多个Kafka
节点上。数据的这种分布式放置对于可伸缩性非常重要,因为它允许客户端应用程序同时从多个Kafka
节点读取和写入数据。将新事件发布到Topic
时,它实际上会appended
到Topic
的一个Partition
中。具有相同事件key
的事件将写入同一Partition
,Kafka
保证给定Topic
的Partition
的任何使用者都将始终以与写入时完全相同的顺序读取该分区的事件。
Replication
为了使数据具有容错性和高可用性,每个Topic
都可以有多个Replication
,以便始终有多个Kafka
节点具有数据副本,以防出现问题。常见的生产设置是replicationFactor
为3
,即始终有三份数据副本(包括一份原始数据)。此Replication
在Topic
的Partition
级别执行。
Kafka
在指定数量(通过replicationFactor
)的服务器上复制每个Topic
的Partition
,这允许在集群中的某些服务器发生故障时进行自动故障转移,以便在出现故障时服务仍然可用。Replication
的单位是Topic
的Partition
。在非故障条件下,Kafka
中的每个Partition
都有一个leader
和零个或多个follower
。replicationFactor
是复制副本(包括leader
)的总数。所有读和写操作都将转到Partition
的leader
上。通常,有比Kafka
节点多得多的Partition
,并且这些Partition
的leader
在Kafka
节点之间均匀分布。follower
上的数据需要与leader
的数据同步,所有数据都具有相同的偏移量和顺序(当然,在任何给定时间,leader
的数据末尾可能有一些尚未复制的数据)。follower
会像普通Kafka
消费者一样使用来自leader
的消息,并将其应用到自己的数据中。如下图所示,三个Kafka
节点上有两个Topic
(Topic 0
和Topic 1
),Topic 0
有两个Partition
并且replicationFactor
为3
(红色的Partition
为leader
),Topic 1
有三个Partition
,replicationFactor
也为3
(红色的Partition
为leader
)。
API
添加依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
这里使用的kafka-clients
版本和博主之前部署的Kafka
版本一致:
- Kafka:部署Kafka
client
操作Topic
的客户端通过AdminClient
抽象类来创建,源码如下:
package org.apache.kafka.clients.admin;
import java.util.Map;
import java.util.Properties;
public abstract class AdminClient implements Admin {
/**
* 使用给定的配置创建一个新的Admin
* props:Admin的配置
* 返回KafkaAdminClient实例
*/
public static AdminClient create(Properties props) {
return (AdminClient) Admin.create(props);
}
/**
* 重载方法
* 使用给定的配置创建一个新的Admin
* props:Admin的配置
* 返回KafkaAdminClient实例
*/
public static AdminClient create(Map<String, Object> conf) {
return (AdminClient) Admin.create(conf);
}
}
实际上会返回一个KafkaAdminClient
实例(KafkaAdminClient
类是AdminClient
抽象类的子类),KafkaAdminClient
类的方法比较多,其中private
方法服务于public
方法(提供给用户的服务)。KafkaAdminClient
类提供的public
方法是对Admin
接口的实现。
create
创建一个新的Topic
。
package com.kaven.kafka.admin;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
public class Admin {
private static final AdminClient adminClient = Admin.getAdminClient();
public static void main(String[] args) throws InterruptedException, ExecutionException {
Admin admin = new Admin();
admin.createTopic();
Thread.sleep(100000);
}
public void createTopic() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
CreateTopicsResult topics = adminClient.createTopics(
Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
);
Map<String, KafkaFuture<Void>> values = topics.values();
values.forEach((name, future) -> {
future.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
latch.countDown();
});
});
latch.await();
}
public static AdminClient getAdminClient() {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
return AdminClient.create(properties);
}
}
创建AdminClient
(简单使用,配置BOOTSTRAP_SERVERS_CONFIG
就可以了):
public static AdminClient getAdminClient() {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
return AdminClient.create(properties);
}
创建Topic
(传入一个NewTopic
实例,并且给该NewTopic
实例配置name
、numPartitions
、replicationFactor
):
public void createTopic() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
CreateTopicsResult topics = adminClient.createTopics(
Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
);
Map<String, KafkaFuture<Void>> values = topics.values();
values.forEach((name, future) -> {
future.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
latch.countDown();
});
});
latch.await();
}
提供的方法大都是异步编程模式的,这些基础知识就不介绍了,输出如下图所示:
list
获取Topic
列表。
public void listTopics() throws ExecutionException, InterruptedException {
ListTopicsResult listTopicsResult = adminClient.
listTopics(new ListTopicsOptions().listInternal(true));
Set<String> names = listTopicsResult.names().get();
names.forEach(System.out::println);
}
get
方法会等待future
完成,然后返回其结果。输出如下图所示:
通过下面这个配置,可以获取到Kafka
内置的Topic
。
new ListTopicsOptions().listInternal(true)
默认是不会获取到Kafka
内置的Topic
。
public void listTopics() throws ExecutionException, InterruptedException {
ListTopicsResult listTopicsResult = adminClient.listTopics();
Set<String> names = listTopicsResult.names().get();
names.forEach(System.out::println);
}
delete
删除Topic
。
public void deleteTopic() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("java-client4", "java-client2"));
deleteTopicsResult.topicNameValues().forEach((name, future) -> {
future.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
latch.countDown();
});
});
latch.await();
}
输出如下图所示:
现在再获取Topic
的列表,输出如下图所示(删除的Topic
已经不在了):
describe
获取Topic
的描述。
public void describeTopic() {
Map<String, KafkaFuture<TopicDescription>> values =
adminClient.describeTopics(Collections.singleton("new-topic-kaven")).values();
for (String name : values.keySet()) {
values.get(name).whenComplete((describe, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
System.out.println(describe);
});
}
}
输出如下图所示:
输出符合预期,因为创建该Topic
的配置为:
new NewTopic("new-topic-kaven", 1, (short) 1)
config
获取Topic
的配置。
public void describeTopicConfig() throws ExecutionException, InterruptedException {
DescribeConfigsResult describeConfigsResult = adminClient
.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven")));
describeConfigsResult.all().get().forEach(((configResource, config) -> {
System.out.println(configResource);
System.out.println(config);
}));
}
输出如下图所示:describeConfigs
方法很显然还可以获取其他资源的配置(通过指定资源的类型)。
public enum Type {
BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
...
}
alter
增量更新Topic
的配置。
public void incrementalAlterConfig() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();
alter.put(
new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
Collections.singletonList(
new AlterConfigOp(
new ConfigEntry("compression.type", "gzip"),
AlterConfigOp.OpType.SET
)
)
);
AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(alter);
alterConfigsResult.values().forEach(((configResource, voidKafkaFuture) -> {
voidKafkaFuture.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(configResource);
latch.countDown();
});
}));
latch.await();
}
输出如下图所示:
很显然incrementalAlterConfigs
方法也可以增量更新其他资源的配置(通过指定资源的类型)。
ConfigResource
定义需要修改配置的资源,Collection<AlterConfigOp>
定义该资源具体的配置修改操作。
Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();
configEntry
定义资源需要修改的配置条目,operationType
定义修改操作的类型。
public AlterConfigOp(ConfigEntry configEntry, OpType operationType) {
this.configEntry = configEntry;
this.opType = operationType;
}
修改操作的类型。
public enum OpType {
/**
* 设置配置条目的值
*/
SET((byte) 0),
/**
* 将配置条目恢复为默认值(可能为空)
*/
DELETE((byte) 1),
/**
* 仅适用于列表类型的配置条目
* 将指定的值添加到配置条目的当前值
* 如果尚未设置配置值,则添加到默认值
*/
APPEND((byte) 2),
/**
* 仅适用于列表类型的配置条目
* 从配置条目的当前值中删除指定的值
* 删除当前不在配置条目中的值是合法的
* 从当前配置值中删除所有条目会留下一个空列表,并且不会恢复为条目的默认值
*/
SUBTRACT((byte) 3);
...
}
资源的配置条目,包含配置名称、值等。
public class ConfigEntry {
private final String name;
private final String value;
private final ConfigSource source;
private final boolean isSensitive;
private final boolean isReadOnly;
private final List<ConfigSynonym> synonyms;
private final ConfigType type;
private final String documentation;
...
}
在获取Topic
配置的输出中也可以发现这些配置条目。
很显然,这里修改名称为new-topic-kaven
的Topic
的compression.type
配置条目(压缩类型)。文章来源:https://www.toymoban.com/news/detail-451810.html
alter.put(
new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
Collections.singletonList(
new AlterConfigOp(
new ConfigEntry("compression.type", "gzip"),
AlterConfigOp.OpType.SET
)
)
);
compression.type
配置条目的默认值为producer
(意味着保留生产者设置的原始压缩编解码器),和上面的图也对应,博主将该配置条目修改成了gzip
。
再来获取该Topic
的配置,如下图所示(很显然配置修改成功了):Kafka
的Topic
概念与API
介绍就到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。文章来源地址https://www.toymoban.com/news/detail-451810.html
到了这里,关于Kafka:Topic概念与API介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!