docker搭建kafka集群并测试完整版

这篇具有很好参考价值的文章主要介绍了docker搭建kafka集群并测试完整版。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

docker搭建kafka集群完整版(windows)

1.安装docker desktop.

打开docker官网,下载docker desktop,这里直接给出网址:Install Docker Desktop on Windows | Docker Docs

如下图,点击下载即可。
kafka集群连接测试,docker,kafka,容器

下载好后 点击运行exe文件,我们采用交互式安装程序。

kafka集群连接测试,docker,kafka,容器

安装完成后直接重启即可,默认安装在c盘,如果不想安装在c盘就采用命令行的方式安装。官网有教程。

点击接受

kafka集群连接测试,docker,kafka,容器

之后点击登录

kafka集群连接测试,docker,kafka,容器

当然不登录也没关系。

接下来我们安装一下Linux内核,打开windows powershell,运行wsl --date,即可(看情况,电脑没有或软件没有提示的情况就要安装)

配置一下环境,如下,打开右上角的设置,更改下面的数据位置

kafka集群连接测试,docker,kafka,容器

之后配置国内镜像源,可用参考网上给的代码

kafka集群连接测试,docker,kafka,容器

{
  "registry-mirrors": [
    "https://registry.docker-cn.com",
    "http://hub-mirror.c.163.com",
    "https://docker.mirrors.ustc.edu.cn",
    "https://cr.console.aliyun.com",
    "https://mirror.ccs.tencentyun.com"
  ],
  "builder": {
    "gc": {
      "defaultKeepStorage": "20GB",
      "enabled": true
    }
  },
  "experimental": false,
  "features": {
    "buildkit": true
  }
}

之后点击应用并重启即可。

过程中可能出现的问题:

1.Docker 一直starting

遇到这种情况,一般是因为没有安装wsl 2(或者没有打开),安装即可。在安装这个之前需要启用虚拟化,一般都开启了,这里不详细介绍。

安装完成后重启电脑即可。

如下图,便是安装完成了

kafka集群连接测试,docker,kafka,容器

上面有一个是我拉取的一个image,一开始没有。

这样我们便可以在windows上使用docker了。

注意:后续如果关闭后一直显示正在启动中建议重启电脑重新启动,亲测有效。

2.下载镜像

打开Windows powershell,运行下面命令:

  1. docker pull bitnami/kafka
    
  2. docker pull zookeeper
    

版本随意,但建议都采用最新的版本,老版本可能会出现版本冲突,但你不知道会不会发生冲突,容易出问题。

kafka集群连接测试,docker,kafka,容器

准备工作:

在开始新建集群之前,新建好文件夹,根据下面的yml配置文件选择的地址来建文件夹(冒号后面的可以不建),如下图(可以自己改变位置):

kafka集群连接测试,docker,kafka,容器

不然数据会默认安装到C盘。

3.创建docker网络

​ 运行下面命令:

docker network create zk-net

如下图:

kafka集群连接测试,docker,kafka,容器

4.docker compose 搭建kafka集群

之前还要搭建zookeeper集群,用了俩个compose.yml文件,这里合并一下,缩减操作。

创建一个yml文件,名字随意,这里取名为docker-compose-kafka.yml

文件配置如下:

version: "3"

networks:
  zk-net:
    external:
      name: zk-net

services:
  z1:
    image: 'zookeeper:latest'
    container_name: z1
    hostname: z1
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=z2:2888:3888;2181 server.3=z3:2888:3888;2181
      ALLOW_ANONYMOUS_LOGIN: "yes"
    networks:
      - zk-net
    ports:
      - 2181:2181
      - 8081:8080
    volumes:
      - /D/docker_desktop/z1/z1/data:/data
      - /D/docker_desktop/z1/z1/datalog:/datalog
  z2:
    image: 'zookeeper:latest'
    container_name: z2
    hostname: z2
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=z1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=z3:2888:3888;2181
      ALLOW_ANONYMOUS_LOGIN: "yes"
    networks:
      - zk-net
    ports:
      - 2182:2181
      - 8082:8080
    volumes:
      - /D/docker_desktop/z1/z2/data:/data
      - /D/docker_desktop/z1/z2/datalog:/datalog
  z3:
    image: 'zookeeper:latest'
    container_name: z3
    hostname: z3
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=z1:2888:3888;2181 server.2=z2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
      ALLOW_ANONYMOUS_LOGIN: "yes"
    networks:
      - zk-net
    ports:
      - 2183:2181
      - 8083:8080
    volumes:
      - /D/docker_desktop/z1/z3/data:/data
      - /D/docker_desktop/z1/z3/datalog:/datalog
  kafka1:
    image: 'bitnami/kafka:latest'
    restart: always    
    container_name: kafka1
    hostname: kafka1
    ports:
      - '9092:9092'
    environment:
      - ALLOW_NONE_AUTHENTICATION=yes
      - ALLOW_PLAINTEXT_LISTENER=yes 
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=z1:2181,z2:2181,z3:2181
    volumes:
      - /D/docker_desktop/k1/kafka1:/bitnami/kafka      
    networks:
      - zk-net
  kafka2:
    image: 'bitnami/kafka:latest'
    restart: always       
    container_name: kafka2
    hostname: kafka2
    ports:
      - '9093:9093'
    environment:
      - ALLOW_NONE_AUTHENTICATION=yes
      - ALLOW_PLAINTEXT_LISTENER=yes 
      - KAFKA_BROKER_ID=2
      - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9093
      - KAFKA_CFG_ZOOKEEPER_CONNECT=z1:2181,z2:2181,z3:2181
    volumes:
      - /D/docker_desktop/k1/kafka2:/bitnami/kafka        
    networks:
      - zk-net
  kafka3:
    image: 'bitnami/kafka:latest'
    restart: always       
    container_name: kafka3
    hostname: kafka3
    ports:
      - '9094:9094'
    environment:
      - ALLOW_NONE_AUTHENTICATION=yes
      - ALLOW_PLAINTEXT_LISTENER=yes 
      - KAFKA_BROKER_ID=3
      - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9094    
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:9094
      - KAFKA_CFG_ZOOKEEPER_CONNECT=z1:2181,z2:2181,z3:2181
    volumes:
      - /D/docker_desktop/k1/kafka3:/bitnami/kafka    
    networks:
      - zk-net

5.启动kafka集群

之后在windows powershell上运行下面命令:

docker-compose -f D:\docker_desktop\z1\docker-compose_kafka.yml up -d

停止配置文件运行代码如下:(不要运行,一般是上面代码报错再运行停止服务的)

docker-compose -f D:\docker_desktop\z1\docker-compose_kafka.yml stop

6.使用docker desktop

如下图,便是docker desktop的一个界面:

kafka集群连接测试,docker,kafka,容器

点击kafka镜像的状态或者左上角的容器,如下图:

kafka集群连接测试,docker,kafka,容器

选择一个容器进入即可:

kafka集群连接测试,docker,kafka,容器

进入后如下图:

kafka集群连接测试,docker,kafka,容器

其中,logs代表日志信息,inspect可以查看kafka的配置信息,包括网络和集群等信息,如下图:

kafka集群连接测试,docker,kafka,容器

exec就是容器内部了,可以通过写指令来操控容器,如图所示:

kafka集群连接测试,docker,kafka,容器

容器内部其实就是一个Linux系统样的东西,在Files里面可以看到容器的结构。如图所示:

kafka集群连接测试,docker,kafka,容器

更多关于docker desktop的使用请自己摸索或查阅。这里只是简单介绍一下方便下面的教学。

6.创建主题

搭建好集群后我们需要创建一个主题,来进行后面的测试。

进入kafka容器,俩种方式,一种是通过docker desktop进入,还有一种是通过命令行的方式进入,命令行的方式自己去搜,这里通过docker desktop进入,跟上面一样进入一个kafka点击exec即可,如下图:

kafka集群连接测试,docker,kafka,容器

注意有上角的灰色垃圾箱代表清空桌面,红色的代表删除容器。

刚进入容器默认在/目录下,我们需要进入到kafka的bin目录下,使用cd命令即可,如下图:

kafka集群连接测试,docker,kafka,容器

注意kafka的文件夹在/opt/bitnami/目录里面。

之后我们通过下面命令创建主题:

./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic topic1
  • ./kafka-topics.sh --create: 这一部分告诉Kafka命令行工具你想要创建一个新的主题。
  • --bootstrap-server localhost:9092: 这一部分指定了Kafka服务的地址和端口。在这个例子中,服务运行在本地主机的9092端口。
  • --replication-factor 3: 这一部分指定了主题的副本因子,即数据在Kafka集群中的复制次数。在这个例子中,数据将被复制3次。
  • --partitions 3: 这一部分指定了主题的分区数。在Kafka中,数据被组织成多个分区,每个分区可以独立地处理和存储。在这个例子中,主题将有3个分区。
  • --topic topic1: 这一部分指定了要创建的主题名称。在这个例子中,主题名称为"topic1"。

注意:在Kafka中,副本数不可以大于分区数。因为副本是以目录存储在各个broker节点的data目录下,如果副本数量大于broker节点数量,那么在同一个Broker节点的data目录下会有两个一样的文件夹,这是不允许的。

网上的命令都有点老了,可能会报错,建议用这个命令,或者用–help查阅怎么使用,如下:

./kafka-topics.sh --help

使用下面命令可以查看主题:

kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic topic1

如下图:

kafka集群连接测试,docker,kafka,容器

7.命令行使用生产者和消费者程序

在容器内运行如下命令,打开消费端:

./kafka-console-consumer.sh --from-beginning --topic ysh --bootstrap-server localhost:9092

之后在打开windows powershell进入一个kafka容器打开生产者程序,命令如下:

docker exec -it kafka2 /bin/bash

之后进入kafka bin目录下 ,命令如下:

cd /opt/bitnami/kafka/bin

打开生产者程序,命令如下:

./kafka-console-producer.sh --broker-list localhost:9093 --topic topic1

注意:上面的我端口是9093,不是9092,9092是kafka1的端口,而我进入的是kafka2,不然会报错,要想在所有kafka集群里面都可以直接用9092就需要改一下上面的配置文件yml,如下图:

kafka集群连接测试,docker,kafka,容器

将他们的端口都映射到9092就可以了,跟zookeeper一样,每个kafka都要改。

之后再生产者端写入数据,可以看到消费端有数据出来,如下图:

kafka集群连接测试,docker,kafka,容器

8.kafka Java API 编写生产者程序和消费者程序(以读取股票信息为例)

在编写代码前需要先在C盘,windows/system32/drivers/hosts文件里面将kafka的网络添加进去,不然idea无法识别,idea机制问题,

kafka集群连接测试,docker,kafka,容器

kafka集群连接测试,docker,kafka,容器

这样就没问题了。

导入依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.7.2</version>
</dependency>

编写生产者程序:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.*;
import java.util.Properties;

public class KafkaProducerTest {
    public static void main(String[] args) throws FileNotFoundException, UnsupportedEncodingException {
        Properties props = new Properties();
        //1.指定Kafaka集群的ip地址和端口号
        props.put("bootstrap.servers", "kafka1:9092,kafka2:9093,kafka3:9094");
        //2.等待所有副本节点的应答
        props.put("acks", "all");
        //3.消息发送最大尝试次数
        props.put("retries", 0);
        //4.指定一批消息处理次数
        props.put("batch.size", 16384);
        //5.指定请求延时
        props.put("linger.ms", 1);
        //6.指定缓存区内存大小
        props.put("buffer.memory", 33554432);
        //7.设置key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //8.设置value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 9、生产数据
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        // 定义CSV文件路径
        String csvFile = "C:\\Users\\asus\\Desktop\\data\\股票a.csv";

        // 读取CSV文件并发送到Kafka
        try {
            //BufferedReader reader = new BufferedReader(new FileReader(csvFile),);
            BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(csvFile), "GBK"));
            String line;
            reader.readLine();
            while ((line = reader.readLine()) != null) {
                String[] data = line.split(","); // 假设CSV文件使用逗号分隔
                String key = data[1]; // 假设交易笔数为关键字
                String value = data[0] + "," + data[1] + "," + data[2] + "," + data[3] + "," + data[4] + "," + data[5] + "," + data[6] + "," + data[7] + "," + data[8]; // 假设交易总量为值,使用逗号分隔
                producer.send(new ProducerRecord<String, String>("ysh",value));
                System.out.printf(value+"\n");
            }
            producer.close(); // 关闭Kafka生产者
        }
        catch (IOException e) {
            System.out.printf("文件打开失败");
            // 处理IO异常
        }
    }
}

编写消费者程序:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class KafkaConsumerTest {
    public static void main(String[] args) {
        //1、准备配置文件
        Properties props = new Properties();
        //2、指定kafka集群主机名和端口号
        //props.put("zookeeper.connect", "localhost:2181");
        props.put("bootstrap.servers", "kafka1:9092,kafka2:9093,kafka3:9094");
        //3、指定消费者组id,在同一时刻同一消费组中只有一个线程可以
        //去消费一个分区消息,不同的消费组可以去消费同一个分区消息
        props.put("group.id", "consumer");
        //4、自动提交偏移量
        props.put("enable.auto.commit", "true");
        //5、自动提交时间间隔,每秒提交一次
        props.put("auto.commit.interval.ms", "1000");

        props.put("auto.offset.reset","earliest");
        props.put("client.id", "zy_client_id");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        //6、订阅消息,这里的topic可以是多个
        kafkaConsumer.subscribe(Arrays.asList("ysh"));
        AtomicInteger count = new AtomicInteger(0); // 原子整数用于统计交易笔数之和
        AtomicLong totalAmount = new AtomicLong(0); // 原子长整型用于统计交易总量之和
        //System.out.printf("yse"); //7、获取消息
        long startTime = System.currentTimeMillis();

        while (true) {
           //每隔10s拉取一次
           ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
           if (records.isEmpty()) {
               // 如果 records 为空,则跳过当前循环
               continue;
           }
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("value=%s%n", record.value());
                String value = record.value(); // 获取消息值(交易总量)
                String[] values = value.split(","); // 使用逗号分隔值(假设格式为交易笔数,交易总量)
                int tradeCount =1; // 解析交易笔数(关键字)为整数并加到总和中(假设第一列是交易笔数)
                long tradeAmount = Long.parseLong(values[4]); // 解析交易总量(值)为长整数并加到总和中(假设第二列是交易总量)
                totalAmount.addAndGet(tradeAmount); //
                count.addAndGet(tradeCount);

            }
            long endTime = System.currentTimeMillis();
            System.out.printf("tradeCount=%d,totalAmount=%d%n",count.get(),totalAmount.get());
            System.out.printf("total_time=%d ms %n",endTime-startTime);

        }

   }
}

先运行消费者程序,再运行生产者程序结果如下:

kafka集群连接测试,docker,kafka,容器

测试完毕,下面进行参数调优和结果比较。

这里以缓存区内存大小为例:

下面是内存大小为335544b时的运行结果,花费时间为3966ms

kafka集群连接测试,docker,kafka,容器

下图为缓存大小为33554b时的运行结果,花费时间为3952ms

kafka集群连接测试,docker,kafka,容器

需要注意的是在缓存大小一定的情况下,花费时间也不是固定的,还收网络速度等因素的影响,下图为缓存大小为33554b时花费时间为3774,较上图速度明显减小,但缓存大小未变。

kafka集群连接测试,docker,kafka,容器

9.总结

在window上搭建kafka集群并用java API 的过程中,因为对很多知识点的不了解,导致过程之中发生了很多意外,比如如何使用window desktop,如何在windows上面搭建docker,docker如何搭建kafka集群,如何配置网络连接,搭建好kafka后如何创建主题,如何查看主题,如何运行生产者程序,如何运行消费者程序,idea如何连接容器内的kafka集群,idea无法连接容器kafka集群,消息遗漏等一系列问题。在搭建kafka集群的过程中,虽然遇到了很多问题,但也让我学到了很多,包括kafka和docker的一些常用命令,kafka API,消息遗漏,网络连接通信的知识。文章来源地址https://www.toymoban.com/news/detail-785485.html

到了这里,关于docker搭建kafka集群并测试完整版的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Docker 搭建 zookeeper、kafka 集群

    首先创建一个自定义网络,后续的所有容器都放入同一个内网中,容器之间还可以通过容器名称进行直接访问,在后续的配置中只需要写明容器名称即可,会自动找到对应的IP地址,防止重启容器后IP地址发生变化时,还要去修改配置文件的操作 创建目录 启动zookeeper 进入zo

    2024年02月10日
    浏览(39)
  • 彻底搞懂Docker容器与Kraft模式kafka集群关于消息大小相关参数设置

    部署背景: 在DockerHub拉取的bitnami/kafka:3.4.1 镜像,如果要部署在Docker-Swarm集群或者单Docker部署,对于消息大小设置需要添加参数 KAFKA_CFG_MESSAGE_MAX_BYTES,如果设置为其他不符合规范的参数格式,会导致容器一直启动不了。 PS:KAFKA_CFG_MESSAGE_MAX_BYTES 是针对Broker级别消息大小限制

    2024年02月08日
    浏览(37)
  • Docker 容器搭建mysql 集群(主从数据库)

    目录 1.背景         2.设备及软件版本 3.开始搭建(这里不介绍安装docker及mysql) 3.1创建主数据库容器(master) 3.2查看容器是否创建成功 3.3进入修改容器下的/etc/my.cnf文件 3.3.1先拷贝my.cnf到容器外修改完再覆盖容器原来的my.cnf文件 3.4重启mysql容器使配置文件生效 3.5查看是否

    2024年02月08日
    浏览(101)
  • Docker安装mysql&&使用Navicat远程连接mysql容器&&mysql容器的持久化测试

    文章主人公:帅哥BUG😎  文章路人: 路人 🤨  路人 😛 目录 一.安装mysql并配置文件 1.下载相关镜像 2.在宿主机中创建相关目录,用于挂载容器的相关数据 3.conf目录 4.data目录(创建mysql5.7容器) 二.使用Navicat远程连接mysql容器 1.ip addr 获取ip 2.点击连接选择MySql 3.输入ip,密码

    2024年02月03日
    浏览(54)
  • Ubuntu18.04 docker kafka 本地测试环境搭建

    Kafka是一种分布式流处理平台,也是一个高吞吐量的分布式发布订阅消息系统。它由LinkedIn开发,并于2011年成为Apache软件基金会的顶级项目。 Kafka的设计目标是能够处理大规模的消息流,并提供持久性、高吞吐量和低延迟的特性。它的核心概念是发布-订阅模型,其中消息被组

    2024年02月15日
    浏览(68)
  • Docker容器搭建Python+Jenkins+Selenium自动化测试(最详细)

    使用Docker搭建Jenkins+python3+selenium项目,在Linux服务器上部署项目代码,实现自动构建。 解决方案 自动构建工具选型:jenkins 中间件选型:docker 代码运行环境:python3 脚本运行工具:chrome浏览器、chromedriver、selenium库 web自动化测试:https://www.bilibili.com/video/BV1MS4y1W79K/ docker分布式自

    2024年02月01日
    浏览(52)
  • Redis【性能 02】Redis-5.0.14伪集群和Docker集群搭建及延迟和性能测试(均无法提升性能)

    使用的是腾讯的云服务器 1核心2G内存50G存储 ,系统信息如下: 每个Redis节点必须要有一个备机,例如搭建3个节点的集群就要有6个Redis实例。 数据按照slots分布式存储在不同的Redis节点上,节点中的数据可共享,可以动态调整数据的分布。 可扩展性强,可以动态增删节点 ,最

    2024年02月05日
    浏览(47)
  • tensorflow 1.15 gpu docker环境搭建;Nvidia Docker容器基于TensorFlow1.15测试GPU;——全流程应用指南

    TensorFlow 在新款 NVIDIA Pascal GPU 上的运行速度可提升高达 50%,并且能够顺利跨 GPU 进行扩展。 如今,训练模型的时间可以从几天缩短到几小时 TensorFlow 使用优化的 C++ 和 NVIDIA® CUDA® 工具包编写,使模型能够在训练和推理时在 GPU 上运行,从而大幅提速 TensorFlow GPU 支持需要多个

    2024年02月03日
    浏览(62)
  • linux安装搭建配置docker,mysql,nacos,redis哨兵集群,kafka,elasticsearch,kibana,IK分词器,安装Rabbitmq,安装并配置maven

    目录 搭建docker 1.2安装yum工具  1.3更新阿里镜像源 1.4下载docker 1.5关闭防火墙 1.6启动docker 1.7查看docker版本 1.8配置阿里云镜像 1.8.1 创建文件夹 1.8.2在文件夹内新建一个daemon.json文件 1.8.3重载文件 1.9重启docker 2安装MySQL 3安装nacos 3.1拉取nacos镜像并启动 3.2启动nacos命令 3.3命令敲完

    2024年02月03日
    浏览(57)
  • kafka-- kafka集群环境搭建

    成功标志

    2024年02月09日
    浏览(88)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包