关于golang锁的一点东西

这篇具有很好参考价值的文章主要介绍了关于golang锁的一点东西。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文基于go 1.19.3

最近打算再稍微深入地看下golang的源码,先从简单的部分入手。正巧前段时间读了操作系统同步机制的一点东西,那么golang这里就从锁开始好了。

在这部分内容中,可能不会涉及到太多的细节的讲解。更多的内容会聚焦在我感兴趣的一些点,以及整体的设计方面。

那么,接下来就是我感兴趣的第一个点:golang的锁是什么级别的锁?

golang的锁是什么级别的锁

通常而言,操作系统会提供多种同步机制,常见的包括的原子操作、自旋锁、互斥锁。相较于自旋锁,互斥锁是我们在日常开发中最常使用的锁。操作系统提供的互斥锁状态变化时阻塞和唤醒影响的是操作系统级别的最小执行流–线程。因此,我在这里比较粗糙地称呼操作系统提供的锁或者编程语言基于系统调用封装的锁为线程级别的锁。

那么golang的锁呢?我们知道golang中最小的执行流为goroutine,并且在runtime中完整地实现了基于goroutine的调度机制。那么golang的锁在是什么级别的锁呢?获取锁时是会阻塞线程,还是仅会阻塞协程?下面我们以sync.Mutex为例来看下其实现。

加锁

sync.Mutex在加锁时会尝试自旋提前占位,自旋是发生在用户态的,直接跳过自旋部分,来到我关注的地方:sync/Mutex.go 171 runtime_SemacquireMutex调用。

// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
   waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)

看runtime_SemacquireMutex的注释,runtime_SemacquireMutex的作用和runtime_Semacquire基本一样,都会阻塞地等待,直到信号量的值(传入的s参数)大于0,然后将值减少。从这点上讲,和linux提供的信号量的功能描述基本上是一致的。

runtime_SemacquireMutex相比runtime_Semacquire还多了两个参数。lifo参数会影响阻塞队列的行为,当lifo为true时,会将当前执行流(这里我们还没确定是goroutine还是线程)置于等待队列的头部,后入先出嘛,对不对。skipframes是跳过的调用栈的层数,看起来是在调试或者观测时起作用,我们不去深究。

// Semacquire waits until *s > 0 and then atomically decrements it.
// It is intended as a simple sleep primitive for use by the synchronization
// library and should not be used directly.
func runtime_Semacquire(s *uint32)

// SemacquireMutex is like Semacquire, but for profiling contended Mutexes.
// If lifo is true, queue waiter at the head of wait queue.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_SemacquireMutex's caller.
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)

sync_runtime_SemacquireMutex调用了semacquire1函数。semacquire1除了信号量、lifo、skipframs参数外,还多了一个profile参数,其值有semaBlockProfile和semaMutexProfile。看这个两个值,应该和pprof的mutex和block有关系?这里同样不去深究,保证主线,后面有空再看。

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
   semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {}

进入semacquire1方法,我们的疑问似乎要得到解答了。

  • 首先会尝试获取锁,easy path,如果成功,就直接返回。
  • 构建sudog,并且根据传入信号量的地址获取到对应的阻塞队列。
  • 如果不能获取锁,就将sudog加入到root的阻塞队列中,同时调用gopark阻塞当前goroutine。

看到这里,我们应该差不多确定golang的sync.Mutex阻塞的是goroutine。当获取锁时,如果抢占失败,会将当前的goroutine阻塞,挂起到锁的阻塞队列上。但是问题似乎不是这么简单,因为root对象也有锁并且有加锁解锁的行为。那我们再来看看其实现是怎样的。也就是在实现sync.Mutex过程中会遇到临界区的问题,这种情况下通常的做法是采用更底层的同步机制取解决,比如原子操作,比如操作系统的锁。

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
   gp := getg()
   if gp != gp.m.curg {
      throw("semacquire not on the G stack")
   }

   // Easy case.
   if cansemacquire(addr) {
      return
   }

   // Harder case:
   // increment waiter count
   // try cansemacquire one more time, return if succeeded
   // enqueue itself as a waiter
   // sleep
   // (waiter descriptor is dequeued by signaler)
   s := acquireSudog()
   root := semtable.rootFor(addr)
   t0 := int64(0)
   s.releasetime = 0
   s.acquiretime = 0
   s.ticket = 0
   if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
      t0 = cputicks()
      s.releasetime = -1
   }
   if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
      if t0 == 0 {
         t0 = cputicks()
      }
      s.acquiretime = t0
   }
   for {
      lockWithRank(&root.lock, lockRankRoot)
      // Add ourselves to nwait to disable "easy case" in semrelease.
      atomic.Xadd(&root.nwait, 1)
      // Check cansemacquire to avoid missed wakeup.
      if cansemacquire(addr) {
         atomic.Xadd(&root.nwait, -1)
         unlock(&root.lock)
         break
      }
      // Any semrelease after the cansemacquire knows we're waiting
      // (we set nwait above), so go to sleep.
      root.queue(addr, s, lifo)
      goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
      if s.ticket != 0 || cansemacquire(addr) {
         break
      }
   }
   if s.releasetime > 0 {
      blockevent(s.releasetime-t0, 3+skipframes)
   }
   releaseSudog(s)
}

接下来看下sematable相关的内容,也就是信号量相关的实现。信号量本身只是一个整数,对其操作只需要原子操作就OK,非常简单。但抢占信号量失败需要阻塞队列,同一个阻塞队列会面临并发访问,这是sematable的实现解决的问题。

semaTable是一个初始化好的长度为251的数组,数组的元素为semaRoot。

数组的实现可以认为是分片数为251的分段锁。操作时根据对应信号量的地址取模拿到对应的semaRoot,以此减少临界区的粒度。

var semtable semTable

// Prime to not correlate with any user patterns.
const semTabSize = 251

type semTable [semTabSize]struct {
   root semaRoot
   pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

func (t *semTable) rootFor(addr *uint32) *semaRoot {
   return &t[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}

semaRoot是真正的阻塞队列。每个semaRoot对应一组信号量,这组信号量的addr%251的值相等。阻塞在同一信号量上的goroutine以链表的形式组织,阻塞同一semaRoot中不同信号量的goroutine之间以平衡二叉树(红黑or二叉)的形式组织。

semaRoot就是一个临界区,golang使用rumtime2.go中的mutex进行并发保护。

// A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem).
// Each of those sudog may in turn point (through s.waitlink) to a list
// of other sudogs waiting on the same address.
// The operations on the inner lists of sudogs with the same address
// are all O(1). The scanning of the top-level semaRoot list is O(log n),
// where n is the number of distinct addresses with goroutines blocked
// on them that hash to the given semaRoot.
// See golang.org/issue/17953 for a program that worked badly
// before we introduced the second level of list, and
// BenchmarkSemTable/OneAddrCollision/* for a benchmark that exercises this.
type semaRoot struct {
   lock  mutex
   treap *sudog // root of balanced tree of unique waiters.
   nwait uint32 // Number of waiters. Read w/o the lock.
}

看注释,rumtime2.go中的mutex在有竞争的条件下是内核级别的锁,on the contention path they sleep in the kernel,会导致内核级的阻塞。

// Mutual exclusion locks.  In the uncontended case,
// as fast as spin locks (just a few user-level instructions),
// but on the contention path they sleep in the kernel.
// A zeroed Mutex is unlocked (no need to initialize each lock).
// Initialization is helpful for static lock ranking, but not required.
type mutex struct {
   // Empty struct if lock ranking is disabled, otherwise includes the lock rank
   lockRankStruct
   // Futex-based impl treats it as uint32 key,
   // while sema-based impl as M* waitm.
   // Used to be a union, but unions break precise GC.
   key uintptr
}

mutex的实现没有采用oop的方式,runtime2.go中同时提供了lock2和unlock2两个函数来对mutex来进行加锁和解锁。

先看加锁的实现。

首先会确保当前运行的m上创建一个锁和一个condition,这两个对象都是调用c库函数实现,为线程级别的对象。然后尝试自旋获取mutex。如果获取成功,则正常返回;否则,将当前的m加入到阻塞队列的最前端,mutex的key值为阻塞队列首个m的指针,然后调用semasleep方法。

func lock2(l *mutex) {
   gp := getg()
   if gp.m.locks < 0 {
      throw("runtime·lock: lock count")
   }
   gp.m.locks++

   // Speculative grab for lock.
   if atomic.Casuintptr(&l.key, 0, locked) {
      return
   }
   semacreate(gp.m)

   // On uniprocessor's, no point spinning.
   // On multiprocessors, spin for ACTIVE_SPIN attempts.
   spin := 0
   if ncpu > 1 {
      spin = active_spin
   }
Loop:
   for i := 0; ; i++ {
      v := atomic.Loaduintptr(&l.key)
      if v&locked == 0 {
         // Unlocked. Try to lock.
         if atomic.Casuintptr(&l.key, v, v|locked) {
            return
         }
         i = 0
      }
      if i < spin {
         procyield(active_spin_cnt)
      } else if i < spin+passive_spin {
         osyield()
      } else {
         // Someone else has it.
         // l->waitm points to a linked list of M's waiting
         // for this lock, chained through m->nextwaitm.
         // Queue this M.
         for {
            gp.m.nextwaitm = muintptr(v &^ locked)
            if atomic.Casuintptr(&l.key, v, uintptr(unsafe.Pointer(gp.m))|locked) {
               break
            }
            v = atomic.Loaduintptr(&l.key)
            if v&locked == 0 {
               continue Loop
            }
         }
         if v&locked != 0 {
            // Queued. Wait.
            semasleep(-1)
            i = 0
         }
      }
   }
}

semasleep方法是将当前的线程阻塞的方法,其使用了condition来进行调度。当前会阻塞直至condition被唤醒,或者在传入的睡眠时间大于等于0,则只睡眠传入的时间间隔。当由mutex解锁时,会从其阻塞队列中获取m,并唤醒其condition。

func semasleep(ns int64) int32 {
   var start int64
   if ns >= 0 {
      start = nanotime()
   }
   mp := getg().m
   pthread_mutex_lock(&mp.mutex)
   for {
      if mp.count > 0 {
         mp.count--
         pthread_mutex_unlock(&mp.mutex)
         return 0
      }
      if ns >= 0 {
         spent := nanotime() - start
         if spent >= ns {
            pthread_mutex_unlock(&mp.mutex)
            return -1
         }
         var t timespec
         t.setNsec(ns - spent)
         err := pthread_cond_timedwait_relative_np(&mp.cond, &mp.mutex, &t)
         if err == _ETIMEDOUT {
            pthread_mutex_unlock(&mp.mutex)
            return -1
         }
      } else {
         pthread_cond_wait(&mp.cond, &mp.mutex)
      }
   }
}

解锁时,如果当前的阻塞队列不为空,则唤醒头部的m。同时将当前线程持有的锁的数量减少,只有当前线程持有的锁的数量为0时,才可以对当前线程中运行的goroutine进行调度。

func unlock2(l *mutex) {
   gp := getg()
   var mp *m
   for {
      v := atomic.Loaduintptr(&l.key)
      if v == locked {
         if atomic.Casuintptr(&l.key, locked, 0) {
            break
         }
      } else {
         // Other M's are waiting for the lock.
         // Dequeue an M.
         mp = muintptr(v &^ locked).ptr()
         if atomic.Casuintptr(&l.key, v, uintptr(mp.nextwaitm)) {
            // Dequeued an M.  Wake it.
            semawakeup(mp)
            break
         }
      }
   }
   gp.m.locks--
   if gp.m.locks < 0 {
      throw("runtime·unlock: lock count")
   }
   if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstack
      gp.stackguard0 = stackPreempt
   }
}

解锁

// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int) {}

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
   semrelease1(addr, handoff, skipframes)
}

解锁相对比较简单,从阻塞队列中取出sudog,并将其置为ready状态。

func semrelease1(addr *uint32, handoff bool, skipframes int) {
   root := semtable.rootFor(addr)
   atomic.Xadd(addr, 1)

   // Easy case: no waiters?
   // This check must happen after the xadd, to avoid a missed wakeup
   // (see loop in semacquire).
   if atomic.Load(&root.nwait) == 0 {
      return
   }

   // Harder case: search for a waiter and wake it.
   lockWithRank(&root.lock, lockRankRoot)
   if atomic.Load(&root.nwait) == 0 {
      // The count is already consumed by another goroutine,
      // so no need to wake up another goroutine.
      unlock(&root.lock)
      return
   }
   s, t0 := root.dequeue(addr)
   if s != nil {
      atomic.Xadd(&root.nwait, -1)
   }
   unlock(&root.lock)
   if s != nil { // May be slow or even yield, so unlock first
      acquiretime := s.acquiretime
      if acquiretime != 0 {
         mutexevent(t0-acquiretime, 3+skipframes)
      }
      if s.ticket != 0 {
         throw("corrupted semaphore ticket")
      }
      if handoff && cansemacquire(addr) {
         s.ticket = 1
      }
      readyWithTime(s, 5+skipframes)
      if s.ticket == 1 && getg().m.locks == 0 {
         // Direct G handoff
         // readyWithTime has added the waiter G as runnext in the
         // current P; we now call the scheduler so that we start running
         // the waiter G immediately.
         // Note that waiter inherits our time slice: this is desirable
         // to avoid having a highly contended semaphore hog the P
         // indefinitely. goyield is like Gosched, but it emits a
         // "preempted" trace event instead and, more importantly, puts
         // the current G on the local runq instead of the global one.
         // We only do this in the starving regime (handoff=true), as in
         // the non-starving case it is possible for a different waiter
         // to acquire the semaphore while we are yielding/scheduling,
         // and this would be wasteful. We wait instead to enter starving
         // regime, and then we start to do direct handoffs of ticket and
         // P.
         // See issue 33747 for discussion.
         goyield()
      }
   }
}

整体设计

承接上文,golang的锁可以说是goroutine级别的锁,或者runtime级别的锁。但是在涉及锁的阻塞队列时会面临更底层的临界区问题,golang使用了runtime2.go mutex来保护临界区,这时可能会涉及到线程的调度。

为什么说可能呢?因为在mutex中使用了自旋来提升性能。我们知道,如果持有锁的时间很短的话,自旋锁的性能是要高于互斥锁的。所以在一些快速操作中会选择用自旋锁,比如,中断上下文,当然,这也和中断上下文中不能阻塞有关。

回到mutex中,在mutex的操作中,对阻塞队列的读写确实是非耗时操作,那么自旋行为确实能提升整体的性能。

包括在更上层的sync.Mutex中,也有自旋的情况,当然在sync.Mutex中,自旋的条件更加苛刻。

所以在仔细了解后,会发现golang的锁还是非常有意思的。


如果觉得本文对您有帮助,可以请博主喝杯咖啡~
关于golang锁的一点东西,golang,锁,同步机制,设计
关于golang锁的一点东西,golang,锁,同步机制,设计文章来源地址https://www.toymoban.com/news/detail-621847.html

到了这里,关于关于golang锁的一点东西的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flutter关于StatefulWidget中State刷新时机的一点实用理解

    刚入门flutter开发,使用StatefulWidget踩了很多坑,就我遇到典型问题谈谈见解。 1.initState方法只会在控件初始化的时候执行一遍。 2.控件内部执行setState方法,则会每次执行build方法。 3.控件销毁会执行dispose方法,所以一些未释放的资源可以在该方法中执行,例如计时器。 4.使

    2024年02月10日
    浏览(32)
  • [数学建模] 0、关于数学建模的一点看法&付费专栏食用说明

    1、前言 在大学期间,几次参加数学建模的经历给我带来的印象还是很深

    2024年02月15日
    浏览(29)
  • 关于credal set和credal decision tree的一点思考(其实就是论文笔记)

    阅读Abellán老师的Credal-C4.5时,发现好难。。。然后又额外补充了一些论文,终于稍微懂一点点了,所以记录如下。 credal set在DS theory的定义如下 [1]: 这句话的意思是(证据理论中的)credal set是一个概率的凸集,这里面的概率p(x)受到上界pl函数和下界bel函数的控制(约束),

    2024年02月12日
    浏览(31)
  • 关于工作流开发前端选型的一点个人见解(bpmn.js与LogicFlow)

    掘金2023年度人气创作者打榜中,快来帮我打榜吧~ https://activity.juejin.cn/rank/2023/writer/747323639208391?utm_campaign=annual_2023utm_medium=self_web_shareutm_source=MiyueFE 首先需要明确的一点是,本文的出发点 纯粹是针对工作流开发 的场景的选型对比,其他业务场景下建议重新调研。 什么是工作

    2024年02月20日
    浏览(36)
  • 基于微信小程序 安卓APP的物流服务系统设计与实现,给后辈的一点建议

    本系统的用户可分为前台模块和后台管理员模块两个界面组成。一个界面用于管理员登录,管理员可以管理系统内所有功能,主要有首页、个人中心、客户管理、员工管理、物流服务管理、货物信息管理、货物运输管理、系统管理等功能;另外一个界面用于客户和员工登录,

    2024年04月14日
    浏览(34)
  • Golang 关于反射机制(一文了解)

    前言: Golang 反射比 C++ RTTI 要强大的多,但是比 .NET C#/VB/C++ 来说,它大约属于低阶反射支持的范畴。 但是 Golang 语言提供了相对强大的反射。 它总要比没有提供易用反射支持的要好的多,C++ 之中我们基本只能依赖模板、宏通过元编程来实现相对强大的反射机制。 Golang 反射

    2024年01月23日
    浏览(28)
  • 关于Linux同步机制知识点整理

    在Linux系统中,同步机制是操作系统中非常重要的一部分,以下是一些基本要点: 互斥锁 互斥锁是一种「独占锁」,比如当线程 A 加锁成功后,此时互斥锁已经被线程 A 独占了,只要线程 A 没有释放手中的锁,线程 B 加锁就会失败,失败的线程B于是就会释放 CPU 让给其他线程

    2024年02月11日
    浏览(36)
  • 【Consul】基于Golang实现Consul服务的注册、注销、修改、监控注册的服务变化、实时同步服务信息机制

    大家好 我是寸铁👊 总结了一篇【Consul】基于Go实现Consul服务的注册、注销、修改、监控注册的服务变化、实时同步服务信息机制✨ 这应该是目前全网最全的使用golang手搓Consul服务信息机制✨ 喜欢的小伙伴可以点点关注 💝 consul常常被用来作服务注册与服务发现,而它的wa

    2024年04月09日
    浏览(33)
  • 背包问题的一点看法

    背包问题已经被人讲得很透彻了,上古大神写的《背包九讲》已经相当详细的阐述了背包问题,本文不会过多赘述,主要总结一些有关背包的有趣的玩意。 01背包和完全背包是非常类似的问题,01背包的特点是每种物品最多只能取一个,而完全背包每种物品都可以任意取。 以

    2024年02月12日
    浏览(25)
  • 找实习、工作的一点浅见

    一、实习的必要性。 为什么需要去实习?1、实习能帮助自己增进对于具体职场的认识,包括具体工作的职责、内容、工作氛围、是否有较大压力等等;2、通过一段时间的实习经历,能帮助自己作出未来是否能胜任类似的工作的判断,如果有留用,是否考虑留下,如果没有留

    2024年02月10日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包