Kafka底层原理探秘

这篇具有很好参考价值的文章主要介绍了Kafka底层原理探秘。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、简介

  1. Kafka 是一个分布式流处理平台由 LinkedIn 公司开发的,遵循 Apache 开源协议。
  2. Kafka 主要是用来处理实时数据流,可以发布、订阅、存储和处理数据。

应用场景:

  1. 日志收集:用于分布式日志系统,例如 ELK。
  2. 消息系统:可以将 Kafka 作为消息队列使用。
  3. 流处理:将 Kafka 与 Flink 或 Spark 等流处理引擎配合使用。

二、架构介绍

1. 组件

  • Producer:发送数据到 Kafka 集群。
  • Consumer:从 Kafka 集群消费数据。
  • Broker:Kafka 集群中的每个服务器就叫做 Broker。
  • Topic:物理上不同的消息类别;逻辑上一个 Topic 包含多个 Partition。
  • Partition:物理上的概念,每个 Partition 对应一个文件夹,该文件夹下存储着该 Partition 的所有消息。
  • Offset:Kafka 采用了分布式的提交日志机制,消费者消费数据时会记录已经消费的位置,即 Offset。
  • ZooKeeper:Kafka 使用 ZooKeeper 来存储集群的配置信息,及 Broker、Producer、Consumer 等各种节点的状态信息。

2. 集群

  • Kafka 集群由多个 Broker 组成,每个 Broker 在集群中都有一个唯一的编号。
  • 一个 Broker 可以容纳多个 Partition,同一个 Topic 的不同 Partition 分散到不同的 Broker 上组成分布式集群。
  • Kafka 集群会自动调整 Partition 的数量,并将 Partition 均匀分配给各个 Broker。

3. 数据存储结构

  • Kafka 消息被保存在 Partition 中,每个 Partition 对应一个目录,里面是多个 Segment,Segment 文件的大小和发送速率有关系。一个 Partition 有多个 Segments 是因为 Kafka 采取了文件系统批量读写机制

代码示例:

public class KafkaDemo {

    public static void main(String[] args) {

        //1.创建生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(getProperties());

        //2.创建消息
        ProducerRecord<String, String> record = new ProducerRecord<>("myTopic", "key", "value");

        try {
            //3.发送消息
            producer.send(record).get();
            System.out.println("Sent message successfully");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            //4.关闭连接
            producer.close();
        }

    }

    /**
     * 获取Kafka配置信息
     *
     * @return 配置信息
     */
    private static Properties getProperties() {
        Properties props = new Properties();

        //设置Kafka地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        //设置消息Key和Value的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        return props;
    }
}

三、Kafka消息传递原理

1. 消息生产者

Kafka生产者将数据以消息的形式发送到Kafka集群。生产者可以将消息发送到一个指定的主题(topic),也可以选择在发送时指定分区(partition)。当生产者需要发送消息时,它先与Kafka集群上的一个Broker建立TCP连接,然后将消息发送到该Broker。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) throws InterruptedException {

        // 配置Kafka生产者属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息到指定主题
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), "Hello World-" + i));
            Thread.sleep(1000); // 每秒发送一条消息
        }

        producer.close(); // 关闭Kafka生产者实例
    }
}

2. 消息消费者

Kafka消费者从Kafka集群中的一个或多个分区中消费消息。消费者可以随时订阅一个或多个主题,并在每个主题中定位到特定分区。

import org.apache.kafka.clients.consumer.*;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group"); // 与生产者所在的组相同
        props.put("enable.auto.commit", "true"); // 自动提交偏移量
        props.put("auto.commit.interval.ms", "1000"); // 自动提交偏移量的时间间隔
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建Kafka消费者实例
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic")); // 订阅指定主题

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                // 处理消息
                System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());
            }
        }

        // consumer.close();
    }
}

3. 主题与分区

Kafka的主题(topic)是Kafka用于区分消息类型和类别的单位。每个主题都由一个或多个分区(partition)组成,分区是存储在Kafka集群中不同节点上的数据容器。每个主题的消息可以分布在不同的分区中。

4. 副本机制

Kafka的副本机制是为了保证消息的高可用性和数据的持久性。当一个分区的消息被发送到Kafka集群后,它会被复制到多个副本(replica)中。每个分区都有一个或多个副本,其中有且仅有一个被标记为“首领副本”(leader replica),负责读写该分区的数据。其他副本被称为“追随者副本”(follower replica),它们只能复制首领副本的数据,并借此提高系统的可靠性和容错性。

四、消息传递过程

1. 消息发送流程

消息发送者将消息发送到Kafka主题(topic),然后由Kafka Producer将消息分区并写入到Broker中的指定分区中。在发送消息之前,Producer需要从Zookeeper中获取集群元数据信息,包括Broker列表和主题分区的信息。具体流程如下:

  1. 消息发送者通过Producer API将消息发送到指定topic中。

    String topic = "test_topic";
    String message = "Hello, Kafka!";
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
    producer.send(record);
    
  2. Producer根据消息的key值使用Partitioner算法将同一个key的消息发送到同一个分区里,保证消息的有序性。

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                Cluster cluster) {
        int numPartitions = cluster.partitionCountForTopic(topic);
        if (keyBytes == null) {
            Random random = new Random();
            return random.nextInt(numPartitions);
        }
        int hash = Utils.murmur2(keyBytes);
        return hash % numPartitions;
    }
    
  3. Producer将记录存储在缓冲区中,如果缓冲区已满,则会调用send方法将缓冲区中的内容批量发送到Broker中。

2. 消息存储流程

消息被存储在Broker的一个或多个分区中,分区中的每条消息都有一个唯一的偏移量(offset),并按照其他参数(如消息的时间戳)进行排序存储。当Consumer消费分区中的消息时,可以根据偏移量来读取消息,保证消息的顺序性。

在Broker上保存的消息是以一种高效而紧凑的格式进行编码的,称为RecordBatch,它可以将多个Producer的相关记录分组在一起,以便有效地压缩提交到Broker的数据传输量。

3. 消息消费流程

Consumer订阅并读取特定主题(topic)的消息。消费者(Consumer)从Broker中拉取特定分区的消息,并对其进行处理。具体流程如下:

  1. 消费者向Kafka集群发送Fetch请求,获取数据。

  2. Broker收到Fetch请求后,从指定的分区和偏移量(offset)开始读取消息,然后将数据返回给Consumer。

  3. Consumer获取到数据后,进行处理和消费,同时记录每个Partition的下一个可拉取的偏移量,并定期将其提交到Zookeeper中,用于在Consumer发生故障或重启后重新读取未被处理的消息。

String topicName = "test_topic";
String groupId = "test_group";
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", groupId);
props.setProperty("auto.commit.enable", "false");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", 
          record.offset(), record.key(), record.value());
    }
    consumer.commitSync();
}

五、性能优化

1.硬件方面的优化

磁盘

  • 在使用Kafka时,建议使用SSD磁盘,因为SSD的I/O性能比HDD磁盘更好。
  • 另外,可以使用多块磁盘,将Kafka数据分散到不同的磁盘上,以减少单个磁盘的负担。

内存

  • 将足够的内存分配给Kafka Broker进程,以便它可以缓存更多的消息。
  • 在Kafka Producer写入消息时,可以开启压缩功能,减少传输的数据量,节省内存空间。

CPU

  • Kafka Broker对CPU的要求通常不高,但在高负载下还是需要注意CPU的使用率。
  • 在多核CPU的机器上,可以通过增加Broker实例数或者增加分区数来充分利用CPU资源。

2.Kafka配置优化

Producer端

  • acks参数:设置消息确认的级别。acks=0表示不等待服务器确认;acks=1表示只需得到Kafka集群中一台服务器的确认;acks=all表示需要得到Kafka集群中全部服务器的确认。确认级别越高,消息的耗时就会增加,但是可以提供更好的数据安全性。
  • batch.size参数:设置批处理大小。较小的批处理大小可以降低延迟,但也会增加CPU开销。建议根据实际情况调整该参数。
  • compression.type参数:设置压缩方式。可选的压缩方式包括none(默认)、gzip、snappy和lz4。生产者使用压缩功能可以减少传输的数据量,提高传输效率。

Broker端

  • message.max.bytes参数:设置单个消息的最大大小。如果消息大小超过了该限制,Kafka将拒绝该消息。
  • num.io.threads参数:设置Broker处理I/O请求的线程数。增加该参数的值可以提高Broker的并发能力,但是也会增加CPU的使用率。

3. 消费者优化

  • Group ID:消费者组是Kafka中消费者的逻辑分组,对于同一组内的消费者来说,每个分区只会被其中一台消费者消费。因此,合理设置Group ID可以提高消费者的有效性。
  • Fetch Size:每次从Kafka Broker读取的消息数量。过大的Fetch Size会导致消费端的延迟增加,过小的Fetch Size则会增加网络开销。可以通过调节该参数来达到最佳的消费效率。
  • 处理策略:消费者有两种处理消息的方式,即poll()和push()。其中,poll()需要由应用主动调用,而push()则是在后台由Kafka的Consumer线程自动触发。通常情况下,使用poll()比push()更为灵活,适用于大部分场景。

4. 代码示例

以下是Kafka Producer端实现批量写入消息的Java代码示例:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerSample {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("batch.size", 16384);

        Producer<String, String> producer = new KafkaProducer<>(props);
        String topic = "myTopic";
        for (int i = 0; i < 10000; i++) {
            String msg = "My message No." + i;
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
            producer.send(record);
        }
        producer.close();
    }
}

六、Kafka的优缺点

1. Kafka的优点

  • 高吞吐量、低延迟: Kafka通过partition和consumer group的概念来实现负载均衡,支持分布式部署,能够实现高吞吐量和低延迟。
  • 高扩展性: Kafka集群内所有节点都是对等的,新的节点可以很容易地加入到集群中,扩展集群的容量,并且不会中断已经运行的服务。
  • 持久化存储: Kafka数据以文件的形式保留在磁盘中,可靠性较高,即使一些节点失效,数据依然不会丢失,非常适合大规模数据的持续存储和离线分析处理。
  • 可靠性高: Kafka支持数据备份和副本机制,通过数据的复制和备份来提高其稳定性,保障数据不会丢失。
  • 消息传输具有多样性: Kafka支持多种协议的传输,可以与不同类型的应用程序集成,例如支持HTTP RESTful API,各种编程语言的客户端和其他一些补充工具等。

2. Kafka的缺点

  • 部署和配置较为复杂: Kafka的集群需要进行配置和部署,需要一定的技术力量,对于较小的企业来说,可能需要投入大量的精力和时间才能完成部署和配置。
  • 需要对数据进行处理: Kafka只是一个消息传递平台,不直接对数据进行处理,需要用户自己编写代码进行数据处理,因此需求较高的技术人员才能使用。
  • 没有自动管理: Kafka集群需要手动做一些管理,例如,当某个节点失效后,需要重新平衡partition的负载等。

七、Kafka的应用案例

Kafka是一个开源的分布式消息系统,在大数据领域有着广泛的应用。下面介绍Kafka的三个应用案例。

1. 网络爬虫

网络爬虫的核心功能是从互联网上抓取数据并进行分析或保存。Kafka可以作为网络爬虫的消息队列,负责将被爬取数据传输给爬虫程序。当网络爬虫处理完数据后,将数据发送到Kafka中,供后续处理程序使用。

具体实现时,需要先创建一个名为spider的Kafka主题,然后在爬虫程序中编写生产者代码,将爬取到的数据发送至该主题。以下是Java示例代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class SpiderProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer =
            new KafkaProducer<String, String>(props);

        for(int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>(
                "spider", Integer.toString(i), "data-" + Integer.toString(i)));

        producer.close();
    }
}

2. 数据统计

Kafka除了可以作为消息队列,还可以作为数据缓存,可以处理大量的数据流。在数据统计过程中,Kafka既可以作为生产者将收集到的数据发送到主题中,也可以作为消费者从主题中获取数据并进行分析、统计等操作。具体实现时,需要先创建一个名为data的Kafka主题,然后在收集数据的程序中编写生产者代码,将数据发送至该主题。再在处理数据的程序中编写消费者代码,从该主题中获取数据以完成数据统计。

以下是Java示例代码:文章来源地址https://www.toymoban.com/news/detail-563522.html

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class DataConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer =
            new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList("data"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n",
                    record.offset(), record.key(), record.value());
        }
    }
}

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class DataProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer =
            new KafkaProducer<String, String>(props);

        for(int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>(
                "data", Integer.toString(i), "data-" + Integer.toString(i)));

        producer.close();
    }
}

3. 实时监控

Kafka可以在实时监控中作为传输媒介,将源数据流发送到消费者,以满足分布式的需求。具体实现时,需要先创建一个名为metrics的Kafka主题,然后在生产者程序中将监控数据发送至该主题。再在监控中心中编写消费者代码,从该主题中获取数据并进行分析、监控等操作。

以下是Java示例代码:

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class MetricsConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer =
            new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList("metrics"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n",
                    record.offset(), record.key(), record.value());
        }
    }
}

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class MetricsProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer =
            new KafkaProducer<String, String>(props);

        for(int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>(
                "metrics", Integer.toString(i), "metrics-" + Integer.toString(i)));

        producer.close();
    }
}

到了这里,关于Kafka底层原理探秘的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【大数据工具】Kafka伪分布式、分布式安装和Kafka-manager工具安装与使用

    Kafka 安装包下载地址:https://archive.apache.org/dist/kafka/ 1. Kafka 伪分布式安装 1. 上传并解压 Kafka 安装包 使用 FileZilla 或其他文件传输工具上传 Kafka 安装包: kafka_2.11-0.10.0.0.tgz 解压安装包 2. 编辑配置文件 3. 拷贝并修改配置文件 分别修改 server2.properties、server3.properties 4. 创建日志

    2024年02月14日
    浏览(48)
  • 分布式消息服务kafka

    什么是消息中间件? 消息中间件是分布式系统中重要的组件,本质就是一个具有接收消息、存储消息、分发消息的队列,应用程序通过读写队列消息来通信。 例如:在淘宝购物时,订单系统处理完订单后,把订单消息发送到消息中间件中,由消息中间件将订单消息分发到下

    2024年02月01日
    浏览(48)
  • 【分布式应用】kafka集群、Filebeat+Kafka+ELK搭建

    主要原因是由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发 too many connection 错误,引发雪崩效应。 我们使用消息队列,通过异步处理请求,从而缓解系统的压力。消息队

    2024年02月16日
    浏览(53)
  • 【分布式技术】消息队列Kafka

    目录 一、Kafka概述 二、消息队列Kafka的好处 三、消息队列Kafka的两种模式 四、Kafka 1、Kafka 定义 2、Kafka 简介 3、Kafka 的特性 五、Kafka的系统架构 六、实操部署Kafka集群  步骤一:在每一个zookeeper节点上完成kafka部署 ​编辑 步骤二:传给其他节点 步骤三:启动3个节点 kafka管理

    2024年01月23日
    浏览(55)
  • 分布式 - 消息队列Kafka:Kafka 消费者的消费位移

    01. Kafka 分区位移 对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。偏移量从0开始,每个新消息的偏移量比前一个消息的偏移量大1。 每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一

    2024年02月12日
    浏览(50)
  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(47)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(44)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(45)
  • 分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式

    最简单的提交方式是让消费者自动提交偏移量,自动提交 offset 的相关参数: enable.auto.commit:是否开启自动提交 offset 功能,默认为 true; auto.commit.interval.ms:自动提交 offset 的时间间隔,默认为5秒; 如果 enable.auto.commit 被设置为true,那么每过5秒,消费者就会自动提交 poll() 返

    2024年02月12日
    浏览(48)
  • 分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

    01. Kafka 消费者分区再均衡是什么? 消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。 分区

    2024年02月12日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包