.Net 6 下WorkerService+RabbitMq实现消息的异步发布订阅

这篇具有很好参考价值的文章主要介绍了.Net 6 下WorkerService+RabbitMq实现消息的异步发布订阅。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

        近期项目里有需要用到RabbitMq实现一些业务,学习整理之后在此记录一下,如有问题或者不对的地方,欢迎留言指正。

一、首先创建连接工厂

 public class RabbitMQProvider
    {
        private readonly string _ipAddress;
        private readonly int? _port;
        private readonly string _username;
        private readonly string _password;

        public RabbitMQProvider()
        {
            _ipAddress = ConfigurationHelper.GetKey("RabbitMQIPAddress") ?? throw new ArgumentException("IP地址未配置!");
            _username = ConfigurationHelper.GetKey("RabbitMQUserName") ?? throw new ArgumentException("用户名不能为空");
            _password = ConfigurationHelper.GetKey("RabbitMQPassword") ?? throw new ArgumentException("密码不能为空");

            var timeApan = new TimeSpan(0, 5, 0);
            if (ConnectionFactory == null)
            {
                ConnectionFactory = new ConnectionFactory//创建连接工厂对象
                {
                    HostName = _ipAddress,//IP地址
                    UserName = _username,//用户账号
                    Password = _password,//用户密码
                    //启用自动连接恢复
                    AutomaticRecoveryEnabled = true,
                    //VirtualHost = "/mqtest",//RabbitMQ中要请求的VirtualHost名称
                    ContinuationTimeout = timeApan,
                    HandshakeContinuationTimeout = timeApan,
                    RequestedConnectionTimeout = timeApan,
                    SocketReadTimeout = timeApan,
                    SocketWriteTimeout = timeApan,
                    //启用异步消费
                    DispatchConsumersAsync = true,
                    //RequestedChannelMax = 5000
                };

            }

        }

        public ConnectionFactory ConnectionFactory { get; }


        private static IConnection connection;
        /// <summary>
        /// 获取RabbitMQ连接对象方法(创建与RabbitMQ的连接)
        /// </summary>
        /// <returns></returns>
        public IConnection GetConnection()
        {
            if (connection == null || !connection.IsOpen)
            {
                //通过工厂创建连接对象
                connection = ConnectionFactory.CreateConnection();
            }
            return connection;
        }

        int times = 0;
        private static IModel Channel;
        public IModel GetChannel()
        {
            if (Channel != null)
                return Channel;
            else
            {
                //times++;
               // Console.WriteLine($"CreateModel{times}次");
                return GetConnection().CreateModel();
            }
        }


    }

二、消息发布

1、获取连接、交换机和队列

 public class RabbitMQPublisher : IPublisher
    {
        static int x_message_ttl;
        static RabbitMQPublisher()
        {
            int.TryParse(ConfigurationHelper.GetKey("RabbitMQ_x-message-ttl"), out x_message_ttl);
            x_message_ttl = x_message_ttl * 60 * 1000;
        }
        #region
        private readonly RabbitMQProvider _provider;
        private IConnection _connection;

        public RabbitMQPublisher(RabbitMQProvider provider)
        {
            try
            {
                _provider = provider;
                //if (_connection == null || !_connection.IsOpen)
                //{
                //    _connection = _provider.ConnectionFactory.CreateConnection();
                //}
                _connection = _provider.GetConnection();
                _channel = _provider.GetChannel();

            }
            catch (Exception ex)
            {
                //记录异常日志
                Util.LogError($"RabbitMQPublisher createConnection exception. Exception message:{ex.Message}");
            }
        }

        public IConnection Connection
        {
            get
            {
                
                if (_connection != null)
                    return _connection;
                return _connection = _provider.GetConnection(); ;
            }
            //get; set;
        }

        private IModel _channel;
        public IModel Channel
        {
            get
            {
                if (_channel != null)
                    return _channel;
                else
                {
                    //if (_connection == null || !_connection.IsOpen)
                    //{
                    //    _connection = _provider.GetConnection(); ;
                    //}

                    return _channel = _provider.GetChannel();
                }

            }
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            if (Channel != null)
            {
                if (Channel.IsOpen)
                    Channel.Close();
                Channel.Abort();
                Channel.Dispose();
            }

            if (Connection != null)
            {
                if (Connection.IsOpen)
                    Connection.Close();
            }
        }
        #endregion
}

2、同步消息发送

 /// <summary>
        /// 发布(生产)消息
        /// </summary>
        /// <param name="message">消息内容</param>
        /// <param name="exchangeName">交换机名称</param>
        /// <param name="queueName">队列名称</param>
        /// <param name="exchangeType">交换机类型</param>
        /// <param name="routingKey">路由键</param>
        /// <param name="durable">是否持久化</param>
        /// <param name="autoDelete">是否自动删除</param>
        /// <param name="arguments">用于插件和代理特定功能,如消息TTL、队列长度限制等</param>
        /// 1.x-message-ttl             发送到队列的消息在丢弃之前可以存活多长时间(毫秒)。
        /// 2.x-expires                 队列在被自动删除(毫秒)之前可以使用多长时间。
        /// 3.x-max-length              队列在开始从头部删除之前可以包含多少就绪消息。
        /// 4.x-max-length-bytes        队列在开始从头部删除之前可以包含的就绪消息的总体大小。
        /// 5.x-dead-letter-exchange    设置队列溢出行为。这决定了在达到队列的最大长度时消息会发生什么。有效值为drop-head或reject-publish。交换的可选名称,如果消息被拒绝或过期,将重新发布这些名称。
        /// 6.x-dead-letter-routing-key 可选的替换路由密钥,用于在消息以字母为单位时使用。如果未设置,将使用消息的原始路由密钥。
        /// 7.x-max-priority            队列支持的最大优先级数; 如果未设置,队列将不支持消息优先级。
        /// 8.x-queue-mode              将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少内存使用;如果未设置,队列将保留内存缓存以尽快传递消息。
        /// 9.x-queue-master-locator    将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。
        private Task Publish(string message, string exchangeName, string queueName, string exchangeType, string routingKey = null, bool durable = true, bool autoDelete = false, IDictionary<string, object> arguments = null)
        {
            if (x_message_ttl > 0)
            {
                arguments = new Dictionary<string, object>();
                arguments.Add("x-message-ttl", x_message_ttl);
            }
            //声明交换机
            Channel.ExchangeDeclare(exchangeName, exchangeType, durable, autoDelete, arguments);
            //声明队列
            Channel.QueueDeclare(queueName, durable, false, autoDelete, arguments);
            Channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey == null ? string.Empty : routingKey);
            var msgByte = Encoding.UTF8.GetBytes(message);
            //设置消息持久化
            var props = Channel.CreateBasicProperties();
            props.Persistent = true;
            try
            {
                Channel.TxSelect();
                Channel.BasicPublish
                (
                    exchange: exchangeName,
                    routingKey: routingKey == null ? string.Empty : routingKey,
                    mandatory: false,
                    basicProperties: props,
                    body: msgByte
                );
                Channel.TxCommit();
            }
            catch (Exception ex)
            {
                Channel.TxRollback();
                //记录异常日志
                Util.LogError($"RabbitMQPublisher publish message exception. Exception message:{ex.Message}");
            }
            return Task.FromResult(0);
        }

3、批量发布

 /// <summary>
        /// 批量发布
        /// </summary>
        /// <param name="message"></param>
        /// <param name="exchangeName"></param>
        /// <param name="queueName"></param>
        /// <param name="exchangeType"></param>
        /// <param name="routingKey"></param>
        /// <param name="durable"></param>
        /// <param name="autoDelete"></param>
        /// <param name="arguments"></param>
        /// <returns></returns>
        private async Task PublishAsyncBatch(List<string> message, string exchangeName, string queueName, string exchangeType, string routingKey = null, bool durable = true, bool autoDelete = false, IDictionary<string, object> arguments = null)
        {
            //using (var conn = _provider.ConnectionFactory.CreateConnection())
            //{
            using (var channel = Connection.CreateModel())
            {
                ///Console.WriteLine(1);
                if (x_message_ttl > 0)
                {
                    arguments = new Dictionary<string, object>();
                    arguments.Add("x-message-ttl", x_message_ttl);
                }
                //声明交换机
                Channel.ExchangeDeclare(exchangeName, exchangeType, durable, autoDelete, arguments);
                //声明队列
                Channel.QueueDeclare(queueName, durable, false, autoDelete, arguments);
                Channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey == null ? string.Empty : routingKey);

                //设置消息持久化
                var props = Channel.CreateBasicProperties();
                props.Persistent = true;
                try
                {
                    Channel.TxSelect();
                    var basicPublishBatch = Channel.CreateBasicPublishBatch();

                    byte[] msgByte;
                    ReadOnlyMemory<byte> memory;
                    foreach (var msg in message)
                    {
                        msgByte = Encoding.UTF8.GetBytes(msg);
                        memory = new ReadOnlyMemory<byte>(msgByte);
                        basicPublishBatch.Add
                        (
                            exchange: exchangeName,
                            routingKey: routingKey == null ? string.Empty : routingKey,
                            mandatory: false,
                            properties: props,
                            body: memory
                        );
                    }
                    basicPublishBatch.Publish();

                    Channel.TxCommit();

                    await Task.Yield();
                }
                catch (Exception ex)
                {

                    Channel.TxRollback();
                    channel.Close(); channel.Dispose();
                    //conn.Close(); conn.Dispose();
                    //记录异常日志
                    Util.LogError($"RabbitMQPublisher publish message exception. Exception message:{ex.Message}");

                    Console.WriteLine("消息订阅时错误:" + ex.Message);
                }

            }
            
        }

注意:多线程消息发布时,应避免多个线程使用同一个IModel实例,必须保证Imodel被一个线程独享,如果必须要多个线程访问呢一个实例的话,则可以通过加锁来处理,详见:.NET/C# Client API Guide — RabbitMQ

IModel ch = RetrieveSomeSharedIModelInstance();
lock (ch) {
  ch.BasicPublish(...);
}

三、消息订阅

1、获取连接、交换机和队列同上消息发布,不再赘述文章来源地址https://www.toymoban.com/news/detail-800487.html

 private void QueueInitialization(string queueName, bool durable = true, bool autoDelete = false, IDictionary<string, object> arguments = null)
        {
            try
            {
                if (x_message_ttl > 0)
                {
                    arguments = new Dictionary<string, object>();
                    arguments.Add("x-message-ttl", x_message_ttl);
                }
                Channel.QueueDeclare(queueName, durable, false, autoDelete, arguments);
            }
            catch (Exception )
            {

            }
        }
/// <summary>
        /// 
        /// </summary>
        /// <param name="queueName"></param>
        /// <param name="callback"></param>
        /// <param name="autoAck"></param>
        /// <param name="consumPerTimes">每次消费的消息条数</param>
        /// <returns></returns>
        private async Task SubscribeAsync(string queueName, Func<string, bool> callback, bool autoAck, ushort consumPerTimes = 1)
        {
            try
            {
                QueueInitialization(queueName);
                //声明为手动确认,每次只消费1条消息。
                Channel.BasicQos(0, consumPerTimes, false);
                //定义消费者
                //var consumer = new EventingBasicConsumer(Channel);

                var consumer = new AsyncEventingBasicConsumer(Channel);

                //接收事件
                consumer.Received += async (eventSender, args) =>
                {
                    var message = args.Body.ToArray();//接收到的消息

                    var res = callback(Encoding.UTF8.GetString(message));
                    //返回消息确认
                    Channel.BasicAck(args.DeliveryTag, res);
                    await Task.Yield();
                };

                //开启监听 -- gai2023-11-1
                Channel.BasicConsume(queueName, autoAck, consumer);
                // await Task.Delay(1000);
            }
            catch (Exception e)
            {
                Console.WriteLine("消息订阅时错误:" + e.Message);
            }
        }

四、通过workerService订阅处理消息

internal class SubscribeWorker : BackgroundService
    {
        #region override
        public override Task StartAsync(CancellationToken cancellationToken)
        {
            try
            {
                //一些数据初始化
                _logger.LogInformation($"Settings 初始化完成");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, ex.Message);
            }


            return base.StartAsync(cancellationToken);
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            //这里注意,不能写在while里,否则会一直进行重复订阅,会导致连接数一直增长
            await MainSubscribe();
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    await Task.Delay(2000);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, ex.Message);
                }
                
            }
        }

        /// <summary>
        /// 服务停止
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public override Task StopAsync(CancellationToken cancellationToken)
        {
            Task.WaitAll();
            subscriber.Dispose();
            _logger.LogInformation("Worker stop at: {time}", DateTimeOffset.Now);
            return base.StopAsync(cancellationToken);
        }
        #endregion





    }

到了这里,关于.Net 6 下WorkerService+RabbitMq实现消息的异步发布订阅的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Python】Flask + MQTT 实现消息订阅发布

    本次项目主要使用到的库: flask_mqtt 1.创建Flask项目 2创建py文件: mqtt_demo.py 3.代码实现 4.项目运行 运行项目前可在Pycharm中设置 host 和 port 设置好后直接运行项目 使用 MQTTX 进行消息测试 1、测试消息接收 创建连接 Host :为代码中定义好的 broker.emqx.io Port :为代码中定义好的

    2024年02月04日
    浏览(48)
  • 第二十九章 使用消息订阅发布实现组件通信

    PubSubJS库介绍 如果你想在 React 中使用第三方库来实现 Pub/Sub 机制, PubSubJS 是一个不错的选择。它是一个轻量级的库,可以在浏览器和 Node.js 环境中使用。 PubSubJS 提供了一个简单的 API ,可以让你在应用程序中订阅和发布消息。你可以使用 npm 来安装它: 1-引入使用 2-首先订阅

    2024年02月02日
    浏览(81)
  • vue消息订阅与发布,实现任意组件间通讯

    第一步:下载第三方消息订阅与发布库,例如常用的pubsub.js,他可以在任何框架中使用包括vue、react、anglar等等。 命令:npm i pubsub-js 注意是pubsub-js(不是点); 第二步:引入库; import pubsub from \\\'pubsub-js\\\' 第三步:订阅消息; 第四部:接收消息;  总结:

    2024年02月13日
    浏览(34)
  • C++发布订阅者模式:实现简单消息传递系统

      概述: 这个C++示例演示了发布者-订阅者模式的基本实现。通过 `Event` 类,发布者 `Publisher` 发送数据,而订阅者 `Subscriber` 订阅并处理数据。通过简单的回调机制,实现了组件间松散耦合的消息传递。 好的,我将为你提供一个简单的C++实例,演示如何使用发布者-订阅者模式

    2024年03月17日
    浏览(55)
  • Spring RabbitMQ那些事(2-两种方式实现延时消息订阅)

    业务开发中有很多延时操作的场景,比如最常见的 超时订单自动关闭 、 延时异步处理 ,我们常用的实现方式有: 定时任务轮询 (有延时)。 借助Redission的延时队列 。 Redis的key过期事件通知机制 (需开启key过期事件通知,对Redis有性能损耗)。 RocketMQ中定时消息推送 (支

    2024年02月04日
    浏览(39)
  • Redis的发布订阅模式:实现消息队列和实时数据推送的利器

    当涉及到实时数据推送和消息队列时,Redis的发布订阅模式是一种非常有用的工具。Redis是一个开源的内存数据库,被广泛用于缓存、队列和实时数据处理等方面。 在本博客中,我们将重点介绍Redis的发布订阅模式,并且提供一些示例代码来帮助读者更好地理解这个模式以及如

    2024年02月12日
    浏览(92)
  • (十九)springboot实战——springboot集成redis实现消息的订阅与发布

    本节内容主要介绍springboot项目通过集成redis,如何利用redis的订阅发布机制,完成系统消息的发布与订阅功能。Redis中的发布与订阅是一种消息通信模式,允许发送者(发布者)将消息发送给多个接收者(订阅者)。在 Redis中,发布与订阅通过PUBLISH和SUBSCRIBE命令实现。 频道(

    2024年02月21日
    浏览(40)
  • MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布

    1.1 什么是MQTT? MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。 MQTT最大优点在于用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息

    2024年02月10日
    浏览(37)
  • Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅

    1. 引入RocketMQ依赖 :首先,在 pom.xml 文件中添加RocketMQ的依赖: 2. 配置RocketMQ连接信息 :在 application.properties 或 application.yml 中配置RocketMQ的连接信息,包括Name Server地址等: 3.消息发布组件 4.消息发布控制器 项目结构: 接下来是websocket模块的搭建 1. 依赖添加 2.application.yml配

    2024年02月08日
    浏览(36)
  • 【MQTT】基于阿里云物联网平台实现两设备间相互订阅及发布消息

    一、准备: 1.下载MQTT.fx - 1.7.1工具 https://www.jianshu.com/p/c9f50cf81cd2 2.进入物联网平台,并按照产品文档创建产品及设备 阿里云登录 - 欢迎登录阿里云,安全稳定的云计算服务平台 3.打开两个MQTT.fx工具,并分别填写相关设备信息,连接物联网平台(参考阿里云产品文档)。  二

    2024年02月02日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包