尚硅谷kafka3.0.0

这篇具有很好参考价值的文章主要介绍了尚硅谷kafka3.0.0。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

目录

💃概述

⛹定义

​编辑⛹消息队列

🤸‍♂️消息队列应用场景

​编辑🤸‍♂️两种模式:点对点、发布订阅

​编辑⛹基本概念

💃Kafka安装

⛹ zookeeper安装

⛹集群规划

​编辑⛹流程

⛹原神启动

🤸‍♂️批量脚本

⛹topics常规操作

⛹生产者命令行操作 

⛹消费者命令行操作 

💃生产者

⛹生产者消息发送

⛹异步发送api

🤸‍♂️普通异步发送 

🤸‍♂️回调异步发送

⛹同步发送

⛹分区 

🤸‍♂️分区策略

🤸‍♂️自定义分区

⛹提高吞吐量

⛹数据可靠性Ack

🤸‍♂️0 1 -1三个应答毛病

🤸‍♂️去重

🤸‍♂️事务

⛹有序

💃Broker

👩‍🚀Broker 工作流程 

🤸‍♂️Zookeeper 存储的 Kafka 信息 

🤸‍♂️总体工作流程

👩‍🚀Kafka 副本

副本基本信息 

Leader 选举流程

🧠整理思路:

follower挂了

leader寄了

分区副本分配

Leader Partition 负载平衡 

👩‍🚀文件存储

🤸‍♂️存储机制 

Topic 数据的存储机制

文件清理策略 

👩‍🚀高效读写数据 

💃Kafka 消费者 

👩‍🚀消费方式 pull

👩‍🚀Kafka 消费者工作流程 

消费者组原理 

消费流程

👩‍🚀消费案例

消费一个主题

消费一个分区

消费者组

👩‍🚀分区的分配策略、再平衡

👩‍🚀offset 位移

👩‍🚀消费者事务

👩‍🚀数据积压(消费者如何提高吞吐量)

💃Kafka-Eagle 监控

安装 改配置

💃Kafka-Kraft 模式

优点

安装配置

💃概述

⛹定义

        Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。(发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。)

        Kafka 最新定义:Kafka 是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

⛹消息队列

🤸‍♂️消息队列应用场景

缓存/消峰

尚硅谷kafka3.0.0,kafka,zookeeper,大数据解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

尚硅谷kafka3.0.0,kafka,zookeeper,大数据异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。

尚硅谷kafka3.0.0,kafka,zookeeper,大数据🤸‍♂️两种模式:点对点、发布订阅

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

1.为方便扩展,并提高吞吐量,一个topic分为多个partition

2.配合分区的设计,提出消费者组的概念,组内每个消费者并行消费

3.为提高可用性,为每个partition增加若干副本,类似NameNode HA

4. ZK中记录谁是leader,Kafka2.8.0以后也可以配置不采用ZK

⛹基本概念

(1)Producer:消息生产者,就是向 Kafka broker 发消息的客户端。

(2)Consumer:消息消费者,向Kafka broker 取消息的客户端。

(3)Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

(4)Broker:一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个broker 可以容纳多个topic。

(5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个 topic。

(6)Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个topic 可以分为多个 partition,每个partition 是一个有序的队列。

(7)Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower。

(8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。

(9)Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个Follower 会成为新的Leader。

💃Kafka安装

⛹ zookeeper安装

zookeeper(目前只有安装)-CSDN博客

⛹集群规划

不是每个都要zk

⛹流程

下载

官网安装地址:Apache Kafka

本文使用kafka_2.12-3.0.0:https://archive.apache.org/dist/kafka/3.0.0/kafka_2.12-3.0.0.tgz

解压

tar -zxvf kafka_2.12-3.0.0.tgz -C /export/server/

改配置

cd config
vim server.properties

# The id of the broker. This must be set to a unique integer for each broker.
# 这里三个节点都要有自己的id,不能重复,hadoop1是0,hadoop2是1,hadoop3是2
broker.id=1

# A comma separated list of directories under which to store log files
# 日志数据,不能放到tmp临时目录,自己指定一个
log.dirs=/export/server/kafka/log


############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.

zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka

:wq

fenfa

fenfa到另外两个集群,修改id为2和3!!!

环境变量/etc/profile,也分发

# kafka
export KAFKA_HOME=/export/server/kafka
export PATH=$PATH:$KAFKA_HOME/bin

⛹原神启动

启动kafka之前要先启动zk

zk start
bin/kafka-server-start.sh -daemon config/server.properties

关闭的时候要先关kafka,先关zoo的话会关不掉kafka!!!! 

注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死Kafka 进程了

🤸‍♂️批量脚本

case $1 in
start)
    for i in hadoop1 hadoop2 hadoop3
    do
        echo ================  Kafka-3.0.0 $i start ================
        ssh $i "/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties"
    done
    ;;
stop)
    for i in hadoop1 hadoop2 hadoop3
    do
        echo ================  Kafka-3.0.0 $i stop ================
        ssh $i "/export/server/kafka/bin/kafka-server-stop.sh"
    done
    ;;
*)
    echo "Usage: $0 {start|stop}"
    ;;
esac

⛹topics常规操作

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

查看操作主题命令参数

bin/kafka-topics.sh 

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

查看当前服务器中的所有 topic

bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --list

创建first topic

bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --create --partitions 1 --replication-factor 3 --topic first

查看first 主题的详情

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first

修改分区数(注意:分区数只能增加,不能减少)

bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --alter --topic first --partitions 3

再次查看first 主题的详情

bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --describe --topic first

删除topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first

⛹生产者命令行操作 

查看操作生产者命令参数 
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh 

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

bin/kafka-console-producer.sh --bootstrap-server hadoop1:9092 --topic first 
>hello world 
>atguigu  atguigu 

⛹消费者命令行操作 

1)查看操作消费者命令参数 
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

费消息 
(1)消费first 主题中的数据。 

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic first 

(2)把主题中所有的数据都读取出来(包括历史数据)。 

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first

💃生产者

⛹生产者消息发送

        在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

参数:

尚硅谷kafka3.0.0,kafka,zookeeper,大数据尚硅谷kafka3.0.0,kafka,zookeeper,大数据尚硅谷kafka3.0.0,kafka,zookeeper,大数据

⛹异步发送api

🤸‍♂️普通异步发送 

1)需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker 

(一边汇报完成了,一边在队列执行)

    public static void main(String[] args) {
        // TODO 创建生产者对象
        Properties conf = new Properties();

//        指定集群
        // public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
        conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092,hadoop2:9092");

//        指定序列化器
        conf.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//        conf.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        conf.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(conf);


        // TODO 发送数据
        for (int i = 0; i < 5; i++) {
            producer.send(new ProducerRecord<>("first", "yuange"+i));

        }



        // TODO 关闭资源
        producer.close();

    }

在 hadoop1 上开启 Kafka 消费者。 

bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic first 

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

🤸‍♂️回调异步发送

        回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。 

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

        // TODO 发送数据
        for (int i = 0; i < 5; i++) {
            producer.send(new ProducerRecord<>("first", "yuange" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null){
                        System.out.println("topic:"+metadata.topic()+" part:"+metadata.partition());
                    }
                }
            });

        }

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

⛹同步发送

只需在异步发送的基础上,再调用一下 get()方法即可。 

        // TODO 发送数据
        for (int i = 0; i < 5; i++) {
            producer.send(new ProducerRecord<>("first", "yuange" + i)).get();

        }

⛹分区 

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。 

(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。 

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

🤸‍♂️分区策略

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

        // TODO 发送数据
        for (int i = 0; i < 20; i++) {
            producer.send(new ProducerRecord<>("first", "yuange" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.println("topic:"+metadata.topic()+" part:"+metadata.partition());

                }
            });
            Thread.sleep(2);
        }

 指定2毫秒 超时重新选择分区了????

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

🤸‍♂️自定义分区

🤸‍♂️实现类

public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String s = value.toString();
        int part = 0;
        if(s.contains("yuange")){
            part = 0;
        }else{
            part = 1;
        }


        return part;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

实现

    public static void main(String[] args) throws InterruptedException {
        // TODO 创建生产者对象
        Properties conf = new Properties();
//        指定集群
        // public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
        conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092,hadoop2:9092");
//        指定序列化器
        conf.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//        conf.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        conf.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        conf.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.example.kafka.implement.MyPartitioner");

        KafkaProducer<String, String> producer = new KafkaProducer<>(conf);



        // TODO 发送数据
        for (int i = 0; i < 20; i++) {

            if (i%2 == 0){
                String value = "yuange" + i;
                producer.send(new ProducerRecord<>("first", value), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        System.out.println("topic:"+metadata.topic()+" part:"+metadata.partition()+value);
                    }
                });
            }else {
                String value = "cxk-" + i;

                producer.send(new ProducerRecord<>("first", value), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        System.out.println("topic:"+metadata.topic()+" part:"+metadata.partition()+value);
                    }
                });
            }
        }



        // TODO 关闭资源
        producer.close();

    }

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

⛹提高吞吐量

batch.size:批次大小,默认16k 
linger.ms:等待时间,修改为5-100ms (这两个同时生效,满足一个)

compression.type:压缩snappy (弹幕说解压影响性能)
RecordAccumulator:缓冲区大小,修改为64m 

        // 缓冲区大小
        conf.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32*1024*1024);
        // 批次大小
        conf.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
        // linger.ms
        conf.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // 压缩?
        conf.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(conf);

⛹数据可靠性Ack

🤸‍♂️0 1 -1三个应答毛病

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

        思考:Leader收到数据,所有Follower都开始同步数据,但有一 个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问 题怎么解决呢?

        Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。 如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如2超时,(leader:0, isr:0,1)

数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

  • acks=0,生产者发送过来数据就不管了,可靠性差,效率高; 
  • acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等; 
  • acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低; 

acks=0很少使用;

acks=1,一般用于传输普通日志,允许丢个别数据;

acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。

https://kafka.apache.org/documentation/#producerconfigs

        conf.put(ProducerConfig.ACKS_CONFIG, "1");
        conf.put(ProducerConfig.RETRIES_CONFIG, 3);

数据会重复,之后解决

🤸‍♂️去重

• 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2 可以保证数据不丢失,但是不能保证数据不重复

• 最多一次(At Most Once)= ACK级别设置为0 可以保证数据不重复,但是不能保证数据不丢失。 

• 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。 

Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。

        幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。 

        重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。 所以幂等性只能保证的是在单分区单会话内不重复。 

开启参数 enable.idempotence 默认为 true,false 关闭

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

🤸‍♂️事务

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

        conf.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "shiwuid-01");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(conf);
        producer.initTransactions();
        producer.beginTransaction();
        try {
            // TODO 发送数据
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<>("first", "value"+i));
                Thread.sleep(300);
            }
            producer.send(new ProducerRecord<>("first", "----------"));

            producer.commitTransaction();  // 提交事务
        }catch (Exception e){
            producer.abortTransaction();  // 终止事务
        }

⛹有序

1 )kafka在1.x版本之前保证数据单分区有序,条件如下: 
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)

2 )kafka在1.x及以后版本保证数据单分区有序,条件如下: 
1)未开启幂等性 
max.in.flight.requests.per.connection需要设置为1。 
2)开启幂等性 
max.in.flight.requests.per.connection需要设置 ≤ 5。 
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据, 故无论如何,都可以保证最近5个request的数据都是有序的。(seqence)

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

💃Broker

👩‍🚀Broker 工作流程 

🤸‍♂️Zookeeper 存储的 Kafka 信息 

bin/zkCli.sh 

通过 ls 命令可以查看 kafka 相关信息。 

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

🤸‍♂️总体工作流程

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

👩‍🚀Kafka 副本

副本基本信息 

1)Kafka 副本作用:提高数据可靠性。 
2)Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。 
3)Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。 

4)Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。

  • AR = ISR + OSR 
  • ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。 
  • OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本

Leader 选举流程

        Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。

        Controller 的信息同步工作是依赖于 Zookeeper 的。

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

1. kafka每启动一个集群,就会在zk里面注册,每个broker都有一个controller

2. 谁先注册好谁就是controller

3. leader监听所有的broker

4. 选举出来的controller选举leader

5. leader自己的信息在zk也有一个空间

6. 其他follower从这个空间拉取leader信息

7. 1号leader寄

8. 重新选举leader,下一个就是0

🧠整理思路:

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

一个分区分好几个块,leader、follower都是拿分区说事

leader是负责在生产者和消费者中间交互的

follower挂了

两个概念:

LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。 
HW(High Watermark):所有副本中最小的LEO 

尚硅谷kafka3.0.0,kafka,zookeeper,大数据尚硅谷kafka3.0.0,kafka,zookeeper,大数据

讲到这了突然特么来了一句:消费者只能拉取到HW

leader寄了

人话:

        leader寄了之后,选出来一个follower作为新leader,以这个新的leader目前有的数据量为基准,多了给扔掉,不能比新王多!但是数据丢掉了

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

分区副本分配

就是一种负载均衡

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

Leader Partition 负载平衡 

有平衡的机制

👩‍🚀文件存储

🤸‍♂️存储机制 

Topic 数据的存储机制

        Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名+分区序号,例如:first-0。 

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

存在KAFKA_HOME/datas下 

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

1.index为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引。 
参数log.index.interval.bytes默认4kb

2.Index文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大
因此能将offset的值控制在固定大小 

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

文件清理策略 

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。 

log.retention.hours,最低优先级小时,默认 7 天。 
log.retention.minutes,分钟。 
log.retention.ms,最高优先级毫秒。 
log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。 

log.retention.bytes,默认等于-1,表示无穷大。 

清理策略有 delete 和 compact 两种。

1)delete 日志删除:将过期数据删除 
⚫ log.cleanup.policy = delete 所有数据启用删除策略 

        1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。 
        2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。 

2)compact 日志压缩 

compact日志压缩:对于相同key的不同value值,只保留最后一个版本。 
⚫ log.cleanup.policy = compact 所有数据启用压缩策略 

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

        压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大 的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。 

        这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。 

👩‍🚀高效读写数据 

  1. Kafka 本身是分布式集群,可以采用分区技术,并行度高 
  2. 读数据采用稀疏索引,可以快速定位要消费的数据 
  3. 顺序写磁盘(同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。 )
  4. 页缓存 + 零拷贝技术 (零拷贝:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。 
    PageCache页缓存:Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。 )

💃Kafka 消费者 

👩‍🚀消费方式 pull

pull(拉)模式: 
consumer采用从broker中主动拉取数据。 
Kafka采用这种方式。不足之处是,如果Kafka没有数据,消费者可能会陷入循环中,一直回空数据。 

push(推)模式:

Kafka没有采用这种方式,因为由broker 决定消息发送速率,很难适应所有消费者的消费速率。例如推送的速度是50m/s, Consumer1、Consumer2就来不及处理消息。 

👩‍🚀Kafka 消费者工作流程 

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

消费者组原理 

        Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 

        消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

1、coordinator:辅助实现消费者组的初始化和分区的分配。 
        coordinator节点选择 = groupid的hashcode值 % 50( __consumer_offsets的分区数量) 
例如: groupid的hashcode值 = 1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。 

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

消费流程

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

👩‍🚀消费案例

消费一个主题

    public static void main(String[] args) throws InterruptedException {
        Properties conf = new Properties();

        conf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092");
        conf.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        conf.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        conf.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(conf);

        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        consumer.subscribe(topics);  // 这意味着消费者将开始从指定的主题接收消息并进行处理

        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> rec : consumerRecords){
                System.out.println(rec);
            }
        }



    }

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

消费一个分区

上面分配分区改为:

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

消费者组

nb,就是把上面那个指定分配分区的复制三份,只要groupid相同,就是一个逻辑组

👩‍🚀分区的分配策略、再平衡

Q:一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个 partition的数据?


A:Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。

可以通过配置参数partition.assignment.strategy,修改分区的分配策略。

默认策略是Range + CooperativeSticky。

Kafka可以同时使用多个分区分配策略。 

Range 以及再平衡

Range 是对每个 topic 而言的。 首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。 

假如现在有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。 

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

但是如果有 N 多个 topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区 !!!

再分配:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。 

RoundRobin 以及再平衡 

RoundRobin 针对集群中所有Topic而言。 
RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。 

Sticky 以及再平衡 

粘性分区定义:在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 

        首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。 

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。 

1号消费者:消费到 2、5、3 号分区数据。 
2号消费者:消费到 4、6 号分区数据。 
0号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 1 号消费者或者 2 号消费者消费。 

to

1号消费者:消费到 2、3、5 号分区数据。 
2号消费者:消费到 0、1、4、6 号分区数据。 

👩‍🚀offset 位移

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+ 分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据

👩‍🚀消费者事务

如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比如MySQL)

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

👩‍🚀数据积压(消费者如何提高吞吐量)

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

💃Kafka-Eagle 监控

Kafka-Eagle 框架可以监控 Kafka 集群的整体运行情况,在生产环境中经常使用。 

安装 改配置

修改KAFKA_HOME/bin/kafka-server-start.sh

其中一段改为,分发

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
        export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5  -XX:InitiatingHeapOccupancyPercent=70"
        export JMX_PORT="9999"
# export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

Kafka-Eagle 安装 

https://github.com/smartloli/kafka-eagle-bin/archive/v3.0.1.tar.gz

解压两重tar文件到自己的目录

修改配置文件 HOME/conf/system-config.properties 

######################################
# multi zookeeper & kafka cluster list 
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
#######################################
efak.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop1:2181,hadoop:2181,hadoop3:2181/kafka


##################################### 
# kafka offset storage 
##################################### 
# offset保存在 kafka 
cluster1.efak.offset.storage=kafka

# 配置 mysql连接 
efak.driver=com.mysql.jdbc.Driver 
efak.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull 
efak.username=root 
efak.password=123456

幻景变量

# kafkaEFAK 
export KE_HOME=/opt/module/efak 
export PATH=$PATH:$KE_HOME/bin 

原神启动

启动之前需要先启动 ZK 以及 KAFKA。

[2023-10-23 11:18:02] INFO: Port Progress: [##################################################] | 100%
[2023-10-23 11:18:06] INFO: Config Progress: [##################################################] | 100%
[2023-10-23 11:18:09] INFO: Startup Progress: [##################################################] | 100%
[2023-10-23 11:17:59] INFO: Status Code[0]
[2023-10-23 11:17:59] INFO: [Job done!]
Welcome to
    ______    ______    ___     __ __
   / ____/   / ____/   /   |   / //_/
  / __/     / /_      / /| |  / ,<   
 / /___    / __/     / ___ | / /| |  
/_____/   /_/       /_/  |_|/_/ |_|  
( Eagle For Apache Kafka® )

Version 2.0.8 -- Copyright 2016-2021
*******************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://192.168.88.128:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
[root@hadoop1 efak]# 

登录页面查看监控数据 http://192.168.88.128:8048

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

💃Kafka-Kraft 模式

优点

干掉了zk

左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller,由controller 进行 Kafka 集群管理。右图为 kraft 模式架构(实验性),不再依赖 zookeeper 集群, 而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理。 

尚硅谷kafka3.0.0,kafka,zookeeper,大数据

  • Kafka 不再依赖外部框架,而是能够独立运行; 
  • controller 管理集群时,不再需要从 zookeeper 中先读取数据,集群性能上升; 
  • 由于不依赖 zookeeper,集群扩展时不再受到 zookeeper 读写能力限制; 
  • controller 不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强 
  • controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策。 

安装配置

重新解压一个新的kafka安装包

cd /export/server/kafka2/config/kraft/

vim server.properties

# The node id associated with this instance's roles
node.id=1、2、3(三个集群分别分别分别设置不同的id)

# The connect string for the controller quorum(每个都这么设)
controller.quorum.voters=1@hadoop1:9093,2@hadoop2:9093,3@hadoop3:9093


advertised.listeners=PLAINTEXT://hadoop1、2、3:9092(三个分别设)

# A comma separated list of directories under which to store log files 日志目录改了
log.dirs=/export/server/kafka2/datas

分发

初始化集群数据目录 

bin/kafka-storage.sh random-uuid

omVzwxZORTqQuFNIFX8Rnw

用该 ID 格式化 kafka 存储目录(三台节点)

bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c /opt/module/kafka2/config/kraft/server.properties

缘神启动 

bin/kafka-server-start.sh -daemon config/kraft/server.properties文章来源地址https://www.toymoban.com/news/detail-740984.html

到了这里,关于尚硅谷kafka3.0.0的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

    Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题 0 号分区的数据,所下图所示: 1.2、案例代码 生产者往firstTopic主题 0 号分区发送数据代码 消费者消费firstTopic主题 0 分区数据代码 1.3、测试 在 IDEA 中执行消费者程序,如下图: 在 IDEA 中执行生产者程序 ,在控制台观察

    2024年02月09日
    浏览(17)
  • Kafka3.0.0版本——增加副本因子

    Kafka3.0.0版本——增加副本因子

    四台服务器 原始服务器名称 原始服务器ip 节点 centos7虚拟机1 192.168.136.27 broker0 centos7虚拟机2 192.168.136.28 broker1 centos7虚拟机3 192.168.136.29 broker2 centos7虚拟机4 192.168.136.30 broker3 2.1、先启动zookeeper集群 启动zookeeper集群 2.2、再启动kafka集群 启动kafka集群 3.1、增加副本因子的概述 在

    2024年02月09日
    浏览(12)
  • Kafka3.0.0版本——文件清理策略

    Kafka3.0.0版本——文件清理策略

    1.1、文件清理策略的概述 Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。 参数 解释 log.retention.hours 小时,最低优先级(默认 7天) log.retention.minutes 分钟 log.retention.ms 毫秒,最高优先级 log.retention.check.interval.ms 负责设置检查周期(默认 5 分钟) 1

    2024年02月09日
    浏览(13)
  • Kafka3.0.0版本——文件存储机制

    Kafka3.0.0版本——文件存储机制

    1.1、Topic 数据的存储机制的概述 Topic是逻辑上的概念,而partition是物理上的概念。 每个partition对应于一个log文件 ,该log文件中存储的就是Producer生产的数据。 Producer生产的数据会被不断追加到该log文件末端 ,为防止log文件过大导致数据定位效率低下, Kafka采取了 分片和 索引

    2024年02月09日
    浏览(6)
  • Kafka3.0.0版本——手动调整分区副本示例

    Kafka3.0.0版本——手动调整分区副本示例

    四台服务器 原始服务器名称 原始服务器ip 节点 centos7虚拟机1 192.168.136.27 broker0 centos7虚拟机2 192.168.136.28 broker1 centos7虚拟机3 192.168.136.29 broker2 centos7虚拟机4 192.168.136.30 broker3 2.1、先启动zookeeper集群 启动zookeeper集群 2.2、再启动kafka集群 启动kafka集群 3.1、手动调整分区副本的前提

    2024年02月11日
    浏览(15)
  • Kafka3.0.0版本——Leader Partition自动平衡

    Kafka3.0.0版本——Leader Partition自动平衡

    正常情况下,Kafka 本身会自动把Leader Partition均匀分散在各个机器上 ,来保证每台机器的读写吞吐量都是均匀的。但是如果 某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上 ,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是foll

    2024年02月11日
    浏览(12)
  • Kafka3.0.0版本——Broker( 退役旧节点)示例

    Kafka3.0.0版本——Broker( 退役旧节点)示例

    三台服务器 原始服务器名称 原始服务器ip centos7虚拟机1 192.168.136.27 centos7虚拟机2 192.168.136.28 centos7虚拟机3 192.168.136.29 centos7虚拟机4 192.168.136.30 分别启动4台zookeeper 再分别启动4台kafka brokers中的ids=0、1、2、3都存在,说明4台kafka启动正常,如下图: 查看kafka中的创建过的名称为

    2024年02月13日
    浏览(6)
  • Kafka3.0.0版本——Follower故障处理细节原理

    Kafka3.0.0版本——Follower故障处理细节原理

    三台服务器 原始服务器名称 原始服务器ip 节点 centos7虚拟机1 192.168.136.27 broker0 centos7虚拟机2 192.168.136.28 broker1 centos7虚拟机3 192.168.136.29 broker2 2.1、服务器基本信息 首先,分别有3台服务器分别为broker0、broker1、broker2,其中一台为leader,2台follower服务器。每台服务器已经接收到

    2024年02月10日
    浏览(16)
  • Kafka3.0.0版本——Leader故障处理细节原理

    Kafka3.0.0版本——Leader故障处理细节原理

    三台服务器 原始服务器名称 原始服务器ip 节点 centos7虚拟机1 192.168.136.27 broker0 centos7虚拟机2 192.168.136.28 broker1 centos7虚拟机3 192.168.136.29 broker2 2.1、服务器基本信息 首先,分别有3台服务器分别为broker0、broker1、broker2,其中一台为leader,2台follower服务器。每台服务器已经接收到

    2024年02月11日
    浏览(11)
  • Kafka3.0.0版本——消费者(自动提交 offset)

    Kafka3.0.0版本——消费者(自动提交 offset)

    官网文档 参数解释 参数 描述 enable.auto.commi 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 图解分析 消费者自动提交 offset代码 消费者自动提交

    2024年02月09日
    浏览(12)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包