SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列
上一篇直通车
SpringBoot整合SpringCloudStream3.1+版本Kafka
实现死信队列步骤
- 添加死信队列配置文件,添加对应channel
- 通道绑定配置对应的channel位置添加重试配置
结果
配置文件
Kafka基本配置(application-mq.yml)
server:
port: 7105
spring:
application:
name: betrice-message-queue
config:
import:
- classpath:application-bindings.yml
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-properties:
enable.auto.commit: false
binders:
betrice-kafka:
type: kafka
environment:
spring.kafka:
bootstrap-servers: ${spring.cloud.stream.kafka.binder.brokers}
创建死信队列配置文件(application-dql.yml)
spring:
cloud:
stream:
kafka:
bindings:
dqlTransfer-in-0:
consumer:
# When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named error.<destination>.<group>.
# messages sent to the DLQ topic are enhanced with the following headers: x-original-topic, x-exception-message, and x-exception-stacktrace as byte[].
# By default, a failed record is sent to the same partition number in the DLQ topic as the original record.
enableDlq: true
dlqName: Evad05-message-dlq
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
# valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.devilvan.pojo.Evad05MessageSerde
autoCommitOnError: true
autoCommitOffset: true
注意:这里的valueSerde使用了对象类型,需要搭配application/json
使用,consumer接收到消息后会转化为json字符串
通道绑定文件添加配置(application-bindings.yml)
channel对应上方配置文件的dqlTransfer-in-0
spring:
cloud:
stream:
betrice-default-binder: betrice-kafka
function:
# 声明两个channel,transfer接收生产者的消息,处理完后给sink
definition: transfer;sink;gather;gatherEcho;dqlTransfer;evad05DlqConsumer
bindings:
# 添加生产者bindiing,输出到destination对应的topic
dqlTransfer-in-0:
destination: Evad10
binder: ${spring.cloud.stream.betrice-default-binder}
group: evad05DlqConsumer # 使用死信队列必须要有group
content-type: application/json
consumer:
maxAttempts: 2 # 当消息消费失败时,尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
backOffInitialInterval: 1000 # 消息消费失败后重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2,即第二次是第一次间隔时间的2倍,第三次是第二次的2倍
backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s。
dqlTransfer-out-0:
destination: Evad10
binder: ${spring.cloud.stream.betrice-default-binder}
content-type: text/plain
# 消费死信队列中的消息
evad05DlqConsumer-in-0:
destination: Evad05-message-dlq
binder: ${spring.cloud.stream.betrice-default-binder}
content-type: text/plain
Controller
发送消息并将消息引入死信队列
@Slf4j
@RestController
@RequestMapping(value = "betriceMqController")
public class BetriceMqController {
@Resource(name = "streamBridgeUtils")
private StreamBridge streamBridge;
@PostMapping("streamSend")
public void streamSend(String topic, String message) {
try {
streamBridge.send(topic, message);
log.info("发送消息:" + message);
} catch (Exception e) {
log.error("异常消息:" + e);
}
}
@PostMapping("streamSendDql")
public void streamSendDql(String topic, String message) {
try {
streamBridge.send(topic, message);
log.info("发送消息:" + message);
} catch (Exception e) {
log.error("异常消息:" + e);
}
}
@PostMapping("streamSendJsonDql")
public void streamSendJsonDql(String topic) {
try {
Evad05MessageSerde message = new Evad05MessageSerde();
message.setData("evad05 test dql");
message.setCount(1);
streamBridge.send(topic, message);
log.info("发送消息:" + message);
} catch (Exception e) {
log.error("异常消息:" + e);
}
}
}
Channel
这里使用了transfer通道,消息从Evad10(topic)传来,经过transfer()方法后抛出异常,随后进入对应的死信队列
@Configuration
public class BetriceMqSubChannel {
@Bean
public Function<String, String> dqlTransfer() {
return message -> {
System.out.println("transfer: " + message);
throw new RuntimeException("死信队列测试!");
};
}
@Bean
public Consumer<String> evad05DlqConsumer() {
return message -> {
System.out.println("Topic: evad05 Dlq Consumer: " + message);
};
}
}
将自定义序列化类型转换为JSON消息
步骤
1. 通道绑定文件(application-bindings.yml)的valueSerde属性添加自定义的序列化
2. BetriceMqController
中封装该自定义类型的对象,并作为消息发送
@PostMapping("streamSendJsonDql")
public void streamSendJsonDql(String topic) {
try {
Evad05MessageSerde message = new Evad05MessageSerde();
message.setData("evad05 test dql");
message.setCount(1);
streamBridge.send(topic, message);
log.info("发送消息:" + message);
} catch (Exception e) {
log.error("异常消息:" + e);
}
}
3. channel(BetriceMqSubChannel
)接收到该消息并反序列化
@Bean
public Consumer<String> evad05DlqConsumer() {
return message -> {
System.out.println("Topic: evad05 Dlq Consumer: " + JSON.parseObject(message, Evad05MessageSerde.class));
};
}
4. 结果
文章来源:https://www.toymoban.com/news/detail-601023.html
参考网址
Kafka 消费端消费重试和死信队列 - Java小强技术博客 (javacui.com)
spring cloud stream kafka rabbit 实现死信队列_spring cloud stream kafka 死信队列_it噩梦的博客-CSDN博客文章来源地址https://www.toymoban.com/news/detail-601023.html
到了这里,关于SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!