RabbitMQ:work结构

这篇具有很好参考价值的文章主要介绍了RabbitMQ:work结构。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ:work结构,RabbitMQ,rabbitmq,分布式

> 只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange

>  消费者指定Qoa和手动ack文章来源地址https://www.toymoban.com/news/detail-699545.html

生产者

package com.qf.mq2302.work;

import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {
    public static final String QUEUE_NAME="work";

    public static void main(String[] args) throws Exception {

        //1.获取连接对象
        Connection conn = MQUtils.getConnection();

        //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
        Channel channel = conn.createChannel();

        //3.声明了一个队列
        /**
         * queue – the name of the queue
         * durable – true代表创建的队列是持久化的(当mq重启后,该对立依然存在)
         * exclusive – 该队列是不是排他的 (该对立是否只能由当前创建该队列的连接使用)
         * autoDelete – 该队列是否可以被mq服务器自动删除
         * arguments – 队列的其他参数,可以为null
         */
     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello doubleasdasda!";

        //生产者如何发送消息,使用下面的方法即可
        /**
         * exchange – 交换机的名字 ,如果是空串,说明是把消息发给了默认交换机
         * routingKey – 路由的key,当发送消息给默认交换机时,routingkey代表队列的名字
         * other properties - 消息的其他属性,可以为null
         * body – 消息的内容,注意,要是有 字节数组
         */
        for (int i = 0; i < 21; i++) {
            channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes());
        }
        System.out.println(" [x] Sent '" + message + "'");

        //关闭资源
        channel.close();
        conn.close();
    }
}

消费者一

package com.qf.mq2302.work;

import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;

public class Recv {
   private  final  static  String QUEUE_NAME="work";

    public static void main(String[] args) throws Exception {
        //1.获取连接对象
        Connection conn = MQUtils.getConnection();

        //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
        Channel channel = conn.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中
        DeliverCallback deliverCallback =new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {

            //从Delivery对象中可以获取到生产者,发送的消息的字节数组
                byte[] body = message.getBody();
                String msg = new String(body, "utf-8");

                try {
                    Thread.sleep(400);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                //在这里写消费者的业务逻辑,例如,发送邮件
                System.out.println("消费者01:"+msg);


                //手动ack
                //从message对象中取
                long deliveryTag = message.getEnvelope().getDeliveryTag();
                /**
                 * 第一个参数:消息编号
                 * 第二个参数: false,代表只确认这一个消息
                 */
                channel.basicAck(deliveryTag,false);
            }
        };

        //设置该消费者,每次只能从mq中获取一条消息
        channel.basicQos(1);
        //4.让当前消费者开始消费(QUEUE_NAME)队列中的消息
        /**
          *把消费者的确认模式,设置为 手动 ack
         *
         */
      channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag -> {});





    }





}

消费者二

package com.qf.mq2302.work;

import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;

public class Recv02 {
   private  final  static  String QUEUE_NAME="work";

    public static void main(String[] args) throws Exception {
        //1.获取连接对象
        Connection conn = MQUtils.getConnection();

        //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
        Channel channel = conn.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中
        DeliverCallback deliverCallback =new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {

            //从Delivery对象中可以获取到生产者,发送的消息的字节数组
                byte[] body = message.getBody();
                String msg = new String(body, "utf-8");

                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                //在这里写消费者的业务逻辑,例如,发送邮件
                System.out.println("消费者02:"+msg);


                long deliveryTag = message.getEnvelope().getDeliveryTag();
                channel.basicAck(deliveryTag,false);
            }
        };
        //注意:这个是可以存三个,而不是一次发三个
        channel.basicQos(3);
        //4.让当前消费者开始消费(QUEUE_NAME)队列中的消息
        /**
         * queue – the name of the queue
         * autoAck – true 代表当前消费者是不是自动确认模式。true代表自动确认。
         * deliverCallback – 当有消息发送给该消费者时,消费者如何处理消息的逻辑
         * cancelCallback – 当消费者被取消掉时,如果要执行代码,写到这里
         */
      channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag -> {});





    }





}

到了这里,关于RabbitMQ:work结构的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 微服务学习:SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    目录 一、高级篇 二、面试篇 ==============实用篇============== day05-Elasticsearch01 1.初识elasticsearch 1.4.安装es、kibana 1.4.1.部署单点es 1.4.2.部署kibana 1.4.3.安装IK分词器 1.4.4.总结 2.索引库操作 2.1.mapping映射属性 2.2.索引库的CRUD 2.2.1.创建索引库和映射 2.2.2.查询索引库 2.2.3.修改索引库 2.

    2024年02月02日
    浏览(55)
  • 分布式消息队列:Kafka vs RabbitMQ vs ActiveMQ

    在现代分布式系统中,消息队列是一种常见的异步通信模式,它可以帮助系统处理高并发、高可用性以及容错等问题。在这篇文章中,我们将深入探讨三种流行的分布式消息队列:Apache Kafka、RabbitMQ和ActiveMQ。我们将讨论它们的核心概念、算法原理、特点以及使用场景。 随着

    2024年02月02日
    浏览(60)
  • Python爬虫分布式架构 - Redis/RabbitMQ工作流程介绍

    在大规模数据采集和处理任务中,使用分布式架构可以提高效率和可扩展性。本文将介绍Python爬虫分布式架构中常用的消息队列工具Redis和RabbitMQ的工作流程,帮助你理解分布式爬虫的原理和应用。 为什么需要分布式架构? 在数据采集任务中,单机爬虫可能面临性能瓶颈和资

    2024年02月11日
    浏览(40)
  • (黑马出品_07)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 聚合 可以让我们极其方便

    2024年03月12日
    浏览(53)
  • 分布式搜索引擎(Elastic Search)+消息队列(RabbitMQ)部署(商城4)

    1、全文搜索 Elastic search可以用于实现全文搜索功能,例如商城中对商品搜索、搜索、分类搜索、订单搜索、客户搜索等。它支持复杂的查询语句、中文分词、近似搜索等功能,可以快速地搜索并返回匹配的结果。 2、日志分析 Elastic search可以用于实现实时日志分析,例

    2024年02月04日
    浏览(48)
  • 分布式 SpringCloudAlibaba、Feign与RabbitMQ实现MySQL到ES数据同步

    本文参考黑马 分布式Elastic search Elasticsearch是一款非常强大的开源搜索引擎,具备非常多强大功能,可以帮助我们从海量数据中快速找到需要的内容 同步调用 方案一:同步调用 基本步骤如下: hotel-demo对外提供接口,用来修改elasticsearch中的数据 酒店管理服务在完成数据库操

    2024年04月11日
    浏览(45)
  • 【103期】RabbitMQ 实现多系统间的分布式事务,保证数据一致性

    org.springframework.boot spring-boot-starter-amqp mysql mysql-connector-java runtime org.projectlombok lombok true org.springframework.boot spring-boot-starter-jdbc com.alibaba fastjson 1.2.17 3.2.1.2配置文件内容: server: port: 8080 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/test?useUnicode=tru

    2024年04月14日
    浏览(64)
  • (黑马出品_高级篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 消息队列在使用过程中,面

    2024年03月19日
    浏览(46)
  • SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈

    我们发现在微服务中有一个令人头疼的问题——部署,用Docker去解决这个部署难题 1、项目部署的问题 2、Docker 扔到一台机器上,它们的依赖难道没有干扰吗?不会,docker将打包好的程序放到一个隔离容器去运行,使用沙箱机制,避免互相干扰,之间不可见,这样就解决了混

    2023年04月24日
    浏览(44)
  • RabbitMQ:高效传递消息的魔法棒,一篇带你助力构建可靠的分布式系统(上篇)

    MQ是消息队列( Message Queue )的缩写,是一种在应用程序之间传递消息的技术。通常用于 分布式系统 或 异步通信 中,其中 发送者 将消息放入队列,而 接收者 从队列中获取消息。 这种异步通信模式允许发送者和接收者在不需要实时连接的情况下进行通信,从而提高了应用

    2024年02月15日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包