Spring整合RabbitMQ-注解方式

这篇具有很好参考价值的文章主要介绍了Spring整合RabbitMQ-注解方式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

maven导入

            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>2.2.7.RELEASE</version>
            </dependency>
5.2.1 消息的生产者
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;

import java.nio.charset.StandardCharsets;

public class ProducterApplication {

    public static void main(String[] args) throws Exception {
        AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);
		
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
		
        //构造消息属性对象
        MessageProperties msgBuild = MessagePropertiesBuilder.newInstance()
            	//设置消息的类型为文本
                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
            	//消息的编码方式为UTF-8
                .setContentEncoding(StandardCharsets.UTF_8.name())
            	//自定义消息头信息
                .setHeader("test.header", "test.value")
                .build();
		//对象消息进行编码操作
        Message msg = MessageBuilder.withBody("你好 RabbitMQ!".getBytes(StandardCharsets.UTF_8))
                .andProperties(msgBuild)
                .build();

        template.send("ex.anno.fanout", "routing.anno", msg);

        context.close();
    }

}

RabbitConfig

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import java.net.URI;

@Configurable
public class RabbitConfig {

    /**
     * 连接工厂
     *
     * @return
     */
    @Bean
    public ConnectionFactory getConnectionFactory() {
        URI uri = URI.create("amqp://root:123456@node1:5672/%2f");
        ConnectionFactory factory = new CachingConnectionFactory(uri);
        return factory;
    }

    /**
     * RabbitTemplate
     */
    @Bean
    @Autowired
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
        return rabbitTemplate;
    }


    /**
     * RabbitAdmin
     */
    @Bean
    @Autowired
    public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
        RabbitAdmin admin = new RabbitAdmin(factory);
        return admin;
    }

    /**
     * Queue
     */
    @Bean
    public Queue queue() {
        Queue queue = QueueBuilder.nonDurable("queue.anno")
                //是否排外,即是否只有当前这个连接才能看到。
                //.exclusive()
                //是否自动删除
                //.autoDelete()
                .build();

        return queue;
    }

    /**
     * Exchange
     */
    @Bean
    public Exchange exchange() {
        Exchange exchange = new FanoutExchange("ex.anno.fanout", false, false, null);
        return exchange;
    }

    /**
     * Binding
     */
    @Bean
    @Autowired
    public Binding binding(Queue queue, Exchange exchange) {
        //创建一个不指定参数的绑定
        Binding binding = BindingBuilder.bind(queue).to(exchange).with("routing.anno").noargs();
        return binding;
    }
}

提示:

ConnectionFactory有三个实现

CachingConnectionFactory 基于channel的缓存模式 最常用是这个。

LocalizedQueueConnectionFactory 直接连接某个节点的方式。如果是集群,此种不太适合。

SimpleRoutingConnectionFactory 在当前的连接工厂中按查找的KEY获取连接工厂。

运行消息的生产者,查看消息发送信息

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ ex.anno.fanout     │ fanout  │
├────────────────────┼─────────┤
│ ex.busi.topic      │ topic   │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│ ex.direct          │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ ex.routing         │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌────────────────┬─────────────┬──────────────────┬──────────────────┬──────────────┬───────────┐
│ source_name    │ source_kind │ destination_name │ destination_kind │ routing_key  │ arguments │
├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
│                │ exchange    │ queue.msg        │ queue            │ queue.msg    │           │
├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
│                │ exchange    │ queue.anno       │ queue            │ queue.anno   │           │
├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
│ ex.anno.fanout │ exchange    │ queue.anno       │ queue            │ routing.anno │           │
├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
│ ex.direct      │ exchange    │ queue.msg        │ queue            │ routing.q1   │           │
└────────────────┴─────────────┴──────────────────┴──────────────────┴──────────────┴───────────┘
[root@nullnull-os ~]# rabbitmqctl list_queues --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌────────────┬──────────┐
│ name       │ messages │
├────────────┼──────────┤
│ queue.msg  │ 0        │
├────────────┼──────────┤
│ queue.anno │ 1        │
└────────────┴──────────┘
[root@nullnull-os ~]# 

通过检查发现,消息已经成功的发送到了队列

5.2.2 使用拉模式获取消息
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;

public class ConsumerGetApplication {

    public static void main(String[] args) throws Exception {
        //从指定类加载配制信息
        AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);
        RabbitTemplate rabbit = context.getBean(RabbitTemplate.class);

        Message receive = rabbit.receive("queue.anno");
        String encoding = receive.getMessageProperties().getContentEncoding();
        System.out.println("消息信息:" + new String(receive.getBody(), encoding));

        context.close();
    }

}

RabbitConfig的配制

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import java.net.URI;

@Configurable
public class RabbitConfig {

    /**
     * 连接工厂
     *
     * @return
     */
    @Bean
    public ConnectionFactory getConnectionFactory() {
        URI uri = URI.create("amqp://root:123456@node1:5672/%2f");
        ConnectionFactory factory = new CachingConnectionFactory(uri);
        return factory;
    }

    /**
     * RabbitTemplate
     */
    @Bean
    @Autowired
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
        return rabbitTemplate;
    }


    /**
     * RabbitAdmin
     */
    @Bean
    @Autowired
    public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
        RabbitAdmin admin = new RabbitAdmin(factory);
        return admin;
    }

    /**
     * Queue
     */
    @Bean
    public Queue queue() {
        Queue queue = QueueBuilder.nonDurable("queue.anno")
                //是否排外,即是否只有当前这个连接才能看到。
                //.exclusive()
                //是否自动删除
                //.autoDelete()
                .build();

        return queue;
    }
}

运行主程序,检查控制台的输出。

消息信息:你好 RabbitMQ!

至此使用拉模式,已经成功的获取队列中的数据。

**5.2.3 使用推模式获取数据 **

消费者处理的代码

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageListener {


    /**
     * com.rabbitmq.client.Channel to get access to the Channel channel对象
     * org.springframework.amqp.core.Message  message对象,可以直接操作原生的AMQP消息
     * org.springframework.messaging.Message to use the messaging abstraction counterpart
     *
     * @Payload-annotated 注解方法参数,该参数的值就是消息体。   method arguments including the support of validation
     * @Header-annotated 注解方法参数,访问指定的消息头字段的值。 method arguments to extract a specific header value, including standard AMQP headers defined by AmqpHeaders
     * @Headers-annotated 该注解的参数获取该消息的消息头的所有字段,参数集合类型对应的MAP argument that must also be assignable to java.util.Map for getting access to all headers.
     * MessageHeaders 参数类型,访问所有消息头字段  arguments for getting access to all headers.
     * MessageHeaderAccessor or AmqpMessageHeaderAccessor  访问所有消息头字段。
     * <p>
     * 消息监听
     */
    @RabbitListener(queues = "queue.anno")
    public void whenMessageCome(Message msg) throws Exception {
        String encoding = msg.getMessageProperties().getContentEncoding();
        System.out.println("收到的消息:" + new String(msg.getBody(), encoding));
    }


    /**
    // * 使用payload进行消费
    // *
    // * 不可同时存在相同的队列被两个监听
    // *
    // * @param data
    // */
    //@RabbitListener(queues = "queue.anno")
    //public void whenMessageConsumer(@Payload String data) {
    //    System.out.println("收到的消息:" + data);
    //}

}

此处存在两种方式,一种是接收Message作为参数,还有一种是使用@Payload接收内容作为参数

配制处理

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.amqp.core.Queue;
import java.net.URI;

@EnableRabbit
//@ComponentScan("com.nullnull.learn")
@ComponentScan
@Configurable //xml中也可以使用<rabbit:annotation-driven/> 启用@RabbitListener注解
public class RabbitConfig {


    @Bean
    public ConnectionFactory connectionFactory() {
        URI uriInfo = URI.create("amqp://root:123456@node1:5672/%2f");
        return new CachingConnectionFactory(uriInfo);
    }


    @Bean
    @Autowired
    public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
        return new RabbitAdmin(factory);
    }

    @Bean
    @Autowired
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
        return new RabbitTemplate(factory);
    }


    @Bean
    public Queue queue() {
        return QueueBuilder.nonDurable("queue.anno").build();
    }


    /**
     * RabbitListener的容器管理对象
     * <p>
     * 使用监听器监听推送过来的消息。在一个应用中可能会有多个监听器。这些监听器是需要一个工厂管理起来的。
     *
     * @return
     */
    @Bean("rabbitListenerContainerFactory")
    @Autowired
    public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectFactory) {
        SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();

        //要管理容器就得有连接
        containerFactory.setConnectionFactory(connectFactory);
        containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //containerFactory.setAcknowledgeMode(AcknowledgeMode.NONE);
        //设置并发的消费者,即可以同时存在10个消费都消费消息。
        containerFactory.setConcurrentConsumers(10);
        //设置并发的最大消费者。
        containerFactory.setMaxConcurrentConsumers(15);
        //按照批次处理消息消息。
        containerFactory.setBatchSize(10);
        return containerFactory;
    }

}

启动类

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class ConsumerListenerApplication {

    public static void main(String[] args) {
        new AnnotationConfigApplicationContext(RabbitConfig.class);
    }

}

再启动生产者

对生产者作一点改造,让其发送多条

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;

import java.nio.charset.StandardCharsets;

public class ProducterApplication {

    public static void main(String[] args) throws Exception {
        AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);

        RabbitTemplate template = context.getBean(RabbitTemplate.class);

        MessageProperties msgBuild = MessagePropertiesBuilder.newInstance()
                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                .setContentEncoding(StandardCharsets.UTF_8.name())
                .setHeader("test.header", "test.value")
                .build();

        for (int i = 0; i < 20; i++) {
            Message msg = MessageBuilder.withBody(("你好 RabbitMQ! id :" + i).getBytes(StandardCharsets.UTF_8))
                    .andProperties(msgBuild)
                    .build();

            template.send("ex.anno.fanout", "routing.anno", msg);
        }

        context.close();
    }

}

客户端接收,查看控制台

收到的消息:你好 RabbitMQ! id :4
收到的消息:你好 RabbitMQ! id :9
收到的消息:你好 RabbitMQ! id :8
收到的消息:你好 RabbitMQ! id :7
收到的消息:你好 RabbitMQ! id :6
收到的消息:你好 RabbitMQ! id :2
收到的消息:你好 RabbitMQ! id :3
收到的消息:你好 RabbitMQ! id :5
收到的消息:你好 RabbitMQ! id :14
收到的消息:你好 RabbitMQ! id :17
收到的消息:你好 RabbitMQ! id :1
收到的消息:你好 RabbitMQ! id :0
收到的消息:你好 RabbitMQ! id :13
收到的消息:你好 RabbitMQ! id :15
收到的消息:你好 RabbitMQ! id :12
收到的消息:你好 RabbitMQ! id :16
收到的消息:你好 RabbitMQ! id :18
收到的消息:你好 RabbitMQ! id :19
收到的消息:你好 RabbitMQ! id :11
收到的消息:你好 RabbitMQ! id :10

通过观察发现,此处接收的顺序与并非发送的顺序进行的接收,这是因为批量以及并发的控制在这里起的作用,如果要按顺序,去接批量及并发则就是按顺序接收。文章来源地址https://www.toymoban.com/news/detail-708678.html

到了这里,关于Spring整合RabbitMQ-注解方式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring整合RabbitMQ-配制文件方式-2-推模式消费者

    推模式的消费者 在推模式中使用可以两种实现: 使用ChannelAwareMessageListener. 除消息外,还提供了Channel这个对象,通过channel可以有更大的灵活性。 使用MessageListener 基本的消息的临时。普通的场景基本够用。 此处以ChannelAwareMessageListener为样例: spring-rabbit.xml 容器启动类 首先

    2024年02月09日
    浏览(43)
  • 【RabbitMQ】Spring整合RabbitMQ、Spring实现RabbitMQ五大工作模式(万字长文)

    目录 一、准备 1、创建maven项目​编辑 2、引入依赖 3、创建配置文件 1.RabbitMQ配置文件 2.生产者项目配置文件 3.消费者项目配置文件 二、生产者xml中文件创建队列 三、生产者xml文件中创建交换机以及绑定队列 1、创建交换机 2、绑定队列  四、消费者xml文件中创建队列消息监

    2024年01月21日
    浏览(44)
  • 【RabbitMQ】4 Spring/SpringBoot整合RabbitMQ

    spring-amqp 是对AMQP的一些概念的一些抽象, spring-rabbit 是对RabbitMQ操作的封装实现。 主要有几个核心类 RabbitAdmin 、 RabbitTemplate 、 SimpleMessageListenerContainer 等。 RabbitAdmin 类完成对Exchange,Queue,Binding的操作,在容器中管理了 RabbitAdmin 类的时候,可以对Exchange,Queue,Binding进行自

    2024年01月22日
    浏览(43)
  • flask整合rabbitMQ插件的方式

    当今互联网应用的高并发场景下,如何保证可靠的消息传递和高效的处理成为了一项重要的挑战。在这种情况下,RabbitMQ作为一种可靠的消息队列系统,被广泛应用于各个领域。 本文将介绍如何使用RabbitMQ插件整合Flask框架,实现并发性生产者和消费者的兼容。通过这种方式,

    2024年02月03日
    浏览(34)
  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(54)
  • Spring整合RabbitMQ

    生产者 ① 创建生产者工程 ② 添加依赖 ③ 配置整合 ④ 编写代码发送消息 消费者 ① 创建消费者工程 ② 添加依赖 ③ 配置整合 ④ 编写消息监听器 1.在 生产者工程和消费者工程中都 导入如下依赖  2.生产者和消费者 导入配置文件   rabbitmq.properties 3.生产者核心配置文件  

    2024年02月07日
    浏览(27)
  • Spring Boot 整合RabbitMQ

    第一章 Java线程池技术应用 第二章 CountDownLatch和Semaphone的应用 第三章 Spring Cloud 简介 第四章 Spring Cloud Netflix 之 Eureka 第五章 Spring Cloud Netflix 之 Ribbon 第六章 Spring Cloud 之 OpenFeign 第七章 Spring Cloud 之 GateWay 第八章 Spring Cloud Netflix 之 Hystrix 第九章 代码管理gitlab 使用 第十章 Spr

    2024年02月05日
    浏览(51)
  • [Spring boot] Spring boot 整合RabbitMQ实现通过RabbitMQ进行项目的连接

     🍳作者:天海奈奈 💭眼过千遍不如手锤一遍:推荐一款模拟面试,斩获大厂 o f f e r ,程序员的必备刷题平台 − − 牛客网  👉🏻点击开始刷题之旅 目录 什么是RabbitMQ   消息队列:接受并转发消息,类似于快递公司 消息队列的优点 消息队列的特性 RabbitMQ特点 RabbitMQ核

    2024年01月24日
    浏览(84)
  • Spring整合RabbitMQ——生产者

    添加依赖坐标,在producer和consumer模块的pom文件中各复制一份。 配置producer的配置文件 配置producer的xml配置文件 编写测试类发送消息

    2024年02月07日
    浏览(41)
  • Spring的注解开发-注解方式整合MyBatis代码实现

    之前使用xml方式整合了MyBatis,文章导航:Spring整合第三方框架-MyBatis整合Spring实现-CSDN博客 现在使用注解的方式无非是就是将xml标签替换为注解,将xml配置文件替换为配置类而已。 非自定义配置类                 与数据库建立连接的同时,扫描指定的mapper接口,实现实现

    2024年02月07日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包