Kafka动态认证SASL/SCRAM配置+整合springboot配置

这篇具有很好参考价值的文章主要介绍了Kafka动态认证SASL/SCRAM配置+整合springboot配置。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

记录:

zookeeper启动命令:

[root@master-yzjgxh2571705819-1651919082731-99-0727183 bin]# ./zkServer.sh start
[root@master-yzjgxh2571705819-1651919082731-99-0727183 bin]# ./zkServer.sh stop

kafka启动命令:

/data/program/kafka2.12/bin/kafka-server-start.sh /data/program/kafka2.12/config/server.properties

创建SCRAM证书

1)创建broker建通信用户:admin(在使用sasl之前必须先创建,否则启动报错)

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter 
--add-config 'SCRAM-SHA-256=[password=admin-sec],
SCRAM-SHA-512=[password=admin-sec]' --entity-type users --entity-name admin

2)创建生产用户:producer

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter 
--add-config 'SCRAM-SHA-256=[iterations=8192,password=prod-sec],
SCRAM-SHA-512=[password=prod-sec]' --entity-type users --entity-name producer

 3)创建消费用户:consumer

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter 
--add-config 'SCRAM-SHA-256=[iterations=8192,password=cons-sec],
SCRAM-SHA-512=[password=cons-sec]' --entity-type users --entity-name consumer

SCRAM-SHA-256/SCRAM-SHA-512是对密码加密的算法,二者有其一即可

查看SCRAM证书

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name consumer
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name producer

服务端配置

1)创建JAAS文件

vi config/kafka_server_jaas.conf

 内容:

 KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-sec";
};

2)将JAAS配置文件位置作为JVM参数传递给每个Kafka Broker【bin/kafka-server-start.sh】添加

exec $base_dir/kafka-run-class.sh 
$EXTRA_ARGS -Djava.security.auth.login.config
=/home/test/kiki/kafka/ka/config/kafka_server_jaas.conf kafka.Kafka "$@"

操作:

vi bin/kafka-server-start.sh 

 修改最后一行

#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data/program/kafka2.12/config/kafka_server_jaas.conf kafka.Kafka "$@"

3)配置server.properties【config/server.properties】

#认证配置
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
#ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

当前测试kafka端口为8100

 vi config/server.properties 

#listeners=PLAINTEXT://:8100

listeners=SASL_PLAINTEXT://:8100
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512

advertised.listeners=SASL_PLAINTEXT://183.56.218.28:8100

#ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

个人备注:

log.dirs=/data/program/kafka2.12/data

zookeeper.connect=183.56.218.28:2181

SCRAM-SHA-512与SCRAM-SHA-216可互相更改,看需要什么类型。PLAINTEXT为不需要认证

4)重启Kafka和Zookeeper

客户端配置

 1)为我们创建的三个用户分别创建三个JAAS文件:分别命名为
kafka_client_scram_admin_jaas.conf
kafka_client_scram_producer_jaas.conf
kafka_client_scram_consumer_jaas.conf

vi config/kafka_client_scram_admin_jaas.conf

KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-sec"; };

vi config/kafka_client_scram_producer_jaas.conf

KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="producer" password="prod-sec"; };

vi config/kafka_client_scram_consumer_jaas.conf

KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="cons-sec"; };

2)修改启动脚本引入JAAS文件:
生产者配置:
配置bin/kafka-console-producer.sh

#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

exec $(dirname $0)/kafka-run-class.sh 
-Djava.security.auth.login.config
=/data/program/kafka2.12/config/kafka_client_scram_producer_jaas.conf

消费者配置:
配置bin/kafka-console-consumer.sh

#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

exec $(dirname $0)/kafka-run-class.sh 
-Djava.security.auth.login.config
=/data/program/kafka2.12/config/kafka_client_scram_consumer_jaas.conf

3)配置consumer.properties和producer.properties,都要加入以下配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512

4)创建主题

[test@police ka]$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 2 --replication-factor 1

5)启动生产(ps:结束也未能成功测试该命令是否能用,后面在代码方面配置就好)

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test --producer.config config/producer.properties

发现会报权限相关的错

6)对生产者赋予写的权限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add
 --allow-principal User:producer --operation Write --topic test

7)对消费者赋予读的权限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add
 --allow-principal User:consumer --operation Read --topic test

此时启动消费者(ps:结束也未能成功测试该命令是否能用,后面在代码方面配置就好)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer.properties

此时依旧会报错,报未对消费者组授权。给groupId配权

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:consumer --operation Read --group test-group

此时再启动消费者,可以发现能正常消费生产者的消息

8)查看权限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer 
--authorizer-properties zookeeper.connect=localhost:2181 --list

springboot整合配置

权限主要配置部分格式:

      props.put("security.protocol", "SASL_PLAINTEXT");
      props.put("sasl.mechanism", "SCRAM-SHA-512");
      props.put("sasl.jaas.config",
            "org.apache.kafka.common.security.scram.ScramLoginModule required username='easy' password='easy1234';");

生产者:

//异步发送
    @Test
    fun customProducer() {
        //配置
        val properties = Properties()
        //链接kafka
        properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8100"
        //指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        properties[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_PLAINTEXT"
        properties[SaslConfigs.SASL_MECHANISM] = "SCRAM-SHA-512"
        properties[SaslConfigs.SASL_JAAS_CONFIG] = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"producer\" password=\"prod-sec\";"
        val kafkaProducer = KafkaProducer<String, String>(properties)
        //发送数据
        for (i in 0 until 1) {
            //黏性发送,达到设置的数据最大值/时间后,切换分区(不会是当前分区)
            kafkaProducer.send(ProducerRecord("test", "我是成功:::${LocalDateTime.now()}"))
        }//"type":"UPDATE/ADD/DELETE"
        //关闭资源
        kafkaProducer.close()
    }

消费者

package com.umh.medicalbookingplatform.background.config

import com.umh.medicalbookingplatform.core.properties.ApplicationProperties
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory


/**
 * @Description :
 * @Author  xiaomh
 * @date  2022/8/30 14:14
 */

@EnableKafka
@Configuration
class KafkaConsumerConfig {

    @Autowired
    private lateinit var appProperties: ApplicationProperties

    @Bean
    fun consumerFactory(): ConsumerFactory<String?, String?> {
        val props: MutableMap<String, Any> = HashMap()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = appProperties.kafkaBootstrapServersConfig.toString()
        props[ConsumerConfig.GROUP_ID_CONFIG] = appProperties.kafkaGroupId.toString()
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = appProperties.kafkaSecurityProtocol.toString()
        props[SaslConfigs.SASL_MECHANISM] = appProperties.kafkaSaslMechanism.toString()
        props[SaslConfigs.SASL_JAAS_CONFIG] = appProperties.kafkaSaslJaasConfig.toString()

        return DefaultKafkaConsumerFactory(props)
    }

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String>? {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.setConsumerFactory(consumerFactory())
        return factory
    }
}

yml

kafkaBootstrapServersConfig: xxxxxx:8100
kafkaGroupId: test-group
kafkaSecurityProtocol: SASL_PLAINTEXT
kafkaSaslMechanism: SCRAM-SHA-512
kafkaSaslJaasConfig: org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="cons-sec";

scram-sha-512,kafka,kotlin

 参考

Kafka动态认证SASL/SCRAM验证_慕木兮人可的博客-CSDN博客文章来源地址https://www.toymoban.com/news/detail-793267.html

到了这里,关于Kafka动态认证SASL/SCRAM配置+整合springboot配置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 实战:kafka、zookeeper SASL+ACL实现动态权限认证、授权

    1. 不接入外部存储,基于kafka原生ACL认证 环境: kafka-2.2.1、zookeeper-3.6.3 ​ kafka给我们提供了 SASL/SCRAM 模式,将SASL、ACL规则信息存储到zookeeper中,并且通过 KafkaClientAdmin Api,新增、编辑、删除规则,其特性如下 应用发送、消费实现动态身份认证和授权 基于kafka SASL/SCRAM 模式,

    2023年04月26日
    浏览(38)
  • kafka配置SASL/PLAIN 安全认证

    为zookeeper添加SASL支持,在配置文件zoo.cfg添加 新建zk_server_jaas.conf文件,为Zookeeper添加账号认证信息.这个文件你放在哪里随意,只要后面zkEnv配置正确的路径就好了。我是放在 /opt/zookeeper/conf/home 路径下。zk_server_jaas.conf文件的内容如下 Server { org.apache.kafka.common.security.plain.Plai

    2024年02月10日
    浏览(39)
  • Windows下安装单机Kafka环境及配置SASL身份认证

    zookeeper和kafka都是java开发的,所以安装前先安装1.8版本以上的jdk,并设置环境变量 JAVA_HOME=d:envJavajdk1.8.0_14 1.1 Apache ZooKeeper点击下载地址 Apache ZooKeeper,下载最新版本zookeeper压缩包,解压到本地 1.2 来到 conf文件夹下,复制一份 zoo_sample.cfg ,改名为 zoo.cfg 1.3 在安装目录下新

    2024年02月01日
    浏览(40)
  • 第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证 第二章  Spring Boot 整合 Kafka消息队列 生产者 第三章  Spring Boot 整合 Kafka消息队列 消息者 Kafka下载地址:Apache Kafka 2.1、修改 Zookeeper 配置文件 config/zookeeper.properties 2.2、Zookeeper 配置文件修改内容 2.2、Zookeeper 配置文件增加配置说明

    2024年01月16日
    浏览(42)
  • Kafka增加安全验证安全认证,SASL认证,并通过spring boot-Java客户端连接配置

    公司Kafka一直没做安全验证,由于是诱捕程序故需要面向外网连接,需要增加Kafka连接验证,保证Kafka不被非法连接,故开始研究Kafka安全验证 使用Kafka版本为2.4.0版本,主要参考官方文档 官网对2.4版本安全验证介绍以及使用方式地址: https://kafka.apache.org/24/documentation.html#secu

    2024年02月01日
    浏览(66)
  • [Kafka集群] 配置支持Brokers内部SSL认证\外部客户端支持SASL_SSL认证并集成spring-cloud-starter-bus-kafka

    目录 Kafka 集群配置 准备 配置流程 Jaas(Java Authentication and Authorization Service )文件 zookeeper 配置文件 SSL自签名 启动zookeeper集群 启动kafka集群  spring-cloud-starter-bus-kafka 集成 下载统一版本Kafka服务包至三台不同的服务器上 文章使用版本为  kafka_2.13-3.5.0.tgz 下载地址 jdk版本 为 Ado

    2024年02月04日
    浏览(56)
  • 单机部署Kafka和开启SASL认证

    操作系统:linux apache-zookeeper-3.8.1-bin.tar.gz kafka_2.13-3.4.0.tgz 1.上传zookeeper 与kafka到 /opt 2、解压 tar -zxvf apache-zookeeper-3.8.1-bin.tar.gz tar -zxvf  kafka_2.13-3.4.0.tgz mv apache-zookeeper-3.8.1-bin zookeeper mv kafka_2.13-3.4.0 kafka 3、修改zookeeper配置文件并启动 修改zoo.cfg,增加以下命令 创建zk_server_j

    2024年01月22日
    浏览(42)
  • Apache zookeeper kafka 开启SASL安全认证_kafka开启认证(1)

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新网络安全全套学习资料》

    2024年04月23日
    浏览(35)
  • kafka安全认证与授权(SASL/PLAIN)

    SASL 鉴权协议,主要用来保证客户端登录服务器的时候,传输的鉴权数据的安全性,SASL是对用户名和密码加解密用的 SSL 是一种间于传输层(比如TCP/IP)和应用层(比如HTTP)的协议。对传输内容进行加密,如HTTPS 如果使用了SASL但是没有使用SSL,那么服务端可以认证客户端的身

    2023年04月25日
    浏览(43)
  • 【kafka+Kraft模式集群+SASL安全认证】

    准备3个kafka,我这里用的kafka版本为:kafka_2.13-3.6.0,下载后解压: 更改解压后的文件名称: cp kafka_2.13-3.6.0 kafka_2.13-3.6.0-1/2/3 分别得到kafka_2.13-3.6.0-1、kafka_2.13-3.6.0-2、kafka_2.13-3.6.0-3 copy一份config/kraft/server.properties配置文件,修改名称 server-sasl.properties 进入各个config/kraft/server

    2024年02月03日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包