在 Kafka 3.3.1 中,可以使用 ACL(Access Control List)控制用户对 topic 的访问权限。以下是一些基本示例:
- 创建一个名为 my-topic 的 topic
在命令行中执行以下命令创建一个名为 my-topic
的 topic:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
- 设置 ACL
a. 在文件system-acls.properties或配置文件KafkaServer或者其他支持的外部ACL模块中设置ACLs规则,例如,我们可以通过修改/config/server.properties 配置文件来添加全局的访问控制规则:
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
#定义默认ACL规则,允许所有人都可以对topic进行读写操作,这很不安全,请不要在生产环境中使用
super.users=User:admin
其中 SimpleAclAuthorizerr
是一种内置的 ACL 校验器,User:admin
是一组超级管理员。如果没有在配置文件中指定 super.users,则只有在 Zookeeper 上配置了访问控制时才会应用 ACL 规则。
b. 为特定用户或组添加 ACL 规则
假设我们要给组开发者(developers)授予对 my-topic 的读写权限,我们可以通过以下命令添加 ACL 规则:
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:developers --operation Read --operation Write --topic my-topic
现在,开发人员组可以对该 topic 进行读写操作。如果你需要撤销授权(删除 ACL),则可以使用以下命令:
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove \
--allow-principal User:developers --operation Read --operation Write --topic my-topic
- 验证 ACL 规则
验证ACL规则的最简单方法是尝试读取或写入某个受保护的 topic。例如,在 java 中可以使用 KafkaProducer
和 KafkaConsumer
API 来测试:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建 Kafka 生产者对象并发送消息
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
Future<RecordMetadata> result = producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
System.out.println(result.get().toString());
}
producer.close();
// 创建 Kafka 消费者对象并订阅 Topic
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition tp = new TopicPartition("my-topic", 0);
consumer.assign(Arrays.asList(tp));
consumer.seekToBeginning(Collections.singletonList(tp));
// 读取消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
参考资料
Kafka系列(五)、开启SASL安全认证以及配置ACL权限控制_sasl_mechanism_王义凯_Rick的博客-CSDN博客
kafka权限控制 - 简书
Kafka SASL/SCRAM+ACL实现动态创建用户及权限控制
Kafka 权限管理实战(最全整理)
Kafka基于Kraft下的权限控制_kafka kraft sasl_songjxin的博客-CSDN博客
【kafka】SSL和ACL的配置 - 知乎
深入浅出 SSL/CA 证书及其相关证书文件(pem、crt、cer、key、csr)
fabric kafka配置SSL+ACL - 简书
kafka+zookeeper集群模式配置SSL认证 – 俗话曰的博客文章来源:https://www.toymoban.com/news/detail-583574.html
Kafka 如何给集群配置Scram账户认证_Smallc0de的博客-CSDN博客文章来源地址https://www.toymoban.com/news/detail-583574.html
到了这里,关于【Kafka集群】Kafka针对用户做ACL权限控制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!