springboot集成rabbitmq

这篇具有很好参考价值的文章主要介绍了springboot集成rabbitmq。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

简介

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

相关概念

通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

springboot集成rabbitmq,springboot,rabbitmq,spring boot,java,后端

  • 左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。

  • 中间即是 RabbitMQ,其中包括了 交换机 和 队列。

  • 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。

    那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。

  • 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单, RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。

  • 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。

  • 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。

交换机(Exchange)

交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout

  • Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
  • Topic:按规则转发消息(最灵活)
  • Headers:设置 header attribute 参数类型的交换机
  • Fanout:转发消息到所有绑定队列

Direct Exchange

Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

springboot集成rabbitmq,springboot,rabbitmq,spring boot,java,后端

第一个 X - Q1 就有一个 binding key,名字为 orange; X - Q2 就有 2 个 binding key,名字为 black 和 green。当消息中的 路由键 和 这个 binding key 对应上的时候,那么就知道了该消息去到哪一个队列中。

Ps:为什么 X 到 Q2 要有 black,green,2个 binding key呢,一个不就行了吗? - 这个主要是因为可能又有 Q3,而Q3只接受 black 的信息,而Q2不仅接受black 的信息,还接受 green 的信息。

Topic Exchange

Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。

在这种交换机模式下:

  • 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。
  • 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements…b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是 agreements.eu.berlin.#,那么,以agreements.eu.berlin 开头的路由键都是可以的。

具体代码发送的时候还是一样,第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息。如下:

rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is  RabbitMQ!");

topic 和 direct 类似, 只是匹配上支持了”模式”, 在”点分”的 routing_key 形式中, 可以使用两个通配符:

  • *表示一个词.
  • #表示零个或多个词.

Headers Exchange

headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.

Fanout Exchange

Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了 routing_key 会被忽略。

Spring Boot 集成 RabbitMQ

Spring Boot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了spring-boot-starter-amqp 项目对消息各种支持。

1、添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置文件

##rabbitmq配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3、生产者

/**
 * 消息生产者
 */
@Component
public class Send implements RabbitTemplate.ConfirmCallback {

    private RabbitTemplate rabbitTemplate;

    /**
     * 构造方法注入
     */
    @Autowired
    public Send(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
    }

    public void sendMsg(String content) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, RabbitConfig.ROUTINGKEY_A, content, correlationId);
    }

    /**
     * 回调
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println(" 回调id:" + correlationData);
        if (ack) {
            System.out.println("消息成功消费");
        } else {
            System.out.println("消息消费失败:" + cause);
        }
    }

}

4、消费者

/**
 * 消费消费者
 */
@Configuration
public class RabbitConfig {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    public static final String EXCHANGE = "amq.direct";

    public static final String QUEUE_A = "test";

    public static final String ROUTINGKEY_A = "test_A";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1",5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);//必须要设置,才能进行消息的回调。
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必须是prototype类型
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    /**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
     HeadersExchange :通过添加属性key-value匹配
     DirectExchange:按照routingkey分发到指定队列
     TopicExchange:多关键字匹配
     */
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE);
    }

    /**
     * 获取队列A
     * @return
     */
    @Bean
    public Queue queueA() {
        return new Queue(QUEUE_A, true); //队列持久
    }


	 /**
     * 队列和交换机绑定
     * @return
     */
    @Bean
    public Binding bindingA() {
        return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }


    @Bean
    public SimpleMessageListenerContainer messageContainer() {
        //加载处理消息A的队列
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        //设置接收多个队列里面的消息,这里设置接收队列A
        //假如想一个消费者处理多个队列里面的信息可以如下设置:
        //container.setQueues(queueA(),queueB(),queueC());
        container.setQueues(queueA());
        container.setExposeListenerChannel(true);
        //设置最大的并发的消费者数量
        container.setMaxConcurrentConsumers(1);
        //最小的并发消费者的数量
        container.setConcurrentConsumers(1);
        //设置确认模式手工确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                /**通过basic.qos方法设置prefetch_count=1,这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message,
                 换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它 */
                channel.basicQos(1);
                byte[] body = message.getBody();
                logger.info("接受到的数据为:{}",new String(body));
                /**为了保证永远不会丢失消息,RabbitMQ支持消息应答机制。
                 当消费者接收到消息并完成任务后会往RabbitMQ服务器发送一条确认的命令,然后RabbitMQ才会将消息删除。*/
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        });
        return container;
    }

}

5. 发送消息

@Configuration
@EnableScheduling
public class RabbitMqJob {

    private static final Logger log = LoggerFactory.getLogger(RabbitMqJob.class);

    @Autowired
    private Send send;

    //每30s发送一次
    @Scheduled(cron = "0/30 * * * * ?")
    public void sendMessage(){
    	log.info("开始发送消息:"+ Instant.now());
        try
            send.sendMsg("hello world");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

结果如下:

springboot集成rabbitmq,springboot,rabbitmq,spring boot,java,后端文章来源地址https://www.toymoban.com/news/detail-643828.html

到了这里,关于springboot集成rabbitmq的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 第二十章 : Spring Boot 集成RabbitMQ(四)

    第二十章 : Spring Boot 集成RabbitMQ(四) 前言 本章知识点:死信队列的定义、场景、作用以及原理、TTL方法的使用以及演示代码示例。 死信队列 定义:什么是死信队列? 在RabbitMQ中,并没有提供真正意义上的延迟队列,但是RabbitMQ可以设置队列、消息的过期时间,当队列或者

    2024年02月04日
    浏览(37)
  • 第二十一章 : Spring Boot 集成RabbitMQ(五)

    第二十一章 : Spring Boot 集成RabbitMQ(五) 前言 本章知识点: 如何保证消息100%可靠性发送的技术解决方案。 一、 应用场景 在使用消息队列时,因为生产者和消费者不直接交互,所以面临下面几个问题: 1)要把消息添加到队列中,怎么保证消息成功添加? 2)如何保证消息

    2024年02月03日
    浏览(43)
  • Springboot 实践(13)spring boot 整合RabbitMq

    前文讲解了RabbitMQ的下载和安装,此文讲解springboot整合RabbitMq实现消息的发送和消费。 1、创建web project项目,名称为“SpringbootAction-RabbitMQ” 2、修改pom.xml文件,添加amqp使用jar包    !--  RabbitMQ --         dependency             groupIdorg.springframework.boot/groupId         

    2024年02月09日
    浏览(60)
  • spring boot集成Elasticsearch-SpringBoot(25)

      搜索引擎(search engine )通常意义上是指:根据特定策略,运用特定的爬虫程序从互联网上搜集信息,然后对信息进行处理后,为用户提供检索服务,将检索到的相关信息展示给用户的系统。   而我们讲解的是捜索的索引和检索,不涉及爬虫程序的内容爬取。大部分公司

    2023年04月09日
    浏览(113)
  • spring boot学习第六篇:SpringBoot 集成WebSocket详解

    1、WebSocket简介 WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。 2、为什么需要WebSocket HTTP 是基于请求响应式的,即通信只能由客户端发起,服务端做出响应,无状态,无连接。 无状态:每次连

    2024年01月21日
    浏览(52)
  • 【SpringBoot3】Spring Boot 3.0 集成 Redis 缓存

    Redis缓存是一个开源的使用ANSIC语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。它主要用于作为数据库、缓存和消息中间件,以快速读写和丰富的数据结构支持而著称。 在应用程序和数据库之间,Redis缓存作为一个中间层起着关键

    2024年02月21日
    浏览(56)
  • Spring Boot进阶(48):【实战教程】SpringBoot集成WebSocket轻松实现实时消息推送

            WebSocket是一种新型的通信协议,它可以在客户端与服务器端之间实现双向通信,具有低延迟、高效性等特点,适用于实时通信场景。在SpringBoot应用中,集成WebSocket可以方便地实现实时通信功能,如即时聊天、实时数据传输等。         本文将介绍如何在Sprin

    2024年02月09日
    浏览(61)
  • Spring Boot进阶(55):SpringBoot之集成MongoDB及实战使用 | 超级详细,建议收藏

            随着大数据时代的到来,数据存储和处理变得越来越重要。而MongoDB作为一种非关系型数据库,具有高效的数据存储和处理能力,被越来越多地应用于各种领域。尤其在Web应用开发中,SpringBoot框架已经成为了主流选择之一。在这篇文章中,我们将探讨如何将MongoD

    2024年02月17日
    浏览(48)
  • spring boot +springboot集成es7.9.1+canal同步到es

    未经许可,请勿转载。 其实大部分的代码是来源于 参考资料来源 的 主要代码实现 ,我只是在他的基础上增加自定义注解,自定义分词器等。需要看详细源码的可以去看 主要代码实现 ,结合我的来使用。 有人会问为什么需要自定义注解,因为elasticsearch7.6 索引将去除type 没

    2023年04月11日
    浏览(83)
  • 【Spring Boot】SpringBoot 2.6.6 集成 SpringDoc 1.6.9 生成swagger接口文档

    之前常用的SpringFox在2020年停止更新了,新项目集成SpringFox出来一堆问题,所以打算使用更活跃的SpringDoc,这里简单介绍一下我这边SpringBoot2.6.6集成SpringDoc1.6.9的demo。 官网链接 maven为例: 代码如下(示例): 默认路径: UI界面 http://localhost:9527/swagger-ui/index.html json界面 http:/

    2024年02月09日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包