.NET开源分布式锁DistributedLock

这篇具有很好参考价值的文章主要介绍了.NET开源分布式锁DistributedLock。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、线程锁和分布式锁

线程锁通常在单个进程中使用,以防止多个线程同时访问共享资源。

在我们.NET中常见的线程锁有:

  • 自旋锁:当线程尝试获取锁时,它会重复执行一些简单的指令,直到锁可用
  • 互斥锁: Mutex,可以跨进程使用。Mutex 类定义了一个互斥体对象,可以使用 WaitOne() 方法等待对象上的锁
  • 混合锁:Monitor,可以通过 lock 关键字来使用
  • 读写锁:允许多个线程同时读取共享资源,但只允许单个线程写入共享资源
  • 信号量:Semaphore,它允许多个线程同时访问同一个资源

更多的线程同步锁,可以看这篇文章:https://www.cnblogs.com/Z7TS/p/16463494.html

分布式锁是一种用于协调多个进程/节点之间的并发访问的机制,某个资源在同一时刻只能被一个应用所使用,可以通过一些共享的外部存储系统来实现跨进程的同步和互斥

常见的分布式锁实现:

  • Redis 分布式锁
  • ZooKeeper 分布式锁
  • Mysql 分布式锁
  • SqlServer 分布式锁
  • 文件分布式锁

DistributedLock开源项目中有多种实现方式,我们今天主要讨论Redis中的分布式锁实现。

二、Redis分布式锁的实现原理

基础实现

Redis 本身可以被多个客户端共享访问,正好就是一个共享存储系统,可以用来保存分布式锁,而且 Redis 的读写性能高,可以应对高并发的锁操作场景。

Redis 的 SET 命令有个 NX 参数可以实现「key不存在才插入」,所以可以用它来实现分布式锁:

  • 如果 key 不存在,则显示插入成功,可以用来表示加锁成功;
  • 如果 key 存在,则会显示插入失败,可以用来表示加锁失败。
SET lock_keyunique_value NX PX 10000 
  • lock_key 就是 key 键;
  • unique_value 是客户端生成的唯一的标识,区分来自不同客户端的锁操作;
  • NX 代表只在 lock_key 不存在时,才对 lock_key 进行设置操作;
  • PX 10000 表示设置 lock_key 的过期时间为 10s,这是为了避免客户端发生异常而无法释放锁。

释放锁的时候需要删除key,或者使用lua脚本来保证原子性。

// 释放锁时,先比较 unique_value 是否相等,避免锁的误释放
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

续租机制

基于上文中的实现方式,我们在设置key过期时间时,不能准确的描述业务处理时间。为了防止因为业务处理时间较长导致锁过期而提前释放锁,通过不断更新锁的过期时间来保持锁的有效性,避免了因锁过期而导致的并发问题。

关于这个问题,目前常见的解决方法有两种:

1、实现自动续租机制:额外起一个线程,定期检查线程是否还持有锁,如果有则延长过期时间。DistributedLock里面就实现了这个方案,使用“看门狗”定期检查(每1/3的锁时间检查1次),如果线程还持有锁,则刷新过期时间。

2、实现快速失败机制:当我们解锁时发现锁已经被其他线程获取了,说明此时我们执行的操作已经是“不安全”的了,此时需要进行回滚,并返回失败。

以下是使用StackExchange.Redis 库实现分布式锁和续租机制的示例代码:

public class RedisLock
{
    private readonly IDatabase _database;
    private readonly string _lockKey;
    private string _lockValue;
    private readonly TimeSpan _lockTimeout;

    private readonly TimeSpan _renewInterval;
    private bool _isLocked;

    public RedisLock(IDatabase database, string lockKey, TimeSpan lockTimeout, TimeSpan renewInterval)
    {
        _database = database;
        _lockKey = lockKey;
        _lockTimeout = lockTimeout;
        _renewInterval = renewInterval;
    }

    //尝试获取锁,如果成功,则启动一个续租线程
    public async Task<bool> AcquireAsync()
    {
        _lockValue = Guid.NewGuid().ToString();
        var acquired = await _database.StringSetAsync(_lockKey, _lockValue, _lockTimeout, When.NotExists);
        if (acquired)
        {
            _isLocked = true;
            StartRenewal();
        }
        return acquired;
    }

    //定期使用 KeyExpireAsync 命令重置键的过期时间,从而实现续租机制
    private async void StartRenewal()
    {
        while (_isLocked)
        {
            await Task.Delay(_renewInterval);
            await _database.KeyExpireAsync(_lockKey, _lockTimeout);
        }
    }
}

RedLock

Redlock 是一种分布式锁实现方案,它的设计目标是解决 Redis 集群模式下的分布式锁并发控制问题。

它是基于多个 Redis 节点的分布式锁,即使有节点发生了故障,锁变量仍然是存在的,客户端还是可以完成锁操作

Redlock 算法加锁三个过程:

  1. 客户端获取当前时间(t1)。
  2. 客户端按顺序依次向 N 个 Redis 节点(官方推荐是至少部署 5 个 Redis 节点)执行加锁操作:
  • 加锁操作使用 SET 命令,带上 NX,EX/PX 选项,以及带上客户端的唯一标识。
  • 如果某个 Redis 节点发生故障了,为了保证在这种情况下,Redlock 算法能够继续运行,我们需要给「加锁操作」设置一个超时时间(不是对「锁」设置超时时间,而是对「加锁操作」设置超时时间),加锁操作的超时时间需要远远地小于锁的过期时间,一般也就是设置为几十毫秒。
  1. 一旦客户端从超过半数(大于等于 N/2+1)的 Redis 节点上成功获取到了锁,就再次获取当前时间(t2),然后计算计算整个加锁过程的总耗时(t2-t1)。如果 t2-t1 < 锁的过期时间,此时,认为客户端加锁成功,否则认为加锁失败。

加锁成功后,客户端需要重新计算这把锁的有效时间,计算的结果是「锁最初设置的过期时间」减去「客户端从大多数节点获取锁的总耗时(t2-t1)」。如果计算的结果已经来不及完成共享数据的操作了,我们可以释放锁,以免出现还没完成数据操作,锁就过期了的情况。

加锁失败后,客户端向所有 Redis 节点发起释放锁的操作,释放锁的操作和在单节点上释放锁的操作一样,只要执行释放锁的 Lua 脚本就可以了。

三、DistributedLock开源项目简介

项目介绍

DistributedLock 是一个 .NET 库,它基于各种底层技术提供强大且易于使用的分布式互斥体、读写器锁和信号量。

DistributedLock 包含基于各种技术的实现;可以单独安装实现包,也可以只安装 DistributedLock NuGet 包,这是一个“元”包,其中包含所有实现作为依赖项。请注意,每个包都根据 SemVer 独立进行版本控制。

基础使用

以下两种方法,都是基于RedLock来实现的,在单机上,使用了续租机制,更多细节可以自己观看源码,下文中会简单介绍源码。

  • Acquire 方法

Acquire 方法返回一个代表持有锁的“句柄”对象。当句柄被处理时,锁被释放:

  var redisDistributedLock = new RedisDistributedLock(name, connectionString); 
  using (redisDistributedLock.Acquire())
  {
      //持有锁
  } //释放锁及相关资源
  • TryAcquire 方法

虽然 Acquire 将阻塞直到锁可用,但还有一个 TryAcquire 变体,如果无法获取锁(由于在别处持有),则返回 null :

using (var handle = redisDistributedLock.TryAcquire())
{
    if (handle != null)
    {
        // 我们获得锁
    }
    else
    {
        // 别人获得锁
    }
}

支持异步和依赖注入,依赖注入:

// Startup.cs:
services.AddSingleton<IDistributedLockProvider>(_ => new PostgresDistributedSynchronizationProvider(myConnectionString));
services.AddTransient<SomeService>();

// SomeService.cs
public class SomeService
{
    private readonly IDistributedLockProvider _synchronizationProvider;

    public SomeService(IDistributedLockProvider synchronizationProvider)
    {
        this._synchronizationProvider = synchronizationProvider;
    }

    public void InitializeUserAccount(int id)
    {
        // 通过provider构造lock
        var @lock = this._synchronizationProvider.CreateLock($"UserAccount{id}");
        using (@lock.Acquire())
        {
            // 
        }
      
        using (this._synchronizationProvider.AcquireLock($"UserAccount{id}"))
        {
            // 
        }
    }
}

四、浅析DistributedLock的Redis实现

源码地址

https://github.com/madelson/DistributedLock

目录解析

.NET开源分布式锁DistributedLock

  • DistributedLock.Core 是项目的抽象类库,基础分布式锁、读写锁、信号量的Provider和接口。
  • 其它几个类库是用不同存储系统的具体实现

Redis的实现过程

以下代码对源码,进行了删减和修改,只想简单的讲述一下实现过程。

定义一个工厂接口,返回IDistributedLock,在依赖注入场景中,使用这个工厂接口可能会更加方便

public interface IDistributedLockProvider
{
    IDistributedLock CreateLock(string name);
}

IDistributedLock:定义了控制并发访问的基本操作。该接口支持同步和异步方式获取锁,并提供超时和取消功能,以适应各种情况

public interface IDistributedLock
{
    // 唯一Name
    string Name { get; }
    // 获取锁的方法
    IDistributedSynchronizationHandle Acquire(TimeSpan? timeout = null, CancellationToken cancellationToken = default);

    //......
}

DistributedLock.Redis类库,对Acquire的具体实现,该方法是尝试获取Redis分布式锁实例。

  private async ValueTask<RedisDistributedLockHandle?> TryAcquireAsync(CancellationToken cancellationToken)
  {
      // 初始化Redis连接和相关参数
      //CreateLockId = $"{Environment.MachineName}_{currentProcess.Id}_" + Guid.NewGuid().ToString("n")
      var primitive = new RedisMutexPrimitive(this.Key, RedLockHelper.CreateLockId(), this._options.RedLockTimeouts);

      // 获取和设置锁
      var tryAcquireTasks = await new RedLockAcquire(primitive, this._databases, cancellationToken).TryAcquireAsync().ConfigureAwait(false);

      // 成功后,RedLockHandle这个里边实现了续租机制
      return tryAcquireTasks != null 
          ? new RedisDistributedLockHandle(new RedLockHandle(primitive, tryAcquireTasks, extensionCadence: this._options.ExtensionCadence, expiry: this._options.RedLockTimeouts.Expiry)) 
          : null;
  }

根据当前线程是否在同步上下文,对单库和多库实现进行区分和实现

// 该方法用于尝试获取分布式锁,并返回一个表示各个数据库节点获取锁状态的任务字典
public async ValueTask<Dictionary<IDatabase, Task<bool>>?> TryAcquireAsync()
{
    // 检查当前线程是否在同步上下文中执行,以便根据不同情况采取不同的获取锁策略
    if (SyncViaAsync.IsSynchronous&& this._databases.Count == 1)
        return this.TrySingleFullySynchronousAcquire();

    // 创建一个任务字典,将每个数据库连接和其对应的获取锁任务关联起来
    var tryAcquireTasks = this._databases.ToDictionary(
        db => db,
        db => Helpers.SafeCreateTask(state => state.primitive.TryAcquireAsync(state.db), (primitive, db))
    );

    // 等待所有获取锁任务完成,并返回一个表示整体状态的任务
    var waitForAcquireTask = this.WaitForAcquireAsync(tryAcquireTasks).AwaitSyncOverAsync().ConfigureAwait(false);

    // 执行清理操作 
 
    // 返回结果
    return succeeded ? tryAcquireTasks : null;
}

单库获取Redis分布式锁,就是通过set nx 设置值,返回bool,失败就释放资源,成功检查是否超时。不超时就返回任务字典

private Dictionary<IDatabase, Task<bool>>? TrySingleFullySynchronousAcquire()
{
    var database = this._databases.Single();

    bool success;
    var stopwatch = Stopwatch.StartNew();

    // 通过StackExchange.Redis的StringSet进行无值设置key(set nx)
    try { success = this._primitive.TryAcquire(database); }
    catch
    {
        // 确保释放锁,以便防止出现死锁等问题。然后重新抛出异常
    }

    if (success)
    {
        // 检查是否在超时时间内,并返回一个包含成功状态的任务字典;否则继续释放锁并返回null
    }

    return null;
}

多库中是否获取到分布式锁

private async Task<bool> WaitForAcquireAsync(IReadOnlyDictionary<IDatabase, Task<bool>> tryAcquireTasks)
{
    // 超时或取消时自动停止等待

    using var timeout = new TimeoutTask(this._primitive.AcquireTimeout, this._cancellationToken);
    var incompleteTasks = new HashSet<Task>(tryAcquireTasks.Values) { timeout.Task };

    // 计数器
    var successCount = 0;
    var failCount = 0;
    var faultCount = 0;

    while (true)
    {
        // 不断等待任务完成,如果任务为timeout,则表示超时;否则需要根据任务的状态和信号来判断是否成功获取锁
        var completed = await Task.WhenAny(incompleteTasks).ConfigureAwait(false);

        if (completed == timeout.Task)
            return false; // 超时

        // 判断是否超过成功或者失败的阀值,是否超过1/2
        if (completed.Status == TaskStatus.RanToCompletion)
        {
            var result = await ((Task<bool>)completed).ConfigureAwait(false);

            if (result)
            {
                ++successCount;
                // 是否超过1/2的库
                if (RedLockHelper.HasSufficientSuccesses(successCount, this._databases.Count)) { return true; }
            }
            else
            {
                ++failCount;
                if (RedLockHelper.HasTooManyFailuresOrFaults(failCount, this._databases.Count)) { return false; }
            }
        }
        else 
        {      
            ++faultCount;
            // ......          
        }
        // ......
    }
}

截止到目前,我们就知道如何获取和设置分布式锁了。接下来我们就看下是如何实现续租机制的。就是LeaseMonitor这个对象。

private static Task CreateMonitoringLoopTask(WeakReference<LeaseMonitor> weakMonitor, TimeoutValue monitoringCadence, CancellationToken disposalToken)
{
    // 创建监视任务
    return Task.Run(() => MonitoringLoop());

    async Task MonitoringLoop()
    {
        var leaseLifetime = Stopwatch.StartNew();
        do
        {
            await Task.Delay(monitoringCadence.InMilliseconds, disposalToken).TryAwait();
        }
        // 检查RedLock租约的状态和可用性
        while (!disposalToken.IsCancellationRequested && await RunMonitoringLoopIterationAsync(weakMonitor, leaseLifetime).ConfigureAwait(false));
    }
}

RunMonitoringLoopIterationAsync 里边最终调用了续时的lua脚本

.NET开源分布式锁DistributedLock

你们在公司中,都是如何实现分布式锁的呢?可以在评论区留下您宝贵的建议。文章来源地址https://www.toymoban.com/news/detail-428100.html

到了这里,关于.NET开源分布式锁DistributedLock的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • .NET CORE开源 DDD微服务 支持 多租户 单点登录 多级缓存、自动任务、分布式、日志、授权和鉴权 、网关 、注册与发现 系统架构 docker部署

    源代码地址https://github.com/junkai-li/NetCoreKevin 基于NET6搭建跨平台DDD思想WebApi架构、IDS4单点登录、多缓存、自动任务、分布式、多租户、日志、授权和鉴权、CAP、SignalR、 docker部署  如需简约项目可直接去除项目引用 解耦设计都可以单独引用 架构默认全部引用并启动 项目启动时

    2023年04月24日
    浏览(48)
  • 分布式链路追踪如何跨线程

    我们希望实现全链路信息,但是代码中一般都会异步的线程处理。 我们可以对以前的 Runable 和 Callable 进行增强。 可以使用 ali 已经存在的实现方式。 TransmittableThreadLocal (TTL) 解决异步执行时上下文传递的问题 核心的实现思路如下: 1)异步执行前,把当前线程的 MDC 信息放入

    2024年02月07日
    浏览(43)
  • 多线程、分布式运行用例

    python多线程 多线程实例 输出: 多线程跑unittest框架中的用例:基于一个原则先将需要处理的内容保存在list中,基于list的元素来进行线程的添加,在unittest中可以通过 discover = unittest.defaultTestLoader.discover(start_dir=path, pattern=\\\'test*.py\\\') 发现所有的测试套件,然后使用 unittest.TextT

    2024年02月21日
    浏览(59)
  • 【分布式事务】Seata 开源的分布式事务解决方案

    Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。 阿里巴巴作为国内最早一批进行应用分布式(微服务化)改造的企业,很早就遇到微服务架构下

    2024年02月02日
    浏览(53)
  • .NET分布式Orleans - 6 - 事件溯源

    基本概念 事件溯源(Event Sourcing)是一种设计模式,它记录并存储了应用程序状态变化的所有事件。 其核心思想是将系统中的每次状态变化都视为一个事件,并将这些事件以时间顺序的方式持久化存储。 这样,通过重放这些事件,我们可以重建系统在任何特定时间点的状态

    2024年03月28日
    浏览(42)
  • 分布式集群与多线程高并发

      后台数据的处理语言有很多,Java 是对前端采集的数据的一种比较常见的开发语言。互联网移动客户端的用户量特别大,大量的数据处理需求应运而生。可移动嵌入式设备的表现形式   很多,如 PC 端,手机移动端,智能手表,Google  眼镜等。Server2client 的互联网开发模式比

    2024年02月08日
    浏览(48)
  • [Python系列] 线程、协程、进程和分布式

            我们在写脚本的时候,经常是单线程跑完了全部,毕竟自顶向下按照我们约定的方法运行下去是最规范的。但是很多时候,比如说合法地爬取一些网页信息,图片和资料啊,或者说一些合法的网络请求,读写文件之类的。如果还是单线程地one by one,那么将会影响我们

    2024年02月16日
    浏览(36)
  • .NET分布式Orleans - 3 - Grain放置

    在Orleans 7中,Grain放置是指确定将Grain对象放置在Orleans集群中的哪些物理节点上的过程。 Grain是Orleans中的基本单位,代表应用程序中的逻辑单元或实体。Grain放置策略是一种机制,用于根据不同的因素,将Grain对象放置在合适的节点上,以实现负载均衡、最小化网络延迟和提高

    2024年03月25日
    浏览(44)
  • .NET分布式Orleans - 5 - 持久化

    在分布式系统中,数据的持久化是至关重要的一环。 Orleans 7 引入了强大的持久化功能,使得在分布式环境下管理数据变得更加轻松和可靠。 本文将介绍什么是 Orleans 7 的持久化,如何设置它以及相应的代码示例。 什么是 Orleans 7 的持久化? Orleans 7 的持久化是指将 Orleans 中的

    2024年03月27日
    浏览(45)
  • C++中实现多线程和分布式

    (2)对于 需要写入但不需要等待响应的请求,可以使用 BlockingQueue 完成,例如 log,由一个专门的线程去写入文件,其他线程只需要往 BlockingQueue 写入即可; (3)线程池大小的阻抗匹配原则 密集计算所占时间的比重为 P,系统一共有 C 个 CPU,线程池大小的经验公式为 T = C

    2024年01月20日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包