Kafka数据重复问题解决方案

这篇具有很好参考价值的文章主要介绍了Kafka数据重复问题解决方案。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

通常,消息消费时候都会设置一定重试次数来避免网络波动造成的影响,同时带来副作用是可能出现消息重复。

生产端:遇到异常,基本解决措施都是重试。

场景一:leader分区不可用了,抛 LeaderNotAvailableException异常,等待选出新leader分区。
场景二:Controller所在Broker挂了,抛NotControllerException异常,等待Controller重新选举。
场景三:网络异常、断网、网络分区、丢包等,抛NetworkException异常,等待网络恢复。

消费端:

poll一批数据,处理完毕还没提交offset,机子宕机重启了,又会poll上批数据,再度消费就造成了消息重复。

先来了解下消息的三种投递语义:

最多一次(at most once): 消息只发一次,消息可能会丢失,但绝不会被重复发送。例如:mqtt 中 QoS = 0。
至少一次(at least once): 消息至少发一次,消息不会丢失,但有可能被重复发送。例如:mqtt 中 QoS = 1
精确一次(exactly once): 消息精确发一次,消息不会丢失,也不会被重复发送。例如:mqtt 中 QoS = 2

再来看如何解决消息重复,即如何实现精准一次,可分为三种方法:

1.Kafka幂等性Producer:

保证生产端发送消息幂等。局限性,是只能保证单分区且单会话(重启后就算新会话)

2.Kafka事务:

保证生产端发送消息幂等。解决幂等Producer的局限性

3.消费端幂等:

保证消费端接收消息幂等。蔸底方案。

第一种方法:Kafka幂等性Producer

幂等性指:

无论执行多少次同样的运算,结果都是相同的。
即一条命令,任意多次执行所产生的影响均与一次执行的影响相同

幂等性使用示例:

在生产端添加对应配置即可
Properties props = new Properties();
props.put("enable.idempotence", ture);  // 1. 设置幂等
props.put("acks", "all");               // 2. 当 enable.idempotence 为 true,这里默认为 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 注意
设置幂等,启动幂等。
配置 acks,注意:一定要设置 acks=all,否则会抛异常。
配置 max.in.flight.requests.per.connection 需要 <= 5,否则会抛异常 OutOfOrderSequenceException
0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1
Kafka >= 1.1, max.in.flight.request.per.connection <= 5

为了更好理解,需要了解下Kafka幂等机制

Producer 每次启动后,会向 Broker 申请一个全局唯一的 pid。(重启后 pid 会变化,这也是弊端之一)
Sequence Numbe:针对每个 <Topic, Partition> 都对应一个从0开始单调递增的 Sequence,同时 Broker端会缓存这个seq num
判断是否重复:拿 <pid, seq num> 去 Broker 里对应的队列 ProducerStateEntry.Queue(默认队列长度为 5)查询是否存在
如果 nextSeq == lastSeq + 1,即 服务端seq + 1 == 生产传入seq,则接收。
如果 nextSeq == 0 && lastSeq == Int.MaxValue,即刚初始化,也接收。
反之,要么重复,要么丢消息,均拒绝。

这种设计针对解决了两个问题:

消息重复: 
场景 Broker 保存消息后还没发送 ack 就宕机了,这时候 Producer 就会重试,这就造成消息重复。
消息乱序: 
避免场景,前一条消息发送失败而其后一条发送成功,前一条消息重试后成功,造成的消息乱序。

那什么时候该使用幂等:

如果已经使用 acks=all,使用幂等也可以。
如果已经使用 acks=0 或者 acks=1,说明你的系统追求高性能,对数据一致性要求不高。不要使用幂等。

第二种方法:Kafka事务

事务使用示例:分为生产端 和 消费端

Properties props = new Properties();
props.put("enable.idempotence", ture);  // 1. 设置幂等
props.put("acks", "all");               // 2. 当 enable.idempotence 为 true,这里默认为 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 最大等待数
props.put("transactional.id", "my-transactional-id");  // 4. 设定事务 id
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//初始化事务
producer.initTransactions();
try{
    //开始事务
    producer.beginTransaction();
    //发送数据
    producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
    //数据发送及 Offset 发送均成功的情况下,提交事务
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    //数据发送或者 Offset 发送出现异常时,终止事务
    producer.abortTransaction();
} finally {
    //关闭 Producer 和 Consumer
    producer.close();
    consumer.close();
}
这里消费端Consumer 需要设置下配置:isolation.level 参数
read_uncommitted: 
这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,
不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
read_committed:
表明Consumer只会读取事务型 Producer 成功提交事务写入的消息。
当然了,它也能看到非事务型 Producer 写入的所有消息

第三种方法:消费端幂等

如何解决消息重复? 这个问题,其实换一种说法:就是如何解决消费端幂等性问题。
只要消费端具备了幂等性,那么重复消费消息的问题也就解决了。
典型的方案是使用:消息表,来去重
消费端拉取到一条消息后,开启事务,将消息Id 新增到本地消息表中,同时更新订单信息。
如果消息重复,则新增操作 insert 会异常,同时触发事务回滚。

案例:Kafka 幂等性 Producer 使用

Zookeeper
Kafka:版本 2.7.1
启动生产者
启动消息者
创建 topic :1副本,2 分区
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2
# 查看
$ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe

生产者代码:

public class KafkaProducerApplication {
    private final Producer<String, String> producer;
    final String outTopic;
    public KafkaProducerApplication(final Producer<String, String> producer,  final String topic) {
        this.producer = producer;
        outTopic = topic;
    }
    public void produce(final String message) {
        final String[] parts = message.split("-");
        final String key, value;
        if (parts.length > 1) {
            key = parts[0];
            value = parts[1];
        } else {
            key = null;
            value = parts[0];
        }
        final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(outTopic, key, value);
        producer.send(producerRecord,
                (recordMetadata, e) -> {
                    if(e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());
                    }
                }
        );
    }

    public void shutdown() {
        producer.close();
    }

    public static void main(String[] args) {
        final Properties props = new Properties();
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        final String topic = "myTopic";
        final Producer<String, String> producer = new KafkaProducer<>(props);
        final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);

        String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
        try {
            List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));
            linesToProduce.stream().filter(l -> !l.trim().isEmpty())
                    .forEach(producerApp::produce);
            System.out.println("Offsets and timestamps committed in batch from " + filePath);
        } catch (IOException e) {
            System.err.printf("Error reading file %s due to %s %n", filePath, e);
        } finally {
            producerApp.shutdown();
        }
    }
}

启动消费者:

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic

修改配置 acks

启用幂等的情况下,调整 acks 配置,生产者启动后结果是怎样的:
修改配置 acks = 1
修改配置 acks = 0
会直接报错:
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. 
Otherwise we cannot guarantee idempotence.

修改配置 max.in.flight.requests.per.connection文章来源地址https://www.toymoban.com/news/detail-731633.html

启用幂等的情况下,调整此配置,结果是怎样的:
将max.in.flight.requests.per.connection > 5 会怎样?
当然会报错:
Caused by: org.apache.kafka.common.config.ConfigException: Must

到了这里,关于Kafka数据重复问题解决方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka数据倾斜到某一个分区解决方案

    我们使用Kafka时,某时需要消息消费是有序的,因此在生产者投递消息时,可能会指定分区,或者指定Key,此时可能会导致数据倾斜到某一个分区。 由于Kafka消费的特性,即一个消费组,那怕此时消费组有2个以上消费者,此时同一个主分区,只能被一个消费者消费,当生产消

    2024年02月13日
    浏览(46)
  • kafka千万级数据积压原因以及解决方案

    一、原因 kafka作为消息队列,其中数据积压也是经常遇到的问题之一。 我们都知道,数据积压的直接原因,一定是系统中的某个部分出现了性能问题,来不及处理上游发送的数据,才会导致数据积压。 那么我们就需要分析在使用kafka时,如何通过优化代码以及参数配置来最大

    2024年02月12日
    浏览(39)
  • ClickHouse 与 Kafka 整合: 实时数据流处理与分析解决方案

    随着数据量的不断增长,实时数据处理和分析变得越来越重要。ClickHouse 和 Kafka 都是在现代数据技术中发挥着重要作用的工具。ClickHouse 是一个高性能的列式数据库,专为 OLAP 和实时数据分析而设计。Kafka 是一个分布式流处理平台,用于构建实时数据流管道和流处理应用程序

    2024年02月22日
    浏览(36)
  • Kafka错误解决:kafka.zookeeper.ZooKeeperClientTimeoutException:连接超时等待解决方案

    在使用Kafka时,有时候会遇到\\\"kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection\\\"这样的错误。本文将介绍如何解决这个问题,并提供相应的源代码示例。 Kafka是一个分布式流处理平台,它使用ZooKeeper来管理集群的元数据和协调器。当连接到ZooKeeper服务器超时时,就

    2024年02月06日
    浏览(30)
  • kafka消息丢失解决方案

    目录 一、生产端数据丢失 二、存储端消息丢失 三、消费端数据丢失 四、小结 一条消息从生产到消费完成这个过程,可以划分三个阶段,为了方便描述,我给每个阶段分别起了个名字。 生产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。 存储

    2023年04月26日
    浏览(28)
  • Kafka跨集群备份解决方案MirrorMaker

    一般情况下,我们会使用一套 Kafka 集群来完成业务,但有些场景确实会需要多套 Kafka 集群同时工作,比如为了便于实现灾难恢复,你可以在两个机房分别部署单独的 Kafka 集群。如果其中一个机房出现故障,你就能很容易地把流量打到另一个正常运转的机房下。再比如,你想

    2024年02月11日
    浏览(24)
  • 一碰就头疼的 Kafka 消息重复问题,立马解决!

    一、前言 数据重复这个问题其实也是挺正常,全链路都有可能会导致数据重复。 通常,消息消费时候都会设置一定重试次数来避免网络波动造成的影响,同时带来副作用是可能出现消息重复。 整理下消息重复的几个场景: 生产端: 遇到异常,基本解决措施都是 重试 。 场

    2024年01月23日
    浏览(22)
  • 94、Kafka消息丢失的场景及解决方案

    1、ack=0,不重试 producer发送消息完,不管结果了,如果发送失败也就丢失了。 2、ack=1,leader crash producer发送消息完,只等待 leader 写入成功就返回了,leader crash了,这时follower没来及同步,消息丢失, 3、unclean .leader .election .enable 配置true 允许选举ISR以外的副本作为leader,会导

    2024年02月16日
    浏览(32)
  • 解决Kafka新消费者组导致重复消费的问题

             问题描述 :在使用Kafka时,当我们向新的消费者组中添加消费者时,可能会遇到重复消费的问题。本文将介绍一些解决这个问题的方法,帮助开发者更好地处理Kafka中的消费者组和消费偏移量。         Kafka是一个强大的分布式消息队列系统,但在使用过程中

    2024年02月07日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包