[Kafka集群] 配置支持Brokers内部SSL认证\外部客户端支持SASL_SSL认证并集成spring-cloud-starter-bus-kafka

这篇具有很好参考价值的文章主要介绍了[Kafka集群] 配置支持Brokers内部SSL认证\外部客户端支持SASL_SSL认证并集成spring-cloud-starter-bus-kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

Kafka 集群配置

准备

配置流程

Jaas(Java Authentication and Authorization Service )文件

zookeeper 配置文件

SSL自签名

启动zookeeper集群

启动kafka集群 

spring-cloud-starter-bus-kafka 集成


Kafka 集群配置

准备

下载统一版本Kafka服务包至三台不同的服务器上

文章使用版本为 kafka_2.13-3.5.0.tgz 下载地址

jdk版本Adopt JDK-17 OpenJDK17U-jdk_x64_linux_hotspot_17.0.7_7.tar.gz 下载地址

配置流程

Jaas(Java Authentication and Authorization Service )文件

        在kafka包解压目录下的 config 目录下新建zookeeper认证所需jaas文件,文件名随意,以 .conf 结尾即可

        文件内容如下

        user_{username}为固定写法 {username} 为用户名 密码为双引号内容

        注意,这里zookeeper的jaas有三个用户名和密码分别对应着三台kafka broker去认证时使用的用户名和密码,每一台上的zookeeper的jaas文件内容建议完全相同

Server {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       user_super="super-sec"
       user_kafkabroker1="kafkabroker1-sec"
       user_kafkabroker2="kafkabroker2-sec"
       user_kafkabroker3="kafkabroker3-sec";
};

        在相同目录下建立kafka所需认证jaas文件

        以下是三台服务器中其中一台的kafka jaas认证文件内容,Client内容为本台机器上的broker认证本台机器上的zookeeper的用户名和密码 ( 注意最后一行和倒数第二行需要有分号!! ) KafkaServer端有一对 username="kbroker1"  password="kbroker1-sec"内部brokers之间进行认证所用账号密码但是本文内部broker配置为ssl链接,去掉应该也没事若不同则加一下

        当然每个broker的KafkaServer段也需要有定义这个用户名和密码( 对应  user_kbroker1="kbroker1-sec" )   user_client="client-sec" 外部客户端认证时所需用户名密码

这里为了方便,全部brokers共享一个账号,客户端user_client(也就是连接Kakfa时的producer、consumer或者编程语言SDK读取或配置客户端jaas文件时)也为统一用户名密码

KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="kbroker1"
   password="kbroker1-sec"
   user_kbroker1="kbroker1-sec"
   user_client="client-sec";
};

Client {
   org.apache.zookeeper.server.auth.DigestLoginModule required
   username="kafkabroker3"
   password="kafkabroker3-sec";
};

另外两台用户名密码如下

KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="kbroker1"
   password="kbroker1-sec"
   user_kbroker1="kbroker1-sec"
   user_client="client-sec";
};

Client {
   org.apache.zookeeper.server.auth.DigestLoginModule required
   username="kafkabroker1"
   password="kafkabroker1-sec";
};
KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="kbroker1"
   password="kbroker1-sec"
   user_kbroker1="kbroker1-sec"
   user_client="client-sec";
};

Client {
   org.apache.zookeeper.server.auth.DigestLoginModule required
   username="kafkabroker2"
   password="kafkabroker2-sec";
};

zookeeper 配置文件

同样是在config目录下编辑zookeeper.properties文件

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/opt/kafka/zookeeper-dir
dataLogDir=/opt/kafka/zookeeper-log
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
# 一个ip最多可以对这个zookeeper服务进行连接的数量
maxClientCnxns=5
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
tickTime=2000
initLimit=5
syncLimit=2
server.1=1.1.1.1:2182:1999
server.2=2.2.2.2:2182:1999
server.3=3.3.3.3:2182:1999

# security
# 开启zookeeper sasl认证必须配置
authProvider.sasl=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
maxClientCnxns=5
# 这里可以设置为false 同样设置zookeeper jaas认证也无效了
sessionRequireClientSASLAuth=true
jaasLoginRenew=360000000

注意 clientPort与 <外网ip>:<内部互联连端口>:<选举专用端口> 这些端口要区分开来 不然zookeeper服务启动会报错,三台配置基本一直

注意 server.<int>=<外网ip> 若是连接本机有问题,可以将<外网ip>换成0.0.0.0

zookeeper启动脚本如下

#!/bin/bash

export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/kafka-server/config/zk_jaas.conf"
nohup /opt/kafka/kafka-server/bin/zookeeper-server-start.sh /opt/kafka/kafka-server/config/zookeeper.properties > kafka-zookeeper-start.log 2>&1 &

使用export 导出KAFKA_OPTS中的变量,让zookeeper启动时加载jaas认证文件,参数key为

-Djava.security.auth.login.config

nohup 可以让zookeeper在后台运行,不占用终端,滚动日志可以在 kafka-zookeeper-start.log 文件查看,若想滚动查看日志可以用

tail -f kafka-zookeeper-start.log

SSL自签名

    这个地方卡住很久,遇到了些bug都是关于自签证书的问题,文章用下图说明SSL证书在kafka中的使用流程 每个keystore包含数字签名和证书

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

        每台机器上都应该有自己的keystore与truststore文件,证书可以每台上都使用openssl生成一张,但是,要把每台机器上的证书必须互相导入到其他borkers的truststore与keystore中,而且每台机器上的keystore还需要多次导入所有证书签名之后生成的证书与数字签名。不然在brokers互相创建SSL隧道时会有各种问题,例如下图,将broker1机器上生成的kafka.client.truststore.jks直接 scp 传输到到broker2 后使用,broker1 与 broker2建立SSL隧道时,kafka config 目录下log4j.properties修改TRACE级别日志记录如下

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

 参考

原文连接

参考连接2

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

 很多网站上面只是做单台机器或者单个证书的全部生成过程,这里记录下自己的创建流程

 注意,全局只生成了一次CA ,仅包含一个 ca-cert 与 ca-key

参考如下

 kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

 部署SSL 创建密钥与证书,创建自签名的颁发机构,证书签名

 文件含义

 keystore 可以存储私钥、证书和对称密钥的存储库。
 引用stackoverflow的回答

 kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

 ca-cert 具体证书

 ca-key 证书私钥

 ca-password 颁发机构密钥

 cert-file 导出未签名的证书文件

 cert-signed 带有数字签名的证书

 首先在每个机器上面都要创建keystore密钥库

keytool命令无效可以去JAVA_HOME/bin目录下找

SSL hostname校验可通过两种方式配置

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

在kafka的配置文件中添加以下配置取消校验

ssl.endpoint.identification.algorithm=

或配置CN与SAN分别为hostname与FQDN什么是FQDN

 kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

 文章采用前者,忽略SSL对hostname的认证并按照SAN格式创建keystore

keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey -storepass {keystore-pass} -keypass {key-pass} -dname {distinguished-name} -ext SAN=DNS:{hostname}

       -alias 后面用 hostname,localhost与hostname都可以

        {validity} 为过期时间 自签可以长一点 例 9999

        {keystore-pass} 与 {key-pass} 为密码,建议设为同一个值

        -ext SAN=DNS:{hostname} 注意,必须为hostname (终端 键入hostname查看)

 这里my-host-name可用localhost代替  或者用VPS云服务器专属的hostnamekafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

 kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

         kafka.ser.keystore.jks生成结束

       创建CA证书

openssl req -new -x509 -keyout ca-key -out ca-cert -days 365

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

CN也要写hostname或直接用localhost

此时一共生成三个文件

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

注意!copy ca-cert 与 ca-key 文件到所有kafka broker机器上,(若是想在其他机器上连接也要把这两个文件拷贝过去,例如本地开发集成spring boot时),并放在固定位置 

ca-cert 与 ca-key 代表一张CA

导入 ca-cert 到所有brokers的kafka.server.truststore.jks中,终端交互输入 yes

keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert

原文中多一部将kafka.server.keystore.jks密钥库的证书导出才签名

keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file

证书签名 

openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:123456

此时一共生成六个文件

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

向密钥库 kafka.server.keystore.jks 导入证书与数字签名

keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed

查看kafka.server.keystore.jks包含的内容

keytool --list -v -keystore  kafka.server.keystore.jks

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

 kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

 全部命令如下,思路就是全局生成一张CA (包含ca-cert ca-key)

        

  • 每台机器生成
    • kafka.server.keystore.jks
  • ca-cert导入到每一个 kafka.server.truststore.jks(ca-cert导完了就生成) 的CAroot中
  • 每一个kafka.server.keystore.jks导出一个cert-file
  • 用ca-cert ca-key 给 cert-file 签出一个 cert-signed,
  • ca-cert cert-signed都导入kafka.server.keystore.jks
keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity}
keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed

原文链接

现在全部文件如下(忽略 auto-create-kafka-ssl-keys.sh 这个是自动生成证书的脚本github连接 与 备份压缩包demo.tar)

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

 编辑kafka服务配置文件 server.properties

advertised.listeners要和listeners对应,

advertised.listeners概述

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

 advertised.listeners不要绑定0.0.0.0端口,这里配置SSL为内部访问所以使用云服务器的hostname, brokers之间如何根据彼此的hostname来寻找呢?Linux可编辑/etc/hosts文件

末尾加上 <ip>:<port>添加dns映射 windows在C:\Windows\System32\drivers\etc目录下找hosts文件


broker.id=1

############################# Socket Server Settings #############################

listeners=SSL://:9093,SASL_SSL://:9094
# 注意 这里SSL是做内部brokers通信用的,外部暴露方式为SASL_SSL
advertised.listeners=SSL://Your-Host-name:9093,SASL_SSL://:9094


log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=1.1.1.1:2182,2.2.2.2:2182,3.3.3.3:2182
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Kafka Security ###########################
ssl.endpoint.identification.algorithm=
security.inter.broker.protocol=SSL
ssl.client.auth=required
# ssl加密协议选择
ssl.enabled.protocols=TLSv1.3,TLSv1.1,TLSv1
# Broker security settings
sasl.enabled.mechanisms=PLAIN
#ssl.truststore.password=123456
ssl.truststore.password=123456
ssl.truststore.location=/opt/kafka/crkeys/kafka.server.truststore.jks
ssl.keystore.location=/opt/kafka/crkeys/kafka.server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
############################# Group Coordinator Settings #############################

group.initial.rebalance.delay.ms=0

启动zookeeper集群

编辑一个脚本分别启动每一个broker上的zookeeper  

kafka-zookeeper-quick-start.sh*

#!/bin/bash
# 让jaas文件被zookeeper加载到运行时环境
# KAFKA_OPTS为固定用法
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/kafka-server/config/zk_jaas.conf"
nohup /opt/kafka/kafka-server/bin/zookeeper-server-start.sh /opt/kafka/kafka-server/config/zookeeper.properties > kafka-zookeeper-start.log 2>&1 &

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

zookeeper集群启动结束

启动kafka集群 

编写一个脚本启动每一个broker 优先启动的broker会作为主节点

kafka-server-quick-start.sh

#!/bin/bash
export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"
export KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/kafka-server/config/kafka_server_jaas.conf
nohup /opt/kafka/kafka-server/bin/kafka-server-start.sh /opt/kafka/kafka-server/config/server.properties > kafka-server-start.log 2>&1 &
#/opt/kafka/kafka-server/bin/kafka-server-start.sh /opt/kafka/kafka-server/config/server.properties

 kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

 任意报错可以修改config目录下的log4j.properties 将所有logger设置成trace查看

 注意 设为trace之后 非kafka主节点会疯狂滚动一个controller就绪日志

 以下为主节点

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

  什么是kafka controllerkafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

 全局只有一个broker节点的controller会生效,暂不深究

 ssl 生效测试

本文Kafka配置为 TLSv1.3,TLSv1.1,TLSv1 可加入 TLSv1.2

具体协议版本会与jdk版本有关

openssl s_client --debug -connect <ip>:<port> -tls1 次处 Verify return code: 0 代表最低版本tls协议生效

openssl s_client --debug -connect <ip>:<port> -tls1_1

openssl s_client --debug -connect <ip>:<port> -tls1_2

openssl s_client --debug -connect <ip>:<port> -tls1_3

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

随便登上一台机器或者在开发本地在一个固定目录下创建公用配置文件

client_security.properties

ssl.endpoint.identification.algorithm=
#security.protocol=SSL
security.protocol=SASL_SSL
ssl.truststore.location=/opt/kafka/crkeys/kafka.server.truststore.jks
ssl.truststore.password=123456
ssl.keystore.location=/opt/kafka/crkeys/kafka.server.keystore.jks
ssl.keystore.password=123456
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="client" \
    password="client-sec";

编写一个脚本创建topic

脚本内容

./bin/kafka-topics.sh --create --topic demo-topic-1 --command-config /opt/kafka/kafka-server/config/client_security.properties --partitions 3 --replication-factor 3 --bootstreap-server your-hostname-1:9094,your-hostname-2:9094,your-hostname-3:9094

编写一个脚本测试producer和consumer

脚本内容

/opt/kafka/kafka-server/bin/kafka-console-producer.sh --bootstrap-server hostname-1:9094,hostname-2:9094,hostname-3:9094 --topic demo-topic-1 --producer.config /opt/kafka/kafka-server/config/client_security.properties

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

 启动consumer

 脚本内容

#!/bin/bash
/opt/kafka/kafka-server/bin/kafka-console-consumer.sh --bootstrap-server hostname-1:9094,hostname-2:9094,hostname-3:9094 --topic demo-topic-1 --consumer.config /opt/kafka/kafka-server/config/client_security.properties --from-beginning

这里consumer 添加了 --from-beginning 选项,会从头读取producer写入的数据

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

spring-cloud-starter-bus-kafka 集成

使用spring-cloud-dependencies-2021.0.8 版本 spring-boot-dependencies-2.7.13

spring-cloud-starter-bus-kafka 包含两个依赖

注意,本地生成keystore与truststore步骤与上面的生成步骤一致,需要把全局唯一的ca-cert ca-key拷贝到本地来生成keystore与truststore

  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
      <version>3.2.4</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-bus</artifactId>
      <version>3.1.2</version>
      <scope>compile</scope>
    </dependency>
  </dependencies>

application配置

server.port=13001
server.servlet.context-path=/liquid/configs-dev

# Kafka
spring.kafka.bootstrap-servers=1.1.1.1:9094,2.2.2.2:9094,3.3.3.3:9094
spring.kafka.security.protocol=SASL_SSL
spring.kafka.ssl.key-store-location=kafka.server.keystore.jks
spring.kafka.ssl.key-store-password=123456
spring.kafka.ssl.key-store-type=jks
spring.kafka.ssl.trust-store-location=kafka.server.truststore.jks
spring.kafka.ssl.trust-store-password=123456
spring.kafka.ssl.trust-store-type=jks
spring.kafka.retry.topic.attempts=3


# Kafka stream
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
spring.cloud.stream.kafka.binder.configuration.ssl.endpoint.identification.algorithm=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=classpath:kafka.server.keystore.jks
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.password=123456
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=classpath:kafka.server.keystore.jks
spring.cloud.stream.kafka.binder.configuration.ssl.ssl.truststore.password=123456
spring.cloud.stream.kafka.binder.brokers=1.1.1.1:9094,2.2.2.2:9094,3.3.3.3:9094
spring.kafka.streams.replication-factor=1
spring.cloud.stream.kafka.binder.replication-factor=1
spring.cloud.stream.kafka.binder.auto-create-topics=false
# spring cloud config
spring.cloud.config.server.git.uri=https://github.com/spring-cloud-samples/config-repo

创建一个Java Base Configuration

package com.liquid.config.center.configs;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.*;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;


@Slf4j
@Configuration
public class LiquidKafkaConfiguration {


    @Value("${spring.kafka.bootstrap-servers}")
    public String bootstrapServers;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        configs.put("security.protocol", "SASL_SSL");
        configs.put("sasl.mechanism", "PLAIN");
        configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
                "username=client" +
                "password=client-sec;");
        log.info(">>> Loading Kafka Admin With Jaas String end");
        return new KafkaAdmin(configs);
    }

    @Bean
    public ProducerFactory<Object, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "client", "client-sec"
        ));

        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "client", "client-sec"
        ));

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean("JaasLoginModuleConfiguration")
    public JaasLoginModuleConfiguration creatStreamJaasLoginModule() {
        Map<String, String> configs = new HashMap<>();
        configs.put("security.protocol", "SASL_SSL");
        configs.put("sasl.mechanism", "PLAIN");
        configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
                "username=client" +
                "password=client-sec;");
        log.info(">>> Loading Kafka Admin with jaas string end");
        JaasLoginModuleConfiguration jaasLoginModuleConfiguration = new JaasLoginModuleConfiguration();
        jaasLoginModuleConfiguration.setOptions(configs);
        return jaasLoginModuleConfiguration;
    }

    @Bean
    @Primary
    public KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties(KafkaBinderConfigurationProperties properties) {
        String saslJaasConfigString = String.format("%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "client", "client-sec");
        Map<String, String> configMap = properties.getConfiguration();
        configMap.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfigString);
        return properties;
    }

}

最终启动后 日志

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

 SSL handshake completed successfully with peerHost

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

kafka_2.13-3.5.0.tgz下载,kafka,分布式,spring cloud,zookeeper

 至此 kafka内部ssl 客户端SASL_SSL认证成功文章来源地址https://www.toymoban.com/news/detail-763173.html

到了这里,关于[Kafka集群] 配置支持Brokers内部SSL认证\外部客户端支持SASL_SSL认证并集成spring-cloud-starter-bus-kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka开启SSL认证(包括内置zookeeper开启SSL)

    zookeeper和kafka的SSL开启都可单独进行 使用jre自带的keytool工具生成,linux和windows下生成的证书可以通用 生成含有一个私钥的keystore文件,有效期10年(本文证书密码统一使用test123) keytool -genkeypair -alias certificatekey -dname “CN=127.0.0.1, OU=127.0.0.1, O=127.0.0.1, L=SH, ST=SH, C=CN” -keyalg RS

    2024年01月17日
    浏览(43)
  • JAVA连接Kafka及SSL认证

    1、Maven驱动(注意一定要对应自己的Kafka版本) 2、生产者生产数据 2.1 普通方式创建Producer 2.2 ssl加密和认证创建Producer(Plain) 2.3 ssl加密和认证创建Producer(Plain使用配置文件方式) kafka_client_jaas_plain配置文件信息: 具体代码实现: 2.4 ssl加密和认证创建Producer(Scram) 2.5 ssl加密和认证创

    2024年02月12日
    浏览(44)
  • kafka中Topic、Partition、Groups、Brokers概念辨析

    kafka消息队列有两种消费模式,分别是 点对点模式 和 订阅/发布模式 。具体比较可以参考Kafka基础–消息队列与消费模式。 下图是一个 点对点 的Kafka结构示意图,其中有以下几个部分: producer:消息生产者 consumer:消息消费者 Topic:消息主题 partition:主题内分区 Brokers:消

    2024年02月04日
    浏览(58)
  • 【kafka+Kraft模式集群+SASL安全认证】

    准备3个kafka,我这里用的kafka版本为:kafka_2.13-3.6.0,下载后解压: 更改解压后的文件名称: cp kafka_2.13-3.6.0 kafka_2.13-3.6.0-1/2/3 分别得到kafka_2.13-3.6.0-1、kafka_2.13-3.6.0-2、kafka_2.13-3.6.0-3 copy一份config/kraft/server.properties配置文件,修改名称 server-sasl.properties 进入各个config/kraft/server

    2024年02月03日
    浏览(45)
  • nginx配置ssl的坑(TLSv1.3\ngx_http_ssl_module)

    查看openssl版本openssl version,一般腾讯云为1.0.2k版本。 到官网 www.openssl.org 查看最新版本openssl,现在最新为1.1.1h版。 下载nginx 重新make下nginx,最后openssl配置上面升级后的 完成后执行 make 命令,make失败执行 make clean后重新make 配置后的nginx.config 检查配置 nginx -t 启动nginx ./ngin

    2024年02月16日
    浏览(42)
  • 配置https---Nginx认证ssl证书

    nginx作为前端的负载均衡服务器已经很熟悉了,项目需要使用https安全的时候就需要认证证书了 dockerweb管理工具 Portainer 如果对docker不那么熟悉可以使用docker 第三方管理端 然后访问本地9000端口,登录后可以管理容器镜像 有了该工具可以直接进入容器查看日志等操作 nginx环境安装

    2024年01月19日
    浏览(53)
  • 《Kafka系列》Offset Explorer连接Kafka问题集合,Timeout expired while.. topic metadata,Uable to find any brokers

    1.创建语句如下所示,按照习惯在添加zookeeper参数的时候,指定了 zxy:2181/kafka ,但是却创建失败, Error while executing topic command : Replication factor: 1 larger than available brokers: 0. 2.检查各个broker的server.properties文件 发现在配置参数的时候, zookeeper.connect 指定的是 zxy:2181,zxy:2182,zxy:21

    2024年02月06日
    浏览(65)
  • 达梦数据库配置SSL认证加密

    OS Version:Kylin Linux Advanced Server release V10 (SP1) /(Tercel)-x86_64-Build19/20210319 DB Version:DM V8 1-2-18-21.06.24-142387-10013-ENT Pack4 OpenSSL:OpenSSL 1.1.1f JAVA:openjdk version “1.8.0_242” 64bit 参考手册:《DM8_DISQL使用手册》《DM8安全管理》《DM8程序员手册》 DM8 产品手册 | 达梦技术社区 1、配置ope

    2023年04月08日
    浏览(50)
  • emqx 配置ssl/tls 双向认证(亲自测试有效)

    bash脚本,生成自签名ca、服务端、客户端的key和证书 openssl.cnf配置文件 验证证书是否有效 将证书文件拷贝到emqxetccerts目录下(默认目录),并修改配置文件emqx.conf。SSL/TLS 双向连接的启用及验证 mqttx连接验证 出现连接成功,代表测试无问题  

    2024年03月11日
    浏览(41)
  • 【kafka-ui】支持kafka with kraft的可视化集群管理工具

    本文在kafka3.3.1版本基础上进行测试 在早期使用kafka的时候一般使用Kafka Tool或者kafka eagle,前者为桌面软件,后者为浏览器软件。总体来说体验一般,但是还比较够用。 但是从kafka3.3.1开始,已经正式抛弃zookeeper使用自己的仲裁器了,但是上述两种kafka可视化工具的更新好像并

    2024年02月02日
    浏览(94)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包