rabbitmq第二课-RabbitMQ核心编程模型以及消息应用场景详解

这篇具有很好参考价值的文章主要介绍了rabbitmq第二课-RabbitMQ核心编程模型以及消息应用场景详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、回顾RabbitMQ基础概念

rabbitmq第二课-RabbitMQ核心编程模型以及消息应用场景详解

二、RabbitMQ基础编程模型

使用RabbitMQ提供的原生客户端API进行交互。这是使用RabbitMQ的基础。

1.1、maven依赖

<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.9.0</version>
</dependency>

1.2、基础编程模型

1.首先创建连接,获取Channel

	private static final String HOST_NAME="192.168.56.10";
	private static final int HOST_PORT=5672;
	ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(HOST_NAME);
			factory.setPort(HOST_PORT);
			factory.setUsername("guest");
			factory.setPassword("guest");
			factory.setVirtualHost("/");
			connection = factory.newConnection();

2.声明Exchange-可选

channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException

3、声明queue

channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);

4、声明Exchange与Queue的绑定关系-可选

channel.queueBind(String queue, String exchange, String routingKey) throws IOException;

总结:说白了,就是声明一个交换机和队列,然后进行绑定,至于Channel和连接connection实际是一种物理概念,连接rabbitmq使用的,
当然Channel也可以声明多个,但是需要注意命名!

  • 在创建channel时,可以在createChannel方法中传入一个分配的int参数channelNumber。这个ChannelNumber就会作为Channel的唯一标识。而RabbitMQ防止ChannelNumber重复的方式是:如果对应的Channel没有创建过,就会创建一个新的Channel。但是如果ChannelNumber已经创建过一个Channel了,这时就会返回一个null。

5、Producer根据应用场景发送消息到queue

channel.basicPublish(String exchange, String routingKey, BasicProperties props,message.getBytes("UTF-8")) ;

6、Consumer消费消息

  • 1、被动消费模式

    Consumer等待rabbitMQ 服务器将message推送过来再消费。一般是启一个一直挂起的线程来等待。

channel.basicConsume(String queue, boolean autoAck, Consumer callback);
  • 另一种是主动消费模式
    Comsumer主动到rabbitMQ服务器上去拉取messge进行消费。
GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck)

其中需要注意点的是autoAck。autoAck为true则表示消息被Consumer消费成功后,后续就无法再消费了。而如果autoAck设置为false,就需要在处理过程中手动去调用channel的basicAck方法进行应答。如果
不应答的话,这个消息同样会继续被Consumer重复处理。所以这里要注意,如果消费者一直不对消息进行应答,那么消息就会不断的发起重试,这就会不断的消耗系统资源,最终造成服务宕机。

7、完成以后关闭连接,释放资源

channel.close();
conection.clouse();

三、RabbitMQ常用的消息场景

rabbitmq第二课-RabbitMQ核心编程模型以及消息应用场景详解
rabbitmq第二课-RabbitMQ核心编程模型以及消息应用场景详解

  1. hello world体验
    rabbitmq第二课-RabbitMQ核心编程模型以及消息应用场景详解
    最直接的方式,P端发送一个消息到一个指定的queue,中间不需要任何exchange规则。C端按queue方式进行消费
//生产者
public static void main(String[] args) throws Exception {
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		//声明队列会在服务端自动创建。
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		String message = "Hello World!333";

		AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
		builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode());
		builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority());
		//携带消息ID
		builder.messageId(""+channel.getNextPublishSeqNo());
		Map<String, Object> headers = new HashMap<>();
		//携带订单号
		headers.put("order", "123");
		builder.headers(headers);

		channel.basicPublish("", QUEUE_NAME, builder.build(), message.getBytes("UTF-8"));
		System.out.println(" [x] Sent '" + message + "'");
		
		channel.close();
		connection.close();
	}
//消费者
public static void main(String[] args) throws Exception {
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//Consumer接口还一个实现QueueConsuemr 但是代码注释过期了。
		Consumer myconsumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body)
					throws IOException {
				 System.out.println("========================");
				 String routingKey = envelope.getRoutingKey();
				 System.out.println("routingKey >"+routingKey);
				 String contentType = properties.getContentType();
				 System.out.println("contentType >"+contentType);
				 long deliveryTag = envelope.getDeliveryTag();
				 System.out.println("deliveryTag >"+deliveryTag);
				 System.out.println("content:"+new String(body,"UTF-8"));
				System.out.println("messageId:"+properties.getMessageId());
				properties.getHeaders().forEach((key,value)-> System.out.println("key: "+key +"; value: "+value));
				// (process the message components here ...)
				 //消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
				 //没有答复过的消息,服务器会一直不停转发。
				 channel.basicAck(deliveryTag, false);
			}
		};
		
		channel.basicConsume(QUEUE_NAME, false, myconsumer);
	}
  1. Work queues 工作序列
    rabbitmq第二课-RabbitMQ核心编程模型以及消息应用场景详解

工作任务模式,领导部署一个任务,由下面的一个员工来处理,Producer消息发送给queue,多个Consumer同时往队列上消费消息。

/**
	 * 发布一个task,交由多个Worker去处理。 每个task只要由一个Worker完成就行。
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(RabbitMQUtil.QUEUE_WORK, true, false, false, null);
		for (int i = 0;i<5;i++){
			String message = "task 1";
			channel.basicPublish("", RabbitMQUtil.QUEUE_WORK,
					MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

		}

		channel.close();
		connection.close();
	}
//消费者1
public static void main(String[] args) throws Exception{
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		//这个任务场景一般任务不能因为rabbitmq崩溃而消失,所以把第二个是否持久化设置成true。
		//这样,即使rabbitmq服务重启,任务不会丢失
		channel.queueDeclare(RabbitMQUtil.QUEUE_WORK, true, false, false, null);
		//每个worker同时最多只处理一个消息
		channel.basicQos(1);
		Consumer myconsumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body)
					throws IOException {
				 System.out.println("========================");
				 String routingKey = envelope.getRoutingKey();
				 System.out.println("routingKey >"+routingKey);
				 String contentType = properties.getContentType();
				 System.out.println("contentType >"+contentType);
				 long deliveryTag = envelope.getDeliveryTag();
				 System.out.println("deliveryTag >"+deliveryTag);
				 System.out.println("content:"+new String(body,"UTF-8"));
				 // (process the message components here ...)
				 channel.basicAck(deliveryTag, false);
				}
			};
			channel.basicConsume(RabbitMQUtil.QUEUE_WORK, myconsumer);
	}
//消费者2
public static void main(String[] args) throws Exception{
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		//这个任务场景一般任务不能因为rabbitmq崩溃而消失,所以把第二个是否持久化设置成true。
		//这样,即使rabbitmq服务重启,任务不会丢失
		channel.queueDeclare(RabbitMQUtil.QUEUE_WORK, true, false, false, null);
		//每个worker同时最多只处理一个消息
		channel.basicQos(1);
		Consumer myconsumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body)
					throws IOException {
				 System.out.println("========================");
				 String routingKey = envelope.getRoutingKey();
				 System.out.println("routingKey >"+routingKey);
				 String contentType = properties.getContentType();
				 System.out.println("contentType >"+contentType);
				 long deliveryTag = envelope.getDeliveryTag();
				 System.out.println("deliveryTag >"+deliveryTag);
				 System.out.println("content:"+new String(body,"UTF-8"));
				 // (process the message components here ...)
				 channel.basicAck(deliveryTag, false);
				}
			};
			channel.basicConsume(RabbitMQUtil.QUEUE_WORK, myconsumer);
	}

rabbitmq第二课-RabbitMQ核心编程模型以及消息应用场景详解
rabbitmq第二课-RabbitMQ核心编程模型以及消息应用场景详解
这个模式应该是最常用的模式,也是官网讨论比较详细的一种模式,所以官网上也对这种模式做了重点讲述。

  1. 首先。Consumer端的autoAck字段设置的是false,这表示consumer在接收到消息后不会自动反馈服务器已消费了message,而要改在对message处理完成了之后,再调用channel.basicAck来通知服务器已经消费了该message.这样即使Consumer在执行message过程中出问题了,也不会造成message被忽略,因为没有ack的message会被服务器重新进行投递。
    但是,这其中也要注意一个很常见的BUG,就是如果所有的consumer都忘记调用basicAck()了,就会造成message被不停的分发,也就造成不断的消耗系统资源。这也就是 Poison Message(毒消息)
  2. 其次,官方特意提到的message的持久性。关键的message不能因为服务出现问题而被忽略。还要注意,官方特意提到,所有的queue是不能被多次定义的。如果一个queue在开始时被声明为durable,那在后面再次声明这个queue时,即使声明为 not durable,那这个queue的结果也还是durable的。
  3. 然后,是中间件最为关键的分发方式。这里,RabbitMQ默认是采用的fair dispatch,也叫round-robin模式,就是把消息轮询,在所有consumer中轮流发送。这种方式,没有考虑消息处理的复杂度以及consumer的处理能力。而他们改进后的方案,是consumer可以向服务器声明一个prefetchCount,我把他叫做预处理能力值。channel.basicQos(prefetchCount);表示当前这个consumer可以同时处理几个message。这样服务器在进行消息发送前,会检查这个consumer当前正在处理中的message(message已经发送,但是未收到consumer的basicAck)有几个,如果超过了这个consumer节点的能力值,就不再往这个consumer发布。

3.Publish/Subscribe 订阅 发布 机制

rabbitmq第二课-RabbitMQ核心编程模型以及消息应用场景详解

这个机制是对上面的一种补充。也就是把preducer与Consumer进行进一步的解耦。producer只负责发送消息,至于消息进入哪个queue,由exchange来分配。如上图,就是把producer发送的消息,交由exchange同时发送到两个queue里,然后由不同的Consumer去进行消费。

/**
	 * exchange有四种类型, fanout topic headers direct
	 * fanout类型的exchange会往其上绑定的所有queue转发消息。
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception{
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		//发送者只管往exchange里发消息,而不用关心具体发到哪些queue里。
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		String message = "LOG INFO 222";
		channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
		
		channel.close();
		connection.close();
		
		
	}
public static void main(String[] args) throws Exception {
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		
	    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
	    String queueName = channel.queueDeclare().getQueue();
		System.out.println(queueName);
	    channel.queueBind(queueName, EXCHANGE_NAME, "");

		Consumer myconsumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body)
					throws IOException {
				 System.out.println("========================");
				 String routingKey = envelope.getRoutingKey();
				 System.out.println("routingKey >"+routingKey);
				 String contentType = properties.getContentType();
				 System.out.println("contentType >"+contentType);
				 long deliveryTag = envelope.getDeliveryTag();
				 System.out.println("deliveryTag >"+deliveryTag);
				 System.out.println("content:"+new String(body,"UTF-8"));
				 // (process the message components here ...)
				 //消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
				 //没有答复过的消息,服务器会一直不停转发。
//				 channel.basicAck(deliveryTag, false);
			}
		};
		
		channel.basicConsume(queueName,true, myconsumer);
		
//		channel.close();
//		connection.close();
	}

关键处就是type为”fanout” 的exchange,这种类型的exchange只负责往所有已绑定的队列上发送消息。

  1. Routing 基于内容的路由
    type为”direct” 的exchange
    rabbitmq第二课-RabbitMQ核心编程模型以及消息应用场景详解
    这种模式一看图就清晰了。 在上一章 exchange 往所有队列发送消息的基础上,增加一个路由配置,指定exchange如何将不同类别的消息分发到不同的queue上。
/**
	 * exchange有四种类型, fanout topic headers direct
	 * direct类型的exchange会根据routingkey,将消息转发到该exchange上绑定了该routingkey的所有queue
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception{
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		//发送者只管往exchange里发消息,而不用关心具体发到哪些queue里。
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		String message = "LOG INFO 44444";
		channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes());
		channel.basicPublish(EXCHANGE_NAME, "debug", null, message.getBytes());
		channel.basicPublish(EXCHANGE_NAME, "warn", null, message.getBytes());

		channel.close();
		connection.close();
		
		
	}
 public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//        String queueName = channel.queueDeclare().getQueue();
        String queueName="direct_queue";
        channel.queueDeclare(queueName,false,false,false,null);

//        channel.queueBind(queueName, EXCHANGE_NAME, "info");
//        channel.queueBind(queueName, EXCHANGE_NAME, "debug");
        channel.queueBind(queueName, EXCHANGE_NAME, "warn");
        Consumer myconsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       BasicProperties properties, byte[] body)
                    throws IOException {
                System.out.println("========================");
                String routingKey = envelope.getRoutingKey();
                System.out.println("routingKey >" + routingKey);
                String contentType = properties.getContentType();
                System.out.println("contentType >" + contentType);
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("deliveryTag >" + deliveryTag);
                System.out.println("content:" + new String(body, "UTF-8"));
                // (process the message components here ...)
                //消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
                //没有答复过的消息,服务器会一直不停转发。
//				 channel.basicAck(deliveryTag, false);
            }
        };
        channel.basicConsume(queueName, false, myconsumer);
    }

5:Topics 基于话题的路由
type为"topic" 的exchange
rabbitmq第二课-RabbitMQ核心编程模型以及消息应用场景详解
这个模式也就在上一个模式的基础上,对routingKey进行了模糊匹配
单词之间用,隔开,* 代表一个具体的单词。# 代表0个或多个单词。

/**
	 * exchange有四种类型, fanout topic headers direct
	 * topic类型的exchange在根据routingkey转发消息时,可以对rouytingkey做一定的规则,比如anonymous.info可以被*.info匹配到。
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception{
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		//发送者只管往exchange里发消息,而不用关心具体发到哪些queue里。
		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
		String message = "LOG INFO";
		channel.basicPublish(EXCHANGE_NAME, "anonymous.info", null, message.getBytes());
		channel.basicPublish(EXCHANGE_NAME, "tuling.loulan.debug", null, message.getBytes());

		channel.close();
		connection.close();
	}
public static void main(String[] args) throws Exception {
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		
	    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
	    String queueName = channel.queueDeclare().getQueue();
	    //topic的routingkey,*代表一个具体的单词,#代表0个或多个单词。
	    channel.queueBind(queueName, EXCHANGE_NAME, "*.info");
	    channel.queueBind(queueName, EXCHANGE_NAME, "#.debug");
	    
		Consumer myconsumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body)
					throws IOException {
				 System.out.println("========================");
				 String routingKey = envelope.getRoutingKey();
				 System.out.println("routingKey >"+routingKey);
				 String contentType = properties.getContentType();
				 System.out.println("contentType >"+contentType);
				 long deliveryTag = envelope.getDeliveryTag();
				 System.out.println("deliveryTag >"+deliveryTag);
				 System.out.println("content:"+new String(body,"UTF-8"));
				 // (process the message components here ...)
				 //消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
				 //没有答复过的消息,服务器会一直不停转发。
//				 channel.basicAck(deliveryTag, false);
			}
		};
		channel.basicConsume(queueName,true, myconsumer);
	}

6:Publisher Confirms 发送者消息确认
RabbitMQ的消息可靠性是非常高的,但是他以往的机制都是保证消息发送到了MQ之后,可以推送到消费者消费,不会丢失消息。但是发送者发送消息是否成功是没有保证的
Producer.basicPublish方法是没有返回值的,也就是说,一次发送消息是否成功,应用是不知道的,这在业务上就容易造成消息丢失。而这个模块就是通过给发送者提供一些确认机制,来保证这个消息发送的过程是成功的。

发送者确认模式默认是不开启的,所以如果需要开启发送者确认模式,需要手动在channel中进行声明。

channel.confirmSelect();
  1. 发布单条消息
    即发布一条消息就确认一条消息。核心代码:
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
channel.basicPublish("", queue, null, body.getBytes());
channel.waitForConfirmsOrDie(5_000);
}

channel.waitForConfirmsOrDie(5_000);这个方法就会在channel端等待RabbitMQ给出一个响应,用来表明这个消息已经正确发送到了RabbitMQ服务端。但是要注意,这个方法会同步阻塞channel,在等待确认期间,channel将不能再继续发送消息,也就是说会明显降低集群的发送速度即吞吐量。

  1. 发送批量消息
    之前单条确认的机制会对系统的吞吐量造成很大的影响,所以稍微中和一点的方式就是发送一批消息后,再一起确认。
int batchSize = 100;
int outstandingMessageCount = 0;
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
		String body = String.valueOf(i);
		ch.basicPublish("", queue, null, body.getBytes());
		outstandingMessageCount++;
	if (outstandingMessageCount == batchSize) {
		ch.waitForConfirmsOrDie(5_000);
		outstandingMessageCount = 0;
	}
}
if (outstandingMessageCount > 0) {
	ch.waitForConfirmsOrDie(5_000);
}

这种方式可以稍微缓解下发送者确认模式对吞吐量的影响。但是也有个固有的问题就是,当确认出现异常时,发送者只能知道是这一批消息出问题了, 而无法确认具体是哪一条消息出了问题。所以接下来就需要增
加一个机制能够具体对每一条发送出错的消息进行处理

  1. 异步确认消息
    实现的方式也比较简单,Producer在channel中注册监听器来对消息进行确认。核心代码就是一个:
channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)

按说监听只要注册一个就可以了,那为什么这里要注册两个呢?成功一个,失败一个然后关于这个ConfirmCallback,这是个监听器接口,里面只有一个方法: void handle(long sequenceNumber, boolean multiple) throws IOException; 这方法中的两个参数,

  • sequenceNumer:这个是一个唯一的序列号,代表一个唯一的消息。在RabbitMQ中,他的消息体只是一个二进制数组,默认消息是没有序列号的。那么在回调的时候,Producer怎么知道是哪一条消息成功或者失败呢?RabbitMQ提供了一个方法
    int sequenceNumber =channel.getNextPublishSeqNo();
    来生成一个全局递增的序列号,这个序列号将会分配给新发送的那一条消息。然后应用程序需要自己来将这个序列号与消息对应起来。
  • multiple:这个是一个Boolean型的参数。如果是false,就表示这一次只确认了当前一条消息。如果是true,就表示RabbitMQ这一次确认了一批消息,在sequenceNumber之前的所有消息都已经确认完成了。
    这三种确认机制都能够提升Producer发送消息的安全性。通常情况下,第三种异步确认机制的性能是最好的。

7、Headers 头部路由机制
在官网的体验示例中,还有一种路由策略并没有提及,那就是Headers路由。其实官网之所以没有过多介绍,就是因为这种策略在实际中用得比较少,但是在某些比较特殊的业务场景,还是挺好用的。

/**
	 * exchange有四种类型, fanout topic headers direct
	 * headers用得比较少,他是根据头信息来判断转发路由规则。头信息可以理解为一个Map
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception{
		// header模式不需要routingKey来转发,他是根据header里的信息来转发的。比如消费者可以只订阅logLevel=info的消息。
		// 然而,消息发送的API还是需要一个routingKey。 
		// 如果使用header模式来转发消息,routingKey可以用来存放其他的业务消息,客户端接收时依然能接收到这个routingKey消息。
		String routingKey = "ourTestRoutingKey";
		// The map for the headers.
		Map<String, Object> headers = new HashMap<>();
		headers.put("loglevel", "info");
		headers.put("buslevel", "product");
		headers.put("syslevel", "admin");
		
		String message = "LOG INFO asdfasdf";
		
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		//发送者只管往exchange里发消息,而不用关心具体发到哪些queue里。
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
		
		AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); 
		builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode());
		builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority());
		builder.headers(headers);

		channel.basicPublish(EXCHANGE_NAME, routingKey, builder.build(), message.getBytes("UTF-8"));

//		channel.txSelect();
//		channel.txCommit();
//		channel.txRollback();

		channel.close();
		connection.close();
	}
public static void main(String[] args) throws Exception {
		
		String routingKey= "ourTestRoutingKey";
		x-match:特定的参数。all表示必须全部匹配才算成功。
		//any表示只要匹配一个就算成功。
		Map<String, Object> headers = new HashMap<String, Object>();
		headers.put("x-match","any");
//		headers.put("loglevel", "info");
		headers.put("buslevel", "product");
//		headers.put("syslevel", "admin");
		
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		
	    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
	    String queueName = channel.queueDeclare("ReceiverHeader",true,false,false,null).getQueue();
	    
	    channel.queueBind(queueName, EXCHANGE_NAME,routingKey,headers);
	    
		Consumer myconsumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body)
					throws IOException {
				 System.out.println("========================");
				 String routingKey = envelope.getRoutingKey();
				 System.out.println("routingKey >"+routingKey);
				 String contentType = properties.getContentType();
				 System.out.println("contentType >"+contentType);
				 long deliveryTag = envelope.getDeliveryTag();
				 System.out.println("deliveryTag >"+deliveryTag);
				Map<String, Object> headerInfo = properties.getHeaders();
				headerInfo.forEach((key,value)-> System.out.println("header key: "+key+"; value: "+value));
				System.out.println("content:"+new String(body,"UTF-8"));
				 // (process the message components here ...)
				 //消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
				 //没有答复过的消息,服务器会一直不停转发。
				 channel.basicAck(deliveryTag, false);
			}
		};
		
		String consumerTag = channel.basicConsume(queueName,true, myconsumer);
		System.out.println("consumerTag > "+consumerTag);
	}

四、SpringBoot集成RabbitMQ

SpringBoot官方就集成了RabbitMQ,所以RabbitMQ与SpringBoot的集成是非常简单的。不过,SpringBoot集成RabbitMQ的方式是按照Spring的一套统一的MQ模型创建的,因此SpringBoot集成插件中对于生产者、消息、消费者等重要的对象模型,与RabbitMQ原生的各个组件有对应关系,但是并不完全相同。

  1. 引入依赖
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置关键参数
server.port=8080
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

# 单词推送消息数量
spring.rabbitmq.listener.simple.prefetch=1
# 消费者的消费线程数量
spring.rabbitmq.listener.simple.concurrency=5
# 消费者的最大线程数量
spring.rabbitmq.listener.simple.max-concurrency=10

spring.rabbitmq.listener.simple.acknowledge-mode=none
# 2.6以上版本SpringBoot集成Swagger2要修改配置。坑爹
spring.mvc.pathmatch.matching-strategy=ant_path_matcher

3: 声明Exchange,Queue和Binding
所有的exchange, queue, binding的配置,都需要以对象的方式声明。默认情况下,这些业务对象一经声明,应用就会自动到RabbitMQ上常见对应的业务对象。但是也是可以配置成绑定已有业务对象的。文章来源地址https://www.toymoban.com/news/detail-499376.html

/**
 * 直连模式只需要声明队列,所有消息都通过队列转发。
 * @author roykingw
 */
@Configuration
public class DirectConfig {

	@Bean
	public Queue directQueue() {
		return new Queue(MyConstants.QUEUE_DIRECT);
	}
}


  1. 使用RabbitmqTemplate对象发送消息
@ApiOperation(value="direct发送接口",notes="直接发送到队列。task模式")
	@GetMapping(value="/directSend")
	public Object directSend(String message) throws AmqpException, UnsupportedEncodingException {
		//设置部分请求参数
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
		messageProperties.setPriority(2);
		//设置消息转换器,如json
		rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
		//将对象转换成json再发送。
//		rabbitTemplate.convertandsend("",Object);
		//发消息
		rabbitTemplate.send("directqueue",new Message(message.getBytes("UTF-8"),messageProperties));
		return "message sended : "+message;
	}
  1. 使用@RabbitListener注解声明消费者
    消费者都是通过@RabbitListener注解来声明。在@RabbitMQListener注解中包含了非常多对Queue进行定制的属性,大部分的属性都是有默认值的。例如ackMode默认是null,就表示自动应答。在日常开发过程中,通常都会简化业务模型,让消费者只要绑定队列消费即可。
@RabbitListener(queues=MyConstants.QUEUE_DIRECT)
	public void directReceive2(String message) {
		System.out.println("consumer2 received message : " +message);
	}

到了这里,关于rabbitmq第二课-RabbitMQ核心编程模型以及消息应用场景详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 【RabbitMQ教程】第二章 —— RabbitMQ - 简单案例

    【RabbitMQ教程】第二章 —— RabbitMQ - 简单案例

                                                                       💧 【 R a b b i t M Q 教程】第二章—— R a b b i t M Q − 简单案例 color{#FF1493}{【RabbitMQ教程】第二章 —— RabbitMQ - 简单案例} 【 R abbi tMQ 教程】第二章 —— R abbi tMQ − 简单案例

    2024年02月08日
    浏览(8)
  • 【图解RabbitMQ-3】消息队列RabbitMQ介绍及核心流程

    【图解RabbitMQ-3】消息队列RabbitMQ介绍及核心流程

    🧑‍💻作者名称:DaenCode 🎤作者简介:CSDN实力新星,后端开发两年经验,曾担任甲方技术代表,业余独自创办智源恩创网络科技工作室。会点点Java相关技术栈、帆软报表、低代码平台快速开发。技术尚浅,闭关学习中······ 😎人生感悟:尝尽人生百味,方知世间冷暖。

    2024年02月09日
    浏览(9)
  • RabbitMQ基础核心概念

    RabbitMQ基础核心概念

    了解RabbitMQ的核心概念 1、模型概念 2、Producer (生产者) 、 Consumer(消费者) 3、Exchange (交换机) 4、Queue (消息队列) 5、Broker(服务节点) 6、RabbitMQ的五种工作模式 Producer(生产者) :顾名思义是生产消息的一方 Consumer(消费者) :顾名思义是消费消息的一方 而消息一般由 俩部分组成

    2023年04月09日
    浏览(6)
  • RabbitMQ核心概念记录

    RabbitMQ核心概念记录

    本文来记录下RabbitMQ核心概念 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发

    2024年02月03日
    浏览(5)
  • 消息队列-RabbitMQ:MQ作用分类、RabbitMQ核心概念及消息生产消费调试

    消息队列-RabbitMQ:MQ作用分类、RabbitMQ核心概念及消息生产消费调试

    1)什么是 MQ MQ (message queue),从字面意思上看, 本质是个队列,FIFO 先入先出 ,只不过队列中存放的内容是 message 而已,还是一种 跨进程的通信机制 , 用于上下游传递消息 。在互联网架构中,MQ 是一种非常常见的上下游 “ 逻辑解耦 + 物理解耦” 的消息通信服务 。 使用了

    2024年02月20日
    浏览(8)
  • 第二十章 : Spring Boot 集成RabbitMQ(四)

    第二十章 : Spring Boot 集成RabbitMQ(四) 前言 本章知识点:死信队列的定义、场景、作用以及原理、TTL方法的使用以及演示代码示例。 死信队列 定义:什么是死信队列? 在RabbitMQ中,并没有提供真正意义上的延迟队列,但是RabbitMQ可以设置队列、消息的过期时间,当队列或者

    2024年02月04日
    浏览(9)
  • 第二十一章 : Spring Boot 集成RabbitMQ(五)

    第二十一章 : Spring Boot 集成RabbitMQ(五) 前言 本章知识点: 如何保证消息100%可靠性发送的技术解决方案。 一、 应用场景 在使用消息队列时,因为生产者和消费者不直接交互,所以面临下面几个问题: 1)要把消息添加到队列中,怎么保证消息成功添加? 2)如何保证消息

    2024年02月03日
    浏览(12)
  • 【MQ 系列】RabbitMq 核心知识点小结

    【MQ 系列】RabbitMq 核心知识点小结

    RabbitMQ 是一个基于 AMQP 协议实现的企业级消息系统,想要顺畅的玩耍的前提是得先了解它,本文将主要介绍 rabbitmq 的一些基本知识点 特点 基本概念 消息投递消费的几种姿势 事务 集群 它是采用 Erlang 语言实现的 AMQP(Advanced Message Queued Protocol)的消息中间件,最初起源于金融系

    2024年01月23日
    浏览(9)
  • 基于RabbitMQ的模拟消息队列之二---创建项目及核心类

    基于RabbitMQ的模拟消息队列之二---创建项目及核心类

    创建一个SpringBoot项目,环境:JDK8,添加依赖:Spring Web、MyBatis FrameWork(最主要) 2.核心类 在mqserver包中添加一个包,名字为core,表示核心类。 Exchange ExchangeType MSGQueue (为了区分Queue) Binding Message BasicProperties

    2024年02月11日
    浏览(9)
  • 根据源码,模拟实现 RabbitMQ - 从需求分析到实现核心类(1)

    根据源码,模拟实现 RabbitMQ - 从需求分析到实现核心类(1)

    目录 一、需求分析 1.1、对 Message Queue 的认识 1.2、消息队列核心概念 1.3、Broker Server 内部关键概念 1.4、Broker Server 核心 API (重点实现) 1.5、交换机类型 Direct 直接交换机 Fanout 扇出交换机 Topic 主题交换机 1.6、持久化 1.7、网络通信 通信流程 远程调用设计思想 1.8、模块设计图

    2024年02月12日
    浏览(8)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包