Kafka简单入门02——ISR机制

这篇具有很好参考价值的文章主要介绍了Kafka简单入门02——ISR机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

ISR机制

ISR 关键概念

HW和LEO

Java使用Kafka通信

Kafka 生产者示例

Kafka 消费者示例


kafka isr,消息队列,kafka

ISR机制

Kafka 中的 ISR(In-Sync Replicas)机制是一种用于确保数据可靠性和一致性的重要机制。ISR 是一组副本,它包括分区的领导者(Leader)和追随者(Follower)副本,这些副本与领导者保持数据同步。

ISR 关键概念
  1. 领导者和追随者:每个分区有一个领导者和零个或多个追随者。领导者负责处理客户端的写请求,而追随者主要用于数据复制。

  2. ISR 集合:ISR 集合是分区领导者的一组追随者副本,它们与领导者保持数据同步。只有在 ISR 集合中的追随者副本可以参与数据的写入和读取操作。

  3. 数据复制:领导者将消息写入其本地日志,并定期将这些消息发送给 ISR 集合中的追随者。追随者接收消息后,将其写入本地日志,以保持数据同步。

  4. Leader Epoch 和 Log Start Offset:ISR 集合中的每个追随者都维护了领导者的日志信息,包括领导者的 Leader Epoch 和 Log Start Offset。这些信息用于确保数据的正确复制和同步。

  5. 数据一致性:只有在 ISR 集合中的所有追随者都成功复制了一条消息后,领导者才会将该消息标记为已提交,确保数据的一致性。

  6. 故障处理:如果某个追随者发生故障或者追赶进度过慢,那么该追随者可能会被从 ISR 集合中移除。这有助于保持数据的可靠性和避免影响性能。

其中,需要注意的的概念:

  • 分区中的所有副本统称为AR(Assigned Replicas)。

  • 所有Leader副本加上和Leader副本保持同步的Follower副本组成ISR(In-Sync Replicas)。

  • 所有没有保持同步的Follower副本组成OSR(Out-of-Sync Replicas)。

  • AR = ISR + OSR。正常情况下,所有Follower副本都应该和Leader副本一致,即AR=ISR。

  • 当Leader故障时,在ISR集合中的Follower才有资格被选举为新的Leader。

HW和LEO

在 Kafka 中,HW(High Watermark)和 LEO(Log End Offset)是与数据复制和消费有关的两个重要概念。

HW(High Watermark):HW 是指在分区中,已经被所有追随者(Follower)副本复制的消息的位置。HW 是每个分区的属性,它表示已经提交的消息。只有在 HW 之前的消息才被认为是已经提交的,这些消息已经被写入分区的所有追随者副本,并且被认为是安全的,不会丢失。HW 是为了确保数据一致性和可靠性而引入的。

LEO(Log End Offset):LEO 是指在分区中当前最新消息的位置。LEO 表示分区日志中的最后一条消息的偏移量。LEO 包括已经被写入但尚未被所有追随者副本复制的消息,以及正在等待被写入的消息。LEO 是一个动态的属性,它会随着新消息的写入而逐渐增加。

HW 和 LEO 之间的关系非常重要,它们可以帮助确保数据的可靠性和一致性:

  • HW 之前的消息是已经提交的消息,它们在数据复制中是安全的,不会丢失。

  • LEO 之前的消息是已经写入但尚未被所有追随者副本复制的消息。这些消息可能会在 HW 之前被提交,也可能会在之后被提交。

  • 一旦 HW 追赶上 LEO,表示所有的消息都已经提交,分区的数据一致性得到了保障。

Kafka的消息同步流程:

  1. 初始状态,HW和LEO在同一位置。消费者可以读取的有效消息为0,1,2,3.

    kafka isr,消息队列,kafka

  2. 消息写入Leader,LEO位置改变。Follower进行同步。

    kafka isr,消息队列,kafka
  3. Follower同步进度决定HW位置,消费者可读的有效消息0,1,2,3,4。

    kafka isr,消息队列,kafka

  4. 完成同步,消费者可读的有效消息0,1,2,3,4,5,6。

    kafka isr,消息队列,kafka

可以看出,Kafka的复制机制既不是完全的同步复制,也不是单纯异步复制。

  • 同步复制要求所有Follower副本都复制完,太影响性能了。

  • 异步复制只要数据被写入Leader副本就认为提交成功,在这种情况下,如果Leader宕机时候Follower还是落后于Leader就会造成数据丢失。

而Kafka使用的ISR机制则有效地权衡了数据可靠性和性能之间的关系。

Java使用Kafka通信

以下是 Kafka 生产者和消费者的简单示例,使用 Kafka 的 Java 客户端库(Kafka Producer 和 Kafka Consumer)来创建一个基本的消息传递示例。文章来源地址https://www.toymoban.com/news/detail-764980.html

Kafka 生产者示例
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
​
public class KafkaProducerExample {
    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092"; // Kafka 服务器地址
        String topic = "my-topic"; // Kafka 主题名称
​
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
​
        Producer<String, String> producer = new KafkaProducer<>(properties);
​
        // 发送消息
        producer.send(new ProducerRecord<>(topic, "key", "Hello, Kafka!"), (metadata, exception) -> {
            if (exception == null) {
                System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset());
            } else {
                System.err.println("Error sending message: " + exception.getMessage());
            }
        });
​
        producer.close();
    }
}
Kafka 消费者示例
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.time.Duration;
import java.util.Collections;
​
public class KafkaConsumerExample {
    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092"; // Kafka 服务器地址
        String groupId = "my-group"; // 消费者组 ID
        String topic = "my-topic"; // Kafka 主题名称
​
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("group.id", groupId);
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
​
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));
​
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: key = " + record.key() + ", value = " + record.value());
            }
        }
    }
}

到了这里,关于Kafka简单入门02——ISR机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka 入门到起飞 - 生产者参数详解 ,什么是生产者确认机制? 什么是ISR? 什么是 OSR?

    上回书我们讲了,生产者发送消息流程解析传送门 那么这篇我们来看下,生产者发送消息时几个重要的参数详解 ,什么是生产者确认机制? 什么是ISR? 什么是 OSR? bootstrap.servers : Kafka 集群地址 host1:port1,host2:port2,host3:port3 不需要写Kafka集群中全部的broker地址,但是也不要写

    2024年02月15日
    浏览(35)
  • 大数据面试题:Kafka的ISR机制

    面试题来源: 《大数据面试题 V4.0》 大数据面试题V3.0,523道题,679页,46w字 可回答:1)从ISR踢出去之后呢;2)一般Leader怎么判断Follower挂掉? 参考答案: ISR (In-Sync Replicas):副本同步队列 ISR是Leader维护的一个动态副本同步队列,是和Leader保持同步的Follower集合。Kafka通过

    2024年02月12日
    浏览(40)
  • Kafka的Replication策略和ISR同步机制

    目录 replication replication复制机制 同步复制 异步复制 ISR

    2024年02月13日
    浏览(35)
  • Kafka 的 replica 同步机制(ISR与OSR列表数据相互转换)

    注:ISR与OSR列表数据是存储在Zookeeper的Partition中的(一个Topic可能会配置多个partition)。 一、Kafka副本 Kafka中主题的每个Partition有一个预写式日志文件,每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中,Partition中的每个消息都有一个连续

    2024年04月28日
    浏览(25)
  • kafka--kafka基础概念-ISR详解

    主要是讲 主 往 从同步中的问题 当绿色P1接收到写入的数据,要同步到紫色的P1S1和P1S2 如何保证一致性呢? 使用In Sync Replicas 也就是ISR概念 为什么不一致的? 因为P1S1同步数据 可能花费 50ms P1S2可能花费60ms 同步完的进入ISR集合, 同步时间是可以设置规定时间的(容忍时间)

    2024年02月12日
    浏览(37)
  • Kafka之分区副本与ISR

    Kafka的Topic分区本质是一个用于存储Topic下的消息的日志,但是只存一份日志会因为机器损坏或其他原因导致消息丢失不可恢复, 因此需要多个相同的日志作为备份,提高系统可用性,这些备份在kafka中被称为副本(replica)。 kafka将分区的所有副本均匀的分配到所有broker上,并从

    2024年02月04日
    浏览(34)
  • 【项目实战】Kafka 分区中的AR、ISR、OSR

    👉 博主介绍 : 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO TOP红人 Java知识图谱点击链接: 体系化学习Java(Java面试专题) 💕💕 感兴趣的同学可以收藏关注下 , 不然下次找不到哟

    2024年02月16日
    浏览(30)
  • Kafka - AR 、ISR、OSR,以及HW和LEO之间的关系

    AR(Assigned Replication): 分区中的所有副本统称为AR(Assigned Replicas) ISR(In-Sync Replicas):同步副本集合 ISR是指当前与主副本保持同步的副本集合。当主副本发生故障时,Kafka会从ISR中选举一个新的主副本来接管工作。因此,ISR的大小对于分区的可用性和性能至关重要。如果

    2024年02月16日
    浏览(31)
  • 消息队列kafka及zookeeper机制

    目录 一、zookeeper 1、zookeeper简介 2、zookeeper特点 3、zookeeper工作模式及机制 4、zookeeper应用场景及选举机制 5、zookeeper集群部署 ①实验环境 ②安装zookeeper 二、消息队列kafka  1、为什么要有消息队列 2、使用消息队列的好处 3、kafka简介 4、kafka特点 5、kafka系统架构名词介绍 6、

    2023年04月14日
    浏览(37)
  • Kafka系列(一)【消息队列、Kafka的基本概念、Kafka的工作机制、Kafka可满足的需求、Kafka的特性、Kafka的应用场景】

    转自《Kafka并不难学!入门、进阶、商业实战》 1. 消息队列的来源 在高并发的应用场景中, 由于来不及同步处理请求,接收到的请求往往会发生阻塞。 例如,大量的插入、更新请求同时到达数据库,这会导致行或表被锁住,最后会因为请求堆积过多而触发“连接数过多的异

    2024年02月20日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包