RabbitMQ:使用Java进行操作

这篇具有很好参考价值的文章主要介绍了RabbitMQ:使用Java进行操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用Java操作消息队列

现在我们来看看如何通过Java连接到RabbitMQ服务器并使用消息队列进行消息发送(这里一起讲解,包括Java基础版本和SpringBoot版本),首先我们使用最基本的Java客户端连接方式:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.2</version>
</dependency>

依赖导入之后,我们来实现一下生产者和消费者,首先是生产者,生产者负责将信息发送到消息队列:

public static void main(String[] args) {
    //使用ConnectionFactory来创建连接
    ConnectionFactory factory = new ConnectionFactory();

    //设定连接信息,基操
    factory.setHost("192.168.0.12");
    factory.setPort(5672);  //注意这里写5672,是amqp协议端口
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setVirtualHost("/test");
  
 		//创建连接
    try(Connection connection = factory.newConnection()){
        
    }catch (Exception e){
        e.printStackTrace();
    }
}

这里我们可以直接在程序中定义并创建消息队列(实际上是和我们在管理页面创建一样的效果)客户端需要通过连接创建一个新的通道(Channel),同一个连接下可以有很多个通道,这样就不用创建很多个连接也能支持分开发送了。

try(Connection connection = factory.newConnection();
    Channel channel = connection.createChannel()){   //通过Connection创建新的Channel
  	//声明队列,如果此队列不存在,会自动创建
    channel.queueDeclare("yyds", false, false, false, null);
  	//将队列绑定到交换机
    channel.queueBind("yyds", "amq.direct", "my-yyds");
  	//发布新的消息,注意消息需要转换为byte[]
    channel.basicPublish("amq.direct", "my-yyds", null, "Hello World!".getBytes());
}catch (Exception e){
    e.printStackTrace();
}

其中queueDeclare方法的参数如下:

  • queue:队列的名称(默认创建后routingKey和队列名称一致)
  • durable:是否持久化。
  • exclusive:是否排他,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。排他队列是基于Connection可见,同一个Connection的不同Channel是可以同时访问同一个连接创建的排他队列,并且,如果一个Connection已经声明了一个排他队列,其他的Connection是不允许建立同名的排他队列的,即使该队列是持久化的,一旦Connection关闭或者客户端退出,该排他队列都会自动被删除。
  • autoDelete:是否自动删除。
  • arguments:设置队列的其他一些参数,这里我们暂时不需要什么其他参数。

其中queueBind方法参数如下:

  • queue:需要绑定的队列名称。
  • exchange:需要绑定的交换机名称。
  • routingKey:不用多说了吧。

其中basicPublish方法的参数如下:

  • exchange: 对应的Exchange名称,我们这里就使用第二个直连交换机。
  • routingKey:这里我们填写绑定时指定的routingKey,其实和之前在管理页面操作一样。
  • props:其他的配置。
  • body:消息本体。

执行完成后,可以在管理页面中看到我们刚刚创建好的消息队列了:

rabbitmq java工具包,rabbitmq

并且此消息队列已经成功与amq.direct交换机进行绑定:

rabbitmq java工具包,rabbitmq

那么现在我们的消息队列中已经存在数据了,怎么将其读取出来呢?我们来看看如何创建一个消费者:

public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("10.37.129.4");
    factory.setPort(5672);
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setVirtualHost("/test");

    //这里不使用try-with-resource,因为消费者是一直等待新的消息到来,然后按照
    //我们设定的逻辑进行处理,所以这里不能在定义完成之后就关闭连接
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    //创建一个基本的消费者
    channel.basicConsume("yyds", false, (s, delivery) -> {
        System.out.println(new String(delivery.getBody()));
        //basicAck是确认应答,第一个参数是当前的消息标签,后面的参数是
        //是否批量处理消息队列中所有的消息,如果为false表示只处理当前消息
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        //basicNack是拒绝应答,最后一个参数表示是否将当前消息放回队列,如果
        //为false,那么消息就会被丢弃
        //channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
        //跟上面一样,最后一个参数为false,只不过这里省了
        //channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
    }, s -> {});
}

其中basicConsume方法参数如下:

●queue  -  消息队列名称,直接指定。
●autoAck - 自动应答,消费者从消息队列取出数据后,需要跟服务器进行确认应答,当服务器收到确认后,会自动将消息删除,如果开启自动应答,那么消息发出后会直接删除。
●deliver  -  消息接收后的函数回调,我们可以在回调中对消息进行处理,处理完成后,需要给服务器确认应答。
●cancel  -  当消费者取消订阅时进行的函数回调,这里暂时用不到。

现在我们启动一下消费者,可以看到立即读取到我们刚刚插入到队列中的数据:


我们现在继续在消息队列中插入新的数据,这里直接在网页上进行操作就行了,同样的我们也可以在消费者端接受并进行处理。

现在我们把刚刚创建好的消息队列删除。

官方文档:Spring AMQP

前面我们已经完成了RabbitMQ的安装和简单使用,并且通过Java连接到服务器。现在我们来尝试在SpringBoot中整合消息队列客户端,首先是依赖:
 

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

接着我们需要配置RabbitMQ的地址等信息:

spring:
  rabbitmq:
    addresses: 192.168.0.4
    username: admin
    password: admin
    virtual-host: /test

这样我们就完成了最基本信息配置,现在我们来看一下,如何像之前一样去声明一个消息队列,我们只需要一个配置类就行了:

@Configuration
public class RabbitConfiguration {
    @Bean("directExchange")  //定义交换机Bean,可以很多个
    public Exchange exchange(){
        return ExchangeBuilder.directExchange("amq.direct").build();
    }

    @Bean("yydsQueue")     //定义消息队列
    public Queue queue(){
        return QueueBuilder
          				.nonDurable("yyds")   //非持久化类型
          				.build();
    }

    @Bean("binding")
    public Binding binding(@Qualifier("directExchange") Exchange exchange,
                           @Qualifier("yydsQueue") Queue queue){
      	//将我们刚刚定义的交换机和队列进行绑定
        return BindingBuilder
                .bind(queue)   //绑定队列
                .to(exchange)  //到交换机
                .with("my-yyds")   //使用自定义的routingKey
                .noargs();
    }
}

接着我们来创建一个生产者,这里我们直接编写在测试用例中:

@SpringBootTest
class SpringCloudMqApplicationTests {

  	//RabbitTemplate为我们封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可
    @Resource
    RabbitTemplate template;

    @Test
    void publisher() {
      	//使用convertAndSend方法一步到位,参数基本和之前是一样的
      	//最后一个消息本体可以是Object类型,真是大大的方便
        template.convertAndSend("amq.direct", "my-yyds", "Hello World!");
    }

}

现在我们来运行一下这个测试用例:

rabbitmq java工具包,rabbitmq

可以看到后台自动声明了我们刚刚定义好的消息队列和交换机以及对应的绑定关系,并且我们的数据也是成功插入到消息队列中:

rabbitmq java工具包,rabbitmq

现在我们再来看看如何创建一个消费者,因为消费者实际上就是一直等待消息然后进行处理的角色,这里我们只需要创建一个监听器就行了,它会一直等待消息到来然后再进行处理:

@Component  //注册为Bean
public class TestListener {

    @RabbitListener(queues = "yyds")   //定义此方法为队列yyds的监听器,一旦监听到新的消息,就会接受并处理
    public void test(Message message){
        System.out.println(new String(message.getBody()));
    }
}

接着我们启动服务器:

rabbitmq java工具包,rabbitmq

可以看到控制台成功输出了我们之前放入队列的消息,并且管理页面中也显示此消费者已经连接了:

rabbitmq java工具包,rabbitmq

接着我们再通过管理页面添加新的消息看看,也是可以正常进行接受的。

当然,如果我们需要确保消息能够被消费者接受并处理,然后得到消费者的反馈,也是可以的:

@Test
void publisher() {
  	//会等待消费者消费然后返回响应结果
    Object res = template.convertSendAndReceive("amq.direct", "my-yyds", "Hello World!");
    System.out.println("收到消费者响应:"+res);
}

消费者这边只需要返回一个对应的结果即可:

@RabbitListener(queues = "yyds")
public String receiver(String data){
    System.out.println("一号消息队列监听器 "+data);
    return "收到!";
}

测试没有问题:

rabbitmq java工具包,rabbitmq

那么如果我们需要直接接收一个JSON格式的消息,并且希望直接获取到实体类呢?

@Data
public class User {
    int id;
    String name;
}
@Configuration
public class RabbitConfiguration {
  	...

    @Bean("jacksonConverter")   //直接创建一个用于JSON转换的Bean
    public Jackson2JsonMessageConverter converter(){
        return new Jackson2JsonMessageConverter();
    }
}

接着我们只需要指定转换器就可以了:

@Component
public class TestListener {

  	//指定messageConverter为我们刚刚创建的Bean名称
    @RabbitListener(queues = "yyds", messageConverter = "jacksonConverter")
    public void receiver(User user){  //直接接收User类型
        System.out.println(user);
    }
}

现在我们直接在管理页面发送:

{"id":1,"name":"LB"}

rabbitmq java工具包,rabbitmq

可以看到成功完成了转换,并输出了用户信息:

rabbitmq java工具包,rabbitmq

同样的,我们也可以直接发送User,因为我们刚刚已经配置了Jackson2JsonMessageConverter为Bean,所以直接使用就可以了:

@Test
void publisher() {
    template.convertAndSend("amq.direct", "yyds", new User());
}

可以看到后台的数据类型为:

rabbitmq java工具包,rabbitmq

rabbitmq java工具包,rabbitmq

这样,我们就通过SpringBoot实现了RabbitMQ的简单使用。文章来源地址https://www.toymoban.com/news/detail-598230.html

到了这里,关于RabbitMQ:使用Java进行操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【RabbitMQ】| 狮子带你(超详细)原生Java操作兔子队列

    RabbitMQ 是一种快速、灵活、可靠的消息传递方式,可用于构建分布式应用程序、异步处理任务、实现消息队列等。下面是 Java 原生操作 RabbitMQ 的一些好处和用途: 简单易用:RabbitMQ 提供了丰富的 Java 客户端库,开发者可以轻松地使用 Java 代码进行消息发送和接收,无需学习

    2024年02月01日
    浏览(25)
  • java操作rabbitmq实现简单的消息发送(socket编程的升级)

    准备: 1.下载rabbitmq并搭建环境(和python那篇一样:http://www.cnblogs.com/g177w/p/8176797.html) 2.下载支持的jar包(http://repo1.maven.org/maven2/com/rabbitmq/amqp-client) 生产者方(Productor.java): View Code 消费者方(Consummer.java):

    2023年04月08日
    浏览(32)
  • RabbitMQ的使用(JAVA)

    目录 一、MQ的简介 二、MQ的作用 1、异步处理(同时处理多件事情) ​编辑 2、应用解耦(添加了一个中间件) ​编辑 3、流量控制(当秒杀的时候,可以限制流量) 三、Docker安装MQ  四、MQ的执行顺序 五、其他 1、Exchange 类型 ①Exchange的direct(默认) ②Exchange的fanout ③Exchange的

    2023年04月08日
    浏览(18)
  • java中使用rabbitmq

    mq常用于业务解耦、流量削峰和异步通信,rabbitmq是使用范围较广,比较稳定的一款开源产品,接下来我们使用springboot的starter来引入rabbitmq,了解mq的几种使用模式,通过几个简单的案例,让你可以快速地了解到该使用哪种模式来对应业务场景,使用rabbitmq看这一篇就够了,下方附安

    2024年03月28日
    浏览(26)
  • Java - RabbitMq的安装&使用

    目录 一、Linux(ubuntu)安装RabbitMQ (1)首先确认Linux 内核版本,确定是Ubuntu还是CentOS版本。 (2)rabbitMq需要erlang语言的支持,在安装rabbitMq之前需要安装erlang。 (3)确定relang是否安装成功。 (4)更新一下软件包,确保软件最新版本。 (5)安装最新版本的RabbitMQ (5.1)安装

    2024年02月03日
    浏览(29)
  • RabbitMQ系列(5)--使用Java实现RabbitMQ的消费者接收消息

    前言:先简单了解RabbitMQ的工作过程,方便后续开发理清思路 简略: 详细: 1、新建消费者类 效果图: 2、编写消费者消费消息的代码 例: 3、查看代码运行结果 运行代码后如果有输出生产者发送的”Hello World”信息,则证明消费者消费消息成功 4、在web页面上查看队列的消

    2024年02月06日
    浏览(32)
  • Java RabbitMQ消息队列简单使用

    消息队列,即MQ,Message Queue。 消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

    2024年02月12日
    浏览(44)
  • Rabbitmq在java中的使用

    1.1、maven导入相关依赖 1.2、通用类及常用方法讲解 常用方法可以参考: RabbitMQ 常用方法介绍(二) 1.3、编写不同的交换机类型 Direct exchange(直连交换机) 直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的,步骤如下: 将一个队列绑定

    2024年02月06日
    浏览(31)
  • Java中使用RabbitMQ(持续更新中)

    前言 RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang ( https://www.erlang.org/downloads )。同时,在安装RabbitMQ 时需要注意RabbitMQ 与Erlang的对应版本,防止安装过程中出错。一般情况下,版本均选择新版本不会有太大问题,但还是需要注意下。可以

    2024年02月21日
    浏览(28)
  • Java扫描区块链的工具包|Java扫块|监听token转账

    Magician-Scanning是一个用Java开发的扫描区块链的工具包,当我们在程序中需要一些功能时,它可以派上用场,比如说。 当一个地址收到ETH时,程序中的一个方法会被自动触发,这个交易会被传入该方法。 当一个合约的某个功能被调用时(比如 ERC20 转账),它会自动触发程序中

    2024年01月17日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包