SpringAMQP中AmqpTemplate发送接收消息

这篇具有很好参考价值的文章主要介绍了SpringAMQP中AmqpTemplate发送接收消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SpringAMQP中AmqpTemplate发送接收消息


前言: 最近没事浏览Spring官网,简单写一些相关的笔记,这篇文章整理Spring AMQP相关内容。文章并不包含所有技术点,只是记录有收获


 目录

1.AmqpTemplate 介绍

2.发送消息(Sending Message)

2.1发送Message消息

2.2发送POJO对象

2.3默认交换器与默认路由

2.5构建消息方法

3.接收消息(Receiving Message)

3.1接收Message消息

3.2接收Java对象

3.3接收消息并回复



1.AmqpTemplate 介绍

Spring AMQP提供了一个扮演核心角色的“模板”,定义操作的接口是AmqpTemplate ,  接口操作涵盖了发送和接收消息的一般行为 . 因此他包含了发送和接收消息的所有基本操作. 模板接口的每一个实现都依赖特定的客户端类库, 目前只有RabbitTemplate 

SpringAMQP中AmqpTemplate发送接收消息

2.发送消息(Sending Message)  

2.1发送Message消息

SpringAMQP中AmqpTemplate发送接收消息

参考源码: 

package org.springframework.amqp.core;

public interface AmqpTemplate {

    /**
    * 使用默认路由密钥向默认交换机发送消息
    */
	void send(Message message) throws AmqpException;

	/**
	 * 使用特定路由密钥向默认交换机发送消息
	 */
	void send(String routingKey, Message message) throws AmqpException;

	/**
	 * 使用特定路由密钥向特定交换机发送消息
	 */
	void send(String exchange, String routingKey, Message message) throws AmqpException;
    
    //代码略
}

AmqpTemplate指定AMQP操作的提供了发送消息的方法,其中最后一个方法有三个参数,它是显示发送消息的方法,他允许在运行时提供AMQP交换器 名称和路由关键字来发送消息,而最后的参数是实际常见的Message 。使用方法如下

amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
    new Message("12.34".getBytes(), someProperties));

如果是使用同给一个交换器(exchange)可以通过设置(set)的方法设置交换器,然后再发送消息,例如

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));

如果amqpTemplate 设置了交换器(exchange)和路由键(routingKey)属性,那么只需要接收消息参数即可

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

2.2发送POJO对象

AmqpTemplate提供了将Java对象转换为消息并发送的方法

SpringAMQP中AmqpTemplate发送接收消息

 参考源码如下:

package org.springframework.amqp.core;

public interface AmqpTemplate {

    //将Java对象转换为消息并发送给默认交换机
    void convertAndSend(Object message) throws AmqpException;

    //将Java对象转换为Amqp消息,并将其发送到具有特定路由的默认交换机
	void convertAndSend(String routingKey, Object message) throws AmqpException;

	//将Java对象转换为Amqp消息,并使用特定的路由密钥将其发送到特定的交换机。
	void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

    //将Java对象转换为Amqp消息,并使用默认路由密钥将其发送到默认交换机
	void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;

	//将Java对象转换为Amqp消息,并使用特定的路由密钥将其发送到默认交换机
	void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor)
			throws AmqpException;

    //将Java对象转换为Amqp消息,并使用特定的路由密钥将其发送到特定的交换机。
	void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)
			throws AmqpException;
    //代码略
}

从接口AmqpTemplate 的实现类RabbitTemplate可以看到Object对象通过MessageConverter对象转换为成Message对象

package org.springframework.amqp.rabbit.core;

public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
		implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
		ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {

    //默认转换对象为SimpleMessageConverter
    private MessageConverter messageConverter = new SimpleMessageConverter();
    
	@Override
	public void convertAndSend(String exchange, String routingKey, final Object message,
			final MessagePostProcessor messagePostProcessor,
			@Nullable CorrelationData correlationData) throws AmqpException {
		Message messageToSend = convertMessageIfNecessary(message);
		messageToSend = messagePostProcessor.postProcessMessage(messageToSend, correlationData,
				nullSafeExchange(exchange), nullSafeRoutingKey(routingKey));
		send(exchange, routingKey, messageToSend, correlationData);
	}

	protected Message convertMessageIfNecessary(final Object object) {
		if (object instanceof Message) {
			return (Message) object;
		}
		return getRequiredMessageConverter().toMessage(object, new MessageProperties());
	}eturn converter;
	}

    private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
		MessageConverter converter = getMessageConverter();
		if (converter == null) {
			throw new AmqpIllegalStateException(
					"No 'messageConverter' specified. Check configuration of RabbitTemplate.");
		}
		return converter;
	}

    //代码略
}

2.3默认交换器与默认路由

显示设置交换器(exchange)和路由键(routingKey)属性是比较推荐的使用方法,应为代码更清晰易读。 当不显示设置两个属性的时候也会有默认值。

AmqpTemplate默认交换器和默认路由键都是空String类型。因为AMQP规范将默认交换器定义为没有名称。 所有队列都自动绑定到默认交换器(direct exchange). 

//参考RabbitTemplate 源码
package org.springframework.amqp.rabbit.core

public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
		implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
		ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
    private static final String DEFAULT_EXCHANGE = "";

	private static final String DEFAULT_ROUTING_KEY = "";
    
    @Override
	public void send(Message message) throws AmqpException {
		send(this.exchange, this.routingKey, message);
	}

    @Override
	public void send(String routingKey, Message message) throws AmqpException {
		send(this.exchange, routingKey, message);
	}

    //省略其他代码
}

例如可以创建一个模板,用于将消息发送到某个队列

// 没有设置交换器就是默认交换器
RabbitTemplate template = new RabbitTemplate(); 
// 消息将会发送到名称为queue.helloWorld队列中
template.setRoutingKey("queue.helloWorld"); 
// 执行发送消息
template.send(new Message("Hello World".getBytes(), someProperties));

2.5构建消息方法

AmqpTemplate接口使用的Message参数,可以使用MessageBuilder和MessagePropertiesBuilder对象快速构建

MessageBuilder

package org.springframework.amqp.core;

public final class MessageBuilder extends MessageBuilderSupport<Message> {
    
    //由构建器创建的消息正文将直接引用“body”
    public static MessageBuilder withBody(byte[] body) {}
    
    //由构建器创建的消息正文将是新数组中“body”的副本,通过Array.copyOf复制
    public static MessageBuilder withClonedBody(byte[] body) {}
    
    //由构建器创建的消息正文将是一个新数组,包含“body”的字节范围,通过Array.copyOf复制
    public static MessageBuilder withBody(byte[] body, int from, int to) {}
    
    //构建器创建的消息将具有一个包含参数的正文副本的新数组的主体。
    //参数的属性被复制到一个新的MessageProperties对象
    public static MessageBuilder fromClonedMessage(Message message) {}

    //建设者创建的消息将具有一个直接引用参数体的主体。
    //参数的属性被复制到一个新的MessageProperties对象
    public static MessageBuilder fromMessage(Message message) {}
    
    //构建返回Message 对象
    @Override
	public Message build() {
		return new Message(this.body, this.buildProperties());
	}
}

通过MessageBuilder构建message

Message message = MessageBuilder.withBody("foo".getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();

MessagePropertiesBuilder

package org.springframework.amqp.core;

public final class MessagePropertiesBuilder extends MessageBuilderSupport<MessageProperties> {

	public static MessagePropertiesBuilder newInstance() {}

	public static MessagePropertiesBuilder fromProperties(MessageProperties properties) {}

	public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) {}
    
    //构建返回MessageProperties对象
    @Override
	public MessageProperties build() {
		return this.buildProperties();
	}

}

通过MessagePropertiesBuilder构建MessageProperties 然后在通过MessageBuilder构建message

MessageProperties props = MessagePropertiesBuilder.newInstance()
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
Message message = MessageBuilder.withBody("foo".getBytes())
    .andProperties(props)
    .build();

3.接收消息(Receiving Message)

接收消息有两种方式,一种是通过轮询的方法来轮询单个消息,另一种是是通过监听器来异步接收消息。异步消息使用的专用组件而不是AmqpTemplate, AmqpTemplate可以用于轮询接收消息,在使用AmqpTemplate接收消息时,默认情况下如果没有消息的时候不会阻塞,直接返回空, 也可设置接收的阻塞时间,

3.1接收Message消息

SpringAMQP中AmqpTemplate发送接收消息

参考源码如下:

package org.springframework.amqp.core;

public interface AmqpTemplate {
    
    //如果存在来自默认队列的消息,则接收消息。立即返回
    Message receive() throws AmqpException;
    
    //如果有来自特定队列的消息,则接收消息 ,立刻返回
	Message receive(String queueName) throws AmqpException;
    
    //从默认队列接收消息,如果消息可用,则等待指定的等待时间
	Message receive(long timeoutMillis) throws AmqpException;
    
    //从特定队列接收消息,如果消息可用,则等待指定的等待时间
	Message receive(String queueName, long timeoutMillis) throws AmqpException;

    //代码略
}

3.2接收Java对象

 AmqpTemplate提供了四个重载方法来接收POJO,而不是上面的Message对象, 它们返回对象是Object,用以替换返回值 Message。

SpringAMQP中AmqpTemplate发送接收消息

如下源码所示

package org.springframework.amqp.core;

public interface AmqpTemplate {
    
    //接收默认队列的消息,如果消息可用,并转换为JAVA对象。立即返回
    Object receiveAndConvert() throws AmqpException;
    
    //接收特定队列的消息,如果消息可用,将其转换为Java对象。立即返回,可能返回空值
	Object receiveAndConvert(String queueName) throws AmqpException;
    
    //接收默认队列的消息,如果消息可用,则等待指定的等待时间
	Object receiveAndConvert(long timeoutMillis) throws AmqpException;
    
    //接收特定队列的消息,如果消息可用,则等待指定的等待时间
	Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

    //代码略
}


从接口AmqpTemplate 的实现类RabbitTemplate可以看到Message是通过MessageConverter对象转换为成Java对象

package org.springframework.amqp.rabbit.core;

public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
		implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
		ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {

    //默认转换对象为SimpleMessageConverter
    private MessageConverter messageConverter = new SimpleMessageConverter();
    
    //接收特定队列的消息,如果消息可用,则等待指定的等待时间
	Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException{
        Message response = timeoutMillis == 0 ? doReceiveNoWait(queueName) : receive(queueName, timeoutMillis);
		if (response != null) {
			return getRequiredMessageConverter().fromMessage(response);
		}
    }

    private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
		MessageConverter converter = getMessageConverter();
		if (converter == null) {
			throw new AmqpIllegalStateException(
					"No 'messageConverter' specified. Check configuration of RabbitTemplate.");
		}
		return converter;
	}

    //代码略
}

3.3接收消息并回复

除此之外AmqpTemplate还有几个receiveAndReply方法,这些方法用来实现同步接收,处理并回复消息 

SpringAMQP中AmqpTemplate发送接收消息

AmqpTemplate实现负责接收和回复阶段。在大多数情况下,提供ReceiveAndReplyCallback的实现来为接收到的消息执行一些业务逻辑,如果需要,可以构建回复对象或消息。需要注意的是ReceiveAndReplyCallback可能返回null。在这种情况下,没有发送回复消息

package org.springframework.amqp.core;

public interface AmqpTemplate {
    
    //接收默认队列的消息,调用提供的ReceiveAndReplyCallback,
    //如果回调返回一个消息,则将回复消息发送到MessageProperties中的replyTo地址,
    //或者发送到默认交换和默认routingKe
    <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback) throws AmqpException;

    //接收指定队列的消息,调用提供的ReceiveAndReplyCallback,
    //如果回调返回一个消息,则将回复消息发送到MessageProperties中的replyTo地址,
    //或者发送到默认交换和默认routingKe
	<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) throws AmqpException;

    //接收默认队列的消息,调用提供的ReceiveAndReplyCallback,
    //如果回调返回消息,则向提供的exchange和routingKey发送回复消息
	<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, String replyExchange, String replyRoutingKey)
			throws AmqpException;

    //接收指定队列的消息,调用提供的ReceiveAndReplyCallback,
    //如果回调返回消息,则向提供的exchange和routingKey发送回复消息
	<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, String replyExchange,
			String replyRoutingKey) throws AmqpException;

    //接收默认队列的消息,调用提供的ReceiveAndReplyCallback,
    //如果回调返回一个消息,则向ReplyToAddressCallback的结果中的replyToAddress发送回复消息
	<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
			ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
    
    //接收指定队列的消息,调用提供的ReceiveAndReplyCallback,
    //如果回调返回一个消息,则向ReplyToAddressCallback的结果中的replyToAddress发送回复消息
	<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
			ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

    //代码略
}

例如

  boolean b=template.receiveAndReply("myqueue",new ReceiveAndReplyCallback<String,String>(){
            public String handle(String str) {
                System.out.println("getMessage is :" + str);
                //doSomething
                return "OK";
            }
        });

上一篇:SpringAMQP和RabbitMQ入门文章来源地址https://www.toymoban.com/news/detail-402181.html

到了这里,关于SpringAMQP中AmqpTemplate发送接收消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka消息队列实现消息的发送和接收

    消息在Kafka消息队列中发送和接收过程如下图所示: 消息生产者Producer产生消息数据,发送到Kafka消息队列中,一台Kafka节点只有一个Broker,消息会存储在Kafka的Topic(主题中),不同类型的消息数据会存储在不同的Topic中,可以利用Topic实现消息的分类,消息消费者Consumer会订阅

    2024年02月11日
    浏览(37)
  • 如何使用RabbitMQ发送和接收消息

    本文介绍了如何使用RabbitMQ的Python客户端库pika来发送和接收消息,并提供了示例代码。读者可以根据自己的需求修改代码,例如修改队列名称、发送不同的消息等。 RabbitMQ 是一个开源的消息队列软件,可以用于在应用程序之间传递消息。下面是一个使用 RabbitMQ 的流程和代码

    2024年02月15日
    浏览(37)
  • 微信小程序消息推送、接收消息事件、发送客服消息

    文档地址消息推送 | 微信开放文档 接收消息和事件 | 微信开放文档 发送客服消息 | 微信开放文档 代码参考

    2024年02月12日
    浏览(30)
  • RabbitMQ如何保证消息的发送和接收

    一、RabbitMQ如何保证消息的发送和接收 1.ConfirmCallback方法 ConfirmCallback是一个回调接口,消息发送到broker后触发回调,确认消息是否到达broker服务器,也就是只确认消息是否正确到达Exchange交换机中。 2.ReturnCallback方法 通过实现ReturnCallback接口,启动消息失败返回,此接口是在交

    2024年02月15日
    浏览(35)
  • qt websocket 通讯实现消息发送接收

    websocket 是基于 TCP socket 之上的应用层, 解决 HTML 轮询连接的问题,实现客户端与服务端长连接, 实现消息互相发送,全双工。 服务端, 使用 QT 教程demo chatserver.h chatserver.cpp main.cpp 客户端 clientwidget.h clientwidget.cpp websocketclient.h websocketclient.cpp

    2024年02月15日
    浏览(34)
  • 使用C#和RabbitMQ发送和接收消息

    通过NuGet安装 RabbitMQ.Client 以下是一个简单的示例代码,演示如何使用 C# 和 RabbitMQ 客户端库来发送和接收消息: durable持久化 durable 参数用于指定队列是否是持久化的。 当 durable 参数设置为 true 时,表示队列是持久化的。持久化的队列会在RabbitMQ服务器重启后仍然存在,确保

    2024年02月11日
    浏览(35)
  • 如何使用 RabbitMQ 进行消息的发送和接收

    1、创建连接工厂: 2、创建交换器和队列: 3、发送消息: 4、接收消息: 在上述示例中,我们创建了一个连接工厂,并通过它建立与 RabbitMQ 服务器的连接和通道。然后,我们声明了一个直连型交换器和一个队列,并将它们绑定在一起。接下来,我们使用basicPublish方法发送消

    2024年04月22日
    浏览(38)
  • 「RabbitMQ」实现消息确认机制以确保消息的可靠发送、接收和拒收

    目录 介绍 方案 配置手动确认 使用 「Bean 」 配置RabbitMQ的属性 确定消费、拒绝消费、拒绝消费进入死信队列 模拟生产者发送消息①         RabbitMQ 的消息确认机制应用场景非常广泛,尤其是在需要确保消息可靠性和避免消息丢失的场合下更为重要,例如:金融系统、电

    2024年02月08日
    浏览(28)
  • C#使用RabbitMQ发送和接收消息工具类

    下面是一个简单的 C# RabbitMQ 发送和接收消息的封装工具类的示例代码: 通过NuGet安装 RabbitMQ.Client

    2024年02月11日
    浏览(46)
  • rabbitMQ:绑定Exchange发送和接收消息(direct)

    AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息 发送到一个交换机。先由 Exchange 来接收,然后 Exchange 按照特定的策略转发到 Queue 进行存储。Exchange 就类似于一个交换机,

    2024年02月15日
    浏览(25)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包