深入理解 go sync.Waitgroup

这篇具有很好参考价值的文章主要介绍了深入理解 go sync.Waitgroup。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文基于 Go 1.19。

go 里面的 WaitGroup 是非常常见的一种并发控制方式,它可以让我们的代码等待一组 goroutine 的结束。
比如在主协程中等待几个子协程去做一些耗时的操作,如发起几个 HTTP 请求,然后等待它们的结果。

WaitGroup 示例

下面的代码展示了一个 goroutine 等待另外 2 个 goroutine 结束的例子:

func TestWaitgroup(t *testing.T) {
	var wg sync.WaitGroup
	// 计数器 +2
	wg.Add(2)

	go func() {
		sendHttpRequest("https://baidu.com")
		// 计数器 -1
		wg.Done()
	}()

	go func() {
		sendHttpRequest("https://baidu.com")
		// 计数器 -1
		wg.Done()
	}()

	// 阻塞。计数器为 0 的时候,Wait 返回
	wg.Wait()
}

// 发起 HTTP GET 请求
func sendHttpRequest(url string) (string, error) {
	method := "GET"

	client := &http.Client{}
	req, err := http.NewRequest(method, url, nil)

	if err != nil {
		return "", err
	}

	res, err := client.Do(req)
	if err != nil {
		return "", err
	}
	defer res.Body.Close()

	body, err := io.ReadAll(res.Body)
	if err != nil {
		return "", err
	}

	return string(body), err
}

在这个例子中,我们做了如下事情:

  • 定义了一个 WaitGroup 对象 wg,调用 wg.Add(2) 将其计数器 +2
  • 启动两个新的 goroutine,在这两个 goroutine 中,使用 sendHttpRequest 函数发起了一个 HTTP 请求。
  • 在 HTTP 请求返回之后,调用 wg.Done 将计数器 -1
  • 在函数的最后,我们调用了 wg.Wait,这个方法会阻塞,直到 WaitGroup 的计数器的值为 0 才会解除阻塞状态。

WaitGroup 基本原理

WaitGroup 内部通过一个计数器来统计有多少协程被等待。这个计数器的值在我们启动 goroutine 之前先写入(使用 Add 方法),
然后在 goroutine 结束的时候,将这个计数器减 1(使用 Done 方法)。除此之外,在启动这些 goroutine 的协程中,
会调用 Wait 来进行等待,在 Wait 调用的地方会阻塞,直到 WaitGroup 内部的计数器减到 0。
也就实现了等待一组 goroutine 的目的

背景知识

在操作系统中,有多种实现进程/线程间同步的方式,如:test_and_setcompare_and_swap、互斥锁等。
除此之外,还有一种是信号量,它的功能类似于互斥锁,但是它能提供更为高级的方法,以便进程能够同步活动。

信号量

一个信号量(semaphore)S是一个整型变量,它除了初始化外只能通过两个标准的原子操作:wait()signal() 来访问。
操作 wait() 最初称为 P(荷兰语 proberen,测试);操作 signal() 最初称为 V(荷兰语 verhogen,增加),可按如下来定义 wait()

PV 原语。

wait(S) {
    while (S <= 0)
        ; // 忙等待
    S--;
}

可按如下来定义 signal()

signal(S) {
    S++;
}

wait()signal() 操作中,信号量整数值的修改应不可分割地执行。也就是说,当一个进程修改信号量值时,没有其他进程能够同时修改同一信号量的值。

简单来说,信号量实现的功能是:

  • 当信号量>0 时,表示资源可用,则 wait 会对信号量执行减 1 操作。
  • 当信号量<=0 时,表示资源暂时不可用,获取信号量时,当前的进程/线程会阻塞,直到信号量为正时被唤醒。

WaitGroup 中的信号量

WaitGroup 中,使用了信号量来实现 goroutine 的阻塞以及唤醒:

  • 在调用 Wait 的地方,goroutine 会陷入阻塞,直到信号量大于等于 0 的时候解除阻塞状态,得以继续执行。
  • 在调用 Done 的时候,如果 WaitGroup 内的等待协程的计数器减到 0 的时候,信号量会进行递增,这样那些阻塞的协程会进行执行下去。

WaitGroup 数据结构

type WaitGroup struct {
	noCopy noCopy

	// 高 32 位为计数器,低 32 位为等待者数量
	state atomic.Uint64
	sema  uint32
}

noCopy

我们发现,WaitGroup 中有一个字段 noCopy,顾名思义,它的目的是防止复制。
这个字段在运行时是没有什么影响的,但是我们通过 go vet 可以发现我们对 WaitGroup 的复制。
为什么不能复制呢?因为一旦复制,WaitGroup 内的计数器就不再准确了,比如下面这个例子:

func test(wg sync.WaitGroup) {
	wg.Done()
}

func TestWaitGroup(t *testing.T) {
	var wg sync.WaitGroup
	wg.Add(1)
	test(wg)
	wg.Wait()
}

go 里面的函数参数传递是值传递。调用 test(wg) 的时候将 WaitGroup 复制了一份。

在这个例子中,程序会永远阻塞下去,因为 test 中调用 wg.Done() 的时候,只是将 WaitGroup 副本的计数器减去了 1,
TestWaitGroup 里面的 WaitGroup 的计数器并没有发生改变,因此 Wait 会永远阻塞。

我们如果需要将 WaitGroup 作为参数,请传递指针:

func test(wg *sync.WaitGroup) {
	wg.Done()
}

传递指针之后,我们在 test 中调用 wg.Done() 修改的就是 TestWaitGroup 里面同一个 WaitGroup
从而,Wait 方法可以正常返回。

state

WaitGroup 里面的 state 是一个 64 位的 atomic.Uint64 类型,它的高 32 位用来保存 counter(也就是上面说的计数器),低 32 位用来保存 waiter(也就是阻塞在 Wait 上的 goroutine 数量。)

深入理解 go sync.Waitgroup,go,golang,开发语言,后端

sema

WaitGroup 通过 sema 来记录信号量:

  • runtime_Semrelease 表示将信号量递增(对应信号量中的 signal 操作)
  • runtime_Semacquire 表示将信号量递减(对应信号量中的 wait 操作)

简单来说,在调用 runtime_Semacquire 的时候 goroutine 会阻塞,而调用 runtime_Semrelease 会唤醒阻塞在同一个信号量上的 goroutine。

WaitGroup 的三个基本操作

  • Add: 这会将 WaitGroup 里面的 counter 加上一个整数(也就是传递给 Add 的函数参数)。
  • Done: 这会将 WaitGroup 里面的 counter 减去 1。
  • Wait: 这会将 WaitGroup 里面的 waiter 加上 1,并且调用 Wait 的地方会阻塞。(有可能会有多个 goroutine 等待一个 WaitGroup

WaitGroup 的实现

Add 的实现

Add 做了下面两件事:

  1. delta 加到 state 的高 32 位上
  2. 如果 counter0 了,并且 waiter 大于 0,表示所有被等待的 goroutine 都完成了,而还有在等待的 goroutine,这会唤醒那些阻塞在 Wait 上的 goroutine。

源码实现:文章来源地址https://www.toymoban.com/news/detail-792535.html

func (wg *WaitGroup) Add(delta int) {
	// wg.state 的计数器加上 delta
	//(加到 state 的高 32 上)
	state := wg.state.Add(uint64(delta) << 32) // 高 32 位加上 delta
	v := int32(state >> 32)                    // 高 32 位(counter)
	w := uint32(state)                         // 低 32 位(waiter)
	// 计数器不能为负数(加上 delta 之后不能为负数,最小只能到 0)
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	// 正常使用情况下,是先调用 Add 再调用 Wait 的,这种情况下,w 是 0,v > 0
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// v > 0,计数器大于 0
	// w == 0,没有在 Wait 的协程
	// 说明还没有到唤醒 waiter 的时候
	if v > 0 || w == 0 {
		return
	}

	// Add 负数的时候,v 会减去对应的数值,减到最后 v 是 0。
	// 计数器是 0,并且有等待的协程,现在要唤醒这些协程。

	// 存在等待的协程时,goroutine 已将计数器设置为0。
	// 现在不可能同时出现状态突变:
	// - Add 不能与 Wait 同时发生,
	// - 如果看到计数器==0,则 Wait 不会增加等待的协程。
	// 仍然要做一个廉价的健康检查,以检测 WaitGroup 的误用。
	if wg.state.Load() != state { // 不能在 Add 的同时调用 Wait
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}

	// 将等待的协程数量设置为 0。
	wg.state.Store(0)
	for ; w != 0; w-- {
		// signal,调用 Wait 的地方会解除阻塞
		runtime_Semrelease(&wg.sema, false, 0) // goyield
	}
}

Done 的实现

WaitGroup 里的 Done 其实只是对 Add 的调用,但是它的效果是,将计数器的值减去 1
背后的含义是:一个被等待的协程执行完毕了

Wait 的实现

Wait 主要功能是阻塞当前的协程:

  1. Wait 会先判断计数器是否为 0,为 0 说明没有任何需要等待的协程,那么就可以直接返回了。
  2. 如果计数器还不是 0,说明有协程还没执行完,那么调用 Wait 的地方就需要被阻塞起来,等待所有的协程完成。

源码实现:

func (wg *WaitGroup) Wait() {
	for {
		// 获取当前计数器
		state := wg.state.Load()
		// 计数器
		v := int32(state >> 32)
		// waiter 数量
		w := uint32(state)
		// v 为 0,不需要等待,直接返回
		if v == 0 {
			// 计数器是 0,不需要等待
			return
		}

		// 增加 waiter 数量。
		// 调用一次 Wait,waiter 数量会加 1。
		if wg.state.CompareAndSwap(state, state+1) {
			// 这会阻塞,直到 sema (信号量)大于 0
			runtime_Semacquire(&wg.sema) // goparkunlock
			// state 不等 0
			// wait 还没有返回又继续使用了 WaitGroup
			if wg.state.Load() != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			// 解除阻塞状态了,可以返回了
			return
		}
		// 状态没有修改成功(state 没有成功 +1),开始下一次尝试。
	}
}

总结

  • WaitGroup 使用了信号量来实现了并发资源控制,sema 字段表示信号量。
  • 使用 runtime_Semacquire 会使得 goroutine 阻塞直到计数器减少至 0,而使用 runtime_Semrelease 会使得信号量递增,这等于是通知之前阻塞在信号量上的协程,告诉它们可以继续执行了。
  • WaitGroup 作为参数传递的时候,需要传递指针作为参数,否则在被调用函数内对 Add 或者 Done 的调用,在 caller 里面调用的 Wait 会观测不到。
  • WaitGroup 使用一个 64 位的数来保存计数器(高 32 位)和 waiter(低 32 位,正在等待的协程的数量)。
  • WaitGroup 使用 Add 增加计数器,使用 Done 来将计数器减 1,使用 Wait 来等待 goroutine。Wait 会阻塞直到计数器减少到 0

到了这里,关于深入理解 go sync.Waitgroup的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入理解 Go 语言中的 iota

    iota是go语言的常量计数器,只能在常量表达式中使用,iota在const出现时将被重置为0,const中每新增一行常量声明将使iota计数一次,可理解为const语句块中的行索引。它会自动递增,从0开始。 尽管默认步长为1,但我们也可以通过在常量声明中显式赋值来修改 iota 的步长

    2024年02月10日
    浏览(46)
  • 【Golang星辰图】Go语言云计算SDK全攻略:深入Go云存储SDK实践

    在当今数字化时代,云计算和存储服务扮演着至关重要的角色,为应用程序提供高效、可靠的基础设施支持。本文将介绍几种流行的Go语言SDK,帮助开发者与AWS、Google Cloud、Azure、MinIO、 阿里云和腾讯云等各大云服务提供商的平台进行交互。 欢迎订阅专栏:Golang星辰图 1.1 提供

    2024年03月17日
    浏览(52)
  • 深入理解GO语言——GC垃圾回收二

    书接上回,无论怎么优化,Go V1.3都面临这个一个重要问题,就是mark-and-sweep 算法会暂停整个程序 。 Go是如何面对并这个问题的呢?接下来G V1.5版本 就用 三色并发标记法 来优化这个问题 Golang中的垃圾回收主要应用三色标记法,GC过程和其他用户goroutine可并发运行,但需要一

    2024年04月11日
    浏览(36)
  • 【Golang】go编程语言适合哪些项目开发?

    前言 在当今数字化时代,软件开发已成为各行各业的核心需求之一。 而选择适合的编程语言对于项目的成功开发至关重要。 本文将重点探讨Go编程语言适合哪些项目开发,以帮助读者在选择合适的编程语言时做出明智的决策。 Go 编程语言适合哪些项目开发? Go是由Google开发

    2024年02月04日
    浏览(76)
  • 100天精通Golang(基础入门篇)——第12天:深入解析Go语言中的集合(Map)及常用函数应用

    🌷 博主 libin9iOak带您 Go to Golang Language.✨ 🦄 个人主页——libin9iOak的博客🎐 🐳 《面试题大全》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺 🌊 《IDEA开发秘籍》学会IDEA常用操作,工作效率翻倍~💐 🪁 希望本文能够给您带来一定的帮助🌸文章粗浅,敬请批

    2024年02月12日
    浏览(47)
  • Go语言Sync包

    在处理 goroutine 时,确保它们不会同时访问资源是非常重要的,而 mutex 可以帮助我们做到这一点。 1.1 sync.Mutex 看看这个简单的例子,我没有使用互斥锁来保护我们的变量 a: 此代码的结果是不可预测的,如果幸运的话,您可能会得到 500,但通常结果会小于 500。现在,让我们

    2024年02月16日
    浏览(41)
  • 100天精通Golang(基础入门篇)——第15天:深入解析Go语言中函数的应用:从基础到进阶,助您精通函数编程!(进阶)

    🌷 博主 libin9iOak带您 Go to Golang Language.✨ 🦄 个人主页——libin9iOak的博客🎐 🐳 《面试题大全》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺 🌊 《IDEA开发秘籍》学会IDEA常用操作,工作效率翻倍~💐 🪁 希望本文能够给您带来一定的帮助🌸文章粗浅,敬请批

    2024年02月12日
    浏览(67)
  • 【Golang】VsCode下开发Go语言的环境配置(超详细图文详解)

    📓推荐网站(不断完善中):个人博客 📌个人主页:个人主页 👉相关专栏:CSDN专栏、个人专栏 🏝立志赚钱,干活想躺,瞎分享的摸鱼工程师一枚 ​ 话说在前,Go语言的编码方式是 UTF-8 ,理论上你直接使用文本进行编辑也是可以的,当然为了提升我们的开发效率我们还是需

    2024年02月07日
    浏览(83)
  • Go语言中sync.Map、sync.Pool和Context的用法

    目录 【sync.Map】 实现线程安全的 map 类型  使用 sync.Map 实现并发读写的map 【sync.Pool】 使用 带缓冲channel 实现对象池 使用 sync.Pool 创建临时对象池 【Context 上下文】 Context应用:实现带超时功能的远程调用 Context应用:监控指令  Context应用:取消关联任务 之前在 Go语言中ar

    2024年02月05日
    浏览(44)
  • golang waitgroup

    WaitGroup 可以解决一个 goroutine 等待多个 goroutine 同时结束的场景,这个比较常见的场景就是例如 后端 worker 启动了多个消费者干活,还有爬虫并发爬取数据,多线程下载等等。 我们这里模拟一个 worker 的例子 问题: 反过来支持多个 goroutine 等待一个 goroutine 完成后再干活吗?

    2024年02月16日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包