Spring Boot整合RabbitMQ之路由模式(Direct)

这篇具有很好参考价值的文章主要介绍了Spring Boot整合RabbitMQ之路由模式(Direct)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ中的路由模式(Direct模式)应该是在实际工作中运用的比较多的一种模式了,这个模式和发布与订阅模式的区别在于路由模式需要有一个routingKey,在配置上,交换机类型需要注入DirectExchange类型的交换机bean对象。在交换机和队列的绑定过程中,绑定关系需要在绑定一个路由key。由于在实际的工作中不大可能会用自动确认的模式,所以我们在整合路由模式的过程中,依然采用发送消息双确认机制和消费端手动确认的机制来保证消息的准确送达与消息防丢失。

1. 添加配置

在配置文件中,配置rabbitmq的相关账号信息,开启消息发送回调机制,配置文件其实和发布订阅模式是一样的。配置详情如下:

server:
  port: 10001

spring:
  application:
    name: springboot-rabbitmq-s1
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    # 发送者开启 return 确认机制
    publisher-returns: true
    # 发送者开启 confirm 确认机制
    publisher-confirm-type: correlated

2. 创建配置类

    创建配置类RabbitMQConfig,用于声明交换机、队列,建立队列和交换机的绑定关系,注入RabbitTemplate的bean对象。配置类详情如下:
package com.study.rabbitmq.config;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author alen
 * @DATE 2022/6/7 23:50
 */
@Slf4j
@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "direct-order-exchange";
    public static final String SMS_QUEUE = "sms-direct-queue";
    public static final String EMAIL_QUEUE = "email-direct-queue";
    public static final String WECHAT_QUEUE = "wechat-direct-queue";

    /**
     * 1.
     * 声明交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        /**
         * directExchange的参数说明:
         * 1. 交换机名称
         * 2. 是否持久化 true:持久化,交换机一直保留 false:不持久化,用完就删除
         * 3. 是否自动删除 false:不自动删除 true:自动删除
         */
        return new DirectExchange(EXCHANGE_NAME, true, false);
    }

    /**
     * 2.
     * 声明队列
     * @return
     */
    @Bean
    public Queue smsQueue() {
        /**
         * Queue构造函数参数说明
         * 1. 队列名
         * 2. 是否持久化 true:持久化 false:不持久化
         */
        return new Queue(SMS_QUEUE, true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue(EMAIL_QUEUE, true);
    }

    @Bean
    public Queue wechatQueue() {
        return new Queue(WECHAT_QUEUE, true);
    }

    /**
     * 3.
     * 队列与交换机绑定
     */
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
    }

    @Bean
    public Binding wechatBinding() {
        return BindingBuilder.bind(wechatQueue()).to(directExchange()).with("wechat");
    }

    /**
     * 将自定义的RabbitTemplate对象注入bean容器
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启消息推送结果回调
        rabbitTemplate.setMandatory(true);
        //设置ConfirmCallback回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("==============ConfirmCallback start ===============");
                log.info("回调数据:{}", correlationData);
                log.info("确认结果:{}", ack);
                log.info("返回原因:{}", cause);
                log.info("==============ConfirmCallback end =================");
            }
        });
        //设置ReturnCallback回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("==============ReturnCallback start ===============");
                log.info("发送消息:{}", JSONUtil.toJsonStr(message));
                log.info("结果状态码:{}", replyCode);
                log.info("结果状态信息:{}", replyText);
                log.info("交换机:{}", exchange);
                log.info("路由key:{}", routingKey);
                log.info("==============ReturnCallback end =================");
            }
        });
        return rabbitTemplate;
    }
}

3. 消费者配置

    在消费者项目的配置文件中开启手动确认,配置详情如下:
server:
  port: 10002

spring:
  application:
    name: springboot-rabbitmq-s2
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    listener:
      simple:
        # 表示消费者消费成功消息以后需要手工的进行签收(ack确认),默认为 auto
        acknowledge-mode: manual

4. 创建消费者

分别创建三个消费者,DirectEmailConsumer、DirectSmsConsumer、DirectWechatConsumer来监听对应的队列,有消息后进行消费,三个消费者大同小异,分别如下

4.1 DirectEmailConsumer

package com.study.rabbitmq.service.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author alen
 * @DATE 2022/6/10 22:54
 */
@Slf4j
@Service
@RabbitListener(queues = {"email-direct-queue"}) //监听队列
public class DirectEmailConsumer {

    //标记消费者逻辑执行方法
    @RabbitHandler
    public void emailMessage(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("Email direct --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重复处理失败,拒绝再次接收...");
                //basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                log.error("消息即将再次返回队列处理...");
                // basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

4.2 DirectSmsConsumer

package com.study.rabbitmq.service.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author alen
 * @DATE 2022/6/10 22:55
 */
@Slf4j
@Service
@RabbitListener(queues = {"sms-direct-queue"}) //监听队列
public class DirectSmsConsumer {

    @RabbitHandler
    public void smsMessage(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("sms direct --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重复处理失败,拒绝再次接收...");
                //basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                log.error("消息即将再次返回队列处理...");
                // basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

4.3 DirectWechatConsumer

package com.study.rabbitmq.service.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author chaoxian.wu
 * @DATE 2022/6/10 22:55
 */
@Slf4j
@Service
@RabbitListener(queues = {"wechat-direct-queue"}) //监听队列
public class DirectWechatConsumer {

    @RabbitHandler
    public void wechatlMessage(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("wechat direct --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重复处理失败,拒绝再次接收...");
                //basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                log.error("消息即将再次返回队列处理...");
                // basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

以上就是全部的代码部分,接下来我们在进入测试,看看实际效果如何,先发布一个routingKey=sms的消息,查看是不是只有对应的一个队列中接收到消息,消息发送详情:

package com.study.rabbitmq;

import com.study.rabbitmq.entity.Order;
import com.study.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.UUID;

@SpringBootTest
class SpringbootRabbitmqS1ApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void contextLoads() {
        for (long i = 1; i < 2; i++) {
            //交换机名称
            String exchangeName = "direct-order-exchange";
            //路由key
            String routingKey = "sms";
            Order order = buildOrder(i);
            orderService.createOrder(order, routingKey, exchangeName);
        }
    }

    private Order buildOrder(long id) {
        Order order = new Order();
        order.setRequestId(id);
        order.setUserId(id);
        order.setOrderNo(UUID.randomUUID().toString());
        order.setAmount(10L);
        order.setGoodsNum(1);
        order.setTotalAmount(10L);
        return order;
    }
}

我们登录rabbitmq管理后台查看下,只有sms-direct-queue这个队列有一条消息,效果如下:

Spring Boot整合RabbitMQ之路由模式(Direct),消息队列rabbitmq,java-rabbitmq,spring boot,rabbitmq

我们启动消费者,看下是不是只有监听了sms-direct-queue这个队列的消费者有消费日志,效果如下:

Spring Boot整合RabbitMQ之路由模式(Direct),消息队列rabbitmq,java-rabbitmq,spring boot,rabbitmq

再发一条routingKey=email的消息,消费的日志,效果图示如下

Spring Boot整合RabbitMQ之路由模式(Direct),消息队列rabbitmq,java-rabbitmq,spring boot,rabbitmq

到此其实已经springboot整合rabbitmq的路由模式结束了,这种模式在工作中还是比较常见的,我们演示的是单点的效果,实际工作中,不大可能会使用服务单点部署,现在都讲究服务的高可用,就得服务集群部署,又会涉及到消息重复消费的问题需要处理,我个人觉得,遇到重复消费问题,我第一时间想到的就是分布式锁,哈哈~。但是锁什么呢?肯定是消息中的具备唯一性的属性。来达到防止消息的重复消费。

整个过程中,其实还存在一个小问题没有验证,就是ReturnCallback回调机制没有触发,因为这个得发生在交换机将消息发送到队列的时候失败才会触发,那么我们就发送一个不存在的routingKey就可以触发了,我们发送一个routingKey=duanxin的消息,这个肯定不会发送成功,我们通过断点来看看效果,效果如下:

Spring Boot整合RabbitMQ之路由模式(Direct),消息队列rabbitmq,java-rabbitmq,spring boot,rabbitmq

然后我们常见的就全部整合完成了,当然,开启了双确认机制,虽然我们可以检测到消息投送的结果,然后可以针对投送失败的结果进行预警。但是开启了这个操作,就必然会对消息的处理效率产生影响。所以还得根据实际业务场景而定是否需要使用这个确认机制。文章来源地址https://www.toymoban.com/news/detail-681763.html

到了这里,关于Spring Boot整合RabbitMQ之路由模式(Direct)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot 整合 RabbitMQ 实现延迟消息

    消息队列(Message Queuing,简写为 MQ)最初是为了解决金融行业的特定业务需求而产生的。慢慢的,MQ 被应用到了更多的领域,然而商业 MQ 高昂的价格让很多初创公司望而却步,于是 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)应运而生。 随着 AMQP 草案的发布,两个月

    2024年04月08日
    浏览(30)
  • 第二章 Spring Boot 整合 Kafka消息队列 生产者

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证 第二章  Spring Boot 整合 Kafka消息队列 生产者 第三章  Spring Boot 整合 Kafka消息队列 消息者         Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的

    2024年01月25日
    浏览(31)
  • Spring整合RabbitMQ-配制文件方式-3-消息拉模式

    拉消息的消费者 spring-rabbit.xml 当启动消费者后,便可获取到发送至队列的消息 检查队列的消息的情况: 经过检查确认,发现消息已经被消费了。 至此拉模式的消费者完成。

    2024年02月09日
    浏览(30)
  • Spring Boot整合RabbitMQ之发布与订阅模式

    RabbitMQ的模式中,常用的模式有:简单模式,发布与订阅模式,工作模式,路由模式,主题模式。简单模式不太会运用到工作中,我们可以使用 RabbitMQ 的发布订阅模式,实现: 用户发布动态,其“粉丝”收到其发布动态的消息 用户下订单,库存模块、支付模块等收到消息并

    2024年02月12日
    浏览(24)
  • RabbitMQ入门 消息队列快速入门 SpringAMQP WorkQueue 队列和交换机 Fanout Direct exchange RAbbitMQ单体部署

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年04月08日
    浏览(48)
  • RabbitMQ详解(三):消息模式(fanout、direct、topic、work)

    参考官网:https://www.rabbitmq.com/getstarted.html 简单模式 Simple, 参考RabbitMQ详解(二):消息模式 Simple(简单)模式 简单模式是最简单的消息模式,它包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。 发布订阅模式 fanout 同时向

    2024年02月10日
    浏览(31)
  • 利用消息中间件RabbitMQ创建队列以及扇出(Fanout)、订阅(Direct)、主题(Topic)交换机来完成消息的发送和监听接收(完整版)

    目录 一、前期项目环境准备 1.1父项目以及子项目 1.2配置pom.xml 1.3配置application.yml 二、扇出(Fanout) 交换机实现消息的发送和接收 2.1编写子项目consumer(消费者,接收消息)的代码实现扇出(Fanout)交换机接收消息 2.1.1consumer子项目结构 2.1.2FanoutConfig类的实现扇出(Fanout)交

    2024年02月05日
    浏览(34)
  • 消息队列-RabbitMQ:Exchanges、绑定 bindings以及3大常用交换机(Fanout exchange、Direct exchange、Topics exchange)

    RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列 。实际上, 通常生产者甚至都不知道这些消息传递传递到了哪些队列中 。 相反, 生产者只能将消息发送到交换机 (exchange) , 交换机工作 的内容非常简单, 一方面它接收来自生产者的消息 , 另一

    2024年04月08日
    浏览(37)
  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(34)
  • RabbitMQ消息队列的工作模式

    官方文档地址:https://www.rabbitmq.com/getstarted.html 工作模式其实就是消息队列分发消息的路由方式。 RabbitMQ常用的几种工作模式: 简单模式 WorkQueues工作队列模式 PubSub生产者/PubSub消费者模式 Routing路由模式 Topics通配符模式 发布/订阅模式(Publish/Subscribe):该模式用于一对多的

    2024年02月15日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包