SpringBoot 2.2.5 整合RabbitMQ,实现Topic主题模式的消息发送及消费

这篇具有很好参考价值的文章主要介绍了SpringBoot 2.2.5 整合RabbitMQ,实现Topic主题模式的消息发送及消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ六种工作模式简单说明

1、simple简单模式

  1. 消息产生着§将消息放入队列
  2. 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)

2、work工作模式(资源的竞争)

  1. 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样) 保证一条消息只能被一个消费者使用)
  2. 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)

3、publish/subscribe发布订阅(共享资源)

  1. X代表交换机rabbitMQ内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
  2. 相关场景:邮件群发,群聊天,广播(广告)

4、routing路由模式

  1. 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
  2. 根据业务功能定义路由字符串
  3. 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;

5、topic 主题模式(路由模式的一种)

  1. 星号井号代表通配符
  2. 星号代表多个单词,井号代表一个单词
  3. 路由功能添加模糊匹配
  4. 消息产生者产生消息,把消息交给交换机
  5. 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

6、远程调用rpc模式

  1. 客户端发送一个请求消息然后服务器回复一个响应消息。为了收到一个响应,我们需要发送一个’回调’的请求的队列地址。

本文使用的是Topic主题模式,完整代码地址在结尾!!

第一步,在pom.xml加入依赖,如下

<!-- rabbitmq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步,编写application.yml配置文件,如下

server:
  port: 8184
spring:
  application:
    name: rabbitmq-demo-server
  rabbitmq:
    host: xxx # ip地址
    port: 5672
    username: xxx # 连接账号
    password: xxx # 连接密码
    template:
      retry:
        enabled: true # 开启失败重试
        initial-interval: 10000ms # 第一次重试的间隔时长
        max-interval: 300000ms # 最长重试间隔,超过这个间隔将不再重试
        multiplier: 2 # 下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
      exchange: topic.exchange # 缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
    publisher-confirm-type: correlated # 生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试
    publisher-returns: true
    listener:
      type: simple
      simple:
        acknowledge-mode: manual
        prefetch: 1 # 限制每次发送一条数据。
        concurrency: 3 # 同一个队列启动几个消费者
        max-concurrency: 3 # 启动消费者最大数量
        # 重试策略相关配置
        retry:
          enabled: true # 是否支持重试
          max-attempts: 5
          stateless: false
          multiplier: 1.0 # 时间策略乘数因子
          initial-interval: 1000ms
          max-interval: 10000ms
        default-requeue-rejected: true

第三步,创建RabbitMqConstants类,如下

/**
 * RabbitMqConstants
 *
 * @author luoyu
 * @date 2019/03/16 22:12
 * @description
 */
public class RabbitMqConstants {

    public final static String TEST1_QUEUE = "test1-queue";

    public final static String TEST2_QUEUE = "test2-queue";

    public final static String EXCHANGE_NAME = "test.topic.exchange";

    public final static String TOPIC_TEST1_ROUTINGKEY = "topic.test1.*";

    public final static String TOPIC_TEST1_ROUTINGKEY_TEST = "topic.test1.test";

    public final static String TOPIC_TEST2_ROUTINGKEY = "topic.test2.*";

    public final static String TOPIC_TEST2_ROUTINGKEY_TEST = "topic.test2.test";

}

第四步,创建RabbitMqConfig配置类,如下

import com.luoyu.rabbitmq.constants.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

/**
 * RabbitMQConfig
 *
 * @author luoyu
 * @date 2019/03/16 21:59
 * @description
 */
@Slf4j
@Configuration
public class RabbitMqConfig {

    @Autowired
    private CachingConnectionFactory connectionFactory;

    /**
     *  声明交换机
     */
    @Bean(RabbitMqConstants.EXCHANGE_NAME)
    public Exchange exchange(){
        //durable(true) 持久化,mq重启之后交换机还在
        return ExchangeBuilder.topicExchange(RabbitMqConstants.EXCHANGE_NAME).durable(true).build();
    }

    /**
     *  声明队列
     *  new Queue(QUEUE_EMAIL,true,false,false)
     *  durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
     *  auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
     *  exclusive  表示该消息队列是否只在当前connection生效,默认是false
     */
    @Bean(RabbitMqConstants.TEST1_QUEUE)
    public Queue esQueue() {
        return new Queue(RabbitMqConstants.TEST1_QUEUE);
    }

    /**
     *  声明队列
     */
    @Bean(RabbitMqConstants.TEST2_QUEUE)
    public Queue gitalkQueue() {
        return new Queue(RabbitMqConstants.TEST2_QUEUE);
    }

    /**
     *  TEST1_QUEUE队列绑定交换机,指定routingKey
     */
    @Bean
    public Binding bindingEs(@Qualifier(RabbitMqConstants.TEST1_QUEUE) Queue queue,
                             @Qualifier(RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY).noargs();
    }

    /**
     *  TEST2_QUEUE队列绑定交换机,指定routingKey
     */
    @Bean
    public Binding bindingGitalk(@Qualifier(RabbitMqConstants.TEST2_QUEUE) Queue queue,
                                 @Qualifier(RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY).noargs();
    }

    /**
     * 如果需要在生产者需要消息发送后的回调,
     * 需要对rabbitTemplate设置ConfirmCallback对象,
     * 由于不同的生产者需要对应不同的ConfirmCallback,
     * 如果rabbitTemplate设置为单例bean,
     * 则所有的rabbitTemplate实际的ConfirmCallback为最后一次申明的ConfirmCallback。
     * @return
     */
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        return template;
    }

}

第五步,创建RabbitMqUtils工具类,如下

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * RabbitMqUtils
 *
 * @author luoyu
 * @date 2019/03/16 22:08
 * @description
 */
@Slf4j
@Component
public class RabbitMqUtils implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{

    private RabbitTemplate rabbitTemplate;

    /**
     * 构造方法注入
     */
    @Autowired
    public RabbitMqUtils(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        //这是是设置回调能收到发送到响应
        rabbitTemplate.setConfirmCallback(this);
        //如果设置备份队列则不起作用
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 回调确认
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
        }else{
            log.info("消息发送失败:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
        }
    }

    /**
     * 消息发送到转换器的时候没有对列,配置了备份对列该回调则不生效
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
    }

    /**
     * 发送到指定Queue
     * @param queueName
     * @param obj
     */
    public void send(String queueName, Object obj){
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend(queueName, obj, correlationId);
    }

    /**
     * 1、交换机名称
     * 2、routingKey
     * 3、消息内容
     */
    public void sendByRoutingKey(String exChange, String routingKey, Object obj){
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend(exChange, routingKey, obj, correlationId);
    }
}

第六步,编写RabbitMqListener消费者监听器,如下

import com.luoyu.rabbitmq.constants.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Slf4j
@Component
public class RabbitMqListener {

    @RabbitListener(queues = RabbitMqConstants.TEST1_QUEUE)
    public void test1Consumer(Message message, Channel channel) {
        try {
            //手动确认消息已经被消费
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info("Test1消费消息:" + message.toString() + "。成功!");
        } catch (Exception e) {
            e.printStackTrace();
            log.info("Test1消费消息:" + message.toString() + "。失败!");
        }
    }

    @RabbitListener(queues = RabbitMqConstants.TEST2_QUEUE)
    public void test2Consumer(Message message, Channel channel) {
        try {
            //手动确认消息已经被消费
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info("Test2消费消息:" + message.toString() + "。成功!");
        } catch (Exception e) {
            e.printStackTrace();
            log.info("Test2消费消息:" + message.toString() + "。失败!");
        }
    }

}

第七步,编写生产者服务类,TestService,TestServiceImpl,如下

TestService

public interface TestService {

    String sendTest1(String content);

    String sendTest2(String content);

}

TestServiceImpl

import com.luoyu.rabbitmq.constants.RabbitMqConstants;
import com.luoyu.rabbitmq.service.TestService;
import com.luoyu.rabbitmq.util.RabbitMqUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class TestServiceImpl implements TestService {

    @Autowired
    private RabbitMqUtils rabbitMqUtils;

    @Override
    public String sendTest1(String content) {
        rabbitMqUtils.sendByRoutingKey(RabbitMqConstants.EXCHANGE_NAME,
                RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY_TEST, content);
        return "发送成功!";
    }

    @Override
    public String sendTest2(String content) {
        rabbitMqUtils.sendByRoutingKey(RabbitMqConstants.EXCHANGE_NAME,
                RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY_TEST, content);
        return "发送成功!";
    }
}

第八步,编写TestController类,如下

import com.luoyu.rabbitmq.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author :jhx
 * @date :2020/12/26
 * @desc :
 */
@RestController
@Slf4j
@RequestMapping(value = "/message")
public class TestController {

    @Autowired
    private TestService testService;

    /**
     * 发送消息test1
     * @param content
     * @return
     */
    @PostMapping(value = "/test1")
    public String sendTest1(@RequestBody String content) {
        return testService.sendTest1(content);
    }

    /**
     * 发送消息test2
     * @param content
     * @return
     */
    @PostMapping(value = "/test2")
    public String sendTest2(@RequestBody String content) {
        return testService.sendTest2(content);
    }

}

第九步,启动项目,使用postman调用不同Topic主题的发送消息接口,通过控制台打印日志,可知消息成功发送并成功被消费。

完整代码地址:https://github.com/Jinhx128/springboot-demo

注:此工程包含多个module,本文所用代码均在rabbitmq-demo模块下

文章来源地址https://www.toymoban.com/news/detail-783411.html

到了这里,关于SpringBoot 2.2.5 整合RabbitMQ,实现Topic主题模式的消息发送及消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 利用消息中间件RabbitMQ创建队列以及扇出(Fanout)、订阅(Direct)、主题(Topic)交换机来完成消息的发送和监听接收(完整版)

    目录 一、前期项目环境准备 1.1父项目以及子项目 1.2配置pom.xml 1.3配置application.yml 二、扇出(Fanout) 交换机实现消息的发送和接收 2.1编写子项目consumer(消费者,接收消息)的代码实现扇出(Fanout)交换机接收消息 2.1.1consumer子项目结构 2.1.2FanoutConfig类的实现扇出(Fanout)交

    2024年02月05日
    浏览(61)
  • SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理

    RabbitMQ是一种常用的消息队列,Spring Boot对其进行了深度的整合,可以快速地实现消息的发送和接收。在RabbitMQ中,消息的发送和接收都是异步的,因此需要使用监听器来监听消息的到来。Spring Boot中提供了默认的监听器容器,但是有时候我们需要自定义监听器容器,来满足一

    2024年02月16日
    浏览(49)
  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(54)
  • RabbitMQ详解(三):消息模式(fanout、direct、topic、work)

    参考官网:https://www.rabbitmq.com/getstarted.html 简单模式 Simple, 参考RabbitMQ详解(二):消息模式 Simple(简单)模式 简单模式是最简单的消息模式,它包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。 发布订阅模式 fanout 同时向

    2024年02月10日
    浏览(47)
  • 207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器)

    1、ContentUtil 先定义常量 2、RabbitMQConfig 创建队列的两种方式之一: 配置式: 在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。 就是在配置类中创建一个生成消息队列的@Bean。 问题: 用 @Configuration 注解声明为配置类,但是项目启动

    2024年02月06日
    浏览(57)
  • rabbitmq topic模式设置#通配符情况下 消费者队列未接收消息问题排查解决

    生产者配置 Exchange:topic_exchange_shcool Routing key:topic.shcool.# 消费者代码配置 Exchange:topic_exchange_shcool Routing key:topic.shcool.user 其实以上代码看着没有问题,意思是代码生成一个队列,并把【topic.shcool.user】队列和生产者的【topic_exchange_shcool】exchange绑定,但是生产者发送消息是

    2024年02月11日
    浏览(48)
  • Springboot整合RabbitMQ消息中间件

    spring-boot-rabbitmq–消息中间件整合 前言:RabbitMQ的各种交换机说明 1、直连交换机 生产者发布消息时必须带着routing-key,队列绑定到交换机时必须指定binding-key ,且routing-key和binding-key必须完全相同,如此才能将消息路由到队列中 直连交换机通常用来循环分发任务给多个workers,

    2024年02月11日
    浏览(47)
  • 消息队列——spring和springboot整合rabbitmq

    目录 spring整合rabbitmq——生产者 rabbitmq配置文件信息 倒入生产者工程的相关代码 简单工作模式 spring整合rabbitmq——消费者 spring整合rabbitmq——配置详解 SpringBoot整合RabbitMQ——生产者  SpringBoot整合RabbitMQ——消费者   使用原生amqp来写应该已经没有这样的公司了 创建两个工程

    2024年02月16日
    浏览(53)
  • Spring整合RabbitMQ-配制文件方式-3-消息拉模式

    拉消息的消费者 spring-rabbit.xml 当启动消费者后,便可获取到发送至队列的消息 检查队列的消息的情况: 经过检查确认,发现消息已经被消费了。 至此拉模式的消费者完成。

    2024年02月09日
    浏览(40)
  • 【SpringBoot】 整合RabbitMQ 保证消息可靠性传递

    生产者端 目录结构 导入依赖 修改yml 业务逻辑 测试结果         在publisher-confirm-type中有三个确认消息接受类型:none、correlated、simple。         publisher-confirm-type: none 表示 禁用发布确认模式 。是 默认值 。使用此模式之后,不管消息有没有发送到Broker(RabbitMQ)都不会

    2024年02月10日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包