SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列

这篇具有很好参考价值的文章主要介绍了SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列

上一篇直通车

SpringBoot整合SpringCloudStream3.1+版本Kafka

实现死信队列步骤

  1. 添加死信队列配置文件,添加对应channel
  2. 通道绑定配置对应的channel位置添加重试配置

结果

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列,SpringBoot,SpringCloud,spring boot,kafka,死信队列
SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列,SpringBoot,SpringCloud,spring boot,kafka,死信队列
SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列,SpringBoot,SpringCloud,spring boot,kafka,死信队列

配置文件

Kafka基本配置(application-mq.yml)

server:
  port: 7105
spring:
  application:
	name: betrice-message-queue
  config:
	import:
	- classpath:application-bindings.yml
  cloud:
	stream:
	  kafka:
		binder:
		  brokers: localhost:9092
		  configuration:
			key-serializer: org.apache.kafka.common.serialization.StringSerializer
			value-serializer: org.apache.kafka.common.serialization.StringSerializer
		  consumer-properties:
			enable.auto.commit: false
	  binders:
		betrice-kafka:
		  type: kafka
		  environment:
			spring.kafka:
		  bootstrap-servers: ${spring.cloud.stream.kafka.binder.brokers}

创建死信队列配置文件(application-dql.yml)

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列,SpringBoot,SpringCloud,spring boot,kafka,死信队列

spring:
  cloud:
	stream:
	  kafka:
		bindings:
		  dqlTransfer-in-0:
			consumer:
			  # When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named error.<destination>.<group>.
			  # messages sent to the DLQ topic are enhanced with the following headers: x-original-topic, x-exception-message, and x-exception-stacktrace as byte[].
			  # By default, a failed record is sent to the same partition number in the DLQ topic as the original record.
			  enableDlq: true
			  dlqName: Evad05-message-dlq
			  keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
#              valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
			  valueSerde: com.devilvan.pojo.Evad05MessageSerde
			  autoCommitOnError: true
			  autoCommitOffset: true

注意:这里的valueSerde使用了对象类型,需要搭配application/json使用,consumer接收到消息后会转化为json字符串

通道绑定文件添加配置(application-bindings.yml)

channel对应上方配置文件的dqlTransfer-in-0

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列,SpringBoot,SpringCloud,spring boot,kafka,死信队列

spring:
  cloud:
	stream:
	  betrice-default-binder: betrice-kafka
	  function:
		# 声明两个channel,transfer接收生产者的消息,处理完后给sink
		definition: transfer;sink;gather;gatherEcho;dqlTransfer;evad05DlqConsumer
	  bindings:
		# 添加生产者bindiing,输出到destination对应的topic
		dqlTransfer-in-0:
		  destination: Evad10
		  binder: ${spring.cloud.stream.betrice-default-binder}
		  group: evad05DlqConsumer # 使用死信队列必须要有group
		  content-type: application/json
		  consumer:
			maxAttempts: 2 # 当消息消费失败时,尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
			backOffInitialInterval: 1000 # 消息消费失败后重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
			backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2,即第二次是第一次间隔时间的2倍,第三次是第二次的2倍
			backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s。
		dqlTransfer-out-0:
		  destination: Evad10
		  binder: ${spring.cloud.stream.betrice-default-binder}
		  content-type: text/plain
		# 消费死信队列中的消息
		evad05DlqConsumer-in-0:
		  destination: Evad05-message-dlq
		  binder: ${spring.cloud.stream.betrice-default-binder}
		  content-type: text/plain

Controller

发送消息并将消息引入死信队列

@Slf4j
@RestController
@RequestMapping(value = "betriceMqController")
public class BetriceMqController {
	@Resource(name = "streamBridgeUtils")
	private StreamBridge streamBridge;

	@PostMapping("streamSend")
	public void streamSend(String topic, String message) {
		try {
			streamBridge.send(topic, message);
			log.info("发送消息:" + message);
		} catch (Exception e) {
			log.error("异常消息:" + e);
		}
	}

	@PostMapping("streamSendDql")
	public void streamSendDql(String topic, String message) {
		try {
			streamBridge.send(topic, message);
			log.info("发送消息:" + message);
		} catch (Exception e) {
			log.error("异常消息:" + e);
		}
	}

	@PostMapping("streamSendJsonDql")
	public void streamSendJsonDql(String topic) {
		try {
			Evad05MessageSerde message = new Evad05MessageSerde();
			message.setData("evad05 test dql");
			message.setCount(1);
			streamBridge.send(topic, message);
			log.info("发送消息:" + message);
		} catch (Exception e) {
			log.error("异常消息:" + e);
		}
	}
}

Channel

这里使用了transfer通道,消息从Evad10(topic)传来,经过transfer()方法后抛出异常,随后进入对应的死信队列

@Configuration
public class BetriceMqSubChannel {
	@Bean
	public Function<String, String> dqlTransfer() {
		return message -> {
			System.out.println("transfer: " + message);
			throw new RuntimeException("死信队列测试!");
		};
	}

	@Bean
	public Consumer<String> evad05DlqConsumer() {
		return message -> {
			System.out.println("Topic: evad05 Dlq Consumer: " + message);
		};
	}
}

将自定义序列化类型转换为JSON消息

步骤

1. 通道绑定文件(application-bindings.yml)的valueSerde属性添加自定义的序列化

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列,SpringBoot,SpringCloud,spring boot,kafka,死信队列

2. BetriceMqController中封装该自定义类型的对象,并作为消息发送

@PostMapping("streamSendJsonDql")
public void streamSendJsonDql(String topic) {
	try {
		Evad05MessageSerde message = new Evad05MessageSerde();
		message.setData("evad05 test dql");
		message.setCount(1);
		streamBridge.send(topic, message);
		log.info("发送消息:" + message);
	} catch (Exception e) {
		log.error("异常消息:" + e);
	}
}

3. channel(BetriceMqSubChannel)接收到该消息并反序列化

@Bean
public Consumer<String> evad05DlqConsumer() {
	return message -> {
		System.out.println("Topic: evad05 Dlq Consumer: " + JSON.parseObject(message, Evad05MessageSerde.class));
	};
}

4. 结果

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列,SpringBoot,SpringCloud,spring boot,kafka,死信队列
SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列,SpringBoot,SpringCloud,spring boot,kafka,死信队列

参考网址

Kafka 消费端消费重试和死信队列 - Java小强技术博客 (javacui.com)
spring cloud stream kafka rabbit 实现死信队列_spring cloud stream kafka 死信队列_it噩梦的博客-CSDN博客文章来源地址https://www.toymoban.com/news/detail-601023.html

到了这里,关于SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • springboot:整合Kafka

    依赖 yaml配置 简单demo 下面示例创建了一个生产者,发送消息到topic1,消费者监听topic1消费消息。监听器用@KafkaListener注解,topics表示监听的topic,支持同时监听多个,用英文逗号分隔。 KafkaTemplate调用send时 默认采用异步发送 ,如果需要同步获取发送结果,调用get方法 带回调

    2024年02月08日
    浏览(25)
  • springboot整合kafka-笔记

    这里我的springboot版本是2.3.8.RELEASE,使用的kafka-mq的版本是2.12 测试发送kafka消息-控制台日志

    2024年02月12日
    浏览(33)
  • Kafka 整合 SpringBoot

    1. 添加依赖 2. 配置文件 application.properties 3. 代码实现发送消息 1. 添加依赖 2. 配置文件 applicatio.properties  3. 代码实现消费消息 首先在 kafka 节点上创建 topic: 打开kafka节点服务器的终端,输入以下命令: 一些常用命令: 编写测试代码:ApplicationTests.java

    2024年02月07日
    浏览(29)
  • 【SpringBoot系列】SpringBoot整合Kafka(含源码)

    前言 在现代的微服务架构中,消息队列已经成为了一个不可或缺的组件。 它能够帮助我们在不同的服务之间传递消息,并且能够确保这些消息不会丢失。 在众多的消息队列中,Kafka 是一个非常出色的选择。 它能够处理大量的实时数据,并且提供了强大的持久化能力。 在本

    2024年02月05日
    浏览(34)
  • 什么是kafka,如何学习kafka,整合SpringBoot

    目录 一、什么是Kafka,如何学习 二、如何整合SpringBoot 三、Kafka的优势   Kafka是一种分布式的消息队列系统,它可以用于处理大量实时数据流 。学习Kafka需要掌握如何安装、配置和运行Kafka集群,以及如何使用Kafka API编写生产者和消费者代码来读写数据。此外,还需要了解Ka

    2024年02月10日
    浏览(25)
  • Kafka入门(安装和SpringBoot整合)

    app-tier:网络名称 –driver:网络类型为bridge Kafka依赖zookeeper所以先安装zookeeper -p:设置映射端口(默认2181) -d:后台启动 安装并运行Kafka, –name:容器名称 -p:设置映射端口(默认9092 ) -d:后台启动 ALLOW_PLAINTEXT_LISTENER任何人可以访问 KAFKA_CFG_ZOOKEEPER_CONNECT链接的zookeeper K

    2024年02月11日
    浏览(70)
  • Kafka 基础整理、 Springboot 简单整合

    定义: Kafka 是一个分布式的基于发布/订阅默认的消息队列 是一个开源的分布式事件流平台,被常用用于数据管道、流分析、数据集成、关键任务应用 消费模式: 点对点模式 (少用) 消费者主动拉取数据,消息收到后清除消息 发布/订阅模式 生产者推送消息到队列,都消费者

    2024年02月03日
    浏览(64)
  • 实战:彻底搞定 SpringBoot 整合 Kafka

    kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。Spring创建了一个项目Spring-kafka,封装了Apache 的Kafka-client,用于在Spring项目里快速集成kafka。 除了简单的收发消息外,Spring-kafka还提供了很多高级功能,下面我们就来一一探秘这些用法。

    2024年02月10日
    浏览(29)
  • springboot整合kafka多数据源

    在很多与第三方公司对接的时候,或者处在不同的网络环境下,比如在互联网和政务外网的分布部署服务的时候,我们需要对接多台kafka来达到我们的业务需求,那么当kafka存在多数据源的情况,就与单机的情况有所不同。 单机的情况 如果是单机的kafka我们直接通过springboot自

    2024年02月13日
    浏览(35)
  • springboot整合ELK+kafka采集日志

    在分布式的项目中,各功能模块产生的日志比较分散,同时为满足性能要求,同一个微服务会集群化部署,当某一次业务报错后,如果不能确定产生的节点,那么只能逐个节点去查看日志文件;logback中RollingFileAppender,ConsoleAppender这类同步化记录器也降低系统性能,综上一些

    2024年02月15日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包