目录
一、背景和描述
二、资源情况
三、技术选型
四、部署Kraft版本集群
五、配置SSL模式
六、Springboot使用SSL集成
参考资料
一、背景和描述
考虑资源安全性,需要搭建不依赖Zookeeper的kafka集群环境,并且配置SSL访问控制
Apache Kafka Raft 是一种共识协议,它的引入是为了消除 Kafka 对 ZooKeeper 的元数据管理的依赖,被社区称之为 Kafka Raft metadata mode,简称 KRaft 模式。
目前,Kafka在使用的过程当中,会出现一些问题。由于重度依赖Zookeeper集群,当Zookeeper集群性能发生抖动时,Kafka的性能也会收到很大的影响。因此,在Kafka发展的过程当中,为了解决这个问题,提供KRaft模式,来取消Kafka对Zookeeper的依赖。参考文章:百度安全验证
二、资源情况
服务器IP | CPU | 内存 | 磁盘容量 |
---|---|---|---|
127.0.0.1 | 8核 | 16G | 100G |
127.0.0.1 | 8核 | 16G | 100G |
127.0.0.1 | 8核 | 16G | 100G |
以下是查看Linux机器CPU核心数、内存、磁盘容量等信息的常用命令:
-
查看CPU核心数:
cat /proc/cpuinfo | grep "cpu cores" | uniq cat /proc/cpuinfo | grep "physical id" | sort | uniq | wc -l
-
查看内存大小以及可用空间:
free -h
-
查看磁盘容量以及可用空间:
df -h
-
查看某个目录的占用空间:
du -sh /path/to/directory
以上命令可以通过SSH连接到Linux服务器的终端或使用工具如SecureCRT、PuTTY等来执行。
在Linux系统中,cpu cores
和physical id
都是用来表示CPU核心数量的信息。但是,它们有不同的含义和用途。
cpu cores
表示每个物理CPU上的核心数量。例如,如果一台服务器有两个物理CPU,每个CPU包含4个核心,则cpu cores
的值为8(即 2 * 4 = 8) 。此信息对于诊断单个物理CPU的性能问题非常有用,可以帮助您确定每个物理CPU上的核心数。
而physical id
则表示物理处理器的ID号。在多CPU架构中,每个CPU都有一个唯一的物理ID。如果一台服务器有多个物理CPU,则每个物理CPU将会被分配一个不同的physical id
。这些信息对于了解如何将进程/线程分配到物理CPU上非常有用。
总之,cpu cores
通常用于监视和调整单个物理CPU的性能,而physical id
则用于检查多个物理CPU之间的区别以及了解如何在多CPU环境下分配资源。
三、技术选型
查阅Kafka官方说明文档:Apache Kafka,3.3.x版本Kafka可用于生产,所以Kafka版本选择3.3.1(3.3.0不推荐)
Apache 软件基金会发布了包含许多新特性和改进的 Kafka 3.3.1。这是第一个标志着可以在生产环境中使用 KRaft(Kafka Raft)共识协议的版本。在几年的开发过程中,它先是在 Kafka 2.8 早期访问版本中发布,然后又在 Kafka 3.0 预览版本中发布。
KRaft 是一种共识协议,可以直接在 Kafka 中管理元数据。元数据的管理被整合到了 Kafka 当中,而不需要使用像 ZooKeeper 这样的第三方工具,这大大简化了 Kafka 的架构。这种新的 KRaft 模式提高了分区的可伸缩性和弹性,同时简化了 Kafka 的部署,现在可以不依赖 ZooKeeper 单独部署 Kafka 了。
参考资料:Kafka 3.3使用KRaft共识协议替代ZooKeeper_控制器_仲裁_版本
下载地址:Kafka官方下载地址
四、部署Kraft版本集群
kafka笔记3--快速部署KRaft版本的kafka3.1.1_kafka 3.1 kraft 安装_昕光xg的博客-CSDN博客
五、配置SSL模式
如何配置kafka的SSL链接访问 - 简书
在 Kafka 3.3.1 中启用 Kraft 模式下的 SSL 加密传输,您可以按照以下步骤进行操作:
-
生成 SSL 证书和私钥 这可以使用 OpenSSL 工具来完成。可以根据需要自定义证书和密钥文件名称以及密码等相关参数,例如:
openssl req -new -newkey rsa:4096 -days 365 -nodes -x509 -keyout server.key -out server.crt
-
配置 Kafka Broker 在
broker.properties
文件中,将以下参数设置为所需值:listeners=SSL://kafka-ssl.example.com:9093 # 定义 SSL 监听器 advertised.listeners=SSL://kafka-ssl.example.com:9093 # 公告 SSL 监听器 ssl.keystore.location=/path/to/kafka.server.keystore.jks ssl.keystore.password=<keystore-password> ssl.key.password=<key-password>
将 SSL 监听器和广告监听器设置为
SSL://<hostname>:9093
。请相应地更改<keystore-password>
和<key-password>
以及文件路径等参数为实际的内容。 -
启用 SSL 客户端身份验证(可选) 如果需要对客户端进行身份验证,也可以在
broker.properties
中设置以下参数:ssl.client.auth=required ssl.truststore.location=/path/to/truststore.jks ssl.truststore.password=<truststore-password>
这些参数指定了 SSL 客户端身份验证策略,以及必须使用的可信证书颁发机构(CA)证书的位置和密码等信息。指定
ssl.client.auth=required
可以强制 Kafka Broker 对客户端进行身份验证,并防止未经授权的访问。
在完成上述步骤后,重新启动 Kafka Broker 。现在,您可以使用 SSL 连接到 Kafka 集群。但是,在实际生产环境中,请首先测试配置并确保其可以正常工作,然后再将其应用于生产环境。
要在 Kafka 客户端上创建一个 client-ssl.properties
文件进行测试 SSL 连接,您可以按照以下步骤操作:
-
在客户端机器上安装 JRE (Java 运行环境) 和 OpenSSL 工具 (如果没有安装的话)。
-
生成客户端证书和私钥 这个证书应该是使用与 Kafka Broker 使用的相同的 CA 签署的。也可以为您的测试客户端生成自签名证书以进行测试。可以使用 OpenSSL 工具来生成自签名证书并为其设置密码,例如:
openssl req -new -newkey rsa:4096 -nodes -keyout client.key -out client.csr openssl x509 -req -days 365 -in client.csr -CA rootCA.crt -CAkey rootCA.key -set_serial 01 -out client.crt
-
创建
client-ssl.properties
文件 在与 Kafka 客户端可执行文件相同的目录中,根据需要创建一个新的client-ssl.properties
文件,并将以下参数设置为所需值:security.protocol=SSL ssl.truststore.location=<path-to-client-truststore.jks> ssl.truststore.password=<truststore-password> ssl.keystore.location=<path-to-client-keystore.jks> ssl.keystore.password=<keystore-password> ssl.key.password=<key-password>
将
<path-to-client-truststore.jks>
和<path-to-client-keystore.jks>
替换为实际的证书和密钥存储路径,以及相应的密码。 -
向 Kafka 发送测试消息 在与 Kafka 客户端可执行文件相同的目录中,使用以下命令向 Kafka Broker 发送测试消息:
bin/kafka-console-producer.sh --broker-list <kafka-broker>:9093 --topic test --producer.config client-ssl.properties
将
<kafka-broker>
替换为您的 Kafka Broker 主机名或 IP 地址。 -
检查发送是否成功 您可以在另一个终端中使用以下命令从 Kafka Broker 消费消息:
bin/kafka-console-consumer.sh --bootstrap-server <kafka-broker>:9093 --topic test --from-beginning --consumer.config client-ssl.properties
如果一切正常,您将看到来自生产者的消息已成功传输并已由消费者接收到。
通过按照上述步骤创建 client-ssl.properties
文件并进行测试 SSL 连接,您可以确保 Kafka 集群上的 SSL 连接设置正确且正常工作。
为 IP 生成 CA 证书的流程如下:
-
生成 CA 根证书。
openssl req -x509 -days 3650 -newkey rsa:2048 -nodes -keyout ca.key -out ca.crt
这个命令将生成新的 CA 根证书,其中
-x509
表示生成自签名证书,-days 3650
表示证书的有效期为 10 年。在这个过程中,您需要输入一些信息,如国家代码、城市名、组织名称等。 -
创建一个用于 IP 的证书签名请求(CSR)文件。
openssl req -new -nodes -newkey rsa:2048 -keyout server.key -out server.csr
同样要求输入一些信息,例如国家代码、城市名、公司名等。其中,
-new
表示创建一个新的证书请求,-keyout
指定私钥输出的文件路径,-out
指定新证书请求输出的文件路径。 -
使用 CA 根证书签署 CSR 文件并生成证书。
openssl x509 -req -CA ca.crt -CAkey ca.key -in server.csr -out server.crt -days 3650 -CAcreateserial -extfile server.ext
这个命令将使用 CA 根证书签署刚才创建的证书请求,并生成新的服务器证书。其中,
-CA
和-CAkey
分别指定 CA 根证书和私钥的位置,-in
指定服务器证书签名请求的文件路径,-out
指定生成的证书的文件路径,-days
设置证书的有效期,-CAcreateserial
表示自动创建一个序列号,并将其保存到.srl
文件中。-extfile server.ext
是一个配置文件,在其中可以指定 SAN 扩展等附加选项。 -
使用新的证书和密钥保护 IP
将生成的
server.crt
和server.key
证书和私钥文件分别应用于您需要保护的 IP、域名或其他服务上,在 Internet 上建立安全连接。
请注意,在使用 openssl 命令生成证书时,要特别注意您输入的各种数据信息因素,因为这些信息会在后续内容以及过程中得到反复使用。
六、Springboot使用SSL集成
SpringBoot整合并简单使用SSL认证的Kafka_springboot ssl kafka_justry_deng的博客-CSDN博客
在 Spring Kafka 中,可以通过配置 KafkaTemplate 和 ProducerFactory 来实现 SSL 验证。下面是一个简单的配置示例:
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class KafkaProducerConfiguration {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Value("${kafka.ssl.truststore.location}")
private String truststoreLocation;
@Value("${kafka.ssl.truststore.password}")
private String truststorePassword;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
在上面的代码中,定义了以下参数:
- bootstrap.servers:指定 Kafka Broker 的地址和端口号。
- ssl.truststore.location:指定 SSL 验证所需的证书文件路径。
- ssl.truststore.password:指定证书文件的密码。
- security.protocol:指定安全协议类型为 SSL。
- key.serializer:指定键值序列化器类。
- value.serializer:指定值序列化器类。
这个示例中,使用 @Value 注解读取了配置文件中的属性值,以便在代码中动态使用。因此,在使用之前,需要在配置文件中添加如下属性:
kafka.bootstrap.servers=kafka.example.com:9093
kafka.ssl.truststore.location=/path/to/truststore.jks
kafka.ssl.truststore.password=password
需要注意的是,在 SSL 模式下,Kafka 的安全验证非常关键。为了保护数据安全,在实际场景中可能还需要采用其他的安全措施,例如身份验证、消息加密等。具体措施可以根据企业的实际情况进行确定。
参考资料
Kafka实战:集群SSL加密认证和配置(最新版kafka-2.7.0)
78.kafka高级-kraft集群搭建_哔哩哔哩_bilibili
[教程]【海牛大数据】Kafka 教程(Kafka3.3.2 原理、安装、应用场景、源码解析、高级优化、实战案例应有尽有)[云平台课程] | 海牛部落 高品质的 大数据技术社区
Apache Kafka 官方文档
kafka笔记3--快速部署KRaft版本的kafka3.1.1_kafka 3.1 kraft 安装_昕光xg的博客-CSDN博客
再见!Kafka决定弃用Zookeeper...
Apache Kafka 下载地址
kafka3.4.0集群搭建(无zookeeper)_最新版kafka不需要zookeeper_小趴菜醉了的博客-CSDN博客
kafka3.1集群搭建(kraft模式)_controller.quorum.voters_阿豪咿呀的博客-CSDN博客文章来源:https://www.toymoban.com/news/detail-439020.html
Kafka配置SSL认证_ssl.ca.location kafka_justry_deng的博客-CSDN博客文章来源地址https://www.toymoban.com/news/detail-439020.html
到了这里,关于【Kafka】Kafka3.1.1集群搭建指南KRaft版本的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!