大家好,我是三叔,很高兴这期又和大家见面了,一个奋斗在互联网的打工人。
记一次kafka生产Bug:CommitFailedException
kafka消费端消费消息出现多次同一个消息推送给后台离线处理平台,且消息消费越来越慢。于是查看生产日志,发现了有重复消费的ID,且offset都是一样的。
一开始怀疑是因为后台处理数据失败,调度对失败的消息进行重试,那么问题来了:如果是重试,那么发送到kafka消息的offset应该是不一样的,但是查看日志发现,重复消费的消息id和offset一模一样,于是查看日志后面的error信息:
CommitFailedException: offset commit cannot be complated since the sonsumer is not part of an active ...
大概就是出现了这么一个错误。
根据error日志的信息提示,大概就是offser偏移量提交失败,CommitFailedException 是Kafka中的一个异常,通常在消费者提交offset失败时抛出。消费者提交offset是为了记录它们已经处理了哪些消息,以便在发生故障或重新启动后能够从正确的位置继续消费。如果消费者提交offset失败,则可能会导致消息被重复消费或跳过未处理的消息。
另外,笔者查阅资料,当消费者重复提交相同的offset时,Kafka消费者日志可能会记录以下警告消息:
WARN Offset commit failed on partition [topic]-[partition]:
Commit cannot be completed since the group has already rebalanced
and assigned the partitions to another member. This means that the
time between subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is
spending too much time message processing. You can address this either
by increasing the session timeout or by reducing the maximum size of
batches returned in poll() with max.poll.records.
此警告消息表明消费者尝试提交相同的offset,但由于消费者组已经重新平衡,因此提交失败。这通常表示消费者在处理消息时花费的时间过长,导致Kafka认为消费者已死亡并重新平衡消费者组。如果读者看到此警告消息,可以考虑下面这些解决措施:文章来源:https://www.toymoban.com/news/detail-730809.html
- 优化消费者端代码以加快消息处理速度;
- 或增加max.poll.interval.ms配置值,以便消费者有更长的时间来处理消息,比如:
spring.kafka.consumer.max.poll.interval.ms=500000;// 根据接口处理业务逻辑的时间来处理,比如我这里处理业务逻辑最慢可以达到1分钟甚至
- 或者设置records值,控制消费拉取的数量
spring.kafka.consumer.max.poll.records=3;// 比如每次拉取3条消息,根据业务执行时间来设置
总结:CommitFailedException异常通常有以下几个原因
- 超时:如果消费者在规定的时间内没有提交offset,就会抛出CommitFailedException异常。这通常是因为消费者处理消息的时间太长,或者消费者提交offset的频率太低,消费者在规定的时间内没有发送心跳信号,则该消费者将被视为死亡并从消费者组中移除,导致提交offset超时。
- 重复提交:如果消费者在短时间内多次提交相同的offset,就会抛出CommitFailedException异常。这可能是由于消费者代码中的错误或线程同步问题导致的。
- 重启:如果消费者重启,它将从消费者组中删除并重新加入消费者组。在这种情况下,之前提交的offset将不再有效,因为消费者已经从消费者组中移除。
- 资源不足:如果Kafka集群没有足够的资源来处理消费者提交的offset,就会抛出CommitFailedException异常。例如,如果Kafka集群已经达到了最大负载,就可能无法处理消费者提交的offset。
- Kafka集群故障:如果Kafka集群出现故障,例如领导者节点崩溃或磁盘故障,就可能导致消费者提交offset失败并抛出CommitFailedException异常。
- 其他消费者重复提交:如果另一个消费者提交了相同的offset,那么自己的消费者就无法提交offset。因为提交相同的offset可能会导致消费者组中的其他消费者丢失消息,所以Kafka会拒绝重复提交相同的offset。
如果大家在开发中遇到Kafka日志中出现CommitFailedException异常,请检查消费者提交offset的频率和代码,确保消费者在规定的时间内提交offset,并避免重复提交相同的offset。如果问题仍然存在,请检查Kafka集群的健康状况,并确保集群有足够的资源来处理消费者提交的offset,可以考虑笔者上面设置的两个配置文件进行处理。文章来源地址https://www.toymoban.com/news/detail-730809.html
到了这里,关于kafka CommitFailedException异常的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!