RabbitMQ的确认机制

这篇具有很好参考价值的文章主要介绍了RabbitMQ的确认机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ的确认机制

生产者确认

public class ProductionMessageConfirm
{
    public static void Send()
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.HostName = "localhost";//RabbitMQ服务在本地运行
        factory.UserName = "guest";//用户名
        factory.Password = "guest";//密码 
        using (var connection = factory.CreateConnection())
        {
            //创建通道channel
            using (var channel = connection.CreateModel())
            {
                Console.WriteLine("生产者准备就绪....");
                channel.QueueDeclare(queue: "ConfirmSelectQueue", durable: true, exclusive: false, 
                                     autoDelete: false, arguments: null);
                //声明交换机exchang
                channel.ExchangeDeclare(exchange: "ConfirmSelectQueueExchange", type: ExchangeType.Direct, durable: true,
                                        autoDelete: false, arguments: null);
                //绑定exchange和queue
                channel.QueueBind(queue: "ConfirmSelectQueue", exchange: "ConfirmSelectQueueExchange", 
                                  routingKey: "ConfirmSelectKey");
                string message = "";

                //发送消息
                //在控制台输入消息,按enter键发送消息
                while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                {
                    message = Console.ReadLine();
                    var body = Encoding.UTF8.GetBytes(message);
                    try
                    {
                        //开启消息确认模式
                        channel.ConfirmSelect();//基于当前这个channel 开启消息确认机制 
                        //发送消息
                        channel.BasicPublish(exchange: "ConfirmSelectQueueExchange", routingKey: "ConfirmSelectKey", 
                                             basicProperties: null, body: body);
                        //事务提交 

                        //调用WaitForConfirms 方法来判断如果消息正常发送,就返回true,否则就返回false
                        if (channel.WaitForConfirms()) //如果一条消息或多消息都确认发送
                        {
                            Console.WriteLine($"【{message}】发送到Broke成功!");
                        }
                        else
                        {
                            //可以记录个日志,重试一下;
                        }

                        //通过当前信道,调用WaitForConfirmsOrDie,如果所有消息发送成功 就正常执行;如果有消息发送失败;就抛出异常;
                        //channel.WaitForConfirmsOrDie();//如果所有消息发送成功 就正常执行;如果有消息发送失败;就抛出异常;
                    }
                    catch (Exception)
                    {
                        Console.WriteLine($"【{message}】发送到Broker失败!");
                        //就应该通知管理员
                        // 重新试一下
                    }
                }
                Console.Read();
            }
        }
    }
}

生产者事务版确认

public class ProductionMessageTx
{
    public static void Send()
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.HostName = "localhost";//RabbitMQ服务在本地运行
        factory.UserName = "guest";//用户名
        factory.Password = "guest";//密码 
        using (var connection = factory.CreateConnection())
        {
            //创建通道channel
            using (var channel = connection.CreateModel())
            {
                Console.WriteLine("生产者准备就绪....");
                channel.QueueDeclare(queue: "MessageTxQueue01", durable: true, exclusive: false, 
                                     autoDelete: false, arguments: null);
                channel.QueueDeclare(queue: "MessageTxQueue02", durable: true, exclusive: false, 
                                     autoDelete: false, arguments: null);
                //声明交换机exchang
                channel.ExchangeDeclare(exchange: "MessageTxQueueExchange", type: ExchangeType.Direct, durable: true, 
                                        autoDelete: false, arguments: null);
                //绑定exchange和queue
                channel.QueueBind(queue: "MessageTxQueue01", exchange: "MessageTxQueueExchange", 
                                  routingKey: "MessageTxKey01");
                channel.QueueBind(queue: "MessageTxQueue02", exchange: "MessageTxQueueExchange", 
                                  routingKey: "MessageTxKey02");
                string message = "";
                //发送消息
                //在控制台输入消息,按enter键发送消息
                while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                {
                    message = Console.ReadLine();
                    var body = Encoding.UTF8.GetBytes(message);
                    try
                    {
                        //通过当前信道调用TxSelect方法开启事务机制---RabbitMQ  AMQP协议,强事务保证;
                        channel.TxSelect(); //事务是协议支持的
                        //发送消息
                        //同时给多个队列发送消息;要么都成功;要么都失败;

                        //执行下面这两句话,都不会吧消息发送到队列中去;
                        channel.BasicPublish(exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey01", 
                                             basicProperties: null, body: body);
                        channel.BasicPublish(exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey02", 
                                             basicProperties: null, body: body);
                        //事务提交
                        channel.TxCommit(); //只有事务提交成功以后,才会真正的写入到队列里面去
                        //如果失败---抛出异常;
                        Console.WriteLine($"【{message}】发送到Broke成功!");
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine($"【{message}】发送到Broker失败!");
                        channel.TxRollback(); //事务回滚   或者重试一下;
                        throw;
                    }
                }
                Console.Read();
            }
        }
    }
}

消费者确认

自动确认

public class ConsumerMessageConfirm
{
    public static void Consumption()
    {
        var factory = new ConnectionFactory();
        factory.HostName = "localhost";//RabbitMQ服务在本地运行
        factory.UserName = "guest";//用户名
        factory.Password = "guest";//密码 
        using (var connection = factory.CreateConnection())
        {
            using (IModel channel = connection.CreateModel())
            {
                #region EventingBasicConsumer
                //定义消费者                                      
                var consumer = new EventingBasicConsumer(channel);
                int i = 1;
                consumer.Received += (model, ea) =>
                {
                    if (i == 11)
                    {
                        throw new Exception("");
                    }
                    i++;
                    Console.WriteLine(Encoding.UTF8.GetString(ea.Body.ToArray()));
                };
                Console.WriteLine("消费者准备就绪....");
                //处理消息 
                //autoAck: true 自动确认:消息队列的消息直接推送给当前这个消费者;
                //照单全收,消费是否正常消费,RabbitMQ不管;
                channel.BasicConsume(queue: "PersistenceQueue", autoAck: true, consumer: consumer);
                Console.ReadKey();
                #endregion
            }
        }
    }
}

消费后确认文章来源地址https://www.toymoban.com/news/detail-604865.html

public class ConsumptionACKConfirm
{
    public static void Consumption()
    {
        var factory = new ConnectionFactory();
        factory.HostName = "localhost";//RabbitMQ服务在本地运行
        factory.UserName = "guest";//用户名
        factory.Password = "guest";//密码 
        using (var connection = factory.CreateConnection())
        {
            using (IModel channel = connection.CreateModel())
            {
                #region EventingBasicConsumer
                //定义消费者                                      
                var consumer = new EventingBasicConsumer(channel);
                int i = 0;
                consumer.Received += (model, ea) =>
                {
                    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                    //如果在这里处理消息的手,异常了呢? 
                    //Console.WriteLine($"接收到消息:{message}"); ; 
                    if (i < 50)
                    {
                        //手动确认  消息正常消费  告诉Broker:你可以把当前这条消息删除掉了
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        Console.WriteLine(message);
                    }
                    else
                    {
                        //否定:告诉Broker,这个消息我没有正常消费;  requeue: true:重新写入到队列里去; false:你还是删除掉;
                        channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
                    }
                    i++;
                };
                Console.WriteLine("消费者准备就绪....");
                {
                    //处理消息 
                    //autoAck: true 自动确认; 
                    //channel.BasicConsume(queue: "ConsumptionACKConfirmQueue", autoAck: true, consumer: consumer);
                }
                {
                    //处理消息 
                    //autoAck: false  显示确认; 
                    channel.BasicConsume(queue: "PersistenceQueue", autoAck: false, consumer: consumer);
                }
                 
                Console.ReadKey();
                #endregion
            }
        }
    }
}

到了这里,关于RabbitMQ的确认机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ——解决分布式事务问题,RabbitMQ的重要作用之一!!!通过可靠生产和可靠消费来完美解决!

    分布式事务是指涉及多个独立的计算机系统(也称为节点或参与者)之间的事务处理。在分布式系统中,每个节点可能各自拥有自己的数据存储和事务管理机制。分布式事务的目标是保证在跨多个节点执行的一系列操作可以以一致和可靠的方式执行和提交,即使在面对故障或

    2024年04月23日
    浏览(48)
  • Python爬虫分布式架构 - Redis/RabbitMQ工作流程介绍

    在大规模数据采集和处理任务中,使用分布式架构可以提高效率和可扩展性。本文将介绍Python爬虫分布式架构中常用的消息队列工具Redis和RabbitMQ的工作流程,帮助你理解分布式爬虫的原理和应用。 为什么需要分布式架构? 在数据采集任务中,单机爬虫可能面临性能瓶颈和资

    2024年02月11日
    浏览(45)
  • 分布式消息队列:Kafka vs RabbitMQ vs ActiveMQ

    在现代分布式系统中,消息队列是一种常见的异步通信模式,它可以帮助系统处理高并发、高可用性以及容错等问题。在这篇文章中,我们将深入探讨三种流行的分布式消息队列:Apache Kafka、RabbitMQ和ActiveMQ。我们将讨论它们的核心概念、算法原理、特点以及使用场景。 随着

    2024年02月02日
    浏览(63)
  • 微服务学习:SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    目录 一、高级篇 二、面试篇 ==============实用篇============== day05-Elasticsearch01 1.初识elasticsearch 1.4.安装es、kibana 1.4.1.部署单点es 1.4.2.部署kibana 1.4.3.安装IK分词器 1.4.4.总结 2.索引库操作 2.1.mapping映射属性 2.2.索引库的CRUD 2.2.1.创建索引库和映射 2.2.2.查询索引库 2.2.3.修改索引库 2.

    2024年02月02日
    浏览(59)
  • (黑马出品_07)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 聚合 可以让我们极其方便

    2024年03月12日
    浏览(56)
  • 分布式搜索引擎(Elastic Search)+消息队列(RabbitMQ)部署(商城4)

    1、全文搜索 Elastic search可以用于实现全文搜索功能,例如商城中对商品搜索、搜索、分类搜索、订单搜索、客户搜索等。它支持复杂的查询语句、中文分词、近似搜索等功能,可以快速地搜索并返回匹配的结果。 2、日志分析 Elastic search可以用于实现实时日志分析,例

    2024年02月04日
    浏览(51)
  • 分布式 SpringCloudAlibaba、Feign与RabbitMQ实现MySQL到ES数据同步

    本文参考黑马 分布式Elastic search Elasticsearch是一款非常强大的开源搜索引擎,具备非常多强大功能,可以帮助我们从海量数据中快速找到需要的内容 同步调用 方案一:同步调用 基本步骤如下: hotel-demo对外提供接口,用来修改elasticsearch中的数据 酒店管理服务在完成数据库操

    2024年04月11日
    浏览(46)
  • (黑马出品_高级篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 消息队列在使用过程中,面

    2024年03月19日
    浏览(48)
  • 【103期】RabbitMQ 实现多系统间的分布式事务,保证数据一致性

    org.springframework.boot spring-boot-starter-amqp mysql mysql-connector-java runtime org.projectlombok lombok true org.springframework.boot spring-boot-starter-jdbc com.alibaba fastjson 1.2.17 3.2.1.2配置文件内容: server: port: 8080 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/test?useUnicode=tru

    2024年04月14日
    浏览(65)
  • SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈

    我们发现在微服务中有一个令人头疼的问题——部署,用Docker去解决这个部署难题 1、项目部署的问题 2、Docker 扔到一台机器上,它们的依赖难道没有干扰吗?不会,docker将打包好的程序放到一个隔离容器去运行,使用沙箱机制,避免互相干扰,之间不可见,这样就解决了混

    2023年04月24日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包