.NET中使用RabbitMQ延时队列和死信队列

这篇具有很好参考价值的文章主要介绍了.NET中使用RabbitMQ延时队列和死信队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ延时队列和死信队列

延时队列和死信队列

延时队列是RabbitMQ中的一种特殊队列,它可以在消息到达队列后延迟一段时间再被消费。

延时队列的实现原理是通过使用消息的过期时间和死信队列来实现。当消息被发送到延时队列时,可以为消息设置一个过期时间,这个过期时间决定了消息在延时队列中等待的时间。如果消息在过期时间内没有被消费者消费,则会被自动发送到一个预先指定的死信队列中。

在RabbitMQ中,延时队列的实现可以通过以下步骤来完成:

  1. 创建一个普通的队列作为延时队列,设置x-message-ttl参数为消息的过期时间。
  2. 创建一个死信队列,用于接收延时队列中过期的消息。
  3. 将延时队列设置为普通队列的死信交换机,并指定死信路由键。
  4. 将消费者绑定到死信队列,以消费延时队列中过期的消息。

使用场景

  1. 订单在十分钟内未支付则自动取消。
  2. 新创建的店铺,如果十天内都没有上传过商品,则自动发送消息提醒。
  3. 账单在一周内未支付,则自动结算。
  4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  5. 还有很多场景就不一一例举了。

TTL设置

方式一:

创建队列时设置x-message-ttl的属性,所有被投递到该队列的消息最多都不会超过60s

var args = new Dictionary<string,object>();
args.Add("x-message-ttl",60000); //单位为毫秒
model.QueueDeclare("myqueue",false,false,false,args);

方式二:

为每条消息设置TTL,为每条消息设置过期时间。

IBasicProperties props = model.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Expiration = "60000"
model.BasicPublic(exchangeName,routingKey,props,messageBodyBytes);

代码实践

模拟支付业务

整个项目由三部分组成

  • Web API项目:用于发送订单请求,生产者。
  • 控制台项目一:用于处理订单支付,延时队列。
  • 控制台项目二:用于处理超时未支付的订单,死信队列。

Web API项目

订单类,就简单的写一个用于演示,真实业务肯定不是这样~

public class OrderDto
{
    /// <summary>
    /// 订单名称
    /// </summary>
    public string Name { get; set; }
    /// <summary>
    /// 订单状态
    /// 0 未支付
    /// 1 已支付
    /// 2 超时
    /// </summary>
    public int Status { get; set; }
}

控制器

[ApiController]
[Route("api/orders")]
public class OrdersController : ControllerBase
{
    private readonly IOrderService _orderService;

    public OrdersController(IOrderService orderService)
    {
        _orderService = orderService;
    }

    [HttpPost]
    public IActionResult CreateOrder([FromBody] OrderDto orderDto)
    {
        // 处理订单逻辑
        _orderService.ProcessOrder(orderDto);
        return Ok();
    }
}

订单服务

public interface IOrderService
{
    void ProcessOrder(OrderDto orderDto);
}

public class OrderService : IOrderService
{
    private readonly RabbitMQConnectionFactory _connectionFactory;
    
    public OrderService(RabbitMQConnectionFactory connectionFactory)
    {
        _connectionFactory = connectionFactory;
    }

    public void ProcessOrder(OrderDto orderDto)
    {
        using (var channel = _connectionFactory.CreateChannel())
        {
            var properties = channel.CreateBasicProperties();
            properties.Headers = new Dictionary<string, object>
            {
                { "x-delay", 1000 * 20  } // 设置20秒延时
            };

            var message = JsonConvert.SerializeObject(orderDto);
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish("delayed_exchange", "routing_key", properties, body);
        }
    }
}

支付处理项目

ProcessPay类,用于接收订单消息

public class ProcessPay : IHostedService
{
    private readonly ConnectionFactory _factory;
    private IConnection _connection;
    private IModel _channel;

    public ProcessPay()
    {
        _factory = new ConnectionFactory()
        {
            HostName = "ip",
            Port = 5672,
            UserName = "用户名",
            Password = "密码"
        };
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        Console.WriteLine(" Press [enter] to exit.");
        _connection = _factory.CreateConnection();
        _channel = _connection.CreateModel();

        _channel.ExchangeDeclare("delayed_exchange", ExchangeType.Direct, true, false, null);
        //关键代码,绑定死信交换机
        var arguments = new Dictionary<string, object>
        {
            { "x-dead-letter-exchange", "dead_letter_exchange" },
            { "x-dead-letter-routing-key", "dead_letter_routing_key" }
        };
        _channel.QueueDeclare("delayed_queue", true, false, false, arguments);
        _channel.QueueBind("delayed_queue", "delayed_exchange", "routing_key");

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);

            // 处理支付逻辑
            var orderDto = JsonConvert.DeserializeObject<OrderDto>(message);
            Console.WriteLine($"订单信息:{orderDto.Name}");
            Console.WriteLine("请输入价格(模拟支付):");

            // 超时未支付
            string? many = "";
            // 支付处理
            Console.WriteLine("请输入:");
            // 超时未支付进行处理
            Task.Factory.StartNew(() =>
            {
                many = Console.ReadLine();
                Console.WriteLine($"many:{many}");
            }).Wait(20 * 1000);
            if (string.Equals(many, "100"))
            {
                orderDto.Status = 1;
                Console.WriteLine("支付完成");
                _channel.BasicAck(ea.DeliveryTag, true);
            }
            else
            {
                //重试几次依然失败
                Console.WriteLine("等待一定时间内失效超时未支付的订单");
                _channel.BasicNack(ea.DeliveryTag, false, false);
            }
        };

        _channel.BasicConsume("delayed_queue", false, consumer);

        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _channel?.Close();
        _connection?.Close();
        _channel?.Dispose();
        _connection?.Dispose();

        return Task.CompletedTask;
    }
}

在Main方法中使用单例模式注册该服务,当然直接将代码写在Main方法也是没有问题的,只不过这种方式方便管理。

static void Main(string[] args)
{
    var host = new HostBuilder()
        .ConfigureServices((hostContext, services) =>
                           {
                               services.AddSingleton<IHostedService,ProcessPay>();
                           })
        .Build();

    host.Run();
}

支付超时项目

创建一个死信队列服务,用于订阅死信队列中的订单消息,这里我就直接把代码写在Main方法中了

using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare("dead_letter_exchange", ExchangeType.Direct, true, false, null);

        channel.QueueDeclare("dead_letter_queue", true, false, false, null);

        channel.QueueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");
        
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);

            // 处理超时未支付的订单逻辑
            var orderDto = JsonConvert.DeserializeObject<OrderDto>(message);
            orderDto.Status = 2;
            Console.WriteLine($"订单信息:{orderDto.Name},{orderDto.Status}");
            Console.WriteLine("超时未支付");

            channel.BasicAck(ea.DeliveryTag, true);
        };

        channel.BasicConsume("dead_letter_queue", false, consumer);
        
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

效果展示

代码看不出效果,直接上图。

首先是3个项目各自运行效果图

.NET中使用RabbitMQ延时队列和死信队列

然后演示正常消费效果

.NET中使用RabbitMQ延时队列和死信队列

接下来是超时未支付效果

.NET中使用RabbitMQ延时队列和死信队列

结尾

这就是一个简单的延时队列和死信队列的代码,模拟了支付超时的场景,这里的数据都写死了的,真实运用的时候肯定是中数据库中获取,修改数据库实体的值。然后死信队列是用于处理在一定时间内未被处理的消息,死信交换机也只是一个普通的交换机,只不过他是用于处理超时的消息的交换机。

对于RabbitMQ的文章基本就结束了,可能还会有一篇RabbitMQ集群搭建的文章,但不是很想去写,最近太懒了~

有问题欢迎指出,活到老学到老~文章来源地址https://www.toymoban.com/news/detail-615465.html

RabbitMQ系列文章

  • ZY知识库 · ZY - 在.NET Core中使用RabbitMQ (pljzy.top)

参考资料

  • 【【2021最新.NET/C#】RabbitMQ从零到高可用集群】 https://www.bilibili.com/video/BV1GU4y1w7Yq/?p=10&share_source=copy_web&vd_source=fce337a51d11a67781404c67ec0b5084

到了这里,关于.NET中使用RabbitMQ延时队列和死信队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用RabbitMQ死信队列关闭未支付的订单

    RabbitMQ死信队列(Dead-Letter Exchange,简称DLX)是一种特殊类型的交换机,用于处理在队列中无法被消费的消息。当消息无法被消费时,它会被转发到死信队列中,以便进一步处理。 在RabbitMQ中,死信队列通常用于处理以下情况: 消息无法被消费者处理:例如,如果消费者崩溃

    2024年02月11日
    浏览(38)
  • .NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)

    目录 一、安装mq 二、实操 1、简单模式 2、工作模式 3、fanout扇形模式(发布订阅) 4、direct路由模式也叫定向模式 5、topic主题模式也叫通配符模式(路由模式的一种) 6、header 参数匹配模式 7、延时队列(插件方式实现) 参考资料: 1、我的环境是使用VMware安装的Centos7系统。MQ部署

    2023年04月09日
    浏览(99)
  • 消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

    目录 0.交换机种类和区别 1.声明队列和交换机以及RountKey 2.初始化循环绑定 3.声明交换机 4.监听队列 4.1 监听普通队列 4.2监听死信队列  5.削峰填谷的实现 Direct Exchange(直连交换机) : 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。

    2024年04月23日
    浏览(134)
  • 【RabbitMQ】RabbitMQ高级:死信队列和延迟队列

    在电商平台下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。 该如何实现? 定期轮询(数据库等) 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定

    2024年01月17日
    浏览(56)
  • RabbitMQ-死信交换机和死信队列

    DLX: Dead-Letter-Exchange 死信交换器,死信邮箱 当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。 如下图所示: 其实死信队列就是一个普通的交换机,有些队列的消息成为死信后,(比如过期了或者队列满了)这些死信一般情况下是会被 RabbitMQ 清理

    2024年02月08日
    浏览(44)
  • RabbitMQ延迟队列,死信队列配置

    延迟和死信队列的配置 延迟队列有效期一分钟,后进入死信队列,如果异常就进入异常队列 异常队列配置类

    2024年02月14日
    浏览(49)
  • 【RabbitMQ笔记10】消息队列RabbitMQ之死信队列的介绍

    这篇文章,主要介绍消息队列RabbitMQ之死信队列。 目录 一、RabbitMQ死信队列 1.1、什么是死信队列 1.2、设置过期时间TTL 1.3、配置死信交换机和死信队列(代码配置) (1)设置队列过期时间 (2)设置单条消息过期时间 (3)队列设置死信交换机 (4)配置的基本思路 1.4、配置

    2024年02月16日
    浏览(77)
  • 【RabbitMQ学习日记】——死信队列与延迟队列

    死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说, producer 将消息投递到 broker 或者直接到 queue 里了, consumer 从 queue 取出消息进行消费,但某些时候 由于特定的原因导致 queue 中的某些消息无法被消费 ,这样的消息如果没有后续的处理,就变成了死

    2024年02月06日
    浏览(54)
  • 【RabbitMQ教程】第五章 —— RabbitMQ - 死信队列

                                                                       💧 【 R a b b i t M Q 教 程 】 第 五 章 — — R a b b i t M Q − 死 信 队 列 color{#FF1493}{【RabbitMQ教程】第五章 —— RabbitMQ - 死信队列} 【 R a b b i t M Q 教 程 】 第 五 章 — — R a

    2024年02月09日
    浏览(36)
  • RabbitMQ——死信队列

    死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种重要特性,用于处理无法被消费的消息,防止消息丢失。 死信的来源 在消息队列中,当消息满足一定条件而无法被正常消费时,这些消息会被发送到死信队列。满足条件的情况包括但不限于: 消息被拒绝( basic.reject 或 bas

    2024年03月14日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包