原文网址:Kafka--延迟队列--使用/实现/原理_IT利刃出鞘的博客-CSDN博客
简介
本文介绍Kafka如何使用延迟队列的功能。
Kafka是很常用的消息队列,但Kafka本身是没有延迟队列功能的(RabbitMQ、RocketMQ有延迟队列功能)。本文介绍如何手动给Kafka添加延迟消息的功能。
虽然Kafka内部有时间轮,支持延时操作,例如:延迟生产、延迟拉取以及延迟删除,但这是Kafka自己内部使用的,用户无法将其作为延迟队列来使用。
本内容也是Java后端面试常问的问题 。
方案描述
方案概述
kafka作为一个高性能的消息队列,只要消费能力足够,发出的消息都是会立刻被收到的,因此需要想一个办法,让消息延迟发送出去。
方案如下:
- 第1步:发送延迟消息时不直接发送到目标topic,而是发送到一个用于处理延迟消息的topic,例如delay-minutes-1
- 第2步:写一段代码拉取delay-minutes-1中的消息,将满足条件的消息发送到真正的目标主题里。
如何延迟发送
延迟消息发出去之后,代码程序就会立刻收到延迟消息,要如何处理才能让延迟消息等待一段时间才发送到真正的topic里面?
不能用sleep
有同学会说很简单嘛,在程序收到消息后判断若条件不满足,就调用sleep方法,过一段时间再进行下一个循环拉取消息。但这样是不行的,原因如下:
在轮询kafka拉取消息的时候,kafka会返回由max.poll.records配置指定的一批消息,当程序不能在max.poll.interval.ms配置的期望时间内处理这些消息的话,kafka就会认为这个消费者已经挂了,会进行rebalance,同时你这个消费者就无法再拉取到任何消息了。
举个例子:当你需要一个24小时的延迟消息队列,在代码里面写下了Thread.sleep(1000*60*60*24);,为了不发生rebalance,你把max.poll.interval.ms 也改成了1000*60*60*24,这个时候你或许会感觉到一丝丝的怪异,我是谁?我在哪?我为什么要写出来这样的代码?文章来源:https://www.toymoban.com/news/detail-401111.html
上边只是部分内容,为便于维护,本文已迁移到此地址:Kafka延迟队列的实现方式 - 自学精灵文章来源地址https://www.toymoban.com/news/detail-401111.html
到了这里,关于Kafka--延迟队列--使用/实现/原理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!