RabbitMq生产者发送消息确认

这篇具有很好参考价值的文章主要介绍了RabbitMq生产者发送消息确认。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMq生产者发送消息失败现象

一般情况下RabbitMq的生产者能够正常的把消息投递到交换机Exchange,Exchange能够根据路由键routingKey把消息投递到队列Queue,但是一旦出现消息无法投递到交换机Exchange,或无法路由到Queue的这种特殊情况下,则需要对生产者的消息进行缓存或者保存到数据库,后续在调查完RabbitMq服务器的问题之后,待RabbitMq服务器正常之后,需要对这些消息进行重新投递。正常来说RabbitMq做了集群之后是不会出现这种问题,整个集群挂断的概率也是非常小。

rabbitmq发送消息失败,RabbitMq,java-rabbitmq,rabbitmq,java,Powered by 金山文档

错误信息

当项目启动后,然后把交换机Exchange删除后,然后生产者发送消息时会提示交换机不存在。Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange' in vhost '/', class-id=60, method-id=40)

SpringBoot代码示例

SpringBoot的application.properties需要新增spring.rabbitmq.publisher-confirm-type配置要求值是correlated。默认值是none表示无需触发交换机收到消息的回调接口。correlated表示消息发布后会触发交换机收到消息的回调接口。

# springboot整合rabbitMq的配置
spring.rabbitmq.host=192.168.15.200
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated

队列和交换机配置类

package springbootrabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class ConfirmConfig {

    // 普通交换机名称
    public static final String EXCHANGE_NAME = "confirm_exchange";
    // 队列名称
    public static final String QUEUE_NAME = "confirm_queue";

    public static final String ROUTING_KEY = "key1";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);
    }
}

生产者消息发送确认配置类

package springbootrabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

// 1.先实现RabbitTemplate.ConfirmCallback接口,从写confirm回调函数
@Slf4j
@Component
public class RabbitMqCallBack implements RabbitTemplate.ConfirmCallback {

    // 2.注入
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *
     * @param correlationData 消息
     * @param b 发送成功是true,失败是false
     * @param s 发送失败时的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (b) {
            log.info("交换机已经收到id为{}的消息", id);
        } else {
            log.error("交换机未收到id为{}的消息, 原因是:{}", id, s);
            // 消息缓存或入库,邮件提醒运维
        }
    }

    // 3.然后在springBoot对象初始化之后再执行rabbitTemplate.setConfirmCallback(this);设置回调函数,避免使用默认的ConfirmCallback
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
    }
}

生产者类

package springbootrabbitmq.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import springbootrabbitmq.config.ConfirmConfig;
import springbootrabbitmq.config.TtlQueueConfig;

import java.util.Date;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ConfirmController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/sendMsg/{message}")
    public String sendMsg(@PathVariable String message) {
        log.info("当前时间:{}, 发送一条消息:{} 到队列", new Date().toString(), message);
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message);
        return "success";
    }
    @GetMapping("/sendMsg2/{message}")
    public String sendMsg2(@PathVariable String message) {
        log.info("当前时间:{}, 发送一条消息:{} 到队列", new Date().toString(), message);
        CorrelationData data = new CorrelationData();
        data.setId("1111");
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, data);
        return "success";
    }
}

消费者类

package springbootrabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import springbootrabbitmq.config.ConfirmConfig;

import java.util.Date;

@Component
@Slf4j
public class ConfirmConsumer {
    //监听器接收消息
    @RabbitListener(queues = ConfirmConfig.QUEUE_NAME)
    public void receiveD(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间:{}, 收到一条消息:{} ", new Date().toString(), msg);
    }
}

首先正常发送,然后再删除交换机然后再发送。测试结果如下

2023-01-29 21:07:12.367  INFO 79848 --- [nio-8080-exec-1] s.controller.ConfirmController           : 当前时间:Sun Jan 29 21:07:12 CST 2023, 发送一条消息:12 到队列
2023-01-29 21:07:12.399  INFO 79848 --- [nectionFactory1] s.config.RabbitMqCallBack                : 交换机已经收到id为1111的消息
2023-01-29 21:07:12.403  INFO 79848 --- [ntContainer#0-1] s.consumer.ConfirmConsumer               : 当前时间:Sun Jan 29 21:07:12 CST 2023, 收到一条消息:12 
2023-01-29 21:08:01.282  INFO 79848 --- [nio-8080-exec-2] s.controller.ConfirmController           : 当前时间:Sun Jan 29 21:08:01 CST 2023, 发送一条消息:123 到队列
2023-01-29 21:08:01.289 ERROR 79848 --- [168.15.200:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange' in vhost '/', class-id=60, method-id=40)
2023-01-29 21:08:01.290 ERROR 79848 --- [nectionFactory2] s.config.RabbitMqCallBack                : 交换机未收到id为1111的消息, 原因是:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange' in vhost '/', class-id=60, method-id=40)

消息回退

如果不开启消息回退,默认是消息即使无法发送到队列(如路由键错误等场景),也不会进行提醒,生产者不知道消息能否成功发送到队列。

解决方案

当消息无法到达队列的时候进行提醒

消息回退代码示例

配置,开启消息不可达目的地时的回调

spring.rabbitmq.publisher-returns=true

配置类,实现RabbitTemplate.ReturnCallback接口

package springbootrabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

// 1.先实现RabbitTemplate.ConfirmCallback接口,从写confirm回调函数
@Slf4j
@Component
public class RabbitMqCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    // 2.注入
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *
     * @param correlationData 消息
     * @param b 发送成功是true,失败是false
     * @param s 发送失败时的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (b) {
            log.info("交换机已经收到id为{}的消息", id);
        } else {
            log.error("交换机未收到id为{}的消息, 原因是:{}", id, s);
            // 消息缓存或入库,邮件提醒运维
        }
    }

    // 3.然后在springBoot对象初始化之后再执行rabbitTemplate.setConfirmCallback(this);设置回调函数,避免使用默认的ConfirmCallback
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    // 当消息传递过程中不可达到目的地时将消息返回给生产者,只有不可达到目的地时才会调用这个方法
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息无法被写入队列:{}, 退回原因:{}, 路由Key: {}", message, replyText, routingKey);
        // 邮件发送,缓存或存到数据库
    }
}

生产者

@GetMapping("/sendMsg3/{message}")
    public String sendMsg3(@PathVariable String message) {
        log.info("当前时间:{}, 发送一条消息:{} 到队列", new Date().toString(), message);
        CorrelationData data = new CorrelationData();
        data.setId("1111");
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, data);
        rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY+"222", message +"222", data);
        return "success";
    }

消费者与上一个消费者相同

测试结果如下:调用:http://127.0.0.1:8080/confirm/sendMsg3/123生产者的接口可以看到当路由键错误导致交换机无法把消息投递到队列时会回调returnedMessage方法。文章来源地址https://www.toymoban.com/news/detail-757698.html

2023-01-29 21:27:48.910  INFO 74512 --- [nio-8080-exec-1] s.controller.ConfirmController           : 当前时间:Sun Jan 29 21:27:48 CST 2023, 发送一条消息:123 到队列
2023-01-29 21:27:48.934  INFO 74512 --- [nectionFactory1] s.config.RabbitMqCallBack                : 交换机已经收到id为1111的消息
2023-01-29 21:27:48.941 ERROR 74512 --- [nectionFactory1] s.config.RabbitMqCallBack                : 消息无法被写入队列:(Body:'123222' MessageProperties [headers={spring_returned_message_correlation=1111}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), 退回原因:NO_ROUTE, 路由Key: key1222
2023-01-29 21:27:48.943  INFO 74512 --- [nectionFactory2] s.config.RabbitMqCallBack                : 交换机已经收到id为1111的消息
2023-01-29 21:27:48.946  INFO 74512 --- [ntContainer#0-1] s.consumer.ConfirmConsumer               : 当前时间:Sun Jan 29 21:27:48 CST 2023, 收到一条消息:123 

到了这里,关于RabbitMq生产者发送消息确认的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ学习笔记(消息发布确认,死信队列,集群,交换机,持久化,生产者、消费者)

    MQ(message queue):本质上是个队列,遵循FIFO原则,队列中存放的是message,是一种跨进程的通信机制,用于上下游传递消息。MQ提供“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后消息发送上游只需要依赖MQ,不需要依赖其它服务。 功能1:流量消峰 功能2:应用解耦 功

    2024年02月07日
    浏览(54)
  • RabbitMQ初级篇:生产者与消费者关系、消息确认机制(ACK)、交换器与队列进行消息路由和存储

    在RabbitMQ中,生产者(Producer) 负责发送消息 ,通常是应用程序向RabbitMQ服务器发送具有特定路由键的消息;消费者(Consumer)则 负责处理接收到的这些消息 。在RabbitMQ中,生产者和消费者之间使用 交换器(Exchange)和队列(Queue)进行消息路由和存储 。生产者将消息发送到

    2024年02月01日
    浏览(43)
  • 消息中间件之八股面试回答篇:一、问题概览+MQ的应用场景+RabbitMQ如何保证消息不丢失(生产者确认机制、持久化、消费者确认机制)+回答模板

    目前主流的消息队列技术(MQ技术)分为RabbitMQ和Kafka,其中深蓝色为只要是MQ,一般都会问到的问题。浅蓝色是针对RabbitMQ的特性的问题。蓝紫色为针对Kafka的特性的问题。 MQ主要提供的功能为:异步 解耦 削峰 。 展开来讲就是 异步发送(验证码、短信、邮件…) MYSQL和Redi

    2024年01月24日
    浏览(61)
  • RabbitMQ 生产者-消息丢失 之 场景分析

      生产者发送消息的流程如下:首先生产者和RabbitMQ服务器建立连接,然后创建信道,通过信道发送消息给RabbitMQ服务器,RabbitMQ服务器接收到消息后交由交换机进行消息存储,交换机根据不同策略将消息路由到指定队列中。在此过程中,可能会存在以下消息丢失的场景:

    2024年02月14日
    浏览(43)
  • Spring整合RabbitMQ-配制文件方式-1-消息生产者

    Spring-amqp是对AMQP的一些概念的一些抽象,Spring-rabbit是对RabbitMQ操作的封装实现。 主要有几个核心类 RabbitAdmin 、 RabbitTemplate 、 SimpleMessageListenerContainer 等 RabbitAdmin 类完成对Exchange、Queue、Binding的操作,在容器中管理 了 RabbitAdmin 类的时候,可以对Exchange、Queue、Binding进行自动声

    2024年02月09日
    浏览(42)
  • 如何在rabbitmq中实现一个生产者,多个消费者,多个消费者都能收到同一条消息

    场景:用户登录,邀请其它用户进行视频会议,收到邀请的用户进入会议 rabbitmq实现思路: 选型:发布订阅模式(Publish/Subscribe) 一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,

    2023年04月25日
    浏览(53)
  • Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    kafka尚硅谷视频: 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​      1. producer初始化:加载默认配置,以及配置的参数,开启网络线程      2. 拦截器拦截      3. 序列化器进行消息key, value序列化      4. 进行分区      5. kafka broker集群 获取metaData      6. 消息缓存到

    2024年02月11日
    浏览(48)
  • 多图详解 kafka 生产者消息发送过程

    生产者客户端代码 KafkaProducer 通过解析 producer.propeties 文件里面的属性来构造自己。例如 :分区器、Key 和 Value 序列化器、拦截器、 RecordAccumulator消息累加器 、 元信息更新器 、启动发送请求的后台线程 生产者元信息更新器 我们之前有讲过. 客户端都会保存集群的元信息,例如

    2023年04月09日
    浏览(40)
  • SpringBoot集成RabbitMQ(生产者)

    默认读者已经对SpringBoot和RabbitMQ比较熟悉 SpringBoot集成RabbitMQ(生产者)的步骤如下: 创建SpringBoot工程 Maven添加 spring-boot-starter-amqp 编写application.properties配置RabbitMQ的信息 编写交换机、队列、绑定配置类 在业务逻辑代码中注入RabbitTemplate 调用RabbitTemplate的方法,完成消息推送

    2024年02月15日
    浏览(43)
  • RabbitMQ-生产者可靠性

            由于网络波动导致客户端无法连接上MQ,这是可以开启MQ的失败后重连机制。         注意:                 是连接失败的重试,而不是消息发送失败后的重试。         这种超时重连的方式是 阻塞式 的,后面的代码没办法执行,如果说业务要求比较严格,则需

    2024年01月21日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包