.NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

这篇具有很好参考价值的文章主要介绍了.NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、安装mq

二、实操

1、简单模式

2、工作模式

3、fanout扇形模式(发布订阅)

4、direct路由模式也叫定向模式

5、topic主题模式也叫通配符模式(路由模式的一种)

6、header 参数匹配模式

7、延时队列(插件方式实现)

参考资料:


一、安装mq

1、我的环境是使用VMware安装的Centos7系统。MQ部署在docker上面

使用Docker部署RabbitMQ_KiriSoyer的博客-CSDN博客_docker 部署rabbitmq

2、创建公共项目Commons用于提供者和消费者引用,nuget安装 RabbitMQ.Client,添加一个帮助类:

public class RabbitMQHelper
    {

        //连接mq
        public static IConnection GetMQConnection()
        {
            var factory = new ConnectionFactory
            {
                HostName = "127.0.0.1",  //mq的ip(我自己虚拟机上的)
                Port = 5672, //端口
                UserName = "guoyingjian",  //账户
                Password = "guoyingjian",  //密码
                VirtualHost = "/" //虚拟机 
            };
            return factory.CreateConnection();  //返回连接
        }
    }

二、实操

rabbitmq消息队列有几种模式:

1、简单模式

一个提供者,一个消费者,是有序的,消费者只有一个,吞吐量低,工作基本不用,用来学习了解一下还是可以的

2、工作模式

根据队列名发消息,但有多个消费者,无序的,吞吐量高,1和2工作中基本不用,因为他们没有使用自定义交换机,练练手明白就行了。

生产者代码:

using RabbitMQ.Client;

        /// <summary>
        /// MQ 工作队列模式发消息
        /// </summary>
        /// <returns></returns>
        public void SendWorkerMQ()
        {
            //最基础的是点对点的队列模式,他的优势是有序的,

            //下面这个工作队列是无序的
            #region 工作队列模式
            string queueName = "WorkQueue";
            using (var connection = RabbitMQHelper.GetMQConnection())
            {
                //创建通信管道
                using (var channel = connection.CreateModel())
                {
                    //创建队列
                    channel.QueueDeclare(queueName, false, false, false, null);
                    for (int i = 1; i <= 30; i++)
                    {
                        string message = "hello mq" + i;
                        var body = Encoding.UTF8.GetBytes(message);
                        //发送消息到mq,如没绑定交换机,将使用默认交换机路由
                        channel.BasicPublish(exchange: "", routingKey: queueName, null, body);
                        Console.WriteLine("send normal message" + i);
                    }
                }
            }
            #endregion
        }

消费者代码:

//工作队列接收消息(多个消费者,默认轮循)
        public static void ReceiveMessage()
        {
            string queueName = "WorkQueue";//队列名称与提供者一致
            var connection = RabbitMQHelper.GetMQConnection();

            //创建管道
            var channel = connection.CreateModel();
            channel.QueueDeclare(queueName, false, false, false, null);
            var consumer = new EventingBasicConsumer(channel);

            //prefetchCount:1意思是当前worker在当前消息未消费确认时,不会再往这个worker中再次发送(可以根据不通服务器负载能力来分配)
            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            //消息处理的事件
            consumer.Received += (model, ea) =>
            {
                //业务逻辑处理
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                Console.WriteLine($"normal received => {message}");
            };

            //消费消息
            channel.BasicConsume(queueName, true, consumer);

        }

下面是工作中使用交换机的4种模式:

3、fanout扇形模式(发布订阅)

该类型通常叫作广播类型。fanout类型的Exchange不处理Routing key,而是会将发送给Exchange的消息,路由到所有与它绑定的Queue上。比如现在有一个fanout类型的Exchange,它下面绑定了三个Queue,Routing key分别是ORDER/GOODS/STOCK:

.NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

然后向该Exchange中发送一条消息,消息的Routing key随便填一个值abc(不填也行),如果这个Exchange的路由与这三个Queue绑定,则三个Queue都应该会收到消息

生产者代码:

        /// <summary>
        /// MQ 扇形交换机模式发消息
        /// </summary>
        /// <returns></returns>
        [HttpGet("SendFanoutWorkerMQ")]
        public void SendFanoutWorkerMQ()
        {
            #region 使用扇形交换机模式
            using (var connection = RabbitMQHelper.GetMQConnection())
            {
                //创建通信管道
                using (var channel = connection.CreateModel())
                {
                    string exchangeName = "fanout_exchange";//fanout只提供交换机名称即可

                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true; //设置数据的持久化,保证mq服务挂掉之后消息的安全性

                    for (int i = 1; i <= 10; i++)
                    {
                        var body = Encoding.UTF8.GetBytes("hello mq" + i);
                        //这里绑定了交换机,那么就会发送到这个交换机所有绑定过的队列中
                        channel.BasicPublish(exchange: exchangeName, routingKey: "", properties, body);
                    }
                }
            }
            #endregion
        }

消费者代码:

/// <summary>
        /// 扇形模式队列消费消息
        /// 注意先启动一次消费端会创建交换机、队列、绑定。如果不启动则消息丢失。也可以在生产端做这些创建和绑定
        /// </summary>
        public static void FanoutReceiveMessage()
        {

            var connection = RabbitMQHelper.GetMQConnection();
            //创建管道
            var channel = connection.CreateModel();

            //创建交换机
            channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout");
            //创建队列
            string queueName1 = "fanoutWorkQueue1";//队列名称
            string queueName2 = "fanoutWorkQueue2";
            string queueName3 = "fanoutWorkQueue3";
            channel.QueueDeclare(queue: queueName1,//队列名
                                               durable: false,//是否持久化
                                               exclusive: false,//排它性
                                               autoDelete: false,//一旦客户端连接断开则自动删除queue
                                               arguments: null);//如果安装了队列优先级插件则可以设置优先级
            channel.QueueDeclare(queueName2, false, false, false, null);
            channel.QueueDeclare(queueName3, false, false, false, null);

            //多个队列绑定到fanout_exchange交换机(似发布订阅)
            channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
            channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
            channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");

            //声明消费者
            var consumer = new EventingBasicConsumer(channel);

            //对消费端进行限流:
            //首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 channel.basicConsume(queueName, false, consumer);
            //第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
            //第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 channel.basicAck(envelope.getDeliveryTag(), true);
            //prefetchCount:1意思是当前worker在当前消息未消费确认时,不会再往这个worker中再次发送(可以根据不通服务器负载能力来分配)
            //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            //消费者处理的事件
            consumer.Received += (model, ea) =>
            {
                //业务逻辑处理
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                Console.WriteLine($"normal received => {message}");
            };

            //消费消息
            channel.BasicConsume(queueName2, //队列名
                autoAck: true, //确认消费(删除)
                consumer: consumer);

        }

4、direct路由模式也叫定向模式

direct类型的Exchange会将消息转发到指定Routing key的Queue上,Routing key的解析规则为精确匹配。也就是只有当producer发送的消息的Routing key与消费端的某个Routing key相等时,消息才会被分发到对应的Queue上。比如现在有一个direct类型的Exchange,它下面绑定了三个Queue,Routing key分别是ORDER/GOODS/STOCK:

.NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

然后向该Exchange中发送一条消息,消息的Routing key是ORDER,那只有Routing key是ORDER的队列有一条消息。(与fanout区别:fanout根据已绑定的交换机的队列发送消息。direct当然也得绑定交换机,只不过再精确匹配到routingkey相等的队列发送消息)

生产者代码:

         /// <summary>
        /// MQ 直接交换机模式发消息(指定routingKey发送)
        /// </summary>
        /// <returns></returns>
        [HttpGet("SendDirectWorkerMQ")]

        public void SendDirectWorkerMQ()
        {
            #region 使用直接交换机模式
            using (var connection = RabbitMQHelper.GetMQConnection())
            {
                //创建通信管道
                using (var channel = connection.CreateModel())
                {
                    //direct只提供交换机名称和routingkey即可,消费端只消费routingkey相匹配的
                    string exchangeName = "direct_exchange";

                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true; //设置数据的持久化,保证mq服务挂掉之后消息的安全性

                    for (int i = 1; i <= 10; i++)
                    {
                        var body = Encoding.UTF8.GetBytes("hello mq" + i + "yellow");
                        //这里绑定了交换机,同时绑定了routekey,就会发送到routekey是yellow的队列中
                        channel.BasicPublish(exchange: exchangeName, routingKey: "yellow", properties, body);

                    }
                }
            }
            #endregion
        }

消费者代码:

/// <summary>
        /// 直接模式队列消费消息
        /// 注意先启动一次消费端会创建交换机、队列、绑定。如果不启动则消息丢失。也可以在生产端做这些创建和绑定
        /// </summary>
        public static void DirectReceiveMessage()
        {
            var connection = RabbitMQHelper.GetMQConnection();
            //创建管道
            var channel = connection.CreateModel();

            //创建交换机
            channel.ExchangeDeclare(exchange: "direct_exchange", type: "direct");
            //创建队列
            string queueName1 = "directWorkQueue1";//队列名称
            string queueName2 = "directWorkQueue2";
            string queueName3 = "directWorkQueue3";
            channel.QueueDeclare(queue: queueName1,//队列名
                                               durable: false,//是否持久化
                                               exclusive: false,//排它性
                                               autoDelete: false,//一旦客户端连接断开则自动删除queue
                                               arguments: null);//如果安装了队列优先级插件则可以设置优先级
            channel.QueueDeclare(queueName2, false, false, false, null);
            channel.QueueDeclare(queueName3, false, false, false, null);

            //多个队列绑定到fanout_exchange交换机(似发布订阅)
            channel.QueueBind(queue: queueName1, exchange: "direct_exchange", routingKey: "red");
            channel.QueueBind(queue: queueName2, exchange: "direct_exchange", routingKey: "yellow");
            channel.QueueBind(queue: queueName3, exchange: "direct_exchange", routingKey: "green");

            //声明消费者
            var consumer = new EventingBasicConsumer(channel);

            //对消费端进行限流:
            //首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);
            //第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
            //第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);
            //prefetchCount:1意思是当前worker在当前消息未消费确认时,不会再往这个worker中再次发送(可以根据不通服务器负载能力来分配)
            //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            //消费者处理的事件
            consumer.Received += (model, ea) =>
            {
                //业务逻辑处理
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                Console.WriteLine($"normal received => {message},routingkey:{ea.RoutingKey}");

                //消费完成后需要手动手动签收消息,如果不写该代码就容易导致重复消费问题
                //可以降低每次签收性能损耗。参数multiple:false就是单个手动签收,true就是批量签收,比如消费30条消息后再确认签收
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };

            //消息的签收模式
            //手动签收:保证正确消费,不会丢消息(基于客户端而已)
            //自动签收:容易丟失消息
            channel.BasicConsume(queueName1, //消费队列2的消息
                autoAck: false, //代表要手动签收,因可能会出现确认签收了然后宕机了导致没有执行事件,造成消息丢失。解决方案:手动签收操作写在了队列事件完成后。
                consumer: consumer);

        }

5、topic主题模式也叫通配符模式(路由模式的一种)

根据通配符模糊匹配,将消息交给符合routing pattern(路由模式)的队列。

它与direct相比,都是可以根据routingkey把消息路由到不同的队列。只不过topic类型exchange可以让队列在绑定routingkey的时候使用通配符。

routingkey一般都是有一个或多个单词组成,多个单词以“.”分割,例如:“item.insert”。通配符匹配规则“#”可以匹配一个或多个单词,“*”只能匹配1个单词,例如:“item.#”可以匹配“item.insert.asd”或者“item.insert”,“item.*”只能匹配到“item.insert”。

.NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

生产者代码:

public void SendTopicWorkerMQ()
        {
            #region 使用topic交换机模式
            using (var connection = RabbitMQHelper.GetMQConnection())
            {
                //创建通信管道
                using (var channel = connection.CreateModel())
                {
                    //topic只提供交换机名称和routingkey即可,消费端只消费与routingkey通配符匹配的
                    string exchangeName = "topic_exchange";

                    

                    string routingKey1 = "user.america";
                    string routingKey2 = "user.china";
                    string routingKey3 = "user.china.beijing";
                    string routingKey4 = "user.china.beijing.changping";

                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true; //设置数据的持久化,保证mq服务挂掉之后消息的安全性
                    for (int i = 1; i <= 10; i++)
                    {
                        var body = Encoding.UTF8.GetBytes("hello mq" + i + "topic");
                        //传4个不同的routingkey过去,消费者会根据通配符匹配并消费(好像不能在生产者写通配符)
                        channel.BasicPublish(exchange: exchangeName, routingKey: routingKey1, properties , body);
                        channel.BasicPublish(exchange: exchangeName, routingKey: routingKey2, properties , body);
                        channel.BasicPublish(exchange: exchangeName, routingKey: routingKey3, properties , body);
                        channel.BasicPublish(exchange: exchangeName, routingKey: routingKey4, properties , body);
                    }
                }
            }
            #endregion
        }

消费者代码:

/// <summary>
        /// 主题模式队列消费消息
        /// 注意先启动一次消费端会创建交换机、队列、绑定。如果不启动则消息丢失。也可以在生产端做这些创建和绑定
        /// </summary>
        public static void TopicReceiveMessage()
        {
            var connection = RabbitMQHelper.GetMQConnection();
            //创建管道
            var channel = connection.CreateModel();

            //创建交换机
            channel.ExchangeDeclare(exchange: "topic_exchange", type: "topic");
            string exchangeName = "topic_exchange";
            //创建队列
            string queueName1 = "topicWorkQueue1";//队列名称
            string queueName2 = "topicWorkQueue2";
            string queueName3 = "topicWorkQueue3";
            channel.QueueDeclare(queue: queueName1,//队列名
                                               durable: false,//是否持久化
                                               exclusive: false,//排它性
                                               autoDelete: false,//一旦客户端连接断开则自动删除queue
                                               arguments: null);//如果安装了队列优先级插件则可以设置优先级
            channel.QueueDeclare(queueName2, false, false, false, null);
            channel.QueueDeclare(queueName3, false, false, false, null);

            //多个队列绑定到fanout_exchange交换机
            channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "user.*.*");//匹配例如:user.a.b
            channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "user.*");  //匹配例如:user.a
            channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: "user.#");  //匹配例如:user...... (user. 后面是啥都行)

            //声明消费者
            var consumer = new EventingBasicConsumer(channel);

            //对消费端进行限流:
            //首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 channel.basicConsume(queueName, false, consumer);
            //第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
            //第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 channel.basicAck(envelope.getDeliveryTag(), true);
            //prefetchCount:1意思是当前worker在当前消息未消费确认时,不会再往这个worker中再次发送(可以根据不通服务器负载能力来分配)
            //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            //消费者处理的事件
            consumer.Received += (model, ea) =>
            {
                //业务逻辑处理
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                Console.WriteLine($"normal received => {message},routingkey:{ea.RoutingKey}");

                //消费完成后需要手动手动签收消息,如果不写该代码就容易导致重复消费问题
                //可以降低每次签收性能损耗。参数multiple:false就是单个手动签收,true就是批量签收,比如消费30条消息后再确认签收
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };

            //消息的签收模式
            //手动签收:保证正确消费,不会丢消息(基于客户端而已)
            //自动签收:容易丟失消息
            channel.BasicConsume(queueName2, //消费队列2的消息(可以手动替换其他队列消费)
                autoAck: false, //代表要手动签收,因可能会出现确认签收了然后宕机了导致没有执行事件,造成消息丢失。解决方案:手动签收操作写在了队列事件完成后。
                consumer: consumer);

        }

6、header 参数匹配模式

header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列,Headers 类型的交换器性能会很差,所以这种类型不常用。

以上注意:Exchange(交换机):只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息将会丢失!!

7、延时队列(插件方式实现)

实现延迟队列有两种方法

1、TTL+DLX需要创建死信交换机绑定队列,需创建多个交换机多个队列,复杂麻烦所以不推荐。

2、推荐使用rabbitmq_delayed_message_exchange 插件实现,下面来实现一下:

①插件下载网址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

注意:一定要下载与自己mq版本同一个版本号,不然下面开启插件会报错

比如我的mq版本是 3.9.11的,那我就下载rabbitmq_delayed_message_exchange-3.9.0.ez 即可

下载之后想办法上传到linux上。

或者直接在linux上面下载:

#linux下载插件的命令:(注:选择属于自己的版本号,f12指针查看按钮链接获取下载地址)
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez

②下载完毕后,使用命令复制到docker内rabbitmq容器的plugins文件夹下

查看mq的位置: whereis rabbitmq

查看运行中的容器命令:

docker ps

docker的复制命令:

docker cp rabbitmq_delayed_message_exchange-3.9.0.ez 0e6e229cc6f2:/plugins(0e6e229cc6f2是容器id,plugins是mq容器内的文件夹)

③然后进入容器

docker exec -it 0e6e229cc6f2 bash

退出容器:

exit

或者按Ctrl+P+Q进行退出容器

④进入之后直接进入plugins文件夹下看看复制进去没有?

cd /plugins

ls

.NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

记住只能有一个 rabbitmq_delayed_message_exchange-3.9.0.ez文件,不能放多个版本否则报错

⑤启用rabbitmq_delayed_message_exchange插件

开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

.NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

启用之后可以看到mq可视化页面,交换机类型多了一个 x-delayed-message

.NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

⑥然后就是代码实现:

生产者代码:

public void SendDelayedWorkerMQ()
        {
            #region 使用延时交换机模式
            using (var connection = RabbitMQHelper.GetMQConnection())
            {
                //创建通信管道
                using (var channel = connection.CreateModel())
                {
                    string exchangeName = "delayed_exchange";//delayed需提供交换机名称
                    string queueName = "delay_WorkQueue";

                    #region 消费端做交换机和队列的创建和绑定

                    #endregion
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true; //设置数据的持久化,保证mq服务挂掉之后消息的安全性
                    //properties.Priority = 9;//消息的优先级  值越大 优先级越高 0~9
                    //延时时间从header赋值
                    Dictionary<string, object> headers = new Dictionary<string, object>();
                    headers.Add("x-delay", 10000);
                    properties.Headers = headers;

                    var body = Encoding.UTF8.GetBytes("生产者发送时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));
                    //发送延时消息
                    channel.BasicPublish(exchange: exchangeName, routingKey: queueName, properties, body);
                }
            }
            #endregion
        }

消费者代码:

/// <summary>
        /// 延迟交换机模式队列消费消息
        /// 注意先启动一次消费端会创建交换机、队列、绑定。如果不启动则消息丢失。也可以在生产端做这些创建和绑定
        /// </summary>
        public static void DelayedReceiveMessage()
        {

            var connection = RabbitMQHelper.GetMQConnection();
            //创建管道
            var channel = connection.CreateModel();
            string queueName = "delay_WorkQueue";//队列名称
            string exchangeName = "delayed_exchange";//队列名称

            Dictionary<string, object> args = new Dictionary<string, object>();
            args.Add("x-delayed-type", "direct"); //x-delayed-type必须加(这个创建的是交换机类型)

            //创建交换机
            channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", durable: true, autoDelete: false, arguments: args);
            //创建队列

            channel.QueueDeclare(queue: queueName,//队列名
                                               durable: true,//队列是否持久化
                                               exclusive: false,//是否为单消费者队列,为True时,只能由单一消费者消费
                                               autoDelete: false,//是否自动删除队列,当消费者全部断开时,队列自动删除
                                               arguments: null);//高级特性

            //多个队列绑定到delayed_exchange交换机(似发布订阅)
            channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: queueName);

            //声明消费者
            var consumer = new EventingBasicConsumer(channel);

            //对消费端进行限流:
            //首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 channel.basicConsume(queueName, false, consumer);
            //第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
            //第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 channel.basicAck(envelope.getDeliveryTag(), true);
            //prefetchCount:1意思是当前worker在当前消息未消费确认时,不会再往这个worker中再次发送(可以根据不通服务器负载能力来分配)
            //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            //消费者处理的事件
            consumer.Received += (model, ea) =>
            {
                //业务逻辑处理
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                Console.WriteLine(message);
                Console.WriteLine($"消费者消费时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));

                //消费完成后需要手动手动签收消息,如果不写该代码就容易导致重复消费问题
                //可以降低每次签收性能损耗。参数multiple:false就是单个手动签收,true就是批量签收,比如消费30条消息后再确认签收
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };

            //消费消息
            channel.BasicConsume(queueName, //队列名
                autoAck: false, //确认消费
                consumer: consumer);
        }

最终启动程序可以看到消费者端输出的结果计算精准的相差10秒!

.NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

参考资料:

持久化、优先级、高级特性:

NetCore RabbitMQ高级特性 持久化 及 消息优先级 - 天才卧龙 - 博客园

延迟和死信队列:https://www.jb51.net/article/221796.htm

NetCore RabbitMQ 高级特性 消息存活周期TTL、死信交换机/死信对列DLX,延迟队列,及幂等性的保障 - 天才卧龙 - 博客园文章来源地址https://www.toymoban.com/news/detail-406116.html

到了这里,关于.NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ学习笔记(消息发布确认,死信队列,集群,交换机,持久化,生产者、消费者)

    MQ(message queue):本质上是个队列,遵循FIFO原则,队列中存放的是message,是一种跨进程的通信机制,用于上下游传递消息。MQ提供“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后消息发送上游只需要依赖MQ,不需要依赖其它服务。 功能1:流量消峰 功能2:应用解耦 功

    2024年02月07日
    浏览(32)
  • RabbitMQ队列及交换机的使用

    目录 一、简单模型 1、首先控制台创建一个队列 2、父工程导入依赖  3、生产者配置文件  4、写测试类 5、消费者配置文件 6、消费者接收消息 二、WorkQueues模型 1、在控制台创建一个新的队列 2、生产者生产消息 3、创建两个消费者接收消息 4、能者多劳充分利用每一个消费者

    2024年02月04日
    浏览(31)
  • 【学习日记2023.6.19】 之 RabbitMQ服务异步通信_消息可靠性_死信交换机_惰性队列_MQ集群

    消息队列在使用过程中,面临着很多实际问题需要思考: 消息从发送,到消费者接收,会经历多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收

    2024年02月11日
    浏览(33)
  • 消息队列-RabbitMQ:Exchanges、绑定 bindings以及3大常用交换机(Fanout exchange、Direct exchange、Topics exchange)

    RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列 。实际上, 通常生产者甚至都不知道这些消息传递传递到了哪些队列中 。 相反, 生产者只能将消息发送到交换机 (exchange) , 交换机工作 的内容非常简单, 一方面它接收来自生产者的消息 , 另一

    2024年04月08日
    浏览(39)
  • 利用消息中间件RabbitMQ创建队列以及扇出(Fanout)、订阅(Direct)、主题(Topic)交换机来完成消息的发送和监听接收(完整版)

    目录 一、前期项目环境准备 1.1父项目以及子项目 1.2配置pom.xml 1.3配置application.yml 二、扇出(Fanout) 交换机实现消息的发送和接收 2.1编写子项目consumer(消费者,接收消息)的代码实现扇出(Fanout)交换机接收消息 2.1.1consumer子项目结构 2.1.2FanoutConfig类的实现扇出(Fanout)交

    2024年02月05日
    浏览(35)
  • RabbitMQ交换机与队列

    RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列 。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反, 生产者只能将消息发送到交换机(exchange) ,交换机工作的内容非常简单, 一方面它接收来自生产者的消息,另一方面

    2024年01月24日
    浏览(26)
  • RabbitMq创建交换机和队列

    1. 网页登录 IP:1572   2. 输入登录账号密码 admin admin 3. 点击Exchanges 添加交换机Platform_AlarmEngineInterface 和Rg_Platform_AlarmEngineInterface ,Type选择topic 4. 添加队列 VIDEO_Alarm_platform 、 watch_ftp、 RG_VIDEO_Alarm_platform 、 RG_VIDEO_Alarm_platform_jiance 5. 绑定交换机和队列 (1) 点击Exchanges界面,选择其

    2024年02月16日
    浏览(36)
  • RabbitMQ-死信交换机和死信队列

    DLX: Dead-Letter-Exchange 死信交换器,死信邮箱 当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。 如下图所示: 其实死信队列就是一个普通的交换机,有些队列的消息成为死信后,(比如过期了或者队列满了)这些死信一般情况下是会被 RabbitMQ 清理

    2024年02月08日
    浏览(34)
  • SpringBoot整合RabbitMQ系列--绑定交换机与队列的方法

    原文网址:SpringBoot整合RabbitMQ系列--绑定交换机与队列的方法_IT利刃出鞘的博客-CSDN博客         本文用实例介绍SpringBoot中RabbitMQ如何绑定交换机(交换器)与队列。 交换机 下边两种方式等价。 队列 下边两种方式等价 绑定 下边两种方式等价 注意:第一种的参数并不是字符

    2023年04月09日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包