kafka 集群 ZooKeeper 模式搭建

这篇具有很好参考价值的文章主要介绍了kafka 集群 ZooKeeper 模式搭建。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序

Kafka 官网:Apache Kafka

关于ZooKeeper的弃用

根据 Kafka官网信息,随着Apache Kafka 3.5版本的发布,Zookeeper现已被标记为已弃用。未来计划在Apache Kafka(4.0版)的下一个主要版本中删除ZooKeeper,该版本最快将于2024年4月发布。在弃用阶段,ZooKeeper仍然支持用于Kafka集群元数据的管理,但不建议用于新的部署。新的部署方式使用 KRaft 模式,KRaft 模式部署可以看笔者的文章《kafka 集群 KRaft 模式搭建》,考虑到一些公司仍然在使用老版本的 Kafka,故笔者写这篇文章记录 Kafka 集群Zookeeper 模式搭建

官网信息截图

kafka 集群 ZooKeeper 模式搭建,Kafka,kafka,zookeeper,分布式,java

笔者使用3台服务器,它们的 ip 分别是 192.168.3.232、192.168.2.90、192.168.2.11

目录

1、官网下载 Kafka

2、配置 Kafka

3、启动 Kafka 集群

4、关闭 Kafka 集群

5、使用Kafka 可视化工具查看

6、测试Kafka集群


1、官网下载 Kafka

这里笔者下载最新版3.6.0

3.6.0 版本需要至少 java8 及以上版本,笔者使用的是 java8 版本

关于 linux 安装 java,没安装过的朋友可以参考《linux 系统安装 jdk》

kafka 集群 ZooKeeper 模式搭建,Kafka,kafka,zookeeper,分布式,java

下载完成

kafka 集群 ZooKeeper 模式搭建,Kafka,kafka,zookeeper,分布式,java

将 kafka分别上传到3台linux

kafka 集群 ZooKeeper 模式搭建,Kafka,kafka,zookeeper,分布式,java

在3台服务器上分别创建 kafka 安装目录

mkdir /usr/local/kafka

在3台服务器上分别将 kafka 安装包解压到新创建的 kafka 目录

tar -xzf kafka_2.13-3.6.0.tgz -C /usr/local/kafka

2、配置 Kafka

进入配置目录

cd /usr/local/kafka/kafka_2.13-3.6.0/config

kafka 集群 ZooKeeper 模式搭建,Kafka,kafka,zookeeper,分布式,java

编辑配置文件 server.properties

vi server.properties

配置 broker.id,advertised.listeners,zookeeper.connect

broker.id 每个节点的id

advertised.listeners 本机的外网访问地址

zookeeper.connect zookeeper 地址

192.168.3.232 节点配置

kafka 集群 ZooKeeper 模式搭建,Kafka,kafka,zookeeper,分布式,java

advertised.listeners 笔者配置为本机地址

kafka 集群 ZooKeeper 模式搭建,Kafka,kafka,zookeeper,分布式,java

192.168.2.90 节点

kafka 集群 ZooKeeper 模式搭建,Kafka,kafka,zookeeper,分布式,java

kafka 集群 ZooKeeper 模式搭建,Kafka,kafka,zookeeper,分布式,java

192.168.2.11 节点

kafka 集群 ZooKeeper 模式搭建,Kafka,kafka,zookeeper,分布式,java

kafka 集群 ZooKeeper 模式搭建,Kafka,kafka,zookeeper,分布式,java

笔者zookeeper 地址是 192.168.2.130:2181

zookeeper 版本是3.8.3

关于zookeeper单机安装和集群安装可以参考:《Linux环境 安装 zookeeper》《windows环境 安装 zookeeper》《linux 使用 nginx 搭建 zookeeper 集群》

3、启动 Kafka 集群

首先启动 zookeeper

然后在3台机器上依次启动 Kafka

进入 kafka 目录

cd /usr/local/kafka/kafka_2.13-3.6.0

下面2个命令皆可

bin/kafka-server-start.sh config/server.properties

bin/kafka-server-start.sh -daemon config/server.properties

kafka 集群 ZooKeeper 模式搭建,Kafka,kafka,zookeeper,分布式,java

4、关闭 Kafka 集群

关闭命令

bin/kafka-server-stop.sh

在 3 个节点上分别执行关闭命令

5、使用Kafka 可视化工具查看

下载地址:https://www.kafkatool.com/download.html

运行效果

kafka 集群 ZooKeeper 模式搭建,Kafka,kafka,zookeeper,分布式,java

6、测试Kafka集群

新建 maven 项目,添加 Kafka 依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>

笔者新建 maven项目 kafka-learn

kafka-learn 项目 pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.wsjzzcbq</groupId>
    <artifactId>kafka-learn</artifactId>
    <version>1.0-SNAPSHOT</version>
 
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.6.0</version>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

新建生产者 ProducerDemo

package com.wsjzzcbq;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
 
/**
 * Demo
 *
 * @author wsjz
 * @date 2023/11/24
 */
public class ProducerDemo {
 
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        //配置集群节点信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");
        //配置序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
 
        Producer<String, String> producer = new KafkaProducer<>(properties);
 
        //topic 名称是demo_topic
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_topic", "明月别枝惊鹊");
        RecordMetadata recordMetadata = producer.send(producerRecord).get();
        System.out.println(recordMetadata.topic());
        System.out.println(recordMetadata.partition());
        System.out.println(recordMetadata.offset());
 
    }
}

新建消费者 ConsumerDemo

package com.wsjzzcbq;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
 
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
 
/**
 * ConsumerDemo
 *
 * @author wsjz
 * @date 2023/11/24
 */
public class ConsumerDemo {
 
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 配置集群节点信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");
 
        // 消费分组名
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo_group");
        // 序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
        // 消费者订阅主题
        consumer.subscribe(Arrays.asList("demo_topic"));
 
        while (true) {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String,String> record:records) {
                System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),
                        record.offset(),record.key(),record.value());
            }
        }
    }
}

运行测试

效果图

kafka 集群 ZooKeeper 模式搭建,Kafka,kafka,zookeeper,分布式,java

至此完文章来源地址https://www.toymoban.com/news/detail-754426.html

到了这里,关于kafka 集群 ZooKeeper 模式搭建的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式应用之zookeeper集群+消息队列Kafka

           ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。为分布式框架提供协调服务的

    2024年02月06日
    浏览(60)
  • 【简单认识zookeeper+kafka分布式消息队列集群的部署】

    Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。 Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已

    2024年02月13日
    浏览(40)
  • jdk+zookeeper+kafka 搭建kafka集群

    环境准备 环境资源包: jdk-8u341-linux-x64.tar.gz kafka_2.12-2.2.0.tgz zookeeper-3.4.14.tar.gz server-id ip 状态 server1 10.206.120.10 leader server2 10.206.120.2 follower server3 10.206.120.3 follower 一、安装jdk 因为kafka需要Java环境,所以优先配置jdk环境,若已经配置了java环境,此步骤可以忽略 二、zookeeper集群

    2024年02月04日
    浏览(38)
  • kafka---- zookeeper集群搭建

    Hostname Ip Root Prac-zk-133 172.16.144.133 root Prac-zk-134 172.16.144.134 root Prac-zk-135 172.16.144.135 root 172.16.144.133 Prac-zk-133 172.16.144.134 Prac-zk-134 172.16.144.135 Prac-zk-135 zkServer.sh start-foreground 使用它启动会出现地址已在使用 Zookeeper启动失败(java.net.BindException: 地址已在使用 端口被占用了,把218

    2024年02月11日
    浏览(35)
  • zookeeper + kafka集群搭建详解

    2023年04月11日
    浏览(71)
  • Docker 搭建 zookeeper、kafka 集群

    首先创建一个自定义网络,后续的所有容器都放入同一个内网中,容器之间还可以通过容器名称进行直接访问,在后续的配置中只需要写明容器名称即可,会自动找到对应的IP地址,防止重启容器后IP地址发生变化时,还要去修改配置文件的操作 创建目录 启动zookeeper 进入zo

    2024年02月10日
    浏览(37)
  • kafka3.4.0集群搭建(无zookeeper)

    注意:低版本需要安装zookeeper,在2.8及以上可移除zookeeper 前往官网下载 !!!不要下载src文件 1.解压文件 tar xzf kafka_2.13-3.4.0.tgz 进入文件 cd kafka_2.13-3.4.0 进入 cd config/kraft 2.修改server.properties以下属性 vim server.properties 3.我们需要在启动服务器之前创建kafka集群id。执行下列命令

    2024年02月03日
    浏览(38)
  • 基于Zookeeper搭建Kafka高可用集群(实践可用)

    目录 一、Zookeeper集群搭建 1.1 下载  解压 1.2 修改配置 1.3 标识节点 1.4 启动集群 1.5 集群验证 二、Kafka集群搭建 2.1 下载解压 2.2 拷贝配置文件 2.3 修改配置 2.4 启动集群 2.5 创建测试主题 2.6 写入数据测试 为保证集群高可用,Zookeeper 集群的节点数最好是奇数,最少有

    2024年02月09日
    浏览(42)
  • Zookeeper和kafka集群搭建步骤(超详细,易理解)

    目录 1.修改文件名 2.修改zoo.cfg配置文件内容 (1)找到#maxClientCnxns=60命令在下面添加 3.进入/tmp目录 (1)创建zookeeper目录 (2)进入zookeeper目录中创建myid文件,添加  \\\'  1  \\\';

    2024年02月15日
    浏览(61)
  • CentOS7中新版本自带zookeeper搭建Kafka集群

    一、zookeeper在kafka中的具体作用     它是一个分布式协调框架。很好的将消息生产、消息存储、消息消费的过程结合在一起。在典型的Kafka集群中,Kafka通过Zookeeper管理集群控制,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Con

    2024年02月08日
    浏览(84)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包