kafka配置SASL/PLAIN 安全认证

这篇具有很好参考价值的文章主要介绍了kafka配置SASL/PLAIN 安全认证。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1 zookeeper配置启动

1.1 zookeeper添加SASL支持

为zookeeper添加SASL支持,在配置文件zoo.cfg添加

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

1.2 zk_server_jaas.conf文件

新建zk_server_jaas.conf文件,为Zookeeper添加账号认证信息.这个文件你放在哪里随意,只要后面zkEnv配置正确的路径就好了。我是放在/opt/zookeeper/conf/home路径下。zk_server_jaas.conf文件的内容如下
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“cluster”
password=“clusterpasswd”
user_kafka=“kafkapasswd”;
};`
username和paasword是zk集群之间的认证密码。
user_kafka=“kafkapasswd"定义了一个用户"kafka”,密码是"kafkapasswd",本次测试用户是kafka broker。

1.3 引入jar包

由上一步可发现,认证方式使用的是Kafka的认证类org.apache.kafka.common.security.plain.PlainLoginModule。因此zk需要依赖几个jar包。
在home下新建zk_sasl_dependency目录,从kafka/lib目录下复制以下几个jar包到该目录下。根据kafka版本不同,几个jar包的版本可能不一样

-rw-r--r-- 1 root root 1893564 Aug 29 10:53 kafka-clients-2.0.0.jar
-rw-r--r-- 1 root root  489884 Aug 29 11:14 log4j-1.2.17.jar
-rw-r--r-- 1 root root  370137 Aug 29 10:53 lz4-java-1.4.1.jar
-rw-r--r-- 1 root root   41203 Aug 29 10:53 slf4j-api-1.7.25.jar
-rw-r--r-- 1 root root   12244 Aug 29 10:53 slf4j-log4j12-1.7.25.jar
-rw-r--r-- 1 root root 2019013 Aug 29 10:53 snappy-java-1.1.7.1.jar
[root@sm_qf-bj_hydgpt_192-168-151-168 home]# 

1.4.修改zkEnv.sh

在zkEnv.sh添加


for i in /opt/zookeeper/conf/home/zk_sasl_dependency/*.jar
do
   CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/opt/zookeeper/conf/home/zk_server_jaas.conf "

1.5.启动zk服务端

执行./zkServer.sh restart重新启动zk。如果启动异常查看日志排查问题

2 kafka配置和启动

2.1.新建kafka_server_jaas.conf,为kafka添加认证信息

KafkaServer {
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="cluster"
 password="cluster"
 user_cluster=“clusterpasswd”
 user_kafka="kafkapasswd" ;
};
Client{
 org.apache.kafka.common.security.plain.PlainLoginModule required  
 username="kafka"  
 password="kafkapasswd";  
};

KafkaServer,第一行指定了认证方法为PLAIN,usernam和password是kafka的多个broker之间进行认证的账号密码。
user_kafka="kafkapasswd"设置了用户kafka,密码为kafkapswd,用于客户端的生产者和消费者连接认证。
网上的说法是 Client,是kafka作为用户使用zk的认证信息,这里的username和password一定要和zk_server_jaas.conf的配置对的上。

2. 2.在kafka的配置文件开启SASL认证

listeners=SASL_PLAINTEXT://(IP):9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN 
sasl.enabled.mechanisms=PLAIN
allow.everyone.if.no.acl.found=true

2.3 .在server启动脚本JVM参数

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
改为
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/kafka_server_jaas.conf"

2.4 启动

./kafka-server-start.sh ../config/server.properties

2.5

kafka服务端正常启动后,应该会有类似下面这行的日志信息,说明认证功能开启成功

Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint((IP),9092,ListenerName(SASL_PLAINTEXT),SASL_PLAINTEXT) (kafka.utils.ZkUtils)

3 kafka的SASL认证功能认证和使用

1.使用kafka脚本认证

我们使用kafka自带的脚本进行认证。

1.新建kafka_client_jaas.conf,为客户端添加认证信息

在/home下新建kafka_client_jaas.conf,添加以下信息

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="kafka"
  password="kafkapasswd";
};

2.修改客户端配置信息

修改producer.properties和consumer.properties,添加认证机制

security.protocol=SASL_PLAINTEXT 
sasl.mechanism=PLAIN 

3.修改客户端启动脚本

修改kafka-console-producer.sh,配置认证文件kafka_client_jaas.conf,将

export KAFKA_HEAP_OPTS=“-Xmx512M”

export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka_client_jaas.conf"

kafka-console-consumer.sh的修改类似。

4.客户端启动并认证

启动consumer

./bin/kafka-console-consumer.sh --bootstrap-server (IP):9092 --topic test --from-beginning --consumer.config config/consumer.properties

启动producer

./bin/kafka-console-producer.sh --broker-list (IP):9092 --topic test --producer.config configoducer.properties

producer端发送消息,consumer端成功接收到消息。文章来源地址https://www.toymoban.com/news/detail-694566.html

4.Java客户端认证

package com.zte.sdn.oscp.jms.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;

import java.util.Collections;
import java.util.Properties;

public class KafkaTest {

    @Test
    public void testProduct() throws Exception {
        System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");

        Properties props = new Properties();
        props.put("bootstrap.servers", "IP:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");

        Producer<String, String> producer = new KafkaProducer<>(props);
        while (true){
			long startTime = System.currentTimeMillis();
			for (int i = 0; i < 100; i++) {
				producer.send(new ProducerRecord<>("kafkatest", Integer.toString(i), Integer.toString(i)));
			}
			System.out.println(System.currentTimeMillis()-startTime);
			Thread.sleep(5000);
		}
    }

    @Test
    public void testConsumer() throws Exception {
        System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");

        Properties props = new Properties();
        props.put("bootstrap.servers", "(IP):9092");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("group.id", "kafka_test_group");
        props.put("session.timeout.ms", "6000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("kafkatest"));
        while (true) {
            long startTime = System.currentTimeMillis();
            ConsumerRecords<String, String> records = consumer.poll(1000);
            System.out.println(System.currentTimeMillis() - startTime);
            System.out.println("recieve message number is " + records.count());
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n",
                        record.offset(),
                        record.key(),
                        record.value(),
                        record.partition());
            }
        }
    }
}

到了这里,关于kafka配置SASL/PLAIN 安全认证的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Apache zookeeper kafka 开启SASL安全认证_kafka开启认证(1)

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新网络安全全套学习资料》

    2024年04月23日
    浏览(35)
  • 【kafka+Kraft模式集群+SASL安全认证】

    准备3个kafka,我这里用的kafka版本为:kafka_2.13-3.6.0,下载后解压: 更改解压后的文件名称: cp kafka_2.13-3.6.0 kafka_2.13-3.6.0-1/2/3 分别得到kafka_2.13-3.6.0-1、kafka_2.13-3.6.0-2、kafka_2.13-3.6.0-3 copy一份config/kraft/server.properties配置文件,修改名称 server-sasl.properties 进入各个config/kraft/server

    2024年02月03日
    浏览(45)
  • Apache zookeeper kafka 开启SASL安全认证

    背景:我之前安装的kafka没有开启安全鉴权,在没有任何凭证的情况下都可以访问kafka。搜了一圈资料,发现有关于sasl、acl相关的,准备试试。 Kafka是一个高吞吐量、分布式的发布-订阅消息系统。Kafka核心模块使用Scala语言开发,支持多语言(如Java、Python、Go等)客户端,它可

    2024年03月14日
    浏览(45)
  • Apache zookeeper kafka 开启SASL安全认证 —— 筑梦之路

      Kafka是一个高吞吐量、分布式的发布-订阅消息系统。Kafka核心模块使用Scala语言开发,支持多语言(如Java、Python、Go等)客户端,它可以水平扩展和具有高吞吐量特性而被广泛使用,并与多类开源分布式处理系统进行集成使用。   Kafka作为一款开源的、轻量级的、分布式、可

    2024年02月11日
    浏览(32)
  • Kafka三种认证模式,Kafka 安全认证及权限控制详细配置与搭建

    Kafka三种认证模式,Kafka 安全认证及权限控制详细配置与搭建。 Kafka三种认证模式 使用kerberos认证

    2024年02月04日
    浏览(52)
  • kafka 安装 以及 Kraft 模式、安全认证配置

    常见安装:zookeeper + kafka zookeeper 单节点安装 : apach官网下载对应包:apache-zookeeper-3.7.1-bin.tar.gz 修改对应配置文件 /conf/zoo_sample.cfg ,配置端口以及数据目录 sh zkServer.sh start 启动 、 sh zkServer.sh stop 停止、 sh zkServer.sh status 状态 sh zkCli.sh -server 客户端 zookeeper 集群安装: 在每个

    2024年02月10日
    浏览(50)
  • Kerberos安全认证-连载12-Kafka Kerberos安全配置及访问

    目录 1. Kafka配置Kerberos 2. 客户端操作Kafka ​​​​​​​3. Java API操作Kafka 4. StructuredStreaming操作Kafka 5. Flink 操作Kafka 技术连载系列,前面内容请参考前面连载11内容:​​​​​​​​​​​​​​Kerberos安全认证-连载11-HBase Kerberos安全配置及访问_IT贫道的博客-CSDN博客 Kafk

    2024年02月12日
    浏览(54)
  • Kafka配置Kerberos安全认证及与Java程序集成

    本文主要介绍在 Kafka 中如何配置 Kerberos 认证,以及 java 使用 JAAS 来进行 Kerberos 认证连接。 本文演示为单机版。 查看 Kerberos 版本命令: klist -V 软件名称 版本 jdk 1.8.0_202 kafka 2.12-2.2.1 kerberos 1.15.1 Kerberos 是一种由 MIT(麻省理工大学)提出的网络身份验证协议,它旨在通过使用密

    2024年01月22日
    浏览(61)
  • Kafka-配置Kerberos安全认证(JDK8、JDK11)

    一、相关配置 1、JAAS 配置文件 2、keytab 文件(kafka.service.keytab) 从 Kerberos 服务器上拷贝到目标机器 或 找运维人员要一份 3、Kerberos 配置文件(krb5.conf) krb5文件参数说明:krb5.conf(5) 从 Kerberos 服务器上拷贝到目标机器 或 找运维人员要一份 Tip: JDK11版本 sun.security.krb5.Config 类

    2024年02月15日
    浏览(54)
  • Kafka安全模式之身份认证

    Kafka作为一个分布式的发布-订阅消息系统,在日常项目中被频繁使用,通常情况下无论是生产者还是消费者只要订阅Topic后,即可进行消息的发送和接收。而kafka在0.9.0.0版本后添加了身份认证和权限控制两种安全服务,本文主要介绍在实际项目使用过程中遇到第三方kafka需身份

    2024年04月13日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包