1 zookeeper配置启动
1.1 zookeeper添加SASL支持
为zookeeper添加SASL支持,在配置文件zoo.cfg添加
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
1.2 zk_server_jaas.conf文件
新建zk_server_jaas.conf文件,为Zookeeper添加账号认证信息.这个文件你放在哪里随意,只要后面zkEnv配置正确的路径就好了。我是放在/opt/zookeeper/conf/home
路径下。zk_server_jaas.conf文件的内容如下
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“cluster”
password=“clusterpasswd”
user_kafka=“kafkapasswd”;
};`
username和paasword是zk集群之间的认证密码。
user_kafka=“kafkapasswd"定义了一个用户"kafka”,密码是"kafkapasswd",本次测试用户是kafka broker。
1.3 引入jar包
由上一步可发现,认证方式使用的是Kafka的认证类org.apache.kafka.common.security.plain.PlainLoginModule。因此zk需要依赖几个jar包。
在home下新建zk_sasl_dependency目录,从kafka/lib目录下复制以下几个jar包到该目录下。根据kafka版本不同,几个jar包的版本可能不一样
-rw-r--r-- 1 root root 1893564 Aug 29 10:53 kafka-clients-2.0.0.jar
-rw-r--r-- 1 root root 489884 Aug 29 11:14 log4j-1.2.17.jar
-rw-r--r-- 1 root root 370137 Aug 29 10:53 lz4-java-1.4.1.jar
-rw-r--r-- 1 root root 41203 Aug 29 10:53 slf4j-api-1.7.25.jar
-rw-r--r-- 1 root root 12244 Aug 29 10:53 slf4j-log4j12-1.7.25.jar
-rw-r--r-- 1 root root 2019013 Aug 29 10:53 snappy-java-1.1.7.1.jar
[root@sm_qf-bj_hydgpt_192-168-151-168 home]#
1.4.修改zkEnv.sh
在zkEnv.sh添加
for i in /opt/zookeeper/conf/home/zk_sasl_dependency/*.jar
do
CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/opt/zookeeper/conf/home/zk_server_jaas.conf "
1.5.启动zk服务端
执行./zkServer.sh restart重新启动zk。如果启动异常查看日志排查问题
2 kafka配置和启动
2.1.新建kafka_server_jaas.conf,为kafka添加认证信息
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="cluster"
password="cluster"
user_cluster=“clusterpasswd”
user_kafka="kafkapasswd" ;
};
Client{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafkapasswd";
};
KafkaServer,第一行指定了认证方法为PLAIN,usernam和password是kafka的多个broker之间进行认证的账号密码。
user_kafka="kafkapasswd"设置了用户kafka,密码为kafkapswd,用于客户端的生产者和消费者连接认证。
网上的说法是 Client,是kafka作为用户使用zk的认证信息,这里的username和password一定要和zk_server_jaas.conf的配置对的上。
2. 2.在kafka的配置文件开启SASL认证
listeners=SASL_PLAINTEXT://(IP):9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
allow.everyone.if.no.acl.found=true
2.3 .在server启动脚本JVM参数
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
改为export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/kafka_server_jaas.conf"
2.4 启动
./kafka-server-start.sh ../config/server.properties
2.5
kafka服务端正常启动后,应该会有类似下面这行的日志信息,说明认证功能开启成功
Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint((IP),9092,ListenerName(SASL_PLAINTEXT),SASL_PLAINTEXT) (kafka.utils.ZkUtils)
3 kafka的SASL认证功能认证和使用
1.使用kafka脚本认证
我们使用kafka自带的脚本进行认证。
1.新建kafka_client_jaas.conf,为客户端添加认证信息
在/home下新建kafka_client_jaas.conf,添加以下信息
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafkapasswd";
};
2.修改客户端配置信息
修改producer.properties和consumer.properties,添加认证机制
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
3.修改客户端启动脚本
修改kafka-console-producer.sh,配置认证文件kafka_client_jaas.conf,将
export KAFKA_HEAP_OPTS=“-Xmx512M”
export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka_client_jaas.conf"
kafka-console-consumer.sh的修改类似。
4.客户端启动并认证
启动consumer
./bin/kafka-console-consumer.sh --bootstrap-server (IP):9092 --topic test --from-beginning --consumer.config config/consumer.properties
启动producer文章来源:https://www.toymoban.com/news/detail-694566.html
./bin/kafka-console-producer.sh --broker-list (IP):9092 --topic test --producer.config configoducer.properties
producer端发送消息,consumer端成功接收到消息。文章来源地址https://www.toymoban.com/news/detail-694566.html
4.Java客户端认证
package com.zte.sdn.oscp.jms.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
import java.util.Collections;
import java.util.Properties;
public class KafkaTest {
@Test
public void testProduct() throws Exception {
System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");
Properties props = new Properties();
props.put("bootstrap.servers", "IP:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
Producer<String, String> producer = new KafkaProducer<>(props);
while (true){
long startTime = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>("kafkatest", Integer.toString(i), Integer.toString(i)));
}
System.out.println(System.currentTimeMillis()-startTime);
Thread.sleep(5000);
}
}
@Test
public void testConsumer() throws Exception {
System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");
Properties props = new Properties();
props.put("bootstrap.servers", "(IP):9092");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("group.id", "kafka_test_group");
props.put("session.timeout.ms", "6000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("kafkatest"));
while (true) {
long startTime = System.currentTimeMillis();
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.println(System.currentTimeMillis() - startTime);
System.out.println("recieve message number is " + records.count());
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n",
record.offset(),
record.key(),
record.value(),
record.partition());
}
}
}
}
到了这里,关于kafka配置SASL/PLAIN 安全认证的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!