基于channel的异步事件总线

这篇具有很好参考价值的文章主要介绍了基于channel的异步事件总线。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

生成者/消费者概念编程模型

通道是生成者/使用者概念编程模型的实现。 在此编程模型中,生成者异步生成数据,使用者异步使用该数据。 换句话说,此模型将数据从一方移交给另一方。 尝试将通道视为任何其他常见的泛型集合类型,例如 List。 主要区别在于,此集合管理同步,并通过工厂创建选项提供各种消耗模型。 这些选项控制通道的行为,例如允许它们存储的元素数,以及达到该限制时会发生什么情况,或者通道是由多个生成者还是多个使用者同时访问

channel简介

channel提供了用于在生成者和使用者之间以异步方式传递数据的一组同步数据结构。

channel(管道)提供了有界通道和无界通道

无界通道

该通道可以同时供任意数量的读取器和编写器使用。 或者,可以通过提供 UnboundedChannelOptions 实例在创建无限制通道时指定非默认行为。 该通道的容量不受限制,并且所有写入均以同步方式执行

有界通道

创建有界通道时,该通道将绑定到最大容量。 达到边界时,默认行为是通道异步阻止生成者,直到空间可用。 可以通过在创建通道时指定选项来配置此行为。 可以使用任何大于零的容量值创建有界通道

模式行为

BoundedChannelFullMode.Wait

这是默认值。 WriteAsync调用 以等待空间可用以完成写入操作。 调用 以 TryWrite 立即返回 false 。

BoundedChannelFullMode.DropNewest

删除并忽略通道中的最新项,以便为要写入的项留出空间。

BoundedChannelFullMode.DropOldest

删除并忽略通道中的最旧项,以便为要写入的项留出空间。
BoundedChannelFullMode.DropWrite 删除要写入的项。

Channel.Writer API

生成者功能在 Channel<TWrite,TRead>.Writer 上公开。 下表详细介绍了生成者 API 和预期行为:

ChannelWriter.Complete

将通道标记为已完成,这意味着不再向该通道写入更多项。

ChannelWriter.TryComplete

尝试将通道标记为已完成,这意味着不会向通道写入更多数据。

ChannelWriter.TryWrite

尝试将指定的项写入到通道。 当与无界通道一起使用时,除非通道的编写器通过 ChannelWriter.Complete 或 ChannelWriter.TryComplete 发出完成信号,否则这将始终返回 true。

ChannelWriter.WaitToWriteAsync

返回一个 ValueTask ,当有空间可以写入项时完成。
ChannelWriter.WriteAsync 以异步方式将项写入到通道

Channel.Reader API

ChannelReader.ReadAllAsync

创建允许从通道中读取所有数据的 IAsyncEnumerable

ChannelReader.ReadAsync

以异步方式从通道中读取项。

ChannelReader.TryPeek

尝试从通道中查看项。

ChannelReader.TryRead

尝试从通道中读取项。

ChannelReader.WaitToReadAsync

返回在 ValueTask 数据可供读取时完成的 。

channel的具体使用

https://learn.microsoft.com/zh-cn/dotnet/core/extensions/channels

基于channel实现事件总线

EventDiscriptorAttribute 特性定义

    [AttributeUsage(AttributeTargets.Class,AllowMultiple = false,Inherited = false)]
    public class EventDiscriptorAttribute:Attribute
    {
       /// <summary>
       /// 事件2名称
       /// </summary>
       public string EventName { get; private set; }
       /// <summary>
       /// channel 容量设置
       /// </summary>
       public int Capacity { get; private set; }  
       /// <summary>
       /// 是否维持一个生产者多个消费者模型
       /// </summary>
       public bool SigleReader { get; private set; }

       public EventDiscriptorAttribute(string eventName, int capacity = 1000, bool sigleReader = true)
        {
            EventName = eventName;
            Capacity = capacity;
            SigleReader = sigleReader;
        }   
    }

定义通道容器

    //通道容器单列注入,在拓展类中初始化
    public class EventHandlerContainer : IEventHandlerContainer
    {
        public ConcurrentBag<EventDiscription> Events { get; private set; }

        private readonly IServiceCollection Services;

        public EventHandlerContainer(IServiceCollection services)
        {
            Events = new ConcurrentBag<EventDiscription>();
            Services = services;         
            services.AddSingleton<IEventHandlerContainer>(this);
        }

        private bool Check(Type type)
        {
            var discription = Events.FirstOrDefault(p=>p.EtoType == type);

            return discription is null;
        }
        
        ///订阅并且注入EventHandler
        public void Subscribe(Type eto,Type handler)
        {
            if(!Check(eto))
            {
                return;
            }

            Events.Add(new EventDiscription(eto, handler));

            var handlerbaseType = typeof(IEventHandler<>);

            var handlertype = handlerbaseType.MakeGenericType(eto);

            if(Services.Any(P=>P.ServiceType==handlertype))
            {
                return;
            }

            Services.AddTransient(handlertype, handler);
        }

        public void Subscribe<TEto, THandler>() 
            where TEto : class
            where THandler :IEventHandler<TEto>
        {
            Subscribe(typeof(TEto),typeof(THandler));  
        }

事件管理器

事件管理器通过线程安全字典管理事件通道和事件的触发

可以看到在Subscribe 方法中消费者并不是在订阅后立即执行的而是放到EventTrigger中的定义的异步事件中去

消费者执行最后又.,NET提供的托管任务去执行

    public class EventHandlerManager : IEventHandlerManager, IDisposable
    {
        private ConcurrentDictionary<string, Channel<string>> Channels;

        private bool IsDiposed = false;

        private readonly IServiceProvider ServiceProvider;

        private readonly CancellationToken _cancellation;

        private readonly IEventHandlerContainer _eventHandlerContainer;

        private readonly ILogger _logger;

        private bool IsInitConsumer = true;

        public EventHandlerManager(
            IServiceProvider serviceProvider,
            IEventHandlerContainer eventHandlerContainer,
            ILoggerFactory loggerFactory
        )
        {
            ServiceProvider = serviceProvider;
            _cancellation = CancellationToken.None;
            _eventHandlerContainer = eventHandlerContainer;
            Channels = new ConcurrentDictionary<string, Channel<string>>();
            _logger = loggerFactory.CreateLogger<IEventHandlerManager>();
        }

        public async Task CreateChannles()
        {
            var eventDiscriptions = _eventHandlerContainer.Events;

            foreach (var item in eventDiscriptions)
            {
                var attribute = item.EtoType
                    .GetCustomAttributes()
                    .OfType<EventDiscriptorAttribute>()
                    .FirstOrDefault();

                if (attribute is null)
                {
                    ThorwEventAttributeNullException.ThorwException();
                }

                var channel = Channels.GetValueOrDefault(attribute.EventName);

                if (channel is not null)
                {
                    return;
                }

                channel = Channel.CreateBounded<string>(
                    new BoundedChannelOptions(attribute.Capacity)
                    {
                        SingleWriter = true,
                        SingleReader = false,
                        AllowSynchronousContinuations = false,
                        FullMode = BoundedChannelFullMode.Wait
                    }
                );

                Channels.TryAdd(attribute.EventName, channel);

                _logger.LogInformation($"创建通信管道{item.EtoType}--{attribute.EventName}");
            }
            await Task.CompletedTask;
        }

        private Channel<string> Check(Type type)
        {
            var attribute = type.GetCustomAttributes()
                .OfType<EventDiscriptorAttribute>()
                .FirstOrDefault();

            if (attribute is null)
            {
                ThorwEventAttributeNullException.ThorwException();
            }

            var channel = Channels.GetValueOrDefault(attribute.EventName);

            if (channel is null)
            {
                ThrowChannelNullException.ThrowException(attribute.EventName);
            }

            return channel;
        }

        public void Dispose()
        {
            IsDiposed = true;
            IsInitConsumer = true;
            _cancellation.ThrowIfCancellationRequested();
        }

        /// <summary>
        /// 生产者
        /// </summary>
        /// <typeparam name="TEto"></typeparam>
        /// <param name="eto"></param>
        /// <returns></returns>
        public async Task WriteAsync<TEto>(TEto eto)
            where TEto : class
        {
            var channel = Check(typeof(TEto));
            //因为使用的是有限容量通道存在消息堆积情况,在消息堆积的情况下处理等待消息是否可写入
            while (await channel.Writer.WaitToWriteAsync())
            {
                var data = JsonConvert.SerializeObject(eto);

                await channel.Writer.WriteAsync(data, _cancellation);

                break;
            }
        }

        /// <summary>
        /// 消费者
        /// </summary>
        /// <returns></returns>
        public async Task StartConsumer()
        {
            if (!IsInitConsumer)
            {
                return;
            }

            foreach (var item in _eventHandlerContainer.Events)
            {
                _ = Task.Factory.StartNew(async () =>
                 {
                     var attribute = item.EtoType
                         .GetCustomAttributes()
                         .OfType<EventDiscriptorAttribute>()
                         .FirstOrDefault();

                     var scope = ServiceProvider.CreateAsyncScope();

                     var channel = Check(item.EtoType);

                     var handlerType = typeof(IEventHandler<>).MakeGenericType(item.EtoType);

                     var handler = scope.ServiceProvider.GetRequiredService(handlerType);

                     var reader = channel.Reader;

                     try
                     {
                         while (await channel.Reader.WaitToReadAsync())
                         {
                             while (reader.TryRead(out string str))
                             {
                                 var data = JsonConvert.DeserializeObject(str, item.EtoType);

                                 _logger.LogInformation(str);

                                 await (Task)
                                     handlerType
                                         .GetMethod("HandelrAsync")
                                         .Invoke(handler, new object[] { data });
                             }
                         }
                     }
                     catch (Exception e)
                     {
                         _logger.LogInformation($"本地事件总线异常{e.Source}--{e.Message}--{e.Data}");
                         throw;
                     }
                 });
            }
            IsInitConsumer = false;
            await Task.CompletedTask;
        }
    }

托管任务执行EventHandlerManager StartConsumer()方法

    public class EventBusBackgroundService : BackgroundService
    {
        private readonly IEventHandlerManager _eventHandlerManager;
        public EventBusBackgroundService(IEventHandlerManager eventHandlerManager) 
        { 
            _eventHandlerManager = eventHandlerManager; 
        }  
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            await _eventHandlerManager.StartConsumer();
        }
    }

拓展类定义

    public  static class EventBusExtensions
    {
        //添加事件总线并且添加channle管道
        public static IServiceCollection AddEventBusAndSubscribes(this IServiceCollection services,Action<EventHandlerContainer> action)
        {
            services.AddSingleton<IEventHandlerManager, EventHandlerManager>();

            services.AddTransient<ILocalEventBus, LocalEventBus>();

            services.AddHostedService<EventBusBackgroundService>();

            EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services);

            action.Invoke(eventHandlerContainer);

            return services;
        }

        //创建通信管道
        public static async Task InitChannles(this IServiceProvider serviceProvider,Action<IEventHandlerManager> action)
        {
            var scope = serviceProvider.CreateAsyncScope(); 

            var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>();

            await eventhandlerManager.CreateChannles();

            action.Invoke(eventhandlerManager);
        }

        //添加本地事件总线
        public static IServiceCollection AddEventBus(this IServiceCollection services)
        {
            services.AddSingleton<IEventHandlerManager, EventHandlerManager>();

            services.AddTransient<ILocalEventBus, LocalEventBus>();

            services.AddHostedService<EventBusBackgroundService>();

            return services;
        }

        //添加通信管道
        public static IServiceCollection Subscribes(this IServiceCollection services, Action<EventHandlerContainer> action)
        {
            EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services);

            action.Invoke(eventHandlerContainer);

            return services;
        }
    }

使用


    context.Services.AddEventBus();
    //添加通信管道
    context.Services.Subscribes(p =>
    {
       p.Subscribe<TestEto,TestEventHandler>();
    });
    //
    var scope = context.ServiceProvider.CreateScope();

    var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>();
    //初始化通信管道
    await eventhandlerManager.CreateChannles();
    //定义EventHandler
    public class TestEventHandler : IEventHandler<TestEto>,ITransientInjection
    {
        private ILogger _logger;
        public TestEventHandler(ILoggerFactory factory)
        {
            _logger = factory.CreateLogger<TestEventHandler>();
        }   
        public Task HandelrAsync(TestEto eto)
        {
            _logger.LogInformation($"{typeof(TestEto).Name}--{eto.Name}--{eto.Description}");
            return Task.CompletedTask;
        }
    }
    
    //构造函数注入
    [HttpGet]
		public async Task TestLocalEventBus()
		{
			TestEto eto = null;

			for(var i = 0; i < 100; i++)
			{
				eto = new TestEto()
				{
					Name ="LocalEventBus" + i.ToString(),
					Description ="wyg"+i.ToString(),
				};
				await _localEventBus.PublichAsync(eto);
			}
		}

总结

作为一个才毕业一年的初级程序员的我来说这次的channel的事件总线的封装还存在着许多不足

1.无法对消息进行持久化的管理

2.没有对消息异常进行处理

3.没有做到像abp那样自动订阅

当然还存在着一些我不知道问题,欢迎各位大佬提出问题指正

源码链接文章来源地址https://www.toymoban.com/news/detail-712194.html

到了这里,关于基于channel的异步事件总线的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 生产者消费者模型(基于go实现)

    基于 Channel 编写一个简单的单线程生产者消费者模型: 队列: 队列长度 10,队列元素类型为 int 生产者: 每 1 秒往队列中放入一个类型为 int 的元素,消费者: 每一秒从队列中获取一个元素并打印。 基于 Channel 编写一个简单的单线程生产者消费者模型: 队列: 队列长度

    2024年02月11日
    浏览(35)
  • 基于互斥锁的生产者消费者模型

    生产者消费者模型 是一种常用的 并发编程模型 ,用于 解决多线程或多进程环境下的协作问题 。该模型包含两类角色: 生产者和消费者 。 生产者负责生成数据 ,并将数据存放到共享的缓冲区中。 消费者则从缓冲区中获取数据 并进行处理。生产者和消费者之间通过共享的

    2024年02月12日
    浏览(39)
  • 基于 BlockQueue(阻塞队列) 的 生产者消费者模型

    阻塞队列(Blocking Queue) 是一种特殊类型的队列,它具有阻塞操作的特性。在并发编程中,阻塞队列可以用于实现线程间的安全通信和数据共享。 阻塞队列的 主要特点 是: 当 队列为空时 ,消费者线程尝试从队列中获取(出队)元素时会被阻塞,直到有新的元素被添加到队

    2024年02月12日
    浏览(51)
  • 基于spring mockito 编写kafka消费者的单元测试

    注: message.json为消息体内容的json文件 重点关注@MockBean和Mockito的使用 readData为从文件中读取json对象的方法

    2024年02月04日
    浏览(58)
  • 【Linux】基于环形队列的生产者消费者模型的实现

    文章目录 前言 一、基于环形队列的生产者消费者模型的实现 上一篇文章我们讲了信号量的几个接口和基于环形队列的生产者消费者模型,下面我们就快速来实现。 首先我们创建三个文件,分别是makefile,RingQueue.hpp,以及main.cc。我们先简单搭建一下环形队列的框架: 首先我们

    2024年02月11日
    浏览(45)
  • 【linux】线程同步+基于BlockingQueue的生产者消费者模型

    喜欢的点赞,收藏,关注一下把! 在线程互斥写了一份抢票的代码,我们发现虽然加锁解决了抢到负数票的问题,但是一直都是一个线程在抢票,它错了吗,它没错但是不合理。那我们应该如何安全合理的抢票呢? 讲个小故事。 假设学校有一个VIP学霸自习室,这个自习室有

    2024年02月03日
    浏览(100)
  • 【Linux】POSIX信号量 | 基于环形队列的生产者消费者模型

    ​🌠 作者:@阿亮joy. 🎆 专栏: 《学会Linux》 🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根 POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但 POSIX 可以用于

    2023年04月08日
    浏览(64)
  • 【Java系列】多线程案例学习——基于阻塞队列实现生产者消费者模型

    个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【Java系列专栏】【JaveEE学习专栏】 本专栏旨在分享学习JavaEE的一点学习心得,欢迎大家在评论区交流讨论💌 什么是阻塞式队列(有两点): 第一点:当队列满的时候

    2024年02月04日
    浏览(57)
  • 【Linux学习】多线程——同步 | 条件变量 | 基于阻塞队列的生产者消费者模型

    🐱作者:一只大喵咪1201 🐱专栏:《Linux学习》 🔥格言: 你只管努力,剩下的交给时间! 以生活中消费者生产者为例: 生活中,我们大部分人都扮演着消费者的角色,会经常在超市买东西,比如买方便面,而超市的方便面是由供应商生成的。所以我们就是消费者,供应商

    2024年02月05日
    浏览(51)
  • 【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池

    死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。 当多线程并发执行并都需要访问临界资源时,因为每个线程都是不同的执行流,这就有可能 导致数据不一致问题 ,为了避免此问题的发生

    2024年01月24日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包