如何兼顾性能+实时性处理缓冲数据?

这篇具有很好参考价值的文章主要介绍了如何兼顾性能+实时性处理缓冲数据?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

我们经常会遇到这样的数据处理应用场景:我们利用一个组件实时收集外部交付给它的数据,并由它转发给一个外部处理程序进行处理。考虑到性能,它会将数据存储在本地缓冲区,等累积到指定的数量后打包发送;考虑到实时性,数据不能在缓冲区存太长的时间,必须设置一个延时时间,一旦超过这个时间,缓冲的数据必须立即发出去。看似简单的需求,如果需要综合考虑性能、线程安全、内存分配,要实现起来还真有点麻烦。这个问题有不同的解法,本文提供一种实现方案。(在网友Broadm的提醒下,发现原来定义的Batcher<T>由并发问题,特进行了修正)

一、实例演示
二、待处理的批量数据:Batch<T>
三、感知数据处理的时机:BatchChangeToken
四、接收、缓冲、打包和处理数据:Batcher<T>

一、实例演示

我们先来看看最终达成的效果。在如下这段代码中,我们使用一个Batcher<string>对象来接收应用分发给它的数据,该对象最终会在适当的时机处理它们。 调用Batcher<string>构造函数的三个参数分别表示:

  • processor:批量处理数据的委托对象,它指向的Process方法会将当前时间和处理的数据量输出到控制台上;
  • batchSize:单次处理的数据量,当缓冲的数据累积到这个阈值时会触发数据的自动处理。我们将这个阈值设置为10
  • interval:两次处理处理的最长间隔,我们设置为5秒
var batcher = new Batcher<string>(
    processor:Process,
    batchSize:10,
    interval: TimeSpan.FromSeconds(5));
var random = new Random();
while (true)
{
    var count = random.Next(1, 4);
    for (var i = 0; i < count; i++)
    {
        batcher.Add(Guid.NewGuid().ToString());
    }
    await Task.Delay(1000);
}

static void Process(Batch<string> batch)
{
    using (batch)
    {
        Console.WriteLine($"[{DateTimeOffset.Now}]{batch.Count} items are delivered.");
    }
}

如上面的代码片段所示,在一个循环中,我们每隔1秒钟随机添加1-3个数据项。从下图中可以看出,Process方法的调用具有两种触发条件,一是累积的数据量达到设置的阈值10,另一个则是当前时间与上一次处理时间间隔超过5秒。

如何兼顾性能+实时性处理缓冲数据?

二、待处理的批量数据:Batch<T>

除了上面实例涉及的Batcher<T>,该解决方案还涉及两个额外的类型,如下这个Batch<T>类型表示最终发送的批量数据。为了避免缓冲数据带来的内存分配,我们使用了一个单独的ArrayPool<T>对象来创建池化的数组,这个功能体现在静态方法CreatePooledArray方法上。由于构建Batch<T>对象提供的数组来源于对象池,在处理完毕后必须回归对象池,所以我们让这个类型实现了IDisposable接口,并将这一操作实现在Dispose方法种。在调用ArrayPool<T>对象的Return方法时,我们特意将数组清空。由于提供的数组来源于对象池,所以并不能保证每个数据元素都承载了有效的数据,实现的迭代器和返回数量的Count属性对此作了相应的处理。

public sealed class Batch<T> : IEnumerable<T>, IDisposable where T : class
{
    private bool _isDisposed;
    private int? _count;
    private readonly T[] _data;
    private static readonly ArrayPool<T> _pool = ArrayPool<T>.Create();

    public int Count
    {
        get
        {
            if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>));
            if(_count.HasValue) return _count.Value;
            var count = 0;
            for (int index = 0; index < _data.Length; index++)
            {
                if (_data[index] is  null)
                {
                    break;
                }
                count++;
            }
            return (_count = count).Value;
        }
    }
    public Batch(T[] data) => _data = data ?? throw new ArgumentNullException(nameof(data));
    public void Dispose()
    {
        _pool.Return(_data, clearArray: true);
        _isDisposed = true;
    }
    public IEnumerator<T> GetEnumerator() => new Enumerator(this);
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
    public static T[] CreatePooledArray(int batchSize) => _pool.Rent(batchSize);
    private void EnsureNotDisposed()
    {
        if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>));
    }

    private sealed class Enumerator : IEnumerator<T>
    {
        private readonly Batch<T> _batch;
        private readonly T[] _data;
        private int _index = -1;
        public Enumerator(Batch<T> batch)
        {
            _batch = batch;
            _data = batch._data;
        }
        public T Current
        {
            get { _batch.EnsureNotDisposed(); return _data[_index]; }
        }
        object IEnumerator.Current => Current;
        public void Dispose() { }
        public bool MoveNext()
        {
            _batch.EnsureNotDisposed();
            return ++_index < _data.Length && _data[_index] is not null;
        }
        public void Reset()
        {
            _batch.EnsureNotDisposed();
            _index = -1;
        }
    }
}

三、感知数据处理的时机:BatchChangeToken

Batcher具有两个触发数据处理的设置:缓冲的数据量和两次数据处理之间的最长间隔。当累积的数据量或者当前时间与上一次处理的间隔达到阈值,缓冲的数据将自动被处理。.NET Core经常利用一个IChangeToken作为通知的令牌,为此我们定义了如下这个实现了该接口的BatchChangeToken类型。如下面的代码片段所示,上述两个触发条件体现在两个CancellationToken对象上,我们利用它们创建了对应的CancellationChangeToken对象,最后利用这两个CancellationChangeToken创建了一个CompositeChangeToken对象。这个CompositeChangeToken对象最终被用来实现了IChangeToken接口的三个成员。

internal sealed class BatchChangeToken : IChangeToken
{
    private readonly IChangeToken _innerToken;
    private readonly int _countThreshold;
    private readonly CancellationTokenSource _expirationTokenSource;
    private readonly CancellationTokenSource _countTokenSource;
    private int _counter;

    public BatchChangeToken(int countThreshold, TimeSpan timeThreshold)
    {
        _countThreshold = countThreshold;
        _countTokenSource = new CancellationTokenSource();
        _expirationTokenSource = new CancellationTokenSource(timeThreshold);
        var countToken = new CancellationChangeToken(_countTokenSource.Token);
        var expirationToken = new CancellationChangeToken(_expirationTokenSource.Token);
        _innerToken = new CompositeChangeToken(new IChangeToken[] { countToken, expirationToken });
    }

    public bool HasChanged => _innerToken.HasChanged;
    public bool ActiveChangeCallbacks => _innerToken.ActiveChangeCallbacks;
    public IDisposable RegisterChangeCallback(Action<object?> callback, object? state) => _innerToken.RegisterChangeCallback(s =>
    {
        callback(s);
        _countTokenSource.Dispose();
        _expirationTokenSource.Dispose();
    }, state);
    public void Increase()
    {
        Interlocked.Increment(ref _counter);
        if (_counter >= _countThreshold)
        {
            _countTokenSource.Cancel();
        }
    }
}

上述两个CancellationToken来源于对应的CancellationTokenSource,对应的字段为_countTokenSource和_expirationTokenSource。_expirationTokenSource根据设置的数据处理时间间隔创建而成。为了确定缓冲的数据量,我们提供了一个计数器,并利用Increase方法进行计数。在超过设置的数据量时,该方法会调用_expirationTokenSource的Cancel方法。在实现的ActiveChangeCallbacks方法种,我们将针对这两个CancellationTokenSource的释放放在注册的回调中。

四、接收、缓冲、打包和处理数据:Batcher<T>

最终用于打包的Batcher类型定义如下。在构造函数中,我们除了提供上述两个阈值外,还提供了一个Action<Batch<T>>委托完成针对打包数据的处理。我们创建了一个内部类型Container作为存放数据的容器,具体数据存放在一个数组中,字段_index代表下一个添加数组存放的索引。TryAdd方法将指定的数据添加到数组中,我们使用InterLocked.Increase方法解决并发问题。如果越界返回False,表示添加失败,否则返回True,表示成功添加。Container的数组通过Batch<T>的静态方法CreatePooledArray提供的。Container类型还提供了一个AsBatch方法将数据封装成Batch<T>对象。

public sealed class Batcher<T> : IDisposable where T : class
{
    private volatile Container _container;
    private volatile BatchChangeToken _changeToken = default!;
    private readonly IDisposable _scheduler;

    public Batcher(Action<Batch<T>> processor, int batchSize, TimeSpan interval)
    {
        if(processor is null) throw new ArgumentNullException(nameof(processor));
        _container = new Container(batchSize);
        _scheduler = ChangeToken.OnChange(() => _changeToken = new BatchChangeToken(batchSize, interval), OnChange);

        void OnChange()
        {
            var container = Interlocked.Exchange(ref _container, new Container(batchSize));
            if (!container.IsEmpty)
            {
                _ = Task.Run(() => processor.Invoke(container.AsBatch()));
            }
        }
    }

    public void Add(T item)
    {
        if (item is null) throw new ArgumentNullException(nameof(item));
        if (_container.TryAdd(item))
        {
            _changeToken.Increase();
        }
        else
        {
            new SpinWait().SpinOnce();
            Add(item);
        }
    }

    public void Dispose() => _scheduler.Dispose();

    private sealed class Container
    {
        private volatile int _index = -1;
        private readonly T[] _data;
        public bool IsEmpty => _index == -1;
        public Container(int batchSize) => _data = Batch<T>.CreatePooledArray(batchSize);
        public bool TryAdd(T item)
        {
            var index = Interlocked.Increment(ref _index);
            if (index > _data.Length - 1) return false;
            _data[index] = item;
            return true;
        }
        public Batch<T> AsBatch() => new(_data);
    }
}

在构造函数中,我们调用了ChangeToken的静态方法OnChange将数据处理操作绑定到创建的BatchChangeToken对象上,并确保每次发送“数据处理”后将重新创建的BatchChangeToken对象赋值到_changeToken字段上。当接收到数据处理通知后,我们会调用Batch<T>的静态方法CreatePooledArray构建一个数组,并利用创建的Container对象将字段 ­_container引用的容器替换下来。如果替换下来的容器不为空,我们将其封装成Batch<T>对象进行处理。Add方法直接调用容器的TryAdd方法,如果返回False,需要以自旋的方式等待片刻后,递归调用本方法直至将数据添加到容器中。

=====================================================================

测试证明上面的代码依然有并发问题,本想实现一种完全无锁的解决方案,目前并没有完美的解决这个问题。权衡再三提供了如下这个读写锁的解决方案,如下是Batcher<T>的实现。文章来源地址https://www.toymoban.com/news/detail-464230.html

public sealed class Batcher<T> where T : class
{
    private readonly int _interval;
    private readonly int _batchSize;
    private readonly Action<Batch<T>> _processor;
    private volatile Container _container;
    private readonly Timer _timer;
    private readonly ReaderWriterLockSlim _lock = new();

    public Batcher(Action<Batch<T>> processor, int batchSize, TimeSpan interval)
    {
        _interval = (int)interval.TotalMilliseconds;
        _batchSize = batchSize;
        _processor = processor;
        _container = new Container(batchSize);
        _timer = new Timer(_ => Process(), null, _interval, Timeout.Infinite);
    }

    private void Process()
    {
        if (_container.IsEmpty) return;
        _lock.EnterWriteLock();
        try
        {
            if (_container.IsEmpty) return;
            var container = Interlocked.Exchange(ref _container, new Container(_batchSize));
            _ = Task.Run(() => _processor(container.AsBatch()));
            _timer.Change(_interval, Timeout.Infinite);
        }
        finally
        {
            _lock.ExitWriteLock();
        }
    }

    public void Add(T item)
    {
        _lock.EnterReadLock();
        bool success = false;
        try
        {
            success = _container.TryAdd(item);
        }
        finally
        {
            _lock.ExitReadLock();
        }

        if (!success)
        {
            Process();
            new SpinWait().SpinOnce();
            Add(item);
        }
    }

    private sealed class Container
    {
        private volatile int _next = -1;
        private readonly T[] _data;
        public bool IsEmpty => _next == -1;
        public Container(int batchSize) => _data = Batch<T>.CreatePooledArray(batchSize);
        public bool TryAdd(T item)
        {
            var index = Interlocked.Increment(ref _next);
            if (index > _data.Length - 1) return false;
            _data[index] = item;
            return true;
        }
        public Batch<T> AsBatch() => new(_data);
    }
}

到了这里,关于如何兼顾性能+实时性处理缓冲数据?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RocketMQ 5.0 无状态实时性消费详解

    作者:绍舒 RocketMQ 5.0 版本引入了 Proxy 模块、无状态 pop 消费机制和 gRPC 协议等创新功能,同时还推出了一种全新的客户端类型:SimpleConsumer。 SimpleConsumer 客户端采用了无状态的 pop 机制,彻底解决了在客户端发布消息、上下线时可能出现的负载均衡问题。然而,这种新机制也

    2024年02月15日
    浏览(16)
  • ES的近实时性是什么意思?原理是什么?

    ES 的\\\"近实时\\\"(near real-time)是指当你往 ES 索引(Index)中插入、更新或删除文档时,这些变更几乎立刻就对用户可见和可查询。然而,它并不是真正的实时,因为 ES 采用了一些优化和缓冲机制,以提高性能和吞吐量。 🟥 具体来说,ES的近实时性质包括以下几个方面: 索引

    2024年02月07日
    浏览(18)
  • 边缘计算的挑战与机遇:实时性、安全性与创新驱动

    摘要: 随着物联网、人工智能和大数据的快速发展,边缘计算作为一种新兴的计算模型,已经成为满足实时性和本地处理需求的解决方案。然而,边缘计算也面临着数据安全与隐私保护、网络稳定性等挑战。本文将探讨边缘计算所面临的挑战,并阐述其所带来的机遇,以及我

    2024年01月18日
    浏览(21)
  • 边缘计算技术的双面刃:深度剖析安全、稳定挑战及实时性、成本优势

    在数字化时代的前沿,边缘计算作为一项颠覆性技术,正以其独特的分布式架构和强大的本地处理能力深刻改变着数据处理与分析的方式。然而,这一技术革新也带来了复杂的安全防护需求、网络稳定性问题,同时也为各行业带来了前所未有的实时响应能力和经济效率提升。

    2024年01月22日
    浏览(26)
  • Qt音视频开发44-本地摄像头推流(支持分辨率/帧率等设置/实时性极高)

    本地摄像头推流和本地桌面推流类似,无非就是采集的设备源头换成了本地摄像头设备而不是桌面,其他代码完全一样。采集本地摄像头实时视频要注意的是如果设置分辨率和帧率,一定要是设备本身就支持的,如果不支持那就歇菜,比如设备本身最大分辨率到1280x720,你主

    2024年02月05日
    浏览(18)
  • Qt音视频开发43-采集屏幕桌面并推流(支持分辨率/矩形区域/帧率等设置/实时性极高)

    采集电脑屏幕桌面并推流一般是用来做共享桌面、远程协助、投屏之类的应用,最简单入门的做法可能会采用开个定时器或者线程抓图,将整个屏幕截图下来,然后将图片传出去,这种方式很简单但是性能要低不少,一般采用ffmpeg来做桌面推流的居多,毕竟如果不采用代码直

    2024年02月03日
    浏览(21)
  • Qt/C++监控推流设备推流/延迟极低/实时性极高/rtsp/rtmp推流/hls/flv/webrtc拉流/调整分辨率降低带宽

    算下来这个推流的项目作品写了有四年多了,最初第一个版本只有文件点播的功能,用的纯QTcpSocket通信实现,属于比较简单的功能。由于文件点播只支持文件形式的推流,不支持网络流或者本地设备采集,所以迫切需要打破这个瓶颈,而后加入核心的网络推流功能,这也是本

    2024年04月13日
    浏览(15)
  • 11 | 如何修改TCP缓冲区才能兼顾并发数量与传输速度?

    我们在[第 8 课] 中讲了如何从 C10K 进一步到 C10M,不过,这也意味着 TCP 占用的内存翻了一千倍,服务器的内存资源会非常紧张。 如果你在 Linux 系统中用 free 命令查看内存占用情况,会发现一栏叫做 buff/cache,它是系统内存,似乎与应用进程无关。但每当进程新建一个 TCP 连接

    2024年01月22日
    浏览(19)
  • Flink实时大数据处理性能测试

    Flink是一个开源的流处理框架,用于实时大数据处理。它可以处理大量数据,提供低延迟和高吞吐量。Flink的性能测试是一项重要的任务,可以帮助我们了解其在实际应用中的表现。在本文中,我们将讨论Flink实时大数据处理性能测试的背景、核心概念、算法原理、代码实例、

    2024年03月18日
    浏览(29)
  • 流式计算中的多线程处理:如何使用Kafka实现高效的实时数据处理

    作者:禅与计算机程序设计艺术 Apache Kafka 是 Apache Software Foundation 下的一个开源项目,是一个分布式的、高吞吐量的、可扩展的消息系统。它最初由 LinkedIn 开发并于 2011 年发布。与其他一些类似产品相比,Kafka 有着更强大的功能和活跃的社区支持。因此,越来越多的人开始使

    2024年02月12日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包