Windows下安装单机Kafka环境及配置SASL身份认证

这篇具有很好参考价值的文章主要介绍了Windows下安装单机Kafka环境及配置SASL身份认证。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

0 安装JDK

zookeeper和kafka都是java开发的,所以安装前先安装1.8版本以上的jdk,并设置环境变量 JAVA_HOME=d:\env\Java\jdk1.8.0_14

1 安装Zookeeper 

1.1 Apache ZooKeeper点击下载地址 Apache ZooKeeper,下载最新版本zookeeper压缩包,解压到本地

window 配置 kafka jaas,java,kafka,zookeeper

1.2 来到 conf文件夹下,复制一份 zoo_sample.cfg ,改名为 zoo.cfg

window 配置 kafka jaas,java,kafka,zookeeper

1.3 在安装目录下新建data 和 logs 文件夹,打开 zoo.cfg ,添加 两条配置(按自己安装的地址填写)

dataDir=D:\\env\\apache-zookeeper-3.6.3-bin\\data
dataLogDir=D:\\env\\apache-zookeeper-3.6.3-bin\\logs

1.4 点击 bin 目录下的zkServer.cmd,开启zookeeper服务端

window 配置 kafka jaas,java,kafka,zookeeper

注:不要关闭此窗口

1.5 点击 bin目录下 zkCli.cmd,开启客户端,出现红色框中的提示表示zookeeper已经安装成功

window 配置 kafka jaas,java,kafka,zookeeper

2 安装Kafka

 2.1 点击 Apache Kafka 下载最新版本的kafka

window 配置 kafka jaas,java,kafka,zookeeper

 2.2 解压到本地,在安装目录下新建文件夹 kafka-logs ,进入到config目录打开 server.properties,添加两行配置 (按自己安装的地址填写)

log.dirs=D:\\env\\kafka_2.13-2.8.0\\kafka-logs

zookeeper.connect=localhost:2181

window 配置 kafka jaas,java,kafka,zookeeper

 2.3 在安装目录下打开CMD,输入

.\bin\windows\kafka-server-start.bat .\config\server.properties

window 配置 kafka jaas,java,kafka,zookeeper

 没有出现错误提示,说明kafka服务已经启动,注意不要关闭此窗口

3 测试kafka

3.1 创建主题 

在D:\env\kafka_2.13-2.8.0\bin\windows 下 打开CMD,输入 

.\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

--create:创建参数     --zookeeper localhost:2181:zookeeper地址    --replication-factor 1:副本数量    --partitions 1:分区数量    --topic test01 :主题名称

window 配置 kafka jaas,java,kafka,zookeeper

3.2 查看主题列表

输入

.\kafka-topics.bat --list --zookeeper localhost:2181

 window 配置 kafka jaas,java,kafka,zookeeper

 3.3 生产消息

输入 

.\kafka-console-producer.bat --broker-list localhost:9092 --topic test1

window 配置 kafka jaas,java,kafka,zookeeper

3.4 消费消息 

输入 

.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning

 window 配置 kafka jaas,java,kafka,zookeeper

--from-beginning :表示每次从头开始消费,不填写从当前偏移量开始消费

4 安装kafka可视化管理工具 kafka-manager

4.1 编译好的资源,百度云盘下载链接:https://pan.baidu.com/s/1twhwfILRo9ReCAYczd44Fw 密码:kjqh,解压到本地打开application.conf文件

添加 kafka-manager.zkhosts="localhost:2181"

4.2 在zookeeper 和 kafka都开启的情况下 ,来到 kafka-manager安装目录下打开CMD,输入 ./bin/kafka-manager

4.3 在浏览器内开打  localhost:9000

window 配置 kafka jaas,java,kafka,zookeeper

4.4 创建实例,在zook hosts中输入地址 localhost:2181,其他选项默认

window 配置 kafka jaas,java,kafka,zookeeper

 5 配置kafka身份认证 SASL

5.1 ZooKeeper 配置 SASL

5.1.1 在zookeeper目录的conf文件夹下新建 zoo_jaas.conf 文件

Server {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin"
  user_admin="admin";
};

Server.username、Server.password为 Zookeeper 内部通信的用户名和密码,因此保证每个 zk 节点该属性一致即可
Server.user_xxx 中 xxx 为自定义用户名,用于 zkClient 连接所使用的用户名和密码,即为 kafka 创建的用户名

5.1.2 配置 zoo.conf 文件,加入以下配置

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
zookeeper.sasl.client=true

5.1.3 在kafka安装目录下找到 zookeeper-server-start.bat,新增一个配置KAFKA_OPTS

SetLocal

set KAFKA_OPTS=-Djava.security.auth.login.config=D:/env/apache-zookeeper-3.6.3-bin/conf/zoo_jaas.conf
......

EndLocal

 5.2 kafka 配置 SASL

5.2.1 在kafka目录下config下新建  kafka_server_jaas.conf 文件,加入以下配置

# 定义kafka客户端与broker的认知信息
KafkaServer {
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="admin"
 password="admin"
 user_admin="admin"
 user_producer="producer@123"
 user_consumer="consumer@123";
};
# 用于broker和zookeeper之间的认证,对应zk_server_jass.conf中的【user_admin="admin"】配置
KafkaClient {
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="admin"
 password="admin";
};

KafkaServer.username、KafkaServer.password 为 broker 内部通信的用户名密码,同上
KafkaServer.user_xxx 其中 xxx 必须和 KafkaServer.username 配置的用户名一致,密码也一致
KafkaServer.user_producer、KafkaServer.user_consumer 为了之后的 ACL 做准备,达到消费者生产者使用不同账号且消费者账号只能消费数据,生产者账号只能生产数据

5.2.2 修改 server.properties 文件

listeners=SASL_PLAINTEXT://localhost:9092
#使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制 
sasl.enabled.mechanisms=PLAIN  
sasl.mechanism.inter.broker.protocol=PLAIN   
# 完成身份验证的类 
#authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。 
allow.everyone.if.no.acl.found=false
#超级管理员权限用户
super.users=User:admin
advertised.listeners=SASL_PLAINTEXT://localhost:9092

5.2.3 修改 kafka-server-start.bat文件,使之加载到 kafka_server_jaas.conf 文件

SetLocal

set KAFKA_OPTS= -Djava.security.auth.login.config=D:/env/kafka_2.13-2.8.0/config/kafka_server_jaas.conf
......

EndLocal

5.2.4 producer的sasl配置 producer.properties新增配置

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="producer@123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

5.2.5 producer的sasl配置 consumer.properties新增配置

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="consumer" password="consumer@123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

5.2.6 新建 sasl.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";

5.2.6 启动命令【示例】 zk:

#启动kafka服务
.\bin\windows\kafka-server-start.bat .\config\server.properties
#生产主题
.\kafka-console-producer.bat --broker-list localhost:9092 --topic test1  --producer.config  ..\..\config\producer.properties
#消费主题
.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning --consumer.config ..\..\config\consumer.properties
#查看消费情况
.\kafka-consumer-groups.bat --describe --bootstrap-server localhost:9092 --group test-consumer-group --command-config ..\..\config\sasl.properties

5.3 java api 验证

5.3.1 在application.yml中加入如下配置

kafka:
    bootstrap-servers: localhost:9092
    properties:
      sasl:
        mechanism: PLAIN
        jaas:
          config: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";'
      security:
        protocol: SASL_PLAINTEXT
   

5.3.2 编写 producer和 consumer 文章来源地址https://www.toymoban.com/news/detail-789109.html


    public static final String COLLECTOR_DATA_TOPIC = "test01";
    private static final String COLLECTOR_DATA_CONSUMER_DEV_GROUP = "collector_data_dev_consumer";


    @Autowired
    CollectorService collectorService;

    @KafkaListener(topics = COLLECTOR_DATA_TOPIC, groupId = COLLECTOR_DATA_CONSUMER_DEV_GROUP)
    public void collectorConsumer(ConsumerRecord<String, String> record, Consumer<String, String> consumer) {
        collectorService.collectorConsumer(record, consumer);
    }
package com.xxx.service.impl;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Set;

@Service
public class CollectorServiceImpl extends ServiceImpl<CollectorMapper, CollectorDO> implements CollectorService {

    public static final String CHINT_DATA_TOPIC = "test01";

    @Autowired
    private CollectorMapper collectorMapper;

    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    @Override
    public ResultUtil collectorProducer(String body) {

        //推送数据为空返回
        if (!StringUtils.hasText(String.valueOf(body)) || String.valueOf(body).equals("[{}]") || String.valueOf(body).equals("[]") || String.valueOf(body).equals("{}")) {
            return ResultUtil.success(MessageEnum.COLLECTOR_API_NODATA);
        }

        //保存当前批次的推送数据
        try {
            JSONArray arrayObjects = JSON.parseArray(body);

            for(int i = 0; i < arrayObjects.size(); i++){
                JSONObject arrayObject = (JSONObject) arrayObjects.get(i);

                JSONObject jsonObject = JSON.parseObject(String.valueOf(arrayObject));

                //数据推向kafka生产
                kafkaTemplate.send(CHINT_DATA_TOPIC, String.valueOf(jsonObject));


            }
            // TODO: 2023/4/11 添加其他数据导入流向

        } catch (Exception e) {
            // TODO: 2023/4/11 存入日志
            e.printStackTrace();
            //推送数据格式错误导致数据存储失败返回
            return ResultUtil.error(MessageEnum.COLLECTOR_API_INVALID_MESSAGE);
        }
        return ResultUtil.success(MessageEnum.COLLECTOR_API_ACCEPTED);

    }

    public void collectorConsumer(ConsumerRecord<String, String> record, Consumer<String, String> consumer){
        // System.out.println(record.value());

        CollectorDO collector = new CollectorDO();
        //将系统时间戳作为接收到推送的批次
        Long tag = System.currentTimeMillis();

        //记录推送批次便于查询
        collectorMapper.insertCollectorTag(tag);

        //持久化数据
        try {
            JSONObject jsonObject = JSON.parseObject(record.value());

            Set<String> keys = jsonObject.keySet();
            for (String key : keys) {
                collector.setCollectorField(key);
                collector.setCollectorValue((String)jsonObject.get(key));
                collector.setTag(tag);
                collectorMapper.insert(collector);
            }
        } catch (JSONException e) {
            consumer.commitAsync();
        } catch (Exception e) {
            e.printStackTrace();
        }


    }


}

到了这里,关于Windows下安装单机Kafka环境及配置SASL身份认证的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 在Windows上搭建Kafka环境的步骤,包括安装Java、下载Kafka、配置Zookeeper和Kafka、启动Zookeeper和Kafka、创建主题和生产者/消费者等

    1. 安装Java Kafka需要Java环境支持。可以从Oracle官网下载JDK,或者使用OpenJDK。 2. 下载Kafka 可以从Kafka官网下载Kafka二进制压缩包。解压后可以看到bin、config、libs等目录。 3. 配置Zookeeper Kafka依赖Zookeeper实现分布式协作。可以使用Kafka自带的Zookeeper,也可以独立安装Zookeeper。 如果使

    2024年02月11日
    浏览(33)
  • kafka3.4.0单机版安装配置教程(kraft模式舍弃ZK)

    下载地址:https://archive.apache.org/dist/kafka/3.4.0/

    2024年04月17日
    浏览(40)
  • kafka配置SASL/PLAIN 安全认证

    为zookeeper添加SASL支持,在配置文件zoo.cfg添加 新建zk_server_jaas.conf文件,为Zookeeper添加账号认证信息.这个文件你放在哪里随意,只要后面zkEnv配置正确的路径就好了。我是放在 /opt/zookeeper/conf/home 路径下。zk_server_jaas.conf文件的内容如下 Server { org.apache.kafka.common.security.plain.Plai

    2024年02月10日
    浏览(31)
  • Kafka动态认证SASL/SCRAM配置+整合springboot配置

    zookeeper启动命令: [root@master-yzjgxh2571705819-1651919082731-99-0727183 bin]# ./zkServer.sh start [root@master-yzjgxh2571705819-1651919082731-99-0727183 bin]# ./zkServer.sh stop kafka启动命令: /data/program/kafka2.12/bin/kafka-server-start.sh /data/program/kafka2.12/config/server.properties 1)创建broker建通信用户:admin(在使用sasl之

    2024年01月16日
    浏览(31)
  • Kafka ACL(SASL/SCRAM-SHA-256)动态权限管理【windows】

    以下所有命令行命令都使用 Shift+鼠标右键 打开 Powershell 窗口执行 Version Scala 2.13 - kafka_2.13-3.4.0.tgz 验证方式 Kafka版本 特点 SASL/PLAIN 0.10.0.0 不能动态增加用户 SASL/SCRAM-SHA-256 0.10.2.0 可以动态增加用户 SASL/Kerberos 0.9.0.0 需要独立部署验证服务 SASL/OAUTHBEARER 2.0.0 需自己实现接口实现

    2024年02月06日
    浏览(27)
  • 第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证 第二章  Spring Boot 整合 Kafka消息队列 生产者 第三章  Spring Boot 整合 Kafka消息队列 消息者 Kafka下载地址:Apache Kafka 2.1、修改 Zookeeper 配置文件 config/zookeeper.properties 2.2、Zookeeper 配置文件修改内容 2.2、Zookeeper 配置文件增加配置说明

    2024年01月16日
    浏览(33)
  • 基于 kRaft 搭建单机 kafka 测试环境

    使用 docker-compose 在单机搭建有三个节点的 kafka 集群。

    2024年02月14日
    浏览(31)
  • windows下安装配置kafka详解

    在使用Kafka之前,通常需要先安装和配置ZooKeeper。ZooKeeper是Kafka的依赖项之一,它用于协调和管理Kafka集群的状态。 ZooKeeper是一个开源的分布式协调服务,它提供了可靠的数据存储和协调机制,用于协调分布式系统中的各个节点。Kafka使用ZooKeeper来存储和管理集群的元数据、配

    2024年01月16日
    浏览(20)
  • Kafka增加安全验证安全认证,SASL认证,并通过spring boot-Java客户端连接配置

    公司Kafka一直没做安全验证,由于是诱捕程序故需要面向外网连接,需要增加Kafka连接验证,保证Kafka不被非法连接,故开始研究Kafka安全验证 使用Kafka版本为2.4.0版本,主要参考官方文档 官网对2.4版本安全验证介绍以及使用方式地址: https://kafka.apache.org/24/documentation.html#secu

    2024年02月01日
    浏览(44)
  • 分布式消息流处理平台kafka(一)-kafka单机、集群环境搭建流程及使用入门

    kafka最初是LinkedIn的一个内部基础设施系统。最初开发的起因是,LinkedIn虽然有了数据库和其他系统可以用来存储数据,但是缺乏一个可以帮助处理持续数据流的组件。 所以在设计理念上,开发者不想只是开发一个能够存储数据的系统,如关系数据库、Nosql数据库、搜索引擎等

    2024年02月16日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包