RabbitMQ使用详解

这篇具有很好参考价值的文章主要介绍了RabbitMQ使用详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ

一. 简介

​ RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控制、负载均衡等特性,使得RabbitMQ拥有更加广泛的应用场景。RabbitMQ跟Erlang和AMQP有关。下面简单介绍一下Erlang和AMQP。

​ Erlang是一门动态类型的函数式编程语言,它也是一门解释型语言,由Erlang虚拟机解释执行。从语言模型上说,Erlang是基于Actor模型的实现。在Actor模型里面,万物皆Actor,每个Actor都封装着内部状态,Actor相互之间只能通过消息传递这一种方式来进行通信。对应到Erlang里,每个Actor对应着一个Erlang进程,进程之间通过消息传递进行通信。相比共享内存,进程间通过消息传递来通信带来的直接好处就是消除了直接的锁开销(不考虑Erlang虚拟机底层实现中的锁应用)。

​ AMQP(Advanced Message Queue Protocol)定义了一种消息系统规范。这个规范描述了在一个分布式的系统中各个子系统如何通过消息交互。而RabbitMQ则是AMQP的一种基于erlang的实现。AMQP将分布式系统中各个子系统隔离开来,子系统之间不再有依赖。子系统仅依赖于消息。子系统不关心消息的发送者,也不关心消息的接受者。

二. rabbitmq基本原理

​ RabbitMQ是消息队列的一种实现,那么一个消息队列到底需要什么?答案是队列,即Queue,那么接下来所有名词都是围绕这个Queue来拓展的。

​ 就RabbimtMQ而言,Queue是其中的一个逻辑上的实现,我们需要连接到RabbitMQ来操作队列进而实现业务功能,所以就会有Connection,我们发一条消息连接一次,这样很显然是浪费资源的,建立连接的过程也很耗时,所以我们就会做一个东西让他来管理连接,当我用的时候,直接从里边拿出来已经建立好的连接发信息,那么ConnectionFactory应运而生。

​ 接下来,当程序开发时,可能不止用到一个队列,可能有订单的队列、消息的队列、任务的队列等等,那么就需要给不同的queue发信息,那么和每一个队列连接的这个概念,就叫Channel

​ 再往下来,当我们开发的时候还有时候会用到这样一种功能,就是当我发送一条消息,需要让几个queue都收到,那么怎么解决这个问题呢,难道我要给每一个queue发送一次消息?那岂不是浪费带宽又浪费资源,我们能想到什么办法呢,当然是我们发送给RabbitMQ服务器一次,然后让RabbitMQ服务器自己解析需要给哪个Queue发,那么Exchange就是干这件事的
但是我们给Exchange发消息,他怎么知道给哪个Queue发呢?这里就用到了RoutingKey和BindingKey
BindingKey是Exchange和Queue绑定的规则描述,这个描述用来解析当Exchange接收到消息时,Exchange接收到的消息会带有RoutingKey这个字段,Exchange就是根据这个RoutingKey和当前Exchange所有绑定的BindingKey做匹配,如果满足要求,就往BindingKey所绑定的Queue发送消息,这样我们就解决了我们向RabbitMQ发送一次消息,可以分发到不同的Queue的过程

至此,我们就把所有的名词贯通咯,接下来做个概要描述:

  • ConnectionFactory:与RabbitMQ服务器连接的管理器
  • Connection:与RabbitMQ服务器的TCP连接
  • Channel:与Exchange的连接,一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,为了多路复用。RabbitMQ建议客户端线程之间不要共用Channel,但是建议尽量共用Connection。
  • Exchange:接受消息生产者的消息,并根据消息的RoutingKey和 Exchange绑定的BindingKey,以及Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。
  • Message Queue:消息队列,用于存储还未被消费者消费的消息。
  • Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。
  • RoutingKey:由Producer发送Message时指定,指定当前消息被谁接受
  • BindingKey:由Consumer在Binding Exchange与Message Queue时指定,指定当前Exchange下,什么样的RoutingKey会被下派到当前绑定的Queue中
  • Binding:联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。
  • Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程。
  • Virtual Host:其实是一个虚拟概念,类似于权限控制组,可以通过命令分配给用户Virtual Host的权限,默认的guest用户是管理员权限,初始空间有/,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host
    如下图:

三. 安装

3.1安装erlang环境

rabbitmq是使用erlang语言开发的,所以必须要先安装erlang环境。erlang的软件环境下载地址为:https://www.erlang.org/downloads,安装完毕之后需要配置环境变量:

ERLANG_HOME=G:\erlang\erl-23.0

RabbitMQ使用详解

3.2 安装rabbitmq

本教程采用解压的方式进行安装。rabbitmq的下载地址为:https://www.rabbitmq.com/install-windows.html。

RabbitMQ使用详解

A. 配置环境变量

1.RABBITMQ_SERVER=G:\rabbitmq\rabbitmq_server-3.8.3
2.在path中加入:%RABBITMQ_SERVER%\sbin

B.安装可视化工具和服务

下载完毕之后进行解压,进入到家目录下的sbin目录下,执行如下命令,安装rabbitmq的web可视化工具:

rabbitmq-plugins.bat enable rabbitmq_management

安装rabbitmq的服务:rabbitmq-service.bat install

启动rabbitmq的服务:rabbitmq-server.bat start

在浏览器输入:http://localhost:15672/,访问rabbitmq的控制台,用户名和密码均为 guest

RabbitMQ使用详解

3.3 使用docker安装
docker run -itd --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

四. RabbitMQ程序的编写

4.1 rabbitMQ支持的消息模型

RabbitMQ使用详解

RabbitMQ使用详解

4.2 引入依赖
<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
</dependency>
4.3 简单模型(直连)

RabbitMQ使用详解

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
4.3.1 生成者
public class Producer {
    public static void main(String[] args) throws Exception {
        String host = "localhost";
        int port = 5672;
        String username = "guest";
        String password = "guest";
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);

        Connection conn = factory.newConnection();

        Channel channel = conn.createChannel();

        channel.queueDeclare("test", true, false, false, null);
        
        channel.basicPublish("", "test", null, (i + "hello").getBytes());

        RabbitMQUtils.close(conn, channel);
    }
}
4.3.2 消费者
public class Consumer1 {
    public static void main(String[] args) throws Exception{
        Connection conn = RabbitMQUtils.getConnection();

        Channel channel = conn.createChannel();

        channel.basicConsume("test", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer-one: 收到的消息: " + new String(body));
            }
        });
    }
}
4.4 工作模型(work quene)

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

RabbitMQ使用详解

4.4.1 消息生产者
for (int i = 0; i < 20; i++) {
	channel.basicPublish("", "test", null, (i + "hello").getBytes());
}
4.4.2 多个消费者

消费者一

public class Consumer1 {
    public static void main(String[] args) throws Exception{
        Connection conn = RabbitMQUtils.getConnection();

        Channel channel = conn.createChannel();

        channel.basicConsume("test", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer-one: 收到的消息: " + new String(body));
            }
        });
    }
}

消费者二

public class Consumer1 {
    public static void main(String[] args) throws Exception{
        Connection conn = RabbitMQUtils.getConnection();

        Channel channel = conn.createChannel();

        channel.basicConsume("test", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    TimeUnit.SECOND.sleep(3);
                }catch(Exception ex){}
                System.out.println("Consumer-one: 收到的消息: " + new String(body));
            }
        });
    }
}
4.4.3 消息自动确认

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.

channel.basicQos(1); // 每次只消费一条消息
channel.queueDeclare("firstQueue", false, false, false, null);

// 消息改为手动确认
channel.basicConsume("firstQueue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                           AMQP.BasicProperties properties, byte[] body) throws IOException {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(new String(body, Charset.defaultCharset()));
        // 手动确认消息已经被消费了, 第一个参数是当前消费的消息的标签(递增的整数)
        // 第二个参数是否确认多条消息,包括之前消费的消息
        channel.basicAck(envelope.getDeliveryTag(), false);
	}
});
4.5 发布订阅模型(fanout)

RabbitMQ使用详解

在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
4.5.1 消息生成者
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();

// 声明一个交换机, 交换机的类型为 fanout
channel.exchangeDeclare("multiple", BuiltinExchangeType.FANOUT);

channel.basicPublish("multiple", "", null, "fanout-message".getBytes());

RabbitMQUtils.close(channel, connection);
4.5.2 消息消费者

使用如下代码创建多个消息的消费者

Connection connection = RabbitMQUtils.getConnection();

Channel channel = connection.createChannel();

//String qName = channel.queueDeclare("consumer-2", false, false, false, null).getQueue();
String qName = channel.queueDeclare().getQueue(); //创建临时队列

channel.queueBind(qName, "multiple", "");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, 
                           AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});
4.6 直连模型(direct)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

RabbitMQ使用详解

4.6.1 消息生产者
// 声明一个交换机, 交换机的类型为 direct
channel.exchangeDeclare("direct-module", BuiltinExchangeType.DIRECT);

channel.basicPublish("direct-module", "success", null, "成功信息".getBytes());
channel.basicPublish("direct-module", "error", null, "错误信息".getBytes());
4.6.2 消息消费者

消费者一

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, "direct-module", "success");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

消费者二

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, "direct-module", "success");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

4.7 主题模式(topic)

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

RabbitMQ使用详解

# 统配符
		* (star) can substitute for exactly one word.    匹配不多不少恰好1个词
		# (hash) can substitute for zero or more words.  匹配一个或多个词
# 如:
		audit.#    匹配audit.irs.corporate或者audit.irs 等
        audit.*   只能匹配 audit.irs

4.7.1 消息生产者

// 声明一个交换机, 交换机的类型为 fanout
channel.exchangeDeclare("topic-module", BuiltinExchangeType.TOPIC);

channel.basicPublish("topic-module", "company.java", null, "通知信息".getBytes());
//channel.basicPublish("topic-module", "error", null, "错误信息".getBytes());

RabbitMQUtils.close(channel, connection);

4.7.2 消息消费者

消费者一

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, "topic-module", "company.#");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                  AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

消费者二

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, "topic-module", "company.java.#");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                  AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

消费者三

String qName = channel.queueDeclare().getQueue(); //创建临时队列
channel.queueBind(qName, "topic-module", "company.html.*");

channel.basicConsume(qName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, 
                  AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.out.println(new String(body, Charset.defaultCharset()));
	}
});

结论

company: 可以被消费者一接收到
company.java: 可以被消费者一、消费者二接收到
company.java.manager: 可以被消费者一、消费者二接收到
company.html.teacher: 可以被消费者一、消费者三接收到

五. RabbitMQ与springboot整合

5.1 依赖
<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-amqp</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>fastjson</artifactId>
		<version>1.2.4</version>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
		<exclusions>
			<exclusion>
				<groupId>org.junit.vintage</groupId>
				<artifactId>junit-vintage-engine</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
	</dependency>
</dependencies>
5.2 使用json的序列化
// 消息的消费方json数据的反序列化
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
    			ConnectionFactory connectionFactory){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    return factory;
}

// 定义使用json的方式转换数据
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate amqpTemplate = new RabbitTemplate();
    amqpTemplate.setConnectionFactory(connectionFactory);
    amqpTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return amqpTemplate;
}
5.3 简单模型

消息消费方

@RabbitListener(queuesToDeclare = {@Queue("simpleQueue")})
public void simpleModel(User user) {
	log.info("message: {}", user);
}

消息发送

@SpringBootTest
class SpringbootRabbitmqApplicationTests {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void simpleMessageSend() {
        rabbitTemplate.convertAndSend("simpleQueue", new User(1, "张"));
    }
}
5.4 工作模型

工作模式只需要在简单模式的基础上,添加一个消息的消费方。

5.5 发布订阅模型

消息消费方

// value=@Queue 创建临时队列
// exchange创建交换机
@RabbitListener(bindings = {
	@QueueBinding(value = @Queue,
		exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))
})
public void receiveMessage1(User user) {
	System.out.println(String.format("消费者 【one】: %s", user));
}

@RabbitListener(bindings = {
	@QueueBinding(value = @Queue,
		exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))
})
public void receiveMessage2(User user) {
	System.out.println(String.format("消费者 【two】: %s", user));
}

消息发送方

// fanout模型
@Test
public void fanoutMessageSend() {
	for (int i = 0; i < 5; i++) {
		rabbitTemplate.convertAndSend("fanout-ex", "", new User(i, "张三"));
	}
	try {
		TimeUnit.SECONDS.sleep(5);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}
5.6 直连模式(direct)

消息的消费方

@RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    key = {"error", "success"},
                    exchange = @Exchange(value = "direct-ex", type = ExchangeTypes.DIRECT))
})
public void receiveMessage1(User user) {
        System.out.println(String.format("消费者 【one】: %s", user));
}

@RabbitListener(bindings = {
      @QueueBinding(value = @Queue,
            key = {"error"},
            exchange = @Exchange(value = "direct-ex", type = ExchangeTypes.DIRECT))
})
public void receiveMessage2(User user) {
    System.out.println(String.format("消费者 【two】: %s", user));
}

消息生产者

@Test
public void directMessageSend() {
	//rabbitTemplate.convertAndSend("direct-ex", "success", new User(2, "张三"));
	rabbitTemplate.convertAndSend("direct-ex", "error", new User(2, "张三"));
	try {
		TimeUnit.SECONDS.sleep(10);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}
5.7 topic模型

消息的消费方

@RabbitListener(bindings = {
	@QueueBinding(value = @Queue,
		key = {"company.#"},
		exchange = @Exchange(value = "topic-ex", type = ExchangeTypes.TOPIC))
})
public void receiveMessage1(User user) {
	System.out.println(String.format("消费者 【one】: %s", user));
}

@RabbitListener(bindings = {
	@QueueBinding(value = @Queue,
		key = {"company.java.#"},
		exchange = @Exchange(value = "topic-ex", type = ExchangeTypes.TOPIC))
})
public void receiveMessage2(User user) {
	System.out.println(String.format("消费者 【two】: %s", user));
}

@RabbitListener(bindings = {
	@QueueBinding(value = @Queue,
		key = {"company.html.*"},
		exchange = @Exchange(value = "topic-ex", type = ExchangeTypes.TOPIC))
})
public void receiveMessage3(User user) {
System.out.println(String.format("消费者 【three】: %s", user));
}

消息生产方

@Test
public void topicMessageSend() {
	//rabbitTemplate.convertAndSend("topic-ex", "company", new User(2, "张三"));
	rabbitTemplate.convertAndSend("topic-ex", "company.java", new User(2, "张三"));
	try {
		TimeUnit.SECONDS.sleep(2);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}
5.8 消息的手动确认

配置

spring:
  rabbitmq:
    listener:
      simple:
        # 提交方式为手动
        acknowledge-mode: MANUAL

提交代码

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

说明:当配置了json反序列化(见5.2节),代码中实例化了SimpleRabbitListenerContainerFactory,会默认覆盖application.yml文件中的配置,需要在代码层面手动的设置提交的方式:

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

六. 事务与confirm机制

6.1RabbitMQ事务

RabbitMQ的事务是对AMQP协议的实现,通过设置Channel 的模式来完成,语句为:

channel.txSelect();  //开启事务
// ....本地事务操作
channel.txCommit();  //提交事务
channel.txRollback(); //回滚事务

特别说明:RabbitMQ的事务机制是同步操作,会极大的降低RabbitMQ的性能。

6.2 Confirm机制

由于RabbitMQ的事务性能的问题,于是就又推出了发送方确认模式。

channel.confirmSelect(); //开启发送方确认模式
6.2.1 单条消息确认
channel.waitForConfirms(); //对于单条消息的确认,返回值为true或者false
6.2.2 批量消息确认
try {
	channel.waitForConfirmsOrDie();  //批量消息确认,如果有一条消息没有发送成功,会抛出异常
}catch (Exception ex) {
	ex.printStackTrace();
}
6.2.3 回调方式确认
channel.confirmSelect();
// mandatory 需要开启
channel.basicPublish("", "tx-queue1", true, "text".getBytes());

channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
      System.out.println("成功达到交换机");
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    	System.out.println("没有到达交换机");
    }
});

// 没有到达队列的时候触发
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, 
                            String routingKey, AMQP.BasicProperties properties, 
                             byte[] body) throws IOException {
    		System.out.println("没有到达队列");
    }
});
6.3 springboot对confirm实现

springboot对confirm第三种机制的实现。

6.3.1 配置
spring:
  rabbitmq:
    # 开启信息是否 回调 到交换机的确认方法,即 setConfirmCallback 方法
    publisher-confirm-type: CORRELATED
6.3.2 编码实现
@Bean("availableTemplate")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate rabbitTemplate = new RabbitTemplate();
		rabbitTemplate.setConnectionFactory(connectionFactory);

		// 开启 setReturnCallback 
		rabbitTemplate.setMandatory(true);

		// exchange告诉程序是否以及到达交换机,该方法无论成功都会回调。如果到达ack为true; 否则为false;
		rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
      	System.out.println(correlationData.getId());
				if(ack) {
             System.out.println("成功到达交换机");
        }else {
             System.out.println("没有到达交换机");
        }
		});

		rabbitTemplate.setReturnCallback((message, replyCode, replyText, ex, rk) -> {
				System.out.println("消息没有到达队列");
		});

		return rabbitTemplate;
}
6.3.3 消息的发送
@Test
public void sendMsg2() {
		// 消息属性
		MessageProperties messageProperties = MessagePropertiesBuilder
						.newInstance().setMessageId(UUID.randomUUID().toString())
						.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
						.build();
		// 消息
		Message message = new Message(JSONObject.toJSONBytes(new User(23, "张三")),
                                  messageProperties);

		CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
		rabbitTemplate.convertAndSend("","tx-queue", message, data);

		try {
				TimeUnit.SECONDS.sleep(120);
		} catch (InterruptedException e) {
				e.printStackTrace();
		}
}

七. 死信队列

我们先假设一个场景,当消息被消费方消费过很多次后,依然无法消费,那么就没有尝试的必要了,我们需要将这类信息放到一个特定的队列中,等待人工的接入。死信队列并不是一个特殊的队列,只是一个普通的队列,只是我们把他们取名叫做死信队列。

死信队列的设计是在某个队列的头信息中设定x-dead-letter-exchange (死信交换机)和x-dead-letter-routing-key(死信路由键)即可。关联到一个绑定到某个死信交换机的队列上。然后给该队列指定过期时间或者指定的消息的过期时间,那么该消息到期后会自动到达死信队列中。

7.1 RabbitAdmin对象
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
     RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
}
7.2 创建死信队列与交换机
@Slf4j
@Component
public class DeadQueue {

    // 死信队列名
    private static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
    // 死信交换机
    private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
    // 死信路由键
    private static final String DEAD_LETTER_ROUTING_KEY = "dead_letter_routing_key";

    private RabbitAdmin rabbitAdmin;

    public DeadQueue(RabbitAdmin rabbitAdmin) {
        this.rabbitAdmin = rabbitAdmin;
    }

    @PostConstruct
    public void initDeadQueue() {
        Queue deadQueue = QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
        rabbitAdmin.declareQueue(deadQueue);  //创建死信队列

        Exchange deadExchange = ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE)
                .durable(true).build();
        rabbitAdmin.declareExchange(deadExchange);  //创建死信交换机

        Binding binding = BindingBuilder.bind(deadQueue).to(deadExchange)
                .with(DEAD_LETTER_ROUTING_KEY).noargs();
        rabbitAdmin.declareBinding(binding);  // 将队列绑定到交换机上
        log.info("死信队列:{}, 死信交换机: {}, 已经成功绑定.", DEAD_LETTER_QUEUE, DEAD_LETTER_EXCHANGE);
    }
}
7.3 死信队列的绑定
/**
 * x-dead-letter-exchange: 死信队列交换机
 * x-dead-letter-routing-key: 死信队列路由键
 * x-message-ttl: 消息在队列中最大的存活时间, 如果没有被消费,就会进入到死信队列
 */
@RabbitListener(bindings = @QueueBinding(
  value = @org.springframework.amqp.rabbit.annotation.Queue(
    value = "msg-queue",
    durable = "true",
    arguments = {@Argument(name = "x-dead-letter-exchange", value = DEAD_LETTER_EXCHANGE),
                 @Argument(name = "x-dead-letter-routing-key", value = DEAD_LETTER_ROUTING_KEY),
                 @Argument(name = "x-message-ttl", value = "20000", type = "java.lang.Long")
                }
  ),
  exchange = @org.springframework.amqp.rabbit.annotation.Exchange(name = "msg-exchange"),
  key = {"msg"}
))
public void receiveMsg(Message message, Channel channel) throws Exception{
  log.info("消息体:{}", new String(message.getBody()));
  channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
7.4 消息发送
@Test
public void sendMsg2() {
  // 消息属性
  MessageProperties messageProperties = MessagePropertiesBuilder
    .newInstance().setMessageId(UUID.randomUUID().toString())
    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
    .setExpiration("5000")  //消息的存活时间,与队列的TTL,取最小的时间,进入死信队列
    .build();
  // 消息
  Message message = new Message(JSONObject.toJSONBytes(new User(23, "message")), messageProperties);

  CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
  rabbitTemplate.convertAndSend("msg-exchange","msg", message, data);

  try {
    TimeUnit.SECONDS.sleep(120);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}
7.5 应用场景

场景一:未支付订单在规定的时间取消。实现的方式为,将订单消息放入到一个队列中,并指定其过期时间。当过期时间到了之后,就进入到了死信队列,那么可以直接在死信队列的消费端取出对应的消息即可。

场景二:某条消息在消费端曾多次尝试消费,但是均未消费成功,那么就进入死信队列,让人工干预。

八. 幂等性

所有的消息中间件都会存在这样一个问题,那就是消息的重复消费问题,例如说记录用户的积分信息,消息每次消费都会生成一条记录,这会队我们的业务带来致命的问题,所以我们必须做幂等性设计,所谓幂等设计就是,一条消息无论消费多少次所产生的结果都是相同的。对应的数学公式为:

f(n) = f(f(n))

8.1 方案一

为每条消息生成全局唯一ID,每次消费消息之后都将ID在表中插入一条数据,每次消费之前先查询ID是否存在,如果不存在就执行对应的逻辑;如果存在则直接确认。

8.2 方案二

利用redis+数据库的方案来实现幂等性的设计,实现的思路与redis的缓存击穿方案类似;当插入数据的时候,将唯一ID同时插入数据库,然后放入到redis中。

九. 消息的重试机制

消息的重试是发生在消息的消费端。

十.消息的可靠性投递

RabbitMQ使用详解

RabbitMQ使用详解

文档参考声明:

CSDN(sessinsong): https://blog.csdn.net/sessionsong/article/details/86317991

简书 (jiangmo):https://www.jianshu.com/p/64357bf35808

面试:a. 如何保证消息不丢失?

​ b. 如何保证消息的不重复消费?

​ c. 如何使用mq来是实现分布式事务?

​ d. 在工作中mq用在哪里?支付回调。文章来源地址https://www.toymoban.com/news/detail-471200.html

到了这里,关于RabbitMQ使用详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Ubuntu下安装openjdk、erlang和rabbitmq

    一、安装Open JDK 1.打开终端,在命令行模式下输入指令,添加OpenJDK的源 sudo add-apt-repository ppa:openjdk-r/ppa sudo apt-get update 2.输入指令,安装OpenJDK 8 sudo apt-get install openjdk-8-jdk 3.安装完成后,可以在命令行模式下输入“java -version”指令验证是否安装成功   二、需要Erlang环境支持

    2024年02月13日
    浏览(52)
  • 【RabbitMQ】RabbitMQ和Erlang下载与安装步骤—2023超详细最新版

    注意事项:四大注意事项 一、下载安装包 (1)下载erlang (2)下载RabbitMQ 二、安装程序 (1)安装erlang (2) 安装RabbitMQ 💟 创作不易,不妨 点赞 💚 评论 ❤️ 收藏 💙 一下 注意事项:四大注意事项 1.首先在下载RabbitMQ和Erlang的安装包时要 注意版本是否对应 ,不然不兼容无法安装启

    2024年02月16日
    浏览(73)
  • Linux快速安装Erlang和RabbitMQ单机版

    CentOS7 Xshell6 XFtp6 Erlang 21.3 RabbitMQ 3.8.4 同一个软件有很多种安装方式,在Linux系统有几种常见的软件安装方式: 源码编译安装:一般需要解压,然后使用 make 、 make install 等命令 RPM(RedHat Package Manager):是一种软件管理包,安装卸载比较简单,但无法解决软件包之间的依赖问

    2024年02月11日
    浏览(53)
  • Erlang、RabbitMQ下载与安装教程(windows超详细)

    目录 安装Erlang 1.首先安装RabbitMQ需要安装Erlang环境  2.点击下载好的.exe文件进行傻瓜式安装,一直next即可 3.配置Erlang环境变量 安装RabbitMQ  1.给出RabbitMQ官网下载址:Installing on Windows — RabbitMQ,找到  2.配置RabbitMQ环境变量,过程跟配置Erlang相似  3.安装管理工具RabbitMQ-Plugins,

    2024年02月03日
    浏览(48)
  • linux安装配置RabbitMQ和Erlang并配置环境变量

    linux 安装RabbitMQ、Erlang 一、安装Erlang erlang安装成功如下图: 二、安装RabbitMQ 成功登陆rabbitmq后台图例如下: 三、如何在阿里云开启自定义的端口号? 最后补充知识: 如何使用阿里云去开启一个端口号呢? 步骤如下: 1.登陆阿里云 2.搜索【云服务器ECS】,在左侧找到【安全

    2024年02月11日
    浏览(39)
  • Mac二进制安装RabbitMQ以及Erlang以及OpenSSL

    由于网络上关于Mac安装RabbitMQ的教程都是基于homebrew安装的,对于没有安装homebrew的同学不是特别友好,以下内容就是直接通过官网下载安装RabbitMQ 1、安装Erlang 由于RabbitMQ是基于Erlang语音开发的所以需要提前在电脑上安装Erlang运行环境 Erlang官网:https://www.erlang.org 进入官网点击DO

    2024年02月05日
    浏览(54)
  • rabbitMQ和Erlang安装后无法访问localhost:15672解决方法

    这个是我rabbitMQ安装在电脑上的位置,具体的要看你最近安装的位置,总之找到 sbin. 就在这里输入cmd即可打开 正常启动的服务应该是如下所示,E 和e 分别表示显性和隐性启动,如果没有E和e,这也就是你打不开localhost:15672的原因所在,可能性很大。那么,就再输入 rabbitmq-plu

    2024年01月16日
    浏览(44)
  • windows安装rabbitmq和环境erlang(最详细版,包括对应关系,安装错误解决方法)

    1.rabbitmq和erlang对应关系如下(详情请查看:https://www.rabbitmq.com/which-erlang.html): 2.这里以mq-3.10.18和erlang-24.3.3为例(虽然官方说明写的3.10.18对应24.4版本的erlang,但是亲测至少需要24.3.2的才能支持mq-3.10.18) 3.erlang下载,链接:https://www.erlang.org/patches/otp-24.3.3 4.mq下载,拉到最下方下

    2024年02月09日
    浏览(40)
  • centos7安装erlang23.3.4.11及rabbitmq3.9.16版本

    rpm包有系统版本要求,el是Red Hat Enterprise Linux(EL)的缩写。 EL7是Red Hat 7.x,Centos 7.x EL8是Red Hat 8.x, Centos 8.x 所以我们在安装erlang及rabbitmq时需要选择与自己的服务器相对应的rpm包 # rabbitmq的rpm安装包 https://github.com/rabbitmq/rabbitmq-server/releases?page=10 # erlang的rpm安装包 https://github.com/

    2024年02月07日
    浏览(40)
  • rabbitmq集群搭建报错:[error] Cookie file /var/lib/rabbitmq/.erlang.cookie must be accessible by owner only

    在创建rabbitmq集群时,需要将当前节点的.erlang.cookie文件数据修改为第一个节点的.erlang.cookie文件内容,这里为了防止手动vim修改导致数据末尾的自动换行符的引入,我使用了文件的直接替换,随后在重启当前的mq节点服务时,报错如下: 结果就是rabbitmq启动失败 随后,执行

    2024年02月15日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包