使用Java进行操作RabbitMQ

这篇具有很好参考价值的文章主要介绍了使用Java进行操作RabbitMQ。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用Java操作消息队列

现在我们来看看如何通过Java连接到RabbitMQ服务器并使用消息队列进行消息发送(这里一起讲解,包括Java基础版本和SpringBoot版本),首先我们使用最基本的Java客户端连接方式:


 
 
  1. <dependency>
  2. <groupId>com.rabbitmq </groupId>
  3. <artifactId>amqp-client </artifactId>
  4. <version>5.14.2 </version>
  5. </dependency>

依赖导入之后,我们来实现一下生产者和消费者,首先是生产者,生产者负责将信息发送到消息队列:


 
 
  1. public static void main (String[] args) {
  2. //使用ConnectionFactory来创建连接
  3. ConnectionFactory factory = new ConnectionFactory();
  4. //设定连接信息,基操
  5. factory.setHost( "192.168.0.12");
  6. factory.setPort( 5672); //注意这里写5672,是amqp协议端口
  7. factory.setUsername( "admin");
  8. factory.setPassword( "admin");
  9. factory.setVirtualHost( "/test");
  10. //创建连接
  11. try( Connection connection = factory.newConnection()){
  12. } catch (Exception e){
  13. e.printStackTrace();
  14. }
  15. }

这里我们可以直接在程序中定义并创建消息队列(实际上是和我们在管理页面创建一样的效果)客户端需要通过连接创建一个新的通道(Channel),同一个连接下可以有很多个通道,这样就不用创建很多个连接也能支持分开发送了。


 
 
  1. try( Connection connection = factory.newConnection();
  2. Channel channel = connection.createChannel()){ //通过Connection创建新的Channel
  3. //声明队列,如果此队列不存在,会自动创建
  4. channel.queueDeclare( "yyds", false, false, false, null);
  5. //将队列绑定到交换机
  6. channel.queueBind( "yyds", "amq.direct", "my-yyds");
  7. //发布新的消息,注意消息需要转换为byte[]
  8. channel.basicPublish( "amq.direct", "my-yyds", null, "Hello World!".getBytes());
  9. } catch (Exception e){
  10. e.printStackTrace();
  11. }

其中queueDeclare方法的参数如下:

  • queue:队列的名称(默认创建后routingKey和队列名称一致)
  • durable:是否持久化。
  • exclusive:是否排他,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。排他队列是基于Connection可见,同一个Connection的不同Channel是可以同时访问同一个连接创建的排他队列,并且,如果一个Connection已经声明了一个排他队列,其他的Connection是不允许建立同名的排他队列的,即使该队列是持久化的,一旦Connection关闭或者客户端退出,该排他队列都会自动被删除。
  • autoDelete:是否自动删除。
  • arguments:设置队列的其他一些参数,这里我们暂时不需要什么其他参数。

其中queueBind方法参数如下:

  • queue:需要绑定的队列名称。
  • exchange:需要绑定的交换机名称。
  • routingKey:不用多说了吧。

其中basicPublish方法的参数如下:

  • exchange: 对应的Exchange名称,我们这里就使用第二个直连交换机。
  • routingKey:这里我们填写绑定时指定的routingKey,其实和之前在管理页面操作一样。
  • props:其他的配置。
  • body:消息本体。

执行完成后,可以在管理页面中看到我们刚刚创建好的消息队列了:

java rabbitmq发送消息,java,java-rabbitmq,rabbitmq

并且此消息队列已经成功与amq.direct交换机进行绑定:

java rabbitmq发送消息,java,java-rabbitmq,rabbitmq

那么现在我们的消息队列中已经存在数据了,怎么将其读取出来呢?我们来看看如何创建一个消费者:


 
 
  1. public static void main (String[] args) throws IOException, TimeoutException {
  2. ConnectionFactory factory = new ConnectionFactory();
  3. factory.setHost( "10.37.129.4");
  4. factory.setPort( 5672);
  5. factory.setUsername( "admin");
  6. factory.setPassword( "admin");
  7. factory.setVirtualHost( "/test");
  8. //这里不使用try-with-resource,因为消费者是一直等待新的消息到来,然后按照
  9. //我们设定的逻辑进行处理,所以这里不能在定义完成之后就关闭连接
  10. Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel();
  12. //创建一个基本的消费者
  13. channel.basicConsume( "yyds", false, (s, delivery) -> {
  14. System.out.println( new String(delivery.getBody()));
  15. //basicAck是确认应答,第一个参数是当前的消息标签,后面的参数是
  16. //是否批量处理消息队列中所有的消息,如果为false表示只处理当前消息
  17. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  18. //basicNack是拒绝应答,最后一个参数表示是否将当前消息放回队列,如果
  19. //为false,那么消息就会被丢弃
  20. //channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
  21. //跟上面一样,最后一个参数为false,只不过这里省了
  22. //channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
  23. }, s -> {});
  24. }

其中basicConsume方法参数如下:

●queue  -  消息队列名称,直接指定。
●autoAck - 自动应答,消费者从消息队列取出数据后,需要跟服务器进行确认应答,当服务器收到确认后,会自动将消息删除,如果开启自动应答,那么消息发出后会直接删除。
●deliver  -  消息接收后的函数回调,我们可以在回调中对消息进行处理,处理完成后,需要给服务器确认应答。
●cancel  -  当消费者取消订阅时进行的函数回调,这里暂时用不到。

现在我们启动一下消费者,可以看到立即读取到我们刚刚插入到队列中的数据:


我们现在继续在消息队列中插入新的数据,这里直接在网页上进行操作就行了,同样的我们也可以在消费者端接受并进行处理。

现在我们把刚刚创建好的消息队列删除。

官方文档:Spring AMQP

前面我们已经完成了RabbitMQ的安装和简单使用,并且通过Java连接到服务器。现在我们来尝试在SpringBoot中整合消息队列客户端,首先是依赖:
 


 
 
  1. <dependency>
  2. <groupId>org.springframework.boot </groupId>
  3. <artifactId>spring-boot-starter-amqp </artifactId>
  4. </dependency>

接着我们需要配置RabbitMQ的地址等信息:


 
 
  1. spring:
  2. rabbitmq:
  3. addresses: 192.168.0.4
  4. username: admin
  5. password: admin
  6. virtual-host: / test

这样我们就完成了最基本信息配置,现在我们来看一下,如何像之前一样去声明一个消息队列,我们只需要一个配置类就行了:


 
 
  1. @Configuration
  2. public class RabbitConfiguration {
  3. @Bean("directExchange") //定义交换机Bean,可以很多个
  4. public Exchange exchange (){
  5. return ExchangeBuilder.directExchange( "amq.direct").build();
  6. }
  7. @Bean("yydsQueue") //定义消息队列
  8. public Queue queue (){
  9. return QueueBuilder
  10. .nonDurable( "yyds") //非持久化类型
  11. .build();
  12. }
  13. @Bean("binding")
  14. public Binding binding (@Qualifier("directExchange") Exchange exchange,
  15. @Qualifier("yydsQueue") Queue queue){
  16. //将我们刚刚定义的交换机和队列进行绑定
  17. return BindingBuilder
  18. .bind(queue) //绑定队列
  19. .to(exchange) //到交换机
  20. .with( "my-yyds") //使用自定义的routingKey
  21. .noargs();
  22. }
  23. }

接着我们来创建一个生产者,这里我们直接编写在测试用例中:


 
 
  1. @SpringBootTest
  2. class SpringCloudMqApplicationTests {
  3. //RabbitTemplate为我们封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可
  4. @Resource
  5. RabbitTemplate template;
  6. @Test
  7. void publisher () {
  8. //使用convertAndSend方法一步到位,参数基本和之前是一样的
  9. //最后一个消息本体可以是Object类型,真是大大的方便
  10. template.convertAndSend( "amq.direct", "my-yyds", "Hello World!");
  11. }
  12. }

现在我们来运行一下这个测试用例:

java rabbitmq发送消息,java,java-rabbitmq,rabbitmq

可以看到后台自动声明了我们刚刚定义好的消息队列和交换机以及对应的绑定关系,并且我们的数据也是成功插入到消息队列中:

java rabbitmq发送消息,java,java-rabbitmq,rabbitmq

现在我们再来看看如何创建一个消费者,因为消费者实际上就是一直等待消息然后进行处理的角色,这里我们只需要创建一个监听器就行了,它会一直等待消息到来然后再进行处理:


 
 
  1. @Component //注册为Bean
  2. public class TestListener {
  3. @RabbitListener(queues = "yyds") //定义此方法为队列yyds的监听器,一旦监听到新的消息,就会接受并处理
  4. public void test (Message message){
  5. System.out.println( new String(message.getBody()));
  6. }
  7. }

接着我们启动服务器:

java rabbitmq发送消息,java,java-rabbitmq,rabbitmq

可以看到控制台成功输出了我们之前放入队列的消息,并且管理页面中也显示此消费者已经连接了:

java rabbitmq发送消息,java,java-rabbitmq,rabbitmq

接着我们再通过管理页面添加新的消息看看,也是可以正常进行接受的。

当然,如果我们需要确保消息能够被消费者接受并处理,然后得到消费者的反馈,也是可以的:


 
 
  1. @Test
  2. void publisher () {
  3. //会等待消费者消费然后返回响应结果
  4. Object res = template.convertSendAndReceive( "amq.direct", "my-yyds", "Hello World!");
  5. System.out.println( "收到消费者响应:"+res);
  6. }

消费者这边只需要返回一个对应的结果即可:


 
 
  1. @RabbitListener(queues = "yyds")
  2. public String receiver (String data){
  3. System.out.println( "一号消息队列监听器 "+data);
  4. return "收到!";
  5. }

测试没有问题:

java rabbitmq发送消息,java,java-rabbitmq,rabbitmq

那么如果我们需要直接接收一个JSON格式的消息,并且希望直接获取到实体类呢?


 
 
  1. @Data
  2. public class User {
  3. int id;
  4. String name;
  5. }

 
 
  1. @Configuration
  2. public class RabbitConfiguration {
  3. ...
  4. @Bean("jacksonConverter") //直接创建一个用于JSON转换的Bean
  5. public Jackson2JsonMessageConverter converter (){
  6. return new Jackson2JsonMessageConverter();
  7. }
  8. }

接着我们只需要指定转换器就可以了:


 
 
  1. @Component
  2. public class TestListener {
  3. //指定messageConverter为我们刚刚创建的Bean名称
  4. @RabbitListener(queues = "yyds", messageConverter = "jacksonConverter")
  5. public void receiver (User user){ //直接接收User类型
  6. System.out.println(user);
  7. }
  8. }

现在我们直接在管理页面发送:

{"id":1,"name":"LB"}
 
 

java rabbitmq发送消息,java,java-rabbitmq,rabbitmq

可以看到成功完成了转换,并输出了用户信息:

java rabbitmq发送消息,java,java-rabbitmq,rabbitmq

同样的,我们也可以直接发送User,因为我们刚刚已经配置了Jackson2JsonMessageConverter为Bean,所以直接使用就可以了:


 
 
  1. @Test
  2. void publisher () {
  3. template.convertAndSend( "amq.direct", "yyds", new User());
  4. }

可以看到后台的数据类型为:

java rabbitmq发送消息,java,java-rabbitmq,rabbitmq

java rabbitmq发送消息,java,java-rabbitmq,rabbitmq

这样,我们就通过SpringBoot实现了RabbitMQ的简单使用。文章来源地址https://www.toymoban.com/news/detail-686612.html

到了这里,关于使用Java进行操作RabbitMQ的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用Java进行操作RabbitMQ

    使用Java操作消息队列 现在我们来看看如何通过Java连接到RabbitMQ服务器并使用消息队列进行消息发送(这里一起讲解,包括Java基础版本和SpringBoot版本),首先我们使用最基本的Java客户端连接方式: 依赖导入之后,我们来实现一下生产者和消费者,首先是生产者,生产者负责

    2024年02月10日
    浏览(31)
  • RabbitMQ可靠性消息发送(java实现)

    本博客属于 《RabbitMQ基础组件封装—整体结构》的子博客 step1:消息落库,业务数据存库的同时,也要将消息记录存入数据库,二者要保证原子性; step2:Producer发送消息到MQ Broker; step3:Producer收到 broker 返回的确认消息; step4:更改消息记录库的状态(定义三种状态:0待确

    2024年02月04日
    浏览(68)
  • 如何使用RabbitMQ发送和接收消息

    本文介绍了如何使用RabbitMQ的Python客户端库pika来发送和接收消息,并提供了示例代码。读者可以根据自己的需求修改代码,例如修改队列名称、发送不同的消息等。 RabbitMQ 是一个开源的消息队列软件,可以用于在应用程序之间传递消息。下面是一个使用 RabbitMQ 的流程和代码

    2024年02月15日
    浏览(44)
  • 使用C#和RabbitMQ发送和接收消息

    通过NuGet安装 RabbitMQ.Client 以下是一个简单的示例代码,演示如何使用 C# 和 RabbitMQ 客户端库来发送和接收消息: durable持久化 durable 参数用于指定队列是否是持久化的。 当 durable 参数设置为 true 时,表示队列是持久化的。持久化的队列会在RabbitMQ服务器重启后仍然存在,确保

    2024年02月11日
    浏览(42)
  • 使用RabbitMQ控制台查看和发送消息

    控制台发消息 点击queues -- Publish message 发送消息 Headers 设置contentType application/json Payload里填上发送内容,点击Publish message 就可以 控制台查看消息 点击queues -- Get Message(s) -- Payload里查看内容 get message只是查看消息,不会消费该条消息

    2024年02月10日
    浏览(57)
  • C#使用RabbitMQ发送和接收消息工具类

    下面是一个简单的 C# RabbitMQ 发送和接收消息的封装工具类的示例代码: 通过NuGet安装 RabbitMQ.Client

    2024年02月11日
    浏览(52)
  • 使用rabbitmq进行支付之后的消息通知

    订单服务完成支付后将支付结果发给每一个与订单服务对接的微服务,订单服务将消息发给交换机,由交换机广播消息,每个订阅消息的微服务都可以接收到支付结果. 微服务收到支付结果根据订单的类型去更新自己的业务数据。 使用消息队列进行异步通知需要保证消息的可

    2024年02月10日
    浏览(45)
  • RabbitMQ的基本使用,进行实例案例的消息队列

    目录 一、介绍 1. 概述 2. 作用 3. 工作原理 二、RabbitMQ安装部署 1. 安装 2. 部署 3. 增加用户 三、实现案例 1. 项目创建 2. 项目配置 3. 生产者代码 4. 消费者代码 四、测试 每篇一获 RabbitMQ  是一种开源的消息代理和队列服务器,用于通过简单和可扩展的方式在分布式系统中传递

    2024年01月20日
    浏览(34)
  • 90、RabbitMQ如何确保消息发送?消息接收?

    信道需要设置为 confirm 模式,则所有在信道上发布的消息都会分配一个唯一 ID。 一旦消息被投递到queue(可持久化的消息需要写入磁盘),信道会发送一个确认给生产者(包含消息唯一 ID) 如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack (未确认) 消息给生产者。

    2024年02月16日
    浏览(44)
  • JAVA 使用WebSocket发送通知消息

    注: 1、jdk必须要1.8及以上 2、项目若有接口拦截过滤,WebSocket接口必须要配置拦截,使其可以验证通过 WebSocket 业务类 发送消息的方法 前端代码

    2024年02月11日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包