RabbitMQ的6种工作模式

这篇具有很好参考价值的文章主要介绍了RabbitMQ的6种工作模式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ的6种工作模式

官方文档:

http://www.rabbitmq.com/

https://www.rabbitmq.com/getstarted.html

RabbitMQ 常见的 6 种工作模式:
RabbitMQ的6种工作模式,rabbitmq,rabbitmq

1、simple简单模式

RabbitMQ的6种工作模式,rabbitmq,rabbitmq

1)、消息产生后将消息放入队列。

2)、消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。

3)、存在的问题:消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失。

4)、应用场景:聊天(中间有一个过度的服务器)。

5)、代码实现:

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>rabbitmq-java</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>
    </dependencies>

</project>

工具类

package com.example;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {

    // 连接rabbitmq服务,共享一个工厂对象
    private static ConnectionFactory factory;

    static {
        factory=new ConnectionFactory();
        //设置rabbitmq属性
        factory.setHost("127.0.0.1");
        factory.setUsername("zsx242030");
        factory.setPassword("zsx242030");
        factory.setVirtualHost("/");
        factory.setPort(5672);
    }
    public static Connection getConnection(){
        Connection connection=null;
        try {
            //获取连接对象
            connection = factory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return connection;
    }
}

消息提供者

package com.example.simple;

import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Provider {

    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //通过通道创建队列,后续所有的操作都是基于channel实现(队列也可以由消费方创建)
            channel.queueDeclare("queue1", false, false, false, null);
            //向队列中发送消息
            channel.basicPublish("", "queue1", null, "Hello RabbitMQ!!!".getBytes());
            //断开连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者

package com.example.simple;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //监听队列中的消息(消费的是队列,而不是交换机)
            channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
消费者获得消息为:Hello RabbitMQ!!!

2、work工作模式(资源的竞争)

RabbitMQ的6种工作模式,rabbitmq,rabbitmq

1)、消息产生者将消息放入队列,消费者可以有多个,消费者1,消费者2,同时监听同一个队列。消息被消费,

C1 和 C2 共同争抢当前的消息队列内容,谁先拿到谁负责消费消息。

2)、存在的问题:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关

(synchronized,与同步锁的性能不一样),保证一条消息只能被一个消费者使用。

3)、应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到

消息队列中,空闲的系统自动争抢);对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

4)、代码实现:

消息提供者

package com.example.work;

import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Provider {

    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //通过通道创建队列
            channel.queueDeclare("queue1", false, false, false, null);
            //向队列中发送消息
            for (int i = 1; i <= 10; i++) {
                channel.basicPublish("", "queue1", null, ("Hello RabbitMQ!!!" + i).getBytes());
            }
            //断开连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者1

package com.example.work;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //监听队列中的消息
            channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            // connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者2

package com.example.work;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //监听队列中的消息
            channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            // connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!10

3、publish/subscribe发布订阅(共享资源)

RabbitMQ的6种工作模式,rabbitmq,rabbitmq

1)、X代表交换机,rabbitMQ 内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消

息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费。

Exchange 有常见以下 3 种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。

  • Direct:定向,把消息交给符合指定 routing key 的队列。

  • Topic:通配符,把消息交给符合 routing pattern (路由模式)的队列。

Exchange (交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者

没有符合路由规则的队列,那么消息会丢失。

2)相关场景:邮件群发,群聊天,广播(广告)。

3)、代码实现:

消息提供者

package com.example.publishsubscribe;

import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建
public class Provider {

    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)
            // 1.参数一:交换机名称    参数二:交换机类型
            channel.exchangeDeclare("fanout_exchange", "fanout");
            //通过通道创建队列
            //channel.queueDeclare("queue1",false,false,false,null);
            //向队列中发送消息
            for (int i = 1; i <= 10; i++) {
                channel.basicPublish("fanout_exchange", "", null, ("Hello RabbitMQ!!!" + i).getBytes());
            }
            //断开连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者1

package com.example.publishsubscribe;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {

    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("fanout_queue1", false, false, false, null);
            //给队列绑定交换机
            channel.queueBind("fanout_queue1", "fanout_exchange", "");
            //监听队列中的消息
            channel.basicConsume("fanout_queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者2

package com.example.publishsubscribe;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {

    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("fanout_queue2", false, false, false, null);
            //给队列绑定交换机
            channel.queueBind("fanout_queue2", "fanout_exchange", "");
            //监听队列中的消息
            channel.basicConsume("fanout_queue2", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10
消费者2获得消息为:Hello RabbitMQ!!!1
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!3
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!5
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!7
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!10

4、routing路由模式

RabbitMQ的6种工作模式,rabbitmq,rabbitmq

1)、消息生产者将消息发送给交换机按照路由判断,路由是字符串,当前产生的消息携带路由字符,交换机根据路

由的 key,只能匹配上路由 key 对应的消息队列,对应的消费者才能消费消息。队列与交换机的绑定,不能是任意

绑定了,而是要指定一个 RoutingKey (路由 key)。消息的发送方在向 Exchange 发送消息时,也必须指定消息的

RoutingKey 。Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列

的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息。

2)、根据业务功能定义路由字符串。

3)、从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

4)、业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可

以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误。

5)、代码实现:

消息提供者

package com.example.souting;

import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建

public class Provider {

    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)
            // 1.参数一:交换机名称    参数二:交换机类型
            channel.exchangeDeclare("direct_exchange", "direct");
            //向队列中发送消息
            for (int i = 1; i <= 10; i++) {
                channel.basicPublish("direct_exchange",
                        //设置路由键,符合路由键的队列,才能拿到消息
                        "insert",
                        null,
                        ("Hello RabbitMQ!!!" + i).getBytes());
            }
            //断开连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者1

package com.example.souting;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {

    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("direct_queue1", false, false, false, null);
            //绑定交换机(routingKey:路由键)
            channel.queueBind("direct_queue1", "direct_exchange", "select");
            channel.queueBind("direct_queue1", "direct_exchange", "insert");
            //监听队列中的消息
            channel.basicConsume("direct_queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者2

package com.example.souting;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("direct_queue2", false, false, false, null);
            //绑定交换机(routingKey:路由键)
            channel.queueBind("direct_queue2", "direct_exchange", "delete");
            channel.queueBind("direct_queue2", "direct_exchange", "select");
            //监听队列中的消息
            channel.basicConsume("direct_queue2", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10

5、topic 主题模式(路由模式的一种)

RabbitMQ的6种工作模式,rabbitmq,rabbitmq

1)、Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型

Exchange 可以让队列在绑定 Routing key 的时候使用通配符。

2)、Routingkey 一般都是有一个或多个单词组成,多个单词之间以 . 分割,例如:item.insert。

通配符规则:

# :匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.# :能够匹配item.insert.abc或者item.insert

item.* :只能匹配 item.insert

usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到

#.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

3)、路由功能添加模糊匹配。

4)、消息产生者产生消息,把消息交给交换机。

5)、交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费。

6)、代码实现:

消息提供者

package com.example.topic;


import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建

public class Provider {

    public static void main(String[] args) {
        try {
            //获取连接对象
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)   //1.参数一:交换机名称    参数二:交换机类型
            channel.exchangeDeclare("topic_exchange", "topic");
            //向队列中发送消息
            for (int i = 1; i <= 10; i++) {
                channel.basicPublish("topic_exchange",
                        // #:匹配0-n个单词(之间以.区分,两点之间算一个单词,可以匹配hello world空格的情况)   *(匹配一个单词)
                        "emp.hello world",
                        null,
                        ("Hello RabbitMQ!!!" + i).getBytes());
            }
            //断开连接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者1

package com.example.topic;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("topic_queue1", false, false, false, null);
            //绑定交换机(routingKey:路由键)  #:匹配0-n个单词(之间以.区分,两点之间算一个单词)
            channel.queueBind("topic_queue1", "topic_exchange", "emp.#");
            //监听队列中的消息
            channel.basicConsume("topic_queue1", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者2

package com.example.topic;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) {
        try {
            Connection connection = ConnectionUtil.getConnection();
            //获取通道对象
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("topic_queue2", false, false, false, null);
            //绑定交换机(routingKey:路由键)  *:匹配1个单词(之间以.区分,两点之间算一个单词)
            channel.queueBind("topic_queue2", "topic_exchange", "emp.*");
            //监听队列中的消息
            channel.basicConsume("topic_queue2", true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
                }
            });
            //消费方不需要关闭连接,保持一直监听队列状态
            // channel.close();
            //connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10

6、RPC

RabbitMQ的6种工作模式,rabbitmq,rabbitmq

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1)、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2)、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。

3)、服务端将RPC方法 的结果发送到RPC响应队列。

4)、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

5)、代码实现:

Client端

package com.example.rpc;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Client {

    public static void main(String[] argv) throws IOException, InterruptedException {
        String message = "Hello World!!!";
        // 建立一个连接和一个通道,并为回调声明一个唯一的回调队列
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 定义一个临时变量的接受队列名
        String replyQueueName = channel.queueDeclare().getQueue();
        // 生成一个唯一的字符串作为回调队列的编号
        String corrId = UUID.randomUUID().toString();
        // 发送请求消息,消息使用了两个属性:replyTo和correlationId
        // 服务端根据replyTo返回结果,客户端根据correlationId判断响应是不是给自己的
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
                .build();
        // 发布一个消息,rpc_queue路由规则
        channel.basicPublish("", "rpc_queue", props, message.getBytes("UTF-8"));
        // 由于我们的消费者交易处理是在单独的线程中进行的,因此我们需要在响应到达之前暂停主线程。
        // 这里我们创建的容量为1的阻塞队列ArrayBlockingQueue,因为我们只需要等待一个响应。
        final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
        // String basicConsume(String queue, boolean autoAck, Consumer callback)
        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                //检查它的correlationId是否是我们所要找的那个
                if (properties.getCorrelationId().equals(corrId)) {
                    //如果是,则响应BlockingQueue
                    response.offer(new String(body, "UTF-8"));
                }
            }
        });
        System.out.println(" 客户端请求的结果:" + response.take());
    }
}

Server端

package com.example.rpc;

import com.example.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Server {

    public static void main(String[] args) {
        Connection connection = null;
        try {
            connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare("rpc_queue", false, false, false, null);
            channel.basicQos(1);
            System.out.println("Awaiting RPC requests:");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(properties.getCorrelationId()).build();
                    String response = "";
                    try {
                        response = new String(body, "UTF-8");
                        System.out.println("response (" + response + ")");
                    } catch (RuntimeException e) {
                        System.out.println("错误信息 " + e.toString());
                    } finally {
                        // 返回处理结果队列
                        channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                        // 确认消息,已经收到后面参数 multiple:是否批量.true:将一次性确认所有小于envelope.getDeliveryTag()的消息。
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        // RabbitMq consumer worker thread notifies the RPC
                        // server owner thread
                        synchronized (this) {
                            this.notify();
                        }
                    }
                }
            };
            // 取消自动确认
            boolean autoAck = false;
            channel.basicConsume("rpc_queue", autoAck, consumer);
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (consumer) {
                    try {
                        consumer.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
Awaiting RPC requests:
response (Hello World!!!)
response (Hello World!!!)
response (Hello World!!!)

# 客戶端发起3次请求
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!

7、发布订阅模式与工作队列模式的区别

1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。

2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使

用默认交换机)。

3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将

队列绑定到默认的交换机 。文章来源地址https://www.toymoban.com/news/detail-631845.html

到了这里,关于RabbitMQ的6种工作模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ的工作模式

    RabbitMQ 的工作模式 一 .simple 模式(即最简单的收发模式) 二 .work 工作模式 ( 资源的竞争 ) publish_subscribe 发布订阅 (../../../../../0 马士兵 / 新建文件夹 /BAT 面试突击资料 (1)/ 整理 /BAT 面试突击资料 /15- 消息中间件 MQ 面试题( 2020 最新 版) .assets/publish_subscribe 发布订阅 ( 共享资

    2024年02月06日
    浏览(61)
  • RabbitMQ 工作模式介绍

    RabbitMQ 是一个消息代理:它接受并转发消息。您可以将其视为邮局:当您将要邮寄的邮件放入邮箱时,您可以确定信使最终会将邮件交付给您的收件人。在这个类比中,RabbitMQ是一个邮政信箱,一个邮局和一个信件载体。 RabbitMQ 和邮局之间的主要区别在于它不处理纸张,而是

    2024年02月06日
    浏览(32)
  • 【RabbitMQ】Spring整合RabbitMQ、Spring实现RabbitMQ五大工作模式(万字长文)

    目录 一、准备 1、创建maven项目​编辑 2、引入依赖 3、创建配置文件 1.RabbitMQ配置文件 2.生产者项目配置文件 3.消费者项目配置文件 二、生产者xml中文件创建队列 三、生产者xml文件中创建交换机以及绑定队列 1、创建交换机 2、绑定队列  四、消费者xml文件中创建队列消息监

    2024年01月21日
    浏览(31)
  • 消息队列之RabbitMQ工作模式

    提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 消息队列之RabbitMQ工作模式 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 提示:这里可以添加本文要记录的大概内容: 在这篇博客中,我将深入探讨 RabbitMQ 的工作模式,带你

    2024年01月18日
    浏览(35)
  • RabbitMQ消息队列的工作模式

    官方文档地址:https://www.rabbitmq.com/getstarted.html 工作模式其实就是消息队列分发消息的路由方式。 RabbitMQ常用的几种工作模式: 简单模式 WorkQueues工作队列模式 PubSub生产者/PubSub消费者模式 Routing路由模式 Topics通配符模式 发布/订阅模式(Publish/Subscribe):该模式用于一对多的

    2024年02月15日
    浏览(35)
  • RabbitMQ的6种工作模式

    官方文档: http://www.rabbitmq.com/ https://www.rabbitmq.com/getstarted.html RabbitMQ 常见的 6 种工作模式: 1)、消息产生后将消息放入队列。 2)、消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。 3)、存在的问题:消息可能没有被消费者正确处

    2024年02月14日
    浏览(27)
  • RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 & 页面分析

    RabbitMQ作为一款常用的消息中间件,在微服务项目中得到大量应用,其本身是微服务中的重点和难点,有不少概念我自己的也是一知半解,本系列博客尝试结合实际应用场景阐述RabbitMQ的应用,分析其为什么使用,并给出怎么用的案例。 本篇博客结合场景来阐述RabbitMQ的几种模

    2024年02月07日
    浏览(32)
  • 消息队列——rabbitmq的不同工作模式

    目录 Work queues 工作队列模式  Pub/Sub 订阅模式 Routing路由模式 Topics通配符模式   工作模式总结 C1和C2属于竞争关系,一个消息只有一个消费者可以取到。  代码部分只需要用两个消费者进程监听同一个队里即可。 两个消费者呈现竞争关系。 用一个生产者推送10条消息 两个监

    2024年02月16日
    浏览(37)
  • RabbitMQ常用工作模式+整合springboot

    目录 1.MQ的相关概念 1.1 什么是MQ消息中间件 1.2 为什么使用MQ (1) 应用解耦 (2) 异步提速   (3)削峰填谷 1.3 使用MQ的劣势 1.4 常见的MQ组件​​​​​​​ 2. RabbitMQ的概述 2.1 RabbitMQ的概念 2.2 RabbitMQ的原理 2.3 安装RabbitMQ 3. RabbitMQ 的工作模式 3.1 simple (简单模式) 3.2 Work queues(工作模

    2024年02月15日
    浏览(30)
  • RabbitMQ Exchange类型和工作模式介绍

    RabbitMQ常用的交换器类型有: fanout、 direct、 topic、 headers四种。 会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中 如图: direct类型的交换器路由规则很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的 队列中 ,如下图: topic类型的交换器在direc

    2024年02月11日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包