【初始RabbitMQ】工作队列的实现

这篇具有很好参考价值的文章主要介绍了【初始RabbitMQ】工作队列的实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

工作队列

工作队列(又称为任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务

轮训分发消息

我们启动两个工作线程,一个消息发送线程,一个用来接受线程,我们来看看它们两个工作线程是如何工作的

抽取工具类

我们将获取信道这个重复的代码封装为一个类,当时用的时候直接调用

/**
 * 连接工厂创建信道工具类
 */
public class RabbitMqUtils {
    public static Channel getChannel(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("118.31.6.132");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = null;
        try {
            try {
                connection = factory.newConnection();
            } catch (IOException e) {
                e.printStackTrace();
            }
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        Channel channel = null;
        try {
            channel = connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return channel;
    }
}

这里使用try...catch防止之后每次调用都需要抛出异常

生产者代码

/**
 * 生产者 发送大量消息
 */
public class Task01 {
    //队列名称
    public static final String QUEUE_NAME = "hello";
    //发送大量的消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        /*
         *生成一个队列
         * 1.队列名称
         * 2.队列里面的信息是否持久化(磁盘)默认情况时在内存
         * 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许
         * 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除
         * 5.其他参数 延迟消息等
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //从控制台接受消息
        Scanner scanner = new Scanner(System.in);
        /**
         * 发送一个消息
         * 1.发送到那个交换机
         * 2.路由的KEY值是哪个 本次是队列的名称
         * 3.其他参数信息
         * 4.发送消息的消息体
         */
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("发送"+message+"完成");
        }
    }
}

消费者代码

/**
 * 这是一个工作线程(消费者)
 */
public class Worker01 {
    //队列名称
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //消息接受
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println(new String(message.getBody()));
        };
        //消息接受被取消时
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消息取消消费接口回调");
        };
        /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者微车才能更改消费的回调
         * 4.消费者取消消费回调
         */
        System.out.println("C2等待接受消息......................");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

启动两个工作线程

在运行之前我们要修改一个选项,这样我们就不需要重复的写消费者2了

【初始RabbitMQ】工作队列的实现,消息队列,java,开发语言

【初始RabbitMQ】工作队列的实现,消息队列,java,开发语言

启动一个发送线程 

启动程序,用生产者发送四条消息,

【初始RabbitMQ】工作队列的实现,消息队列,java,开发语言

结果分析

通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且 是按照有序的一个接收一次消息

【初始RabbitMQ】工作队列的实现,消息队列,java,开发语言

 消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

消息应答的方法

  1. Channel.basicAck(用于肯定确认)
    1. RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
  2. Channel.basicNack(用于否定确认)
  3. Channel.basicReject(用于否定确认)
    1. 与 Channel.basicNack 相比少一个参数(批量应答参数) 不处理该消息了直接拒绝,可以将其丢弃了

Multiple(批量应答)的解释

手动应答的一个好处就是可以批量应答并且减少网络拥堵

【初始RabbitMQ】工作队列的实现,消息队列,java,开发语言

multiple 的 true 和 false 代表不同意思:

true:代表批量应答channel 上未应答的消息,比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答

false:同上面相比 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

【初始RabbitMQ】工作队列的实现,消息队列,java,开发语言

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者 可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息【初始RabbitMQ】工作队列的实现,消息队列,java,开发语言

消息手动应答代码

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改 为手动应答,消费者在上面代码的基础上增加下面画红色部分代码

【初始RabbitMQ】工作队列的实现,消息队列,java,开发语言

睡眠工具类:

public class SleepUtils {
    public static void sleep(int second){
        try {
            Thread.sleep(1000*second);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

生产者:

/*
* 消息再手动应答不丢失、放回消息队列重新消费
 */
public class Task2 {
    //队列名称
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明队列
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        //从控制台中输入信息
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发出消息:"+message);
        }
    }
}

消费者01:

public class Work01 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1等待接收消息短");
        //消息消费的时候如何处置消息
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(1);
            System.out.println("接收到消息:"+message);
            /**
             * 1.消息标记tag
             * 2.是否批量应答未应答的消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        //取消消息的回调
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者微车才能更改消费的回调
         * 4.消费者取消消费回调
         */
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

消费者02:

public class Work02 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2等待接收消息长");
        //消息消费的时候如何处置消息
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(30);
            System.out.println("接收到消息:"+message);
            /**
             * 1.消息标记tag
             * 2.是否批量应答未应答的消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        //取消消息的回调
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者微车才能更改消费的回调
         * 4.消费者取消消费回调
         */
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

手动应答效果演示

在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是 由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了, 此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了【初始RabbitMQ】工作队列的实现,消息队列,java,开发语言文章来源地址https://www.toymoban.com/news/detail-833783.html

到了这里,关于【初始RabbitMQ】工作队列的实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳)

    Work Queues— 工作队列 (又称任务队列) 的主要思想是避免立即执行资源密集型任务,而不得不等待它完成 。 我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务 。 轮训分发消

    2024年02月21日
    浏览(53)
  • 【Java】微服务——RabbitMQ消息队列(SpringAMQP实现五种消息模型)

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年02月08日
    浏览(60)
  • Java中如何使用消息队列实现异步(ActiveMQ,RabbitMQ,Kafka)

    在 Java 中,可以使用消息队列实现异步处理。下面是一个简单的示例代码,用于说明如何使用 ActiveMQ 实现消息队列异步处理: 添加 ActiveMQ 依赖 在 pom.xml 文件中添加以下依赖: 创建消息队列 创建一个名为 “TestQueue” 的消息队列,并配置 ActiveMQ 连接信息: 创建消息消费者

    2024年02月16日
    浏览(55)
  • 消息队列——RabbitMQ基本概念+容器化部署和简单工作模式程序

    目录 基本概念  MQ 的优势  1.应用解耦  2.异步提速  3.削峰填谷  MQ 的劣势 使用mq的条件  常见MQ产品  RabbitMQ简介 RabbitMQ的六种工作模式   JMS RabbitMQ安装和配置。 RabbitMQ控制台使用。 RabbitMQ快速入门——生产者 需求: RabbitMQ快速入门——消费者 小结  多个系统之间的通信方

    2024年02月16日
    浏览(44)
  • 【初始RabbitMQ】死信队列的实现

    死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的 原因导致 queue 中的某些消息无法被消费 ,这样的消息如果没有 后续的处理,就变成了死

    2024年02月21日
    浏览(33)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(74)
  • Java RabbitMQ消息队列简单使用

    消息队列,即MQ,Message Queue。 消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

    2024年02月12日
    浏览(58)
  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(52)
  • 参考RabbitMQ实现一个消息队列

    消息队列的本质就是阻塞队列,它的最大用途就是用来实现生产者消费者模型,从而实现 解耦合 以及 削峰填谷 。 在分布式系统中不再是单个服务器而是服务器“集群”,如果我们我们直接A服务器给B服务器发送请求,B服务器给A服务器返回响应,这样的话我们AB的耦合较大

    2024年02月14日
    浏览(84)
  • 《消息队列MyMQ》——参考RabbitMQ实现

    目录 一、什么是消息队列? 二、需求分析 1)核心概念 2)核心API 3)交换机类型 4)持久化 5)网络通信 ​编辑 6)消息应答 三、 模块划分 四、创建核心类 1.ExChange 2.MSGQueue  3.Binding 4. Message 五. 数据库设计  1.配置 sqlite 引⼊ pom.xml 依赖  配置数据源 application.yml 2.实现创建

    2024年02月04日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包