RabbitMQ基于Java实现消息应答

这篇具有很好参考价值的文章主要介绍了RabbitMQ基于Java实现消息应答。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ

概念

RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站, 一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据、|

四大核心概念

生产者

产生数据发送消息的程序时生产者

交换机

交换机是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。

队列

队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。 许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。 请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

工作原理

RabbitMQ基于Java实现消息应答,后端,rabbitmq,java

Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker 一个交换机(Exchange)对应多个队列(Queue) 每一个生产者(Producer)与RabbitMQ Server建立连接,每一个连接(Connection)有多个信道(channel),在连接中通过信道发送信息给Broker。

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection ,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效力也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别channel,所以 channel 之间是完全隔离的。Channel 作为轻量级 Connection 极大减少了操作系统建立TCP connection的开销。

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct(point-to-point 点到点),topic(publish-subscribe)和 fanout(multicast)

Virrual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个RabbitMQ提供的服务时,可以划分出多个 virtual host(简称vhost),每个用户在自己的vhost创建 exchange/queue等

多租户:一个Broker中可以有多个用户,一个用户有自己的Exchange

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中 可以包含routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

Producer 发出消息 -> Connection(Channel) -> Broker -> Exchange -> 匹配查询表中的 routing key -> 分发到对应的queue中 -> Connection(Channel) -> 给对应的Consumer

为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。

Java实现

1.导入依赖

RabbitMQ基于Java实现消息应答,后端,rabbitmq,java

2.生产者代码

RabbitMQ基于Java实现消息应答,后端,rabbitmq,java

3.消费者代码

RabbitMQ基于Java实现消息应答,后端,rabbitmq,java

工作队列

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。(异步处理

一个消息只能被处理一次,不可以被处理多次。

轮询分发消息

工作队列轮流接收消息

消息应答

概念

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理了一个长的任务并仅完成了部分突然它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为消费者无法接收。

为了保证消息在发送过程中不丢失,RabbitMQ引入消息应答机制,消息应答就是:消费者在收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者端出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制。当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用于在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答

消息在手动应答时是不丢失的,失败将会放回队列中重新消费。

方法
  • Channel.basicAck(DeliverTag deliverTag, multiple)(用于肯定确认)

    RabbitMQ 已知到该消息并且成功的处理消息,可以将其丢弃了

  • Channel.basicNack(CancelTag cancelTag , multiple)(用于否定确认)

  • Channel.basicReject(CancelTag cancelTag )(用于否定确认)

    不处理该消息了,直接拒绝,将消息丢弃

Multipart

手动应答的好处是可以批量应答并且减少网络拥堵

multiple的true和false 代表不同意思

true代表批量应答 channel 上未应答的消息

比如说 channel 上有传送 tag 的消息 5,6,7,8,当前 tag 是 8,那么此时5-8这些还未应答的消息将会被确认到消息应答

true 会应答前面未确认的消息

false同上面相比

只会应答 tag=8 的消息,5,6,7这三个消息依然不会被确认收到消息应答

false 只会应答当前成功处理的消息

消息确认

单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认成功发布后,后续的消息才能继续发布,waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能以及足够了。

// 单个确认
    public static void publishMessageIndividual()throws Exception{
        Channel channel = RabbitMQUtils.getChannel();
        // 队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        // 开启时间
        long begin = System.currentTimeMillis();
​
        // 批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            // 单个消息就马上进行发布确认
            boolean flag = channel.waitForConfirms();
            if (flag){
                System.out.println("消息发送成功");
            }else {
                System.out.println("发布失败");
            }
        }
        // 结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "条单独确认的消息,耗时:" + (end - begin) + "ms");
    }
批量确认发布

单个确认发布方式非常慢,与其相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

// 批量发布确认
    public static void publicMessageBatch() throws Exception{
        // 获取信道
        Channel channel = RabbitMQUtils.getChannel();
        // 创建队列
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        // 开始时间
        long begin = System.currentTimeMillis();
​
        // 批量确认消息的大小
        int basicSize = 100;
​
        // 未确认消息个数
​
        // 发布消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            if (i % 100 == 0){
                channel.waitForConfirms();
            }
        }
        // 最后确认发布
        channel.waitForConfirms();
​
        // 结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "条批量确认的消息,耗时:" + (end - begin) + "ms");
    }
异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,它是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。

如何处理异步未确认消息

最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如所用 ConcurrentLinkedQueue 这个队列在从firm callbacks 与发布线程之间的消息传递。

// 异步发布确认
    public static void publishMessageAsync() throws Exception {
        // 创建信道
        Channel channel = RabbitMQUtils.getChannel();
        // 创建队列
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
​
        /**
         * 线程安全有序的一个哈希表 适用于高并发的情况下
         * 1. 轻松的将序号与消息进行管理那
         * 2. 轻松批量删除条目 只需要序号
         * 3. 支持高并发
         */
        ConcurrentSkipListMap<Long, Object> outstandingConfirms = new ConcurrentSkipListMap<>();
​
        // 准备消息的监听器 监听哪些消息成功了 哪些消息失败了
        // 消息确认成功 回调函数
        /**
         * 1、 deliveryTag 消息的标记 默认从1开始
         * 2、 multiple 是否为批量确认
         */
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
            // 删除已经确认的消息 剩下的就是未确认的消息
            if (multiple){
                // 批量发消息 批量删除
                ConcurrentNavigableMap<Long, Object> confirmed =
                        outstandingConfirms.headMap(deliveryTag);
                confirmed.clear();
            }else {
                outstandingConfirms.remove(deliveryTag);
            }
            System.out.println("确认的消息" + deliveryTag);
        };
        // 消息确认失败 回调函数
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            // 有哪些未确认的消息
            String message = (String) outstandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息是" + message);
            System.out.println("未确认的消息" + deliveryTag);
        };
        /**
         * 1、监听哪些消息成功了
         * 2、监听哪些消息失败了
         */
        channel.addConfirmListener(ackCallback, nackCallback);
​
        // 开始时间
        long begin = System.currentTimeMillis();
        // 批量发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "消息" + i;
            // 在此处记录下所有要发送的消息
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
            channel.basicPublish("", queueName, null, message.getBytes());
        }
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "条异步发布确认的消息,耗时:" + (end - begin) + "ms");
    }
总结

单独发布消息:

同步等待确认,简单,但吞吐量非常有限

批量发布消息:

批量同步等待确认,简单,合理的吞吐量,一旦出现问题很难知道具体是哪条消息出现了问题

异步处理:

最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现较复杂文章来源地址https://www.toymoban.com/news/detail-846007.html

到了这里,关于RabbitMQ基于Java实现消息应答的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ 消息应答

    物是人非事事休,欲语泪先流。 为了保证消息在发送过程中不丢失,RabbitMQ引入了消息应答机制, 消费者在接收到消息并且处理该消息后,告诉RabbitMQ它已经处理了,RabbitMQ可以把消息删除了。 消息发送后立即被认为已经传送成功, 这种模式需要在高吞吐量和数据传输安全性

    2024年02月08日
    浏览(38)
  • RabbitMQ 消息应答与发布

    目录 一、消息应答 1、自动应答(默认) 2、手动消息应答的方法  ​编辑 3、消息重新入队 4、手动应答案列与效果演示 二、RabbitMQ持久化 1、队列持久化 2、消息持久化 三、不公平分发(能者多劳,弱者少劳) 1、介绍 2、效果演示 四、预取值分发 1、介绍 五、发布确认 六

    2024年02月06日
    浏览(37)
  • RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递

    我们先看一串代码,并思考一下为什么要先入库然后发MQ: 如果先发MQ的话,如果入库失败,就会导致MQ消息无法回滚了。今天我们就好好聊一聊RabbitMQ消息可靠投递的问题。 ① 消息从生产者发送到Broker 生产者把消息发送到Broker之后,如何知道自己的消息有没有被Broker成功接

    2024年02月11日
    浏览(55)
  • 消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳)

    Work Queues— 工作队列 (又称任务队列) 的主要思想是避免立即执行资源密集型任务,而不得不等待它完成 。 我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务 。 轮训分发消

    2024年02月21日
    浏览(56)
  • RabbitMQ系列(5)--使用Java实现RabbitMQ的消费者接收消息

    前言:先简单了解RabbitMQ的工作过程,方便后续开发理清思路 简略: 详细: 1、新建消费者类 效果图: 2、编写消费者消费消息的代码 例: 3、查看代码运行结果 运行代码后如果有输出生产者发送的”Hello World”信息,则证明消费者消费消息成功 4、在web页面上查看队列的消

    2024年02月06日
    浏览(43)
  • RabbitMQ可靠性消息发送(java实现)

    本博客属于 《RabbitMQ基础组件封装—整体结构》的子博客 step1:消息落库,业务数据存库的同时,也要将消息记录存入数据库,二者要保证原子性; step2:Producer发送消息到MQ Broker; step3:Producer收到 broker 返回的确认消息; step4:更改消息记录库的状态(定义三种状态:0待确

    2024年02月04日
    浏览(70)
  • 【Java】微服务——RabbitMQ消息队列(SpringAMQP实现五种消息模型)

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年02月08日
    浏览(65)
  • RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)

    在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列-RabbitMO.代表使用者保留的消息缓冲区 第一步:导入依赖 第二步:创建生产者 第三步:创建消费者 因为你为了确保同一条消息被其中一个工作线程接收到了之后,其它工作就不能消费的到了 三者

    2023年04月14日
    浏览(43)
  • Java中如何使用消息队列实现异步(ActiveMQ,RabbitMQ,Kafka)

    在 Java 中,可以使用消息队列实现异步处理。下面是一个简单的示例代码,用于说明如何使用 ActiveMQ 实现消息队列异步处理: 添加 ActiveMQ 依赖 在 pom.xml 文件中添加以下依赖: 创建消息队列 创建一个名为 “TestQueue” 的消息队列,并配置 ActiveMQ 连接信息: 创建消息消费者

    2024年02月16日
    浏览(57)
  • java操作rabbitmq实现简单的消息发送(socket编程的升级)

    准备: 1.下载rabbitmq并搭建环境(和python那篇一样:http://www.cnblogs.com/g177w/p/8176797.html) 2.下载支持的jar包(http://repo1.maven.org/maven2/com/rabbitmq/amqp-client) 生产者方(Productor.java): View Code 消费者方(Consummer.java):

    2023年04月08日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包