.net core使用channel消息队列

这篇具有很好参考价值的文章主要介绍了.net core使用channel消息队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

.net core使用channel消息队列

背景

最近做一个项目,连接了很多设备,需要保存设备的心跳数据,刚开始的做法是直接接收到设备的数据之后进行心跳数据的保存,但是随着设备多了起来,然后设备的使用时长不断的加大,对数据库的压力也比较大,所以想着优化一下。

方案调研

1.使用第三方中间件

常见的使用redis,或者mq,只需要不断的向中间件发送数据即可,redis使用队列,如果是mq直接发送消息即可,使用起来简单方便,但是要引入这些中间件,目前的架构里面没有,需要自己去起服务,维护。

2.使用channel

System.Threading.Channels 是.NET Core 3.0 后推出的新的集合类型, 具有异步API,高性能,线程安全等特点,它可以用来做消息队列,进行数据的生产和消费, 公开的 WriterReader api对应消息的生产者和消费者,也让Channel更加的简洁和易用,与Rabbit MQ 等其他队列不同的是,Channel 是进程内的队列

目前就介绍来看非常完美,不需要添加第三方中间件,直接添加现有的模块即可。

代码实现

选择了使用channel来做优化。拿到设备数据之后直接把消息丢入到channel,然后后台使用定时任务或者自己实现hostservice去不断的消费数据。

生产者代码

 public async Task ProduceHeartBeat(string message)
        {
            await channel.Writer.WriteAsync(message);
        }

不断的向里面写入数据即可.

消费者代码

        /// <summary>
        /// timespan时间内消费多少数据
        /// </summary>
        /// <param name="count"></param>
        /// <param name="timeSpan"></param>
        /// <returns></returns>
        public async Task<List<string>> ConsumeHeartBeatAsync(int count,TimeSpan timeSpan)
        {
            var result = new List<string>(count);
            CancellationTokenSource cts = new CancellationTokenSource();
            var cancellationToken = cts.Token;
            cts.CancelAfter(timeSpan);
            int rcount = 0;
            while ( !cancellationToken.IsCancellationRequested && rcount<count)
            {
                //await Task.Delay(2000);
                if (channel.Reader.TryRead(out var number))
                {
                    Console.WriteLine(number);
                    result.Add(number);
                    rcount++;
                }
                else
                {
                    break;
                }
                
            }  
            return result;
        }

里面加入了一个cancellationToken,进行消费的时长限制。在此时长内消费多少条数据,超时直接结束。

这就是基本的代码

后台定时消费数据

public class HeartBeatService : BackgroundService
    {
        private readonly HeartBeatsChannel heartBeatsChannel;

        public HeartBeatService(HeartBeatsChannel heartBeatsChannel)
        {
            this.heartBeatsChannel = heartBeatsChannel;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            try
            {

                Task.Factory.StartNew(() =>
                {
                    while (!stoppingToken.IsCancellationRequested)
                    {
                        //阻塞的队列使得一直在同一个线程运行
                        Process(15,heartBeatsChannel).Wait();
                    }

                }, TaskCreationOptions.LongRunning);

                Console.WriteLine("主线程 现在运行的线程id为:" + Thread.CurrentThread.ManagedThreadId);

                }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }
        }


        /// <summary>
        /// 消费数据
        /// </summary>
        /// <param name="count">一次消费数量</param>
        /// <param name="heartBeatsChannel"></param>
        /// <returns></returns>
        private async Task Process(int count ,HeartBeatsChannel heartBeatsChannel)
        {
            Console.WriteLine("子线程_现在运行的线程id为:" + Thread.CurrentThread.ManagedThreadId);
            //每次消费三十个
            if (heartBeatsChannel.IsHasContent)
            {
                //int count = 15;
                //进行消费
                await heartBeatsChannel.ConsumeHeartBeatAsync(count, TimeSpan.FromSeconds(3));
            }           
            await Task.Delay(3000);
        }

使用的是BackgroundServic,直接实现要处理的业务逻辑就好了。在这里使用的是TaskCreationOptions.LongRunning,新开一个线程去处理心跳数据。

总结

以上就是主要的实现全过程,完整的代码在github

https://github.com/lackguozi/LearnChannelWebApi

实际上完全可以不用后台去定时消费数据,channel有很多api可以去处理,比如WaitToReadAsync(),但是这里没有使用,主要是不想持续的占数据库资源???总结的话学习了channel的用法,底层似乎使用了deque??只稍微看了下源码,但是看到了许多的lock,这个是必不可少的。还是巨硬轮子造的好 =_=文章来源地址https://www.toymoban.com/news/detail-501073.html

到了这里,关于.net core使用channel消息队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • (七)「消息队列」之 RabbitMQ 发布者确认(使用 .NET 客户端)

    发布者确认 是一个 RabbitMQ 扩展,用于实现可靠的发布。当在通道上启用发布者确认时,客户端发布的消息将由代理 异步确认 ,这意味着它们已在服务器端得到处理。 先决条件 本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口( 5672 )上运行。如果您使用了不同的主

    2024年02月16日
    浏览(28)
  • 重要消息丨.NET Core 3.1 将于今年12月13日结束支持

    .NET Core 3.1 将于 2022 年 12 月 13 日结束支持。此后,Microsoft 将不再为 .NET Core 3.1 提供服务更新或技术支持。 我们建议尽快迁移到 .NET 6 。如果您在支持日期结束后仍在使用 .NET Core 3.1,那么您需要将您的应用更新到 .NET 6 或 .NET 7 以继续受到相关支持并继续接收 .NET 更新。 .NE

    2024年02月06日
    浏览(29)
  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(49)
  • 【C#/.NET】使用ASP.NET Core对象池

    Microsoft.Extensions.ObjectPool   减少初始化/资源分配,提高性能。这一条与线程池同理,有些对象的初始化或资源分配耗时长,复用这些对象减少初始化和资源分配。比如:我有一个执行耗时约500毫秒,内存空间 2KB的任务为此创建一个新线程异步执行,而创建线程耗时1秒,内存空

    2024年02月06日
    浏览(57)
  • .NET CORE 之 gRPC使用

    gRPC 是一种与语言无关的高性能远程过程调用 (RPC) 框架( google 开源的 rpc 框架)。 gRPC 默认使用 protocol buffers ,这是 Google 开源的一套成熟的结构数据序列化机制(也可以使用其他数据格式如 JSON )   gRPC 的主要优点是:   HTTP2 传输 现代高性能轻量级 RPC 框架。 协定优先

    2024年02月08日
    浏览(34)
  • RabbitMQ-网页使用消息队列

    几种模式 从最简单的开始 添加完新的虚拟机可以看到,当前admin用户的主机访问权限中新增的刚添加的环境 1.1查看交换机 交换机列表中自动新增了刚创建好的虚拟主机相关的预设交换机。一共7个。前面两个 direct类型的交换机,一个是(AMQP default)还有一个是amq.direct,它们

    2024年02月07日
    浏览(33)
  • RabbitMQ 消息队列使用

    同步调用优点: 时效性强,立即得到结果 缺点: 耦合度高 新业务新需求到来时,需要修改代码 性能和吞吐能力下降 调用服务的响应时间为所有服务的时间之和 资源浪费 调用链中的服务在等待时不会释放请求占用的资源 级联失败 一个服务执行失败会导致调用链后续所有服务失

    2024年01月21日
    浏览(41)
  • 消息队列的使用

    概念: 消息队列是System V IPC对象的一种   发送端: 1 申请Key 2打开/创建消息队列   msgget 3向消息队列发送消息   msgsnd 接收端: 1打开/创建消息队列   msgget 2从消息队列接收消息   msgrcv 3 控制(删除)消息队列   msgctl 打开/ 创建消息队列  #include sys/ipc.h  #include sys/msg

    2024年02月07日
    浏览(12)
  • Net Core中使用EF Core连接Mysql数据库

    Entity Framework Core的前身是微软提供并主推的ORM框架,简称EF,其底层是对ADO.NET的封装。EF支持SQLServer、MYSQL、Oracle、Sqlite等所有主流数据库。 首先是使用时的几个模式的整理及其理解: Code First:根据代码自动创建数据库表结构甚至是数据库,可以支持多库开发,代码较少冗余

    2024年01月24日
    浏览(41)
  • Hyperf使用RabbitMQ消息队列

    Hyperf连接使用RabbitMQ消息中间件 使用Docker部署RabbitMQ,-传送门 使用Docker部署Hyperf,-传送门- 安装amqp扩展 安装command命令行扩展 配置参数 假设已经在rabbitmq设置了交换机exchange_test和队列queue_test 新建 /config/autoload/amp.php配置文件,修改地址和用户名密码 创建生产者中间件 exch

    2024年02月13日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包