springboot~kafka中延时消息的实现

这篇具有很好参考价值的文章主要介绍了springboot~kafka中延时消息的实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

应用场景

  • 用户下单5分钟后,给他发短信
  • 用户下单30分钟后,如果用户不付款就自动取消订单

kafka无死信队列

kafka本身没有这种延时队列的机制,像rabbitmq有自己的死信队列,当一些消息在一定时间不消费时会发到死信队列,由死信队列来处理它们,上面的两个需求如果是rabbitmq可以通过死信队列实现的。

kafka有生产者拦截器

通过对生产者拦截器实现一个TTL的检查,然后再通过类似netty里的延时队列组件来实现消息的延时发送,发到咱们的死信队列里
springboot~kafka中延时消息的实现文章来源地址https://www.toymoban.com/news/detail-664403.html

  • ProducerInterceptorTTL源码
public class ProducerInterceptorTTL implements ProducerInterceptor<Integer, String>, ApplicationContextAware {

	// 消息延时,单位秒
	public static String TTL = "ttl";

	// 死信队列,延时后发送到的队列,我们称为死信队列
	public static String DEAD_TOPIC = "dead_topic";

	// 静态化的上下文,用于获取bean,因为ConsumerInterceptor是通过反射创建的,所以无法通过注入的方式获取bean
	private static ApplicationContext applicationContext;

	// 时间轮,用于延时发送消息
	private static LindTimeWheel timeWheel = new LindTimeWheel(1000, 8);

	@Override
	public ProducerRecord onSend(ProducerRecord<Integer, String> record) {
		final String topic = record.topic();
		final Integer partition = record.partition();
		final Integer key = record.key();
		final String value = record.value();
		final Long timestamp = record.timestamp();
		final Headers headers = record.headers();
		long ttl = -1;
		String deadTopic = null;
		for (Header header : headers) {
			if (header.key().equals(TTL)) {
				ttl = toLong(header.value());
			}
			if (header.key().equals(DEAD_TOPIC)) {
				deadTopic = new String(header.value());
			}
		}
		// 消息超时判定
		if (deadTopic != null && ttl > 0) {
			// 可以放在死信队列中
			String finalDeadTopic = deadTopic;
			long finalTtl = ttl * 1000;
			timeWheel.addTask(() -> {
				System.out.println("消息超时了," + finalTtl + "需要发到topic:" + record.key());
				KafkaTemplate kafkaTemplate = applicationContext.getBean(KafkaTemplate.class);
				kafkaTemplate.send(finalDeadTopic, record.value());

			}, finalTtl);
		}
		// 拦截器拦下来之后改变原来的消息内容
		ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(topic, partition, timestamp,
				key, value, headers);
		// 传递新的消息
		return newRecord;

	}

	@Override
	public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

	}

	@Override
	public void close() {

	}

	@Override
	public void configure(Map<String, ?> map) {

	}

	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		this.applicationContext = applicationContext;
	}

}
  • 注册拦截器
spring:
  kafka:
    producer:
      properties:
        interceptor.classes: com.ruoyi.lawyer.delay.ProducerInterceptorTTL
  • 延时消息在某个时间段之后会送出
    springboot~kafka中延时消息的实现

到了这里,关于springboot~kafka中延时消息的实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMq应用延时消息

    一.建立绑定关系 二.建立生产者 1.消息实体 三.建立消费者 四.测试类测试 五.效果如图所示

    2024年02月12日
    浏览(36)
  • Springboot整合kafka实现高效的消息传递和处理

    Kafka是一个分布式的流处理平台,它可以处理高吞吐量的消息。Spring Boot是一个流行的Java开发框架,提供了快速构建应用程序的能力。将这两者结合起来可以实现高效的消息传递和处理,同时支持多种消息模式。 本篇博客将介绍如何使用Spring Boot整合Kafka,并支持多种消息模式

    2024年02月09日
    浏览(34)
  • Springboot集成websocket实现消息推送和在线用户统计

    在启动类上添加一个bean 核心代码 实现消息推送只要在业务代码中调用sendMessageSpecial()方法即可。 然后调用刚才的业务接口测试:http://localhost:8080/websocket/t1 调用成功后可以看到三个窗口中都收到了消息

    2023年04月08日
    浏览(53)
  • kafka延时队列内部应用简介

    kafka延时队列_悠然予夏的博客-CSDN博客         两个follower副本都已经拉取到了leader副本的最新位置,此时又向leader副本发送拉取请求,而leader副本并没有新的消息写入,那么此时leader副本该如何处理呢?可以直接返回空的拉取结果给follower副本,不过在leader副本一直没有

    2023年04月22日
    浏览(35)
  • 【项目实战】SpringBoot整合Kafka消息队列(基于KafkaTemplate和@KafkaListener实现)

    Apache Kafka是分布式发布-订阅消息系统。 它最初由LinkedIn公司开发,之后成为Apache项目的一部分。 Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。 Apache Kafka与传统消息系统相比,有以下不同: 它将消息持久化到磁盘,因此可用于批量消

    2023年04月09日
    浏览(42)
  • 【Kafka】消息重复场景及解决

    根本原因 生产发送的消息没有收到正确的broke响应,导致生产者重试。 生产者发出一条消息,broke落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后生产者收到一个可恢复的Exception重试消息导致消息重复。 过程 过程如下: new KafkaProducer()后创建

    2024年02月21日
    浏览(37)
  • RabbitMQ 延时消息实现

    导入Spring 集成RabbitMQ MAEVN 推送消息至延迟队列 - 消息过期自动推送到死信队列 - 消费死信队列 2.1. MQ配置信息 2.1.1. 自定义队列配置 …/bootstrap.yml 2.1.2. 读取自定义MQ配置信息 2.2. 配置文件自动生成队列 2.2.1. 延迟队列 2.2.2. 死信队列 2.3. 生产者推送消息 2.4. 消费者处理消息 设

    2024年04月26日
    浏览(42)
  • kafka消息丢失面试题,RocketMQ消息丢失场景及解决办法

    互联网行业更新换代非常快,行业常态便是不断学习,因此这些主流技术你一个都不能落下! ①并发编程 Java并发编程是整个Java开发体系中最难以理解,但也是最重要的知识点之一,因此学习起来比较费劲,从而导致很多人望而却步,但是无论是职场面试还是高并发高流量的

    2024年03月17日
    浏览(42)
  • 使用StreamBridge实现RabbitMq 消息收发 && ack确认 && 延时消息

    下载地址:link 1.下载完成放到rabbitmq安装目录plugins下 2.执行命令启用插件 3.重启mq Exchanges - add a new exchange - type 出现x-delayed-message即安装成功

    2024年02月11日
    浏览(40)
  • 94、Kafka消息丢失的场景及解决方案

    1、ack=0,不重试 producer发送消息完,不管结果了,如果发送失败也就丢失了。 2、ack=1,leader crash producer发送消息完,只等待 leader 写入成功就返回了,leader crash了,这时follower没来及同步,消息丢失, 3、unclean .leader .election .enable 配置true 允许选举ISR以外的副本作为leader,会导

    2024年02月16日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包