一文读懂kafka消息丢失问题和解决方案

这篇具有很好参考价值的文章主要介绍了一文读懂kafka消息丢失问题和解决方案。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

今天分享一下kafka的消息丢失问题,kafka的消息丢失是一个很值得关注的问题,根据消息的重要性,消息丢失的严重性也会进行放大,如何从最大程度上保证消息不丢失,要从生产者,消费者,broker几个端来说。

消息发送和接收流程

kafka生产者生产好消息后,会将消息发送到broker节点,broker对数据进行存储,kafka的消息是顺序存储在磁盘上,以主题(topic),分区(partition)的逻辑进行划分,消息最终存储在日志文件中,消费者会循环从broker拉取消息。

一文读懂kafka消息丢失问题和解决方案

那么从上图的图中可以看出kafka丢消息可能存在的三个地方分别为:

  • 生产者到broker
  • broker到磁盘
  • 消费者

生产者到broker消息丢失

生产者发送消息到broker是会存在消息丢失的,大多可能是由于网络原因引起的,消息中间件中一般都是通过ack来解决这个问题的,kafka中可以通过设置ack来解决这个问题。

acks有三种类型:

  • 0
  • 1
  • -1(all)

acks为0

acks设置为0,代表生产者发送消息后就不管不顾了,不用等待broker的任何响应,那么可能网络异常或者其他原因导致broker没有处理到到这条消息,那么消息就丢失了。

一文读懂kafka消息丢失问题和解决方案

acks为1

acks设置为1,代表生产者发送消息到broker后,只需要broker的leader副本确认收到后就成功响应,不需要follower副本响应,就算follower副本崩溃了,也会成功响应。

一文读懂kafka消息丢失问题和解决方案

acks为-1(all)

acks设置为-1,或者为all,那么生产者发送消息需要leader和follower都收到并写入消息才成功响应生产者,也就是ISR集合要全部写入,当ISR集合中只要有一个没有写入成功,那么就收到失败响应,所以acks=-1能够在最大程度上保证消息的不丢失,但是也是有条件的,需要ISR集合中有两个以上副本才能保证,如果只有一个副本,那么就是就只有一个leader,没有follower,如果leader挂掉,就不能选举出一个eader,消息自然也就丢失,这和acks=1是一样的

一文读懂kafka消息丢失问题和解决方案

解决消息丢失

从上面三种类型的acks中我们可以看出,acks=-1是保证消息从生产者到broker不丢失的最佳设置方式,不过我们也能想到,它需要ISR每个副本都成功应答,所以它的效率自然没有前面两个高,不过此篇我们讨论的是保证消息不丢失问题,所以一切从不丢失层面区说。

如果消息发送失败,那么生产者可以重试发送消息,可以手动在代码中编写消息重发逻辑,也可以配置重试参数。

  • retries
  • retry.backoff.ms

retries表示重试次数,retry.backoff.ms表示重试时间间隔,比如第一次重试依旧没成功,那么隔多久再进行重试,kafka重试的底层逻辑是将没发送成功的消息重新入队,因为kafka的生产者生产消息后,消息并非就直接发送到broker,而是保存在生产者端的收集器(RecordAccumulator),然后由Sender线程去获取RecordAccumulator中的消息,然后再发送给broker,当消息发送失败后,会将消息重新放入RecordAccumulator中,具体逻辑可以看kafka的生产者端Sender的源码。

一文读懂kafka消息丢失问题和解决方案

消息重发引起的消息顺序性问题

要注意,消息发送失败进行重发不能保证消息发送的顺序性,这里的顺序性是单分区顺序性,如果服务对于消息的顺序性有严格的要求,那么我们可以通过设置属性max.in.flight.requests.per.connection=1来保证消息的顺序性,这个配置对应的是kafka中InFlightRequestsmax.in.flight.requests.per.connection代表请求的个数,kafka在创建Sender的时候会判断,如果maxInflightRequests为1,那么guaranteeMessageOrder就为true,就能保证消息的顺序性。

一文读懂kafka消息丢失问题和解决方案

broker到磁盘丢消息

broker收到消息后,需要将消息写入磁盘的log文件中,但是并不是马上写,因为我们知道,生产者发送消息后,消费者那边需要马上获取,如果broker要写入磁盘,那么消费者拉取消息,broker还要从log文件中获取消息,这显然是不合理的,所以kafka引入了(page cache)页缓存。

page cache是磁盘和broker之间的消息映射关系,它是基于内存的,当broker收到消息后,会将消息写入page cache,然后由操作系统进行刷盘,将page cache中的数据写入磁盘。

如果broker发生故障,那么此时page cache的数据就会丢失,broker端可以设置刷盘的参数,比如多久刷盘一次,不过这个参数不建议去修改,最好的方案还是设置多副本,一个分区设置几个副本,当broker故障的时候,如果还有其他副本,那么数据就不会丢失。

消费者丢消息

kafka的消费模式是拉模式,需要不断地向broker拉取消息,拉取的消息消费了以后需要提交offset,也就是提交offset这里可能会出现丢消息,kafka中提供了和offset相关的几个配置项。

  • enable.auto.commit
  • auto.commit.interval.ms
  • auto.offset.reset

下面我们先了解一下kafka offset的提交和参数详解。

enable.auto.commit代表是否自动提交offset,默认为true,auto.commit.interval.ms代表多久提交一次offset,默认为5秒。

如下图,当前消费者消费到了分区中为3的消息。

一文读懂kafka消息丢失问题和解决方案

那么下次当消费者读取消息的时候是从哪里读取呢,当然从4开始读取,因为是从上次读取的offset的下一位开始读取,所以我们就说当前消费组的offset为4,,因为下次是从4开始消费,如果5秒之内又消费了两条消息然后自动提交了offset,那么此时的offset如下:

一文读懂kafka消息丢失问题和解决方案

enable.auto.commit如果为false,就代表不会自动提交offset。

auto.offset.reset=latest代表从分区中最新的offset处开始读取消息,比如某个消费者组上次提交的偏移量为5,然后后面又生产了2条消息,再次读取消息时,读取到的是6,7,8这个三个消息,如果enable.auto.commit设置为false,那么不管往分区中写入多少消息,都是从6开始读取消息。

一文读懂kafka消息丢失问题和解决方案

此时如果一个新的的消费组订阅了这个分区,因为这个消费者组没有在这个分区提交过offset,所以它获取消息并不是从6开始获取,而是从1开始获取。

一文读懂kafka消息丢失问题和解决方案

所以可知每个消费者组在分区中的offset是独立的。

auto.offset.reset还可以设置为earliestnone,使用earliest,如果此消费组从来没有提交过offset,那么就从头开始消费,如果提交过offset,那么就从最新的offset处消费,就和latest一样了,使用none,如果消费组没有提交过offset,在分区中找不到任何offset,那么就会抛出异常。

org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [stock1-0]
复制代码

上面我们初步了解了offset的一些知识,对offset的提交和和读取有一些了解,因为上面我们只提及offset的自动提交,而自动提交的主动权在kafka,而不在我们,所以可能因为一些原因而导致消息丢失。

消息处理异常

当我们收到消息后对消息进行处理,如果在处理的过程中发生异常,而又设置为自动提交offset,那么消息没有处理成功,offset已经提交了,当下次获取消息的时候,由于已经提交过ofset,所以之前的消息就获取不到了,所以应该改为手动提交offset,当消息处理成功后,再进行手动提交offset。

总结

关于kafka的消息丢失问题和解决方案就说到这里,我们分别从生产者到broker,broker到磁盘以及消费者端进行说明,也引申出一些知识点,可能平时没有遇到消息丢失的情况,那是因为网络比较可靠,数据量可能不大,但是如果要真的实现高可用,高可靠,那么就需要对其进行设计。文章来源地址https://www.toymoban.com/news/detail-474714.html

到了这里,关于一文读懂kafka消息丢失问题和解决方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 一文读懂kafka消息拉取机制|线程拉取模型

    如果客户端与待拉取消息的broker节点有待发送的网络请求 (见代码@4),则本次拉取任务将 不会再发起新的拉取请求 ,待已有的请求处理完毕后才会拉取新的消息。 拉取消息时需要指定拉取消息偏移量,来自队列负载算法时指定,主要消费组的最新消费位点。 Step2:按Node依次构

    2024年04月25日
    浏览(25)
  • 一文彻底搞懂Kafka如何保证消息不丢失

    Producer:生产者,发送消息的一方。生产者负责创建消息,然后将其发送到 Kafka。 Consumer:消费者,接受消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。 Consumer Group:将多个消费者组成一个消费者组,一个消费者组可以包含一个或多个消费者。

    2024年04月22日
    浏览(29)
  • RabbitMQ消息丢失的场景,MQ消息丢失解决方案

    第一种 : (生产者) 生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。 第二种 : (服务端) RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了 第三种 : (消费者) 消费端弄丢了数据。刚消费到,还没处理

    2024年02月08日
    浏览(34)
  • kafka消息丢失面试题,RocketMQ消息丢失场景及解决办法

    互联网行业更新换代非常快,行业常态便是不断学习,因此这些主流技术你一个都不能落下! ①并发编程 Java并发编程是整个Java开发体系中最难以理解,但也是最重要的知识点之一,因此学习起来比较费劲,从而导致很多人望而却步,但是无论是职场面试还是高并发高流量的

    2024年03月17日
    浏览(36)
  • RabbitMq消息丢失原因及其解决方案

    我们首先了解下一条消息从生产到消费的整个流程如下: 生产--MQ Broker -- 消费。所以这三个环节都有丢失消息的可能。 1.1、生产者丢失消息 生产者将数据发送到rabbitmq的时候,可能因为网络问题导致数据就在半路给搞丢了。 1.使用事务(性能差) ​ RabbitMQ 客户端中与事务机

    2024年02月08日
    浏览(32)
  • MQ消息丢失的可能原因与解决方案

    当我们使用消息队列(MQ)作为分布式系统中的核心组件时,消息丢失是一个常见的问题。消息丢失可能导致数据不一致或功能故障,因此对于许多应用程序来说是不可接受的。本文将介绍几种常见的MQ消息丢失的原因,并提供相应的解决方案。 生产者在发送消息时可能会遇

    2024年02月15日
    浏览(27)
  • Kafka数据丢失原因及解决方案

    Kafka包括Producer、Broker、Consumer,因此从这三个方面分析。 丢失原因:Kafka在Producer端的消息发送采用的是异步发送的方式(还有同步发送,但是同步发送会导致消息阻塞、需要等待),丢失数据是因为消息没有到达Broker端,原因可能是网络波动导致没有回调和数据消息太大超出

    2024年02月14日
    浏览(22)
  • 带你了解RabbitMQ:消息丢失、重复、积压的原因及其解决方案

    前言 首先说一点,企业中最常用的实际上既不是RocketMQ,也不是Kafka,而是RabbitMQ。 RocketMQ很强大,但主要是阿里推广自己的云产品而开源出来的一款消息队列,其实中小企业用RocketMQ的没有想象中那么多。 深层次的原因在于兔宝在中小企业普及更早,经受的考验也更久,很容

    2024年02月04日
    浏览(26)
  • 一文读懂RabbitMQ消息队列

    在介绍消息队列之前,应该先了解什么是 AMQP(Advanced Message Queuing Protocol, 高级消息队列协议,点击查看) 消息(Message) 是指在应用间 传送的数据 ,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象;而 消息队列(Message Queue) 是一种 应用间 的

    2024年02月02日
    浏览(27)
  • Kafka消息阻塞:拯救面试的八大终极解决方案!

    大家好,我是小米,一个对技术充满热情的90后程序员。最近在准备社招面试的过程中,遇到了一个超级有挑战性的问题:“Kafka消息阻塞怎么解决?”今天,我就来和大家一起深入剖析这个问题,分享我在解决过程中的心得和经验。 首先,我们得了解一下Kafka消息阻塞是什么

    2024年01月16日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包