RabbitMQ系列【16】AmqpTemplate接口详解

这篇具有很好参考价值的文章主要介绍了RabbitMQ系列【16】AmqpTemplate接口详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

有道无术,术尚可求,有术无道,止于术。

前言

RabbitTemplatespring-amqp提供的一个 RabbitMQ 消息操作模板类,在之前我们使用它完成了简单的消息发送。

RabbitTemplate 主要提供了发送消息、接收消息以及其他附加功能,内部封装了RabbitMQ原生API,大大简化了使用 RabbitMQ操作。

RabbitTemplate 主要实现了AmqpTemplateRabbitOperations接口:
RabbitMQ系列【16】AmqpTemplate接口详解

AmqpTemplate

AmqpTemplate接口主要声明了三类方法:

public interface AmqpTemplate {
	// 发送消息
    void send(Message var1) throws AmqpException;

    // 接收消息
    Message receive() throws AmqpException;
	
    // 发送消息并接收回复
    Message sendAndReceive(Message var1) throws AmqpException;
}

API

首先,我们看下AmqpTemplate中声明的各种方法。

send

send 方法一共有三个,需要创建Message消息对象,将消息封装到该对象内发送,如果没有指定交换机、路由键,将使用默认值,也就是空字符串。

	// 发送消息到默认交换机、默认路由KEY
	void send(Message message) throws AmqpException;

	// 发送消息到默认交换机、使用指定路由KEY
	void send(String routingKey, Message message) throws AmqpException;

	// 发送消息到指定交换机、使用指定路由KEY
	void send(String exchange, String routingKey, Message message) throws AmqpException;

示例:

        Message message = new Message("消息".getBytes());
        rabbitTemplate.send(message);
        rabbitTemplate.send("route.key", message);
        rabbitTemplate.send("exchange_name", "route.key", message);

convertAndSend

convertAndSend 方法可以转换对象并发送,并可以添加一个消息处理器MessagePostProcessor

	// 将Java对象转换为Amqp{@link Message}并将其发送到默认交换机、使用默认路由KEY
	void convertAndSend(Object message) throws AmqpException;

	// 将Java对象转换为Amqp{@link Message}并将其发送到默认交换机、使用自定义路由KEY
	void convertAndSend(String routingKey, Object message) throws AmqpException;

	// 将Java对象转换为Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由KEY
	void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

	 // 将Java对象转换为Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由KEY
	 // 在发送消息之前添加一个消息处理器MessagePostProcessor 
	void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;

	// 将Java对象转换为Amqp{@link Message}并将其发送到默认交换机、使用自定义路由KEY
	// 在发送消息之前添加一个消息处理器MessagePostProcessor 
	void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor)
			throws AmqpException;

	// 将Java对象转换为Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由KEY
	// 在发送消息之前添加一个消息处理器MessagePostProcessor
	void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)
			throws AmqpException;

MessagePostProcessor 是一个函数型接口,提供了一个postProcessMessage方法处理消息,由于直接发送的是对象,如果需要设置一些消息的属性,就需要使用该接口进行设置,例如:

        MessagePostProcessor messagePostProcessor = message1 -> {
            MessageProperties messageProperties = message1.getMessageProperties();
            messageProperties.setExpiration("1000");
            return message1;
        };
        rabbitTemplate.convertAndSend("","","消息",messagePostProcessor);

receive

一般获取消息有两种处理模式:

  • push:由RabbitMQ主动将消息推送给订阅队列的消费者,调用channel.basicConsume方法。
  • pull:主动从指定队列中拉取消息,需要消费者调用channel.basicGet方法。

receive 方法,就是从主动队列中获取消息。

	// 如果默认队列中有消息,则接收消息。立即返回,可能有NULL值
	@Nullable
	Message receive() throws AmqpException;

	// 从指定队列中获取消息。立即返回,可能有NULL值
	@Nullable
	Message receive(String queueName) throws AmqpException;

	// 如果默认队列中有消息,则接收消息。可能有NULL值,并指定一个超时时间
	@Nullable
	Message receive(long timeoutMillis) throws AmqpException;

	// 从指定队列中获取消息。可能有NULL值,并指定一个超时时间
	@Nullable
	Message receive(String queueName, long timeoutMillis) throws AmqpException;

receiveAndConvert

receiveAndConvert可以拉取消息并进行对象转换。


	// 如果默认队列中有消息,则接收消息并将其转换为Java对象。立即返回,可能为null 值。
	@Nullable
	Object receiveAndConvert() throws AmqpException;

	// 从指定队列中接收消息并将其转换为Java对象。立即返回,可能为null 值。
	@Nullable
	Object receiveAndConvert(String queueName) throws AmqpException;

	// 如果默认队列中有消息,则接收消息并将其转换为Java对象。立即返回,可能有NULL值,并指定一个超时时间
	@Nullable
	Object receiveAndConvert(long timeoutMillis) throws AmqpException;

	// 从指定队列中接收消息并将其转换为Java对象,并指定一个超时时间,可能为null 值。
	@Nullable
	Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

	//  如果默认队列中有消息,则接收消息并将其转换为Java对象。立即返回,可能为null 值。并可以添加一个消息转换器SmartMessageConverter。
	@Nullable
	<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException;

	// 从指定队列中接收消息并将其转换为Java对象。立即返回,可能为null 值。并可以添加一个消息转换器SmartMessageConverter。
	@Nullable
	<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException;

	// 如果默认队列中有消息,则接收消息并将其转换为Java对象。可能有NULL值,并指定一个超时时间及消息转换器SmartMessageConverter。
	@Nullable
	<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;

	// 从指定队列中接收消息并将其转换为Java对象。可能为null 值。并指定一个超时时间及消息转换器SmartMessageConverter。
	@Nullable
	<T> T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference<T> type)
			throws AmqpException;

示例:

        // 接收消息,队列存在是,会报错;队列中没有消息,返回NULL
        Message bootQueue1= rabbitTemplate.receive("bizQueue");
        Message bootQueue2 = rabbitTemplate.receive("bizQueue",1000);
        Message bootQueue3= rabbitTemplate.receive("backupQueue");

使用消息转换器,可以直接发送、接收对象:

        // User 需要实现Serializable
        User user = new User();
        user.setName("张三");
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.convertAndSend(MqBizConfig.BIZ_EXCHANGE,MqBizConfig.BIZ_ROUTE_KEY,user);
        User receiveUser = rabbitTemplate.receiveAndConvert("bizQueue", new ParameterizedTypeReference<User>() {});

receiveAndReply

receiveAndReply支持在获取消息时传入一个回调函数ReceiveAndReplyCallback,处理接收到消息和回复消息的业务逻辑。

receiveAndReply应用于RPC模式Server端,Server收到消息,并回复消息给客户端:
RabbitMQ系列【16】AmqpTemplate接口详解
该模式用的比较少,实现起来也比较麻烦,这里就不演示了。

    // 收到消息并回复,R:接收到的消息 S: 返回的消息
	<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback) throws AmqpException;

	<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) throws AmqpException;

	<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, String replyExchange, String replyRoutingKey)
			throws AmqpException;

	<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, String replyExchange,
			String replyRoutingKey) throws AmqpException;

	<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
			ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

	<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
			ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

sendAndReceive

sendAndReceive也属于RPC模式,发送消息并接收回复消息,属于Client端。

	@Nullable
	Message sendAndReceive(Message message) throws AmqpException;

	@Nullable
	Message sendAndReceive(String routingKey, Message message) throws AmqpException;

	@Nullable
	Message sendAndReceive(String exchange, String routingKey, Message message) throws AmqpException;

convertSendAndReceive

convertSendAndReceive以及convertSendAndReceiveAsType是对sendAndReceive的扩展,可以直接发送对象消息,并可以设置类型转换器:文章来源地址https://www.toymoban.com/news/detail-489954.html

	@Nullable
	Object convertSendAndReceive(Object message) throws AmqpException;

	@Nullable
	Object convertSendAndReceive(String routingKey, Object message) throws AmqpException;

	@Nullable
	Object convertSendAndReceive(String exchange, String routingKey, Object message) throws AmqpException;

	@Nullable
	Object convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;

	@Nullable
	Object convertSendAndReceive(String routingKey, Object message, MessagePostProcessor messagePostProcessor)
			throws AmqpException;

	@Nullable
	Object convertSendAndReceive(String exchange, String routingKey, Object message,
			MessagePostProcessor messagePostProcessor) throws AmqpException;

	@Nullable
	<T> T convertSendAndReceiveAsType(Object message, ParameterizedTypeReference<T> responseType)
			throws AmqpException;

	@Nullable
	<T> T convertSendAndReceiveAsType(String routingKey, Object message,
			ParameterizedTypeReference<T> responseType) throws AmqpException;

	@Nullable
	<T> T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,
			ParameterizedTypeReference<T> responseType) throws AmqpException;

	@Nullable
	<T> T convertSendAndReceiveAsType(Object message, MessagePostProcessor messagePostProcessor,
			ParameterizedTypeReference<T> responseType) throws AmqpException;

	@Nullable
	<T> T convertSendAndReceiveAsType(String routingKey, Object message,
			MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<T> responseType)
			throws AmqpException;

	@Nullable
	<T> T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,
			MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<T> responseType)
			throws AmqpException;

到了这里,关于RabbitMQ系列【16】AmqpTemplate接口详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Springboot实战16 消息驱动:如何使用 RabbitTemplate 集成 RabbitMQ?

    15 讲我们介绍了基于 ActiveMQ 和 JmsTemplate 实现消息发送和消费,并重构了 SpringCSS 案例系统中的 account-service 和 customer-service 服务。 今天,我们将介绍另一款主流的消息中间件 RabbitMQ,并基于 RabbitTemplate 模板工具类为 SpringCSS 案例添加对应的消息通信机制。 AMQP 规范与 RabbitM

    2024年02月02日
    浏览(50)
  • centos7安装erlang23.3.4.11及rabbitmq3.9.16版本

    rpm包有系统版本要求,el是Red Hat Enterprise Linux(EL)的缩写。 EL7是Red Hat 7.x,Centos 7.x EL8是Red Hat 8.x, Centos 8.x 所以我们在安装erlang及rabbitmq时需要选择与自己的服务器相对应的rpm包 # rabbitmq的rpm安装包 https://github.com/rabbitmq/rabbitmq-server/releases?page=10 # erlang的rpm安装包 https://github.com/

    2024年02月07日
    浏览(42)
  • 利用type-C(16P)设计电源接口

     该原理图是立创商城编号为C2765186的Type-C接口。 外壳固定引脚 13-14(16)引脚:外壳固定引脚,英文名SHELL。有些Type-C原理图标注15、16引脚,他们也都是外壳固定引脚,其实上右图只是将15、16归到了13、14。这些引脚接GND。 拓展: 观察剩下未讲解的引脚,发现他们都是成对的

    2024年02月06日
    浏览(45)
  • 关于IDEA Translation插件中有道智云(有道翻译)应用ID,密钥申请教程

    注册有道智云 创建应用 下面是创建好的应用,将id和密钥分别复制 回到IDEA翻译引擎配置填写就完成了

    2024年02月12日
    浏览(41)
  • C# 图解教程 第5版 —— 第16章 接口

    ​ 接口是声明一组函数成员而不进行实现的引用类型,只能用类和结构来实现接口。 使用 IComparable 接口的示例 ​ Array 类的 Sort 方法依赖于 IComparable 接口,其声明在 BCL 中,只包含唯一的 CompareTo 方法。图 16.1 中灰色表示该方法目前未实现。 图16.1 IComparable 接口 ​ 调用 Co

    2024年02月04日
    浏览(32)
  • Cilium系列-16-CiliumNetworkPolicy 实战演练

    Cilium 系列文章 今天我们进入 Cilium 安全相关主题, 基于 Cilium 官方的《星球大战》 Demo 做详细的 CiliumNetworkPolicy 实战演练。 您是帝国(Empire)的平台工程团队的一员,负责开发死星(Death Star) API 并将其部署到帝国银河 Kubernetes 服务 (Imperial Galactic Kubernetes Service, IGKS)。你已经部署了

    2024年02月14日
    浏览(29)
  • 因果推断系列16-面板数据与固定效应

    加载第三方包

    2024年02月05日
    浏览(43)
  • matlab appdesigner系列-常用16-状态按钮

    状态按钮 ,有两个状态, 按下状态 ,返回值为 1或true ; 未按下状态 ,返回值为 0或false 示例:设置一状态按钮,用文本记录其状态 操作步骤: 1)将状态按钮、文本区域拖拽到画布上,并修改相应文字 2)设置 状态按钮 的回调函数  代码为: 运行效果为:也可以看到点击

    2024年01月23日
    浏览(36)
  • RabbitMQ入门系列01----RabbitMQ简介

    在介绍RabbitMQ之前,我们先来看下面一个电商项目的场景: 商品的原始数据保存在数据库中,增删改查都在数据库中完成。 搜索服务数据来源是索引库(Elasticsearch),如果数据库商品发生变化,索引库数据不能及时更新。 商品详情做了页面静态化处理,静态页面数据也不会

    2023年04月08日
    浏览(41)
  • RabbitMQ系列(23)--RabbitMQ惰性队列

    1、概念:RabbitMQ从 3.6.0版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中(持久化队列若想持久化消息还需要看消息设置了持久化没),而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多

    2024年02月16日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包