C#中使用CAS实现无锁算法

这篇具有很好参考价值的文章主要介绍了C#中使用CAS实现无锁算法。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

CAS 的基本概念

CAS(Compare-and-Swap)是一种多线程并发编程中常用的原子操作,用于实现多线程间的同步和互斥访问。 它操作通常包含三个参数:一个内存地址(通常是一个共享变量的地址)、期望的旧值和新值。

CompareAndSwap(内存地址,期望的旧值,新值)

CAS 操作会比较内存地址处的值与期望的旧值是否相等,如果相等,则将新值写入该内存地址; 如果不相等,则不进行任何操作。这个比较和交换的操作是一个原子操作,不会被其他线程中断。

CAS 通常是通过硬件层面的CPU指令实现的,其原子性是由硬件保证的。具体的实现方式根据环境会有所不同。

CAS 操作通常会有一个返回值,用于表示操作是否成功。返回结果可能是true或false,也可能是内存地址处的旧值。

相比于传统的锁机制,CAS 有一些优势:

  • 原子性:CAS 操作是原子的,不需要额外的锁来保证多线程环境下的数据一致性,避免了锁带来的性能开销和竞争条件。

  • 无阻塞:CAS 操作是无阻塞的,不会因为资源被锁定而导致线程的阻塞和上下文切换,提高了系统的并发性和可伸缩性。

  • 适用性:CAS 操作可以应用于广泛的数据结构和算法,如自旋锁、计数器、队列等,使得它在实际应用中具有较大的灵活性和适用性。

C# 中如何使用 CAS

在 C# 中,我们可以使用 Interlocked 类来实现 CAS 操作。

Interlocked 类提供了一组 CompareExchange 的重载方法,用于实现不同类型的数据的 CAS 操作。

public static int CompareExchange(ref int location1, int value, int comparand);
public static long CompareExchange(ref long location1, long value, long comparand);
// ... 省略其他重载方法
public static object CompareExchange(ref object location1, object value, object comparand);
public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;

CompareExchange 方法将 location1 内存地址处的值与 comparand 比较,如果相等,则将 value 写入 location1 内存地址处,否则不进行任何操作。
该方法返回 location1 内存地址处的值。

通过判断方法返回值与 comparand 是否相等,我们就可以知道 CompareExchange 方法是否执行成功。

算法示例

在使用 CAS 实现无锁算法时,通常我们不光是为了比较和更新一个数据,还需要在更新成功后进行下一步的操作。结合 while(true) 循环,我们可以不断地尝试更新数据,直到更新成功为止。
伪代码如下:

while (true)
{
    // 读取数据
    oldValue = ...;
    // 计算新值
    newValue = ...;
    // CAS 更新数据
    result = CompareExchange(ref location, newValue, oldValue);
    // 判断 CAS 是否成功
    if (result == oldValue)
    {
        // CAS 成功,执行后续操作
        break;
    }
}

在复杂的无锁算法中,因为每一步操作都是独立的,连续的操作并非原子,所以我们不光要借助 CAS,每一步操作前都应判断是否有其他线程已经修改了数据。

示例1:计数器

下面是一个简单的计数器类,它使用 CAS 实现了一个线程安全的自增操作。

public class Counter
{
    private int _value;

    public int Increment()
    {
        while (true)
        {
            int oldValue = _value;
            int newValue = oldValue + 1;
            int result = Interlocked.CompareExchange(ref _value, newValue, oldValue);
            if (result == oldValue)
            {
                return newValue;
            }
        }
    }
}

CLR 底层源码中,我们也会经常看到这样的代码,比如 ThreadPool 增加线程时的计数器。
https://github.com/dotnet/runtime/blob/release/6.0/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs#L446

internal void EnsureThreadRequested()
{
    //
    // If we have not yet requested #procs threads, then request a new thread.
    //
    // CoreCLR: Note that there is a separate count in the VM which has already been incremented
    // by the VM by the time we reach this point.
    //
    int count = _separated.numOutstandingThreadRequests;
    while (count < Environment.ProcessorCount)
    {
        int prev = Interlocked.CompareExchange(ref _separated.numOutstandingThreadRequests, count + 1, count);
        if (prev == count)
        {
            ThreadPool.RequestWorkerThread();
            break;
        }
        count = prev;
    }
}

示例2:队列

下面是一个简单的队列类,它使用 CAS 实现了一个线程安全的入队和出队操作。相较于上面的计数器,这里的操作更加复杂,我们每一步都需要考虑是否有其他线程已经修改了数据。

这样的算法有点像薛定谔的猫,你不知道它是死是活,只有当你试图去观察它的时候,它才可能会变成死或者活。

public class ConcurrentQueue<T>
{
    // _head 和 _tail 是两个伪节点,_head._next 指向队列的第一个节点,_tail 指向队列的最后一个节点。
    // _head 和 _tail 会被多个线程修改和访问,所以要用 volatile 修饰。
    private volatile Node _head;
    private volatile Node _tail;
    
    public ConcurrentQueue()
    {
        _head = new Node(default);
        // _tail 指向 _head 时,队列为空。
        _tail = _head;
    }

    public void Enqueue(T item)
    {
        var node = new Node(item);
        while (true)
        {
            Node tail = _tail;
            Node next = tail._next;
            // 判断给 next 赋值的这段时间,是否有其他线程修改过 _tail
            if (tail == _tail)
            {
                // 如果 next 为 null,则说明从给 tail 赋值到给 next 赋值这段时间,没有其他线程修改过 tail._next,
                if (next == null)
                {
                    // 如果 tail._next 为 null,则说明从给 tail 赋值到这里,没有其他线程修改过 tail._next,
                    // tail 依旧是队列的最后一个节点,我们就可以直接将 node 赋值给 tail._next。                                
                    if (Interlocked.CompareExchange(ref tail._next, node, null) == null)
                    {
                        // 如果_tail == tail,则说明从上一步 CAS 操作到这里,没有其他线程修改过 _tail,也就是没有其他线程执行过 Enqueue 操作。
                        // 那么当前线程 Enqueue 的 node 就是队列的最后一个节点,我们就可以直接将 node 赋值给 _tail。
                        Interlocked.CompareExchange(ref _tail, node, tail);
                        break;
                    }
                }
                // 如果 next 不为 null,则说明从给 tail 赋值到给 next 赋值这段时间,有其他线程修改过 tail._next,
                else
                {
                    // 如果没有其他线程修改过 _tail,那么 next 就是队列的最后一个节点,我们就可以直接将 next 赋值给 _tail。
                    Interlocked.CompareExchange(ref _tail, next, tail);
                }
            }
        }
    }

    public bool TryDequeue(out T item)
    {
        while (true)
        {
            Node head = _head;
            Node tail = _tail;
            Node next = head._next;
            // 判断 _head 是否被修改过
            // 如果没有被修改过,说明从给 head 赋值到给 next 赋值这段时间,没有其他线程执行过 Dequeue 操作。          
            if (head == _head)
            {
                // 如果 head == tail,说明队列为空
                if (head == tail)
                {
                    // 虽然上面已经判断过队列是否为空,但是在这里再判断一次
                    // 是为了防止在给 tail 赋值到给 next 赋值这段时间,有其他线程执行过 Enqueue 操作。
                    if (next == null)
                    {
                        item = default;
                        return false;
                    }

                    // 如果 next 不为 null,则说明从给 tail 赋值到给 next 赋值这段时间,有其他线程修改过 tail._next,也就是有其他线程执行过 Enqueue 操作。
                    // 那么 next 就可能是队列的最后一个节点,我们尝试将 next 赋值给 _tail。
                    Interlocked.CompareExchange(ref _tail, next, tail);
                }
                // 如果 head != tail,说明队列不为空
                else
                {
                    item = next._item;
                    if (Interlocked.CompareExchange(ref _head, next, head) == head)
                    {
                        // 如果 _head 没有被修改过
                        // 说明从给 head 赋值到这里,没有其他线程执行过 Dequeue 操作,上面的 item 就是队列的第一个节点的值。
                        // 我们就可以直接返回。
                        break;
                    }
                    // 如果 _head 被修改过
                    // 说明从给 head 赋值到这里,有其他线程执行过 Dequeue 操作,上面的 item 就不是队列的第一个节点的值。
                    // 我们就需要重新执行 Dequeue 操作。
                }
            }
        }

        return true;
    }

    private class Node
    {
        public readonly T _item;
        public Node _next;

        public Node(T item)
        {
            _item = item;
        }
    }
}

我们可以通过以下代码来进行测试

using System.Collections.Concurrent;

var queue = new ConcurrentQueue<int>();
var results = new ConcurrentBag<int>();
int dequeueRetryCount = 0;

var enqueueTask = Task.Run(() =>
{
    // 确保 Enqueue 前 dequeueTask 已经开始运行
    Thread.Sleep(10);
    Console.WriteLine("Enqueue start");
    Parallel.For(0, 100000, i => queue.Enqueue(i));
    Console.WriteLine("Enqueue done");
});

var dequeueTask = Task.Run(() =>
{
    Thread.Sleep(10);
    Console.WriteLine("Dequeue start");
    Parallel.For(0, 100000, i =>
    {
        while (true)
        {
            if (queue.TryDequeue(out int result))
            {
                results.Add(result);
                break;
            }

            Interlocked.Increment(ref dequeueRetryCount);
        }
    });
    Console.WriteLine("Dequeue done");
});

await Task.WhenAll(enqueueTask, dequeueTask);
Console.WriteLine(
    $"Enqueue and dequeue done, total data count: {results.Count}, dequeue retry count: {dequeueRetryCount}");

var hashSet = results.ToHashSet();
for (int i = 0; i < 100000; i++)
{
    if (!hashSet.Contains(i))
    {
        Console.WriteLine("Error, missing " + i);
        break;
    }
}

Console.WriteLine("Done");

输出结果:

Dequeue start
Enqueue start
Enqueue done
Dequeue done
Enqueue and dequeue done, total data count: 100000, dequeue retry count: 10586
Done

上述的 retry count 为 797,说明在 100000 次的 Dequeue 操作中,有 10586 次的 Dequeue 操作需要重试,那是因为在 Dequeue 操作中,可能暂时没有数据可供 Dequeue,需要等待其他线程执行 Enqueue 操作。

当然这个 retry count 是不稳定的,因为在多线程环境下,每次执行的结果都可能不一样。

总结

CAS 操作是一种乐观锁,它假设没有其他线程修改过数据,如果没有修改过,那么就直接修改数据,如果修改过,那么就重新获取数据,再次尝试修改。

在借助 CAS 实现较为复杂的数据结构时,我们不光要依靠 CAS 操作,还需要注意每次操作的数据是否被其他线程修改过,考虑各个可能的分支,以及在不同的分支中,如何处理数据。

欢迎关注个人技术公众号
C#中使用CAS实现无锁算法文章来源地址https://www.toymoban.com/news/detail-419842.html

到了这里,关于C#中使用CAS实现无锁算法的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java 代理模式的基本概念、使用场景、应用示例和实现方法

    代理模式是一种常见的设计模式,在 Java 开发中被广泛应用。它允许我们通过添加一个代理对象来控制对另一个对象的访问,从而提供了一种间接访问实际对象的方法。本文将详细介绍 Java 代理模式的基本概念、使用场景、应用示例和实现方法等相关内容。 代理模式是一种结

    2024年02月05日
    浏览(57)
  • AI教我学编程之C#类的基本概念(1)

    在AI教我学编程之C#类型 中,我们学习了C#类型的的基础知识,而 类 正是 类型 的一种. 区分类和类型 什么是类? 对话AI 追问 实操 追踪属性的使用 AI登场 逐步推进 提出疑问 药不能停 终于实现 探索事件的使用 异步/交互操作 耗时操作 提示 总结 在 C# 中,类是类型的一种。

    2024年01月18日
    浏览(34)
  • 多系统使用CAS实现SSO登录案例

    实现SSO有很多种方案,比较简单且可行的一般都是如上两种再加上CAS,一般企业内部多个系统之间如果想实现SSO,比较推荐的方式就是今天要介绍的CAS,Central Authentication Service,中央身份认证服务。 CAS方案主要包含如下两个部分: CAS Server,作为认证服务的中心,需要单独部

    2024年02月15日
    浏览(39)
  • flink Mysql CDC(动态加表)、postgresqlCDC 和 CDC无锁算法

    flink1.13.6 + flink mysql cdc 1.4.0 flink 1.16.0 + flink mysql cdc 2.3.0 flink 1.16.0 + flink mysql cdc 2.4.0 flink 1.16.0 + flink postgresql cdc 2.3.0 flink 1.13.6 + flink mysql cdc 2.3.0 : 没有报错,没有数据,估计是兼容有问题 1、调整chunck大小 : scan.incremental.snapshot.chunk.size 2、设置cdc模式:scan.startup.mode【initial(

    2024年02月16日
    浏览(31)
  • 16. 蒙特卡洛强化学习基本概念与算法框架

    蒙特卡洛强化学习(简称MC强化学习)是一种 无模型 强化学习算法,该算法无需知道马尔科夫决策环境模型,即不需要提前获得立即回报期望矩阵R(维度为(nS,nA))、状态转移概率数组P(维度为(nA,nS,nS)),而是通过与环境的反复交互,使用统计学方法,利用交互数据直接进行

    2024年01月21日
    浏览(45)
  • 云计算的基本概念术语说明和核心算法原理

    作者:禅与计算机程序设计艺术 云计算的基本概念是分布式处理、并行处理和网格计算的发展,是一种新兴的商业计算模型。它通过使计算分布在大量的分布式计算机上,而非本地计算机或远程服务器中,使得企业数据中心的运行更与互联网相似。云计算的核心算法原理包括

    2024年02月14日
    浏览(47)
  • DQN基本概念和算法流程(附Pytorch代码)

    DQN,Deep Q Network本质上还是Q learning算法,它的算法精髓还是让 Q 估计 Q_{估计} Q 估计 ​ 尽可能接近 Q 现实 Q_{现实} Q 现实 ​ ,或者说是让当前状态下预测的Q值跟基于过去经验的Q值尽可能接近。在后面的介绍中 Q 现实 Q_{现实} Q 现实 ​ 也被称为TD Target 再来回顾下DQN算法和核

    2024年02月15日
    浏览(41)
  • 深入了解目标检测技术--从基本概念到算法入门

    前言: Hello大家好,我是Dream。 众所周知,目标检测是计算机视觉领域中的重要任务之一,其目的是 识别图像或视频中包含的物体实例并将其定位 。实现目标检测可以帮助人们在 自动驾驶、机器人导航、安防监控 等领域中更好地理解和应用图像信息。接下来Dream将带大家一

    2024年02月03日
    浏览(34)
  • 【动态规划】动态规划算法基本概念,原理应用和示例代码

             动态规划(Dynamic Programming,简称DP)是一种解决多阶段决策问题的数学优化方法。它将原问题分解成若干个子问题,通过解决子问题只需解决一次并将结果保存下来,从而避免了重复计算,提高了算法效率。         通俗来讲,动态规划算法是解决一类具有重叠

    2024年01月21日
    浏览(45)
  • 【数据结构与算法】一、数据结构的基本概念

    抽象数据类型(ADT)定义举例:Circle的定义 如何处理杂乱无章且多样化的数据: 数据元素 :数据中的个体被称为数据元素。 数据对象 :性质相同的数据元素组成的集合。 数据结构 :数据元素加上数据元素之间的关系,就形成了数据结构。 逻辑结构 :数据结构的逻辑模型。

    2023年04月17日
    浏览(96)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包