golang并发安全-select

这篇具有很好参考价值的文章主要介绍了golang并发安全-select。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前面说了golang的channel, 今天我们看看golang select 是怎么实现的。

数据结构

type scase struct {
	c    *hchan         // chan
	elem unsafe.Pointer // 数据
}

select 非默认的case 中都是处理channel 的 接受和发送,所有scase 结构体中c是用来存储select 的case中使用的channel

处理流程

golang并发安全-select,golang,开发语言,后端

select case 场景

编译器在中间代码生成期间会根据 select 中 case 的不同对控制语句进行优化,这一过程都发生在cmd/compile/internal/walk/select.go 中,下面会根据不同的场景进行分析代码。

没有case

代码示例

func main() {
    select {}
}

如果是空的select语句,程序会被阻塞,golang 带有死锁监测机制:如果当前写成无法被唤醒,则会panic

golang并发安全-select,golang,开发语言,后端

源码解读

在runtime/select.go中可以看到:如果cases为空直接调用gopark函数以waitReasonSelectNoCases的原因挂起当前的协程,并且无法被唤醒,golang监测到直接panic。

golang并发安全-select,golang,开发语言,后端

同样我们在walk/select.go的walkSelectCases函数中可以看到,如果case为空直接调用runtime.block函数

golang并发安全-select,golang,开发语言,后端

只有一个case

代码示例

func main() {
	ch := make(chan int)
	go func() {
		ch <- 1
	}()
	select {
	case data := <-ch:
		fmt.Println("ch data:", data)
	}
}

如果有输入直接打印ch data : 1 , 没有的话会被检测出all goroutines are asleep - deadlock!(和没有case的一样)

源码解读

如果一个非default case ,将读写转换成 ch <- 或 <- ch, 正常的channel读写

func walkSelectCases(cases []*ir.CommClause) []ir.Node {
	// optimization: one-case select: single op.
	if ncas == 1 {
		cas := cases[0] //获取case
		ir.SetPos(cas)
		l := cas.Init()
		if cas.Comm != nil { // 不是默认
			n := cas.Comm // 获取case的条件语句
			l = append(l, ir.TakeInit(n)...)
			switch n.Op() {
			default:
				base.Fatalf("select %v", n.Op())

			case ir.OSEND: // 如果是 send, 无须处理
				// already ok

			case ir.OSELRECV2:
				r := n.(*ir.AssignListStmt)
                // 如果不是 data, ok := <- ch 类型,处理成<- ch
				if ir.IsBlank(r.Lhs[0]) && ir.IsBlank(r.Lhs[1]) {
					n = r.Rhs[0]
					break
				}
                // 是的话, op设置成data, ok := <- ch形式
				r.SetOp(ir.OAS2RECV)
			}

			l = append(l, n)
		}
        // 将case 条件后要执行的语句加入带执行的列表
		l = append(l, cas.Body...)
        // 加入 break类型,跳出select-case 
		l = append(l, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
		return l
	}
    // convert case value arguments to addresses.
	// this rewrite is used by both the general code and the next optimization.
	var dflt *ir.CommClause
	for _, cas := range cases {
		ir.SetPos(cas)
		n := cas.Comm
		if n == nil {
			dflt = cas
			continue
		}
		switch n.Op() {
		case ir.OSEND:
			n := n.(*ir.SendStmt)
			n.Value = typecheck.NodAddr(n.Value)
			n.Value = typecheck.Expr(n.Value)

		case ir.OSELRECV2:
			n := n.(*ir.AssignListStmt)
			if !ir.IsBlank(n.Lhs[0]) {
				n.Lhs[0] = typecheck.NodAddr(n.Lhs[0])
				n.Lhs[0] = typecheck.Expr(n.Lhs[0])
			}
		}
	}
}

两个case(一个default)

代码示例

func main() {
	ch := make(chan int)
	select {
	case data := <-ch:
		fmt.Println("ch data:", data)
	default:
		fmt.Println("default")
	}
}

如果写入就走<- 读取,反之走默认

源码解读

如果是两个case,其中一个是default,非default的会根据send还是recv 调用channel的selectnbsend和 selectnbrecv。这两个方法是非阻塞的

func walkSelectCases(cases []*ir.CommClause) []ir.Node){
	// optimization: two-case select but one is default: single non-blocking op.
	if ncas == 2 && dflt != nil {
		cas := cases[0]
		if cas == dflt { // 如果是default 放在 cases[1]
			cas = cases[1]
		}

		n := cas.Comm
		ir.SetPos(n)
		r := ir.NewIfStmt(base.Pos, nil, nil, nil)
		r.SetInit(cas.Init())
		var cond ir.Node
		switch n.Op() {
		default:
			base.Fatalf("select %v", n.Op())

		case ir.OSEND:
            // 调用selectnbsend(c, v)
			// if selectnbsend(c, v) { body } else { default body }
			n := n.(*ir.SendStmt)
			ch := n.Chan
			cond = mkcall1(chanfn("selectnbsend", 2, ch.Type()), types.Types[types.TBOOL], r.PtrInit(), ch, n.Value)

		case ir.OSELRECV2:
			n := n.(*ir.AssignListStmt)
			recv := n.Rhs[0].(*ir.UnaryExpr)
			ch := recv.X
			elem := n.Lhs[0]
			if ir.IsBlank(elem) { //空的话 elem= NodNil
				elem = typecheck.NodNil()
			}
			cond = typecheck.Temp(types.Types[types.TBOOL])
            // 调用 selectnbrecv
			fn := chanfn("selectnbrecv", 2, ch.Type())
			call := mkcall1(fn, fn.Type().Results(), r.PtrInit(), elem, ch)
			as := ir.NewAssignListStmt(r.Pos(), ir.OAS2, []ir.Node{cond, n.Lhs[1]}, []ir.Node{call})
			r.PtrInit().Append(typecheck.Stmt(as))
		}

		r.Cond = typecheck.Expr(cond)
		r.Body = cas.Body
		r.Else = append(dflt.Init(), dflt.Body...)
		return []ir.Node{r, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil)}
	}    
}

每次尝试从channel读/写值,如果不成功则直接返回,不会阻塞。从selectnbsend和selectnbrecv看出,最后转换成if-else

// compiler implements
//
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    // block:false
    // chan将 select准换if-else
	return chansend(c, elem, false, getcallerpc())
}

// compiler implements
//
//	select {
//	case v, ok = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selected, ok = selectnbrecv(&v, c); selected {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
	// block:false
    // chan将 select准换if-else
    return chanrecv(c, elem, false)
}

多个case

代码示例

func main() {
	ch := make(chan int)
	go func() {
		tempArr := []int{1,2,3,4,5,6}
		for i := range tempArr {
			ch <- i
		}
	}()
	go func() {
		for {
			select {
			case i := <-ch:
				println("first: ", i)
			case i := <-ch:
				println("second", i)
			}
		}
	}()
	time.Sleep(3 * time.Second)

}

golang并发安全-select,golang,开发语言,后端

可以看到多个case,会随机选取一个case执行

源码解读

func walkSelectCases(cases []*ir.CommClause) []ir.Node {
	ncas := len(cases)
	sellineno := base.Pos
	if dflt != nil {
		ncas--
	}
    // 定义casorder为ncas大小的case语句的数组
	casorder := make([]*ir.CommClause, ncas)
    // 分别定义nsends为发送channel的case个数,nrecvs为接收channel的case个数
	nsends, nrecvs := 0, 0
    // 多case编译后待执行的语句列表
	var init []ir.Node

	// generate sel-struct
	base.Pos = sellineno
    // 定义selv为长度为ncas的scase类型的数组
    // scasetype()函数返回的就是scase结构体,包含c和elem两个字段
	selv := typecheck.Temp(types.NewArray(scasetype(), int64(ncas)))
	init = append(init, typecheck.Stmt(ir.NewAssignStmt(base.Pos, selv, nil)))

	// No initialization for order; runtime.selectgo is responsible for that.
	// 定义order为2倍的ncas长度的TUINT16类型的数组
    // 注意:selv和order作为runtime.selectgo()函数的入参,前者存放scase列表内存地址,后者用来做scase排序使用,排序是为了便于挑选出待执行的case
    order := typecheck.Temp(types.NewArray(types.Types[types.TUINT16], 2*int64(ncas)))

	var pc0, pcs ir.Node
	if base.Flag.Race {
		pcs = typecheck.Temp(types.NewArray(types.Types[types.TUINTPTR], int64(ncas)))
		pc0 = typecheck.Expr(typecheck.NodAddr(ir.NewIndexExpr(base.Pos, pcs, ir.NewInt(0))))
	} else {
		pc0 = typecheck.NodNil()
	}

	// register cases 遍历case生成scase对象放到selv中
	for _, cas := range cases {
		ir.SetPos(cas)

		init = append(init, ir.TakeInit(cas)...)

		n := cas.Comm
		if n == nil { // default:
			continue
		}

		var i int
		var c, elem ir.Node
		switch n.Op() { // 根据类型获取chan, elem的值
		default:
			base.Fatalf("select %v", n.Op())
		case ir.OSEND: // 发送chan类型,i从0开始递增
			n := n.(*ir.SendStmt)
			i = nsends
			nsends++
			c = n.Chan
			elem = n.Value
		case ir.OSELRECV2: // 接收chan,i从ncas开始递减
			n := n.(*ir.AssignListStmt)
			nrecvs++
			i = ncas - nrecvs
			recv := n.Rhs[0].(*ir.UnaryExpr)
			c = recv.X
			elem = n.Lhs[0]
		}

		casorder[i] = cas
       // 定义一个函数,写入c或elem到selv数组
		setField := func(f string, val ir.Node) {
            // 放到selv数组
			r := ir.NewAssignStmt(base.Pos, ir.NewSelectorExpr(base.Pos, ir.ODOT, ir.NewIndexExpr(base.Pos, selv, ir.NewInt(int64(i))), typecheck.Lookup(f)), val)
            // 添加到带执行列表
			init = append(init, typecheck.Stmt(r))
		}

		c = typecheck.ConvNop(c, types.Types[types.TUNSAFEPTR])
		setField("c", c)
		if !ir.IsBlank(elem) {
			elem = typecheck.ConvNop(elem, types.Types[types.TUNSAFEPTR])
			setField("elem", elem)
		}

		// TODO(mdempsky): There should be a cleaner way to
		// handle this.
		if base.Flag.Race {
			r := mkcallstmt("selectsetpc", typecheck.NodAddr(ir.NewIndexExpr(base.Pos, pcs, ir.NewInt(int64(i)))))
			init = append(init, r)
		}
	}
    // 如果发送chan和接收chan的个数不等于ncas,直接报错
	if nsends+nrecvs != ncas {
		base.Fatalf("walkSelectCases: miscount: %v + %v != %v", nsends, nrecvs, ncas)
	}

	// run the select  开始执行select动作
	base.Pos = sellineno
    // 定义chosen, recvOK作为selectgo()函数的两个返回值
    // chosen 表示被选中的case的索引,recvOK表示对于接收操作,是否成功接收
	chosen := typecheck.Temp(types.Types[types.TINT])
	recvOK := typecheck.Temp(types.Types[types.TBOOL])
	r := ir.NewAssignListStmt(base.Pos, ir.OAS2, nil, nil)
	r.Lhs = []ir.Node{chosen, recvOK}
    // 调用runtime.selectgo()函数作为运行时实际执行多case的select动作的函数
	fn := typecheck.LookupRuntime("selectgo")
	var fnInit ir.Nodes
	r.Rhs = []ir.Node{mkcall1(fn, fn.Type().Results(), &fnInit, bytePtrToIndex(selv, 0), bytePtrToIndex(order, 0), pc0, ir.NewInt(int64(nsends)), ir.NewInt(int64(nrecvs)), ir.NewBool(dflt == nil))}
	init = append(init, fnInit...)
	init = append(init, typecheck.Stmt(r))

	// selv and order are no longer alive after selectgo.
    // 执行完selectgo()函数后,销毁selv和order数组.
	init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, selv))
	init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, order))
	if base.Flag.Race {
		init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, pcs))
	}

	// dispatch cases 
    //定义一个函数,根据chosen确定的case分支生成if语句,执行该分支的语句
	dispatch := func(cond ir.Node, cas *ir.CommClause) {
		cond = typecheck.Expr(cond)
		cond = typecheck.DefaultLit(cond, nil)

		r := ir.NewIfStmt(base.Pos, cond, nil, nil)

		if n := cas.Comm; n != nil && n.Op() == ir.OSELRECV2 {
			n := n.(*ir.AssignListStmt)
			if !ir.IsBlank(n.Lhs[1]) {
				x := ir.NewAssignStmt(base.Pos, n.Lhs[1], recvOK)
				r.Body.Append(typecheck.Stmt(x))
			}
		}

		r.Body.Append(cas.Body.Take()...)
		r.Body.Append(ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
		init = append(init, r)
	}
    // 如果多case中有default分支,并且chosen小于0,执行该default分支
	if dflt != nil {
		ir.SetPos(dflt)
		dispatch(ir.NewBinaryExpr(base.Pos, ir.OLT, chosen, ir.NewInt(0)), dflt)
	}
    // 如果有chosen选中的case分支,即chosen等于i,则执行该分支
	for i, cas := range casorder {
		ir.SetPos(cas)
		dispatch(ir.NewBinaryExpr(base.Pos, ir.OEQ, chosen, ir.NewInt(int64(i))), cas)
	}

	return init
}

从上面代码可以看出:

1- 初始化过程: 生成scase数组,定义selv 存放scase数组内存地址,定义order 来给scase排序

2- 遍历所有的case ,将case放到带执行列表(不包括default)

3- 调用runtime。selectgo并将selv和order作为入参传入selectgo

4- 根据selectgo返回的chosen来生成if语句,执行对应的case

解锁加锁

加锁的顺序和解锁的顺序相反。


func sellock(scases []scase, lockorder []uint16) {
	var c *hchan
	for _, o := range lockorder {
		c0 := scases[o].c
		if c0 != c {
			c = c0
			lock(&c.lock)
		}
	}
}

func selunlock(scases []scase, lockorder []uint16) {
	// 我们必须非常小心,在解锁最后一把锁后不要触摸sel,因为sel可以在最后一次解锁后立即释放。
    //考虑以下情况。第一个M调用runtime·park()在runtime·selectgo()中传递sel。
    //一旦runtime·park()解锁了最后一个锁,另一个M会使调用select的G再次可运行,
    //并安排其执行。当G在另一个M上运行时,它锁定所有锁并释放sel。现在,如果第一个M触摸sel,它将访问释放的内存。
	for i := len(lockorder) - 1; i >= 0; i-- {
		c := scases[lockorder[i]].c
		if i > 0 && c == scases[lockorder[i-1]].c {
			continue // will unlock it on the next iteration
		}
		unlock(&c.lock)
	}
}

selectgo

selectgo 处理逻辑

golang并发安全-select,golang,开发语言,后端

// cas0指向[ncases]scase类型的数组,order0指向[2*ncases]uint16类型的数组(其中ncases必须<=65536)。
// 返回值有两个, chosen 和 recvOK,分别表示选中的case的序号,和对接收操作是否接收成功的布尔值
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
	if debugSelect {
		print("select: cas0=", cas0, "\n")
	}
    //==== 执行必要的初始化操作,并生成处理case的两种顺序:轮询顺序polIorder和加锁顺序lockorder。
  // 为了将scase分配到栈上,这里直接给cas1分配了64KB大小的数组,同理, 给order1分配了128KB大小的数组
	// NOTE: In order to maintain a lean stack size, the number of scases
	// is capped at 65536.
	cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
	order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
    // ncases个数 = 发送chan个数+ 接收chan个数
	ncases := nsends + nrecvs
    // scases是cas1数组的前ncases个元素
	scases := cas1[:ncases:ncases]
    // 顺序列表pollorder是order1的0- ncases个元素
	pollorder := order1[:ncases:ncases]
    // 加锁列表lockorder是order1的ncase到 2 ncases 个元素
	lockorder := order1[ncases:][:ncases:ncases]
	// NOTE: 编译器初始化的pollorder/lockorder的基础数组不是零。
	// Even when raceenabled is true, there might be select
	// statements in packages compiled without -race (e.g.,
	// ensureSigM in runtime/signal_unix.go).
	var pcs []uintptr
	if raceenabled && pc0 != nil {
		pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))
		pcs = pc1[:ncases:ncases]
	}
	casePC := func(casi int) uintptr {
		if pcs == nil {
			return 0
		}
		return pcs[casi]
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

    // 生成排列顺序
    norder := 0
	for i := range scases {
		cas := &scases[i]

		// Omit cases without channels from the poll and lock orders.
        // 处理case中channel为空的情况
		if cas.c == nil {
			cas.elem = nil // 便于GC
			continue
		}
       // 通过fastrandn函数引入随机性,确定pollorder列表中case的随机顺序索引
		j := fastrandn(uint32(norder + 1))
		pollorder[norder] = pollorder[j]
		pollorder[j] = uint16(i)
		norder++
	}
	// 重新生成列表
	pollorder = pollorder[:norder]
	lockorder = lockorder[:norder]

    // 根据chan地址确定lockorder加锁排序列表的顺序
    // 简单的堆排序,以保证nlogn时间复杂度完成排序
	for i := range lockorder {
		j := i
        // 从轮询顺序开始,在同一channel上排序。
		c := scases[pollorder[i]].c
		for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
			k := (j - 1) / 2
			lockorder[j] = lockorder[k]
			j = k
		}
		lockorder[j] = pollorder[i]
	}
	for i := len(lockorder) - 1; i >= 0; i-- {
		o := lockorder[i]
		c := scases[o].c
		lockorder[i] = lockorder[0]
		j := 0
		for {
			k := j*2 + 1
			if k >= i {
				break
			}
			if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
				k++
			}
			if c.sortkey() < scases[lockorder[k]].c.sortkey() {
				lockorder[j] = lockorder[k]
				j = k
				continue
			}
			break
		}
		lockorder[j] = o
	}

	if debugSelect {
		for i := 0; i+1 < len(lockorder); i++ {
			if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
				print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
				throw("select: broken sort")
			}
		}
	}

    // 锁定select中涉及的所有channel
	sellock(scases, lockorder)

	var (
		gp     *g
		sg     *sudog
		c      *hchan
		k      *scase
		sglist *sudog
		sgnext *sudog
		qp     unsafe.Pointer
		nextp  **sudog
	)

	// === pass 1 - 查找可以等待处理的channel
	var casi int
	var cas *scase
	var caseSuccess bool
	var caseReleaseTime int64 = -1
	var recvOK bool
	for _, casei := range pollorder {
		casi = int(casei) // case的索引
		cas = &scases[casi]  
		c = cas.c
		if casi >= nsends { // 处理接收channel的case
			sg = c.sendq.dequeue()
			if sg != nil {
                // 如果当前channel的sendq上有等待的goroutine,
                // 跳到recv代码 并从缓冲区读取数据后将等待goroutine中的数据放入到缓冲区中相同的位置
				goto recv
			}
			if c.qcount > 0 {
                //如果当前channel的缓冲区不为空,就会跳到bufrecv标签处从缓冲区获取数据;
				goto bufrecv
			}
			if c.closed != 0 {
                //如果当前channel已经被关闭,就会跳到rclose读取末尾数据和收尾工作;
				goto rclose
			}
		} else { // 处理发送channel的case
			if raceenabled {
				racereadpc(c.raceaddr(), casePC(casi), chansendpc)
			}
			if c.closed != 0 {
                // 如果当前channel已经被关闭就会直接跳到sclose标签(panic中止程序)
				goto sclose
			}
			sg = c.recvq.dequeue()
			if sg != nil {
                // 如果当前channel的recvq上有等待的goroutine,就会跳到 send标签向channel发送数据;
				goto send
			}
			if c.qcount < c.dataqsiz {
                // 如果当前channel的缓冲区存在空闲位置,就会将待发送的数据存入缓冲区;
				goto bufsend
			}
		}
	}

	if !block { // 如果是非阻塞,即包含default分支,解锁所有channel并返回
		selunlock(scases, lockorder)
		casi = -1
		goto retc
	}

	// === pass 2 - 将当前goroutine根据需要挂在chan的sendq或recvq上
	gp = getg() // 获取当前的groutine
	if gp.waiting != nil {
		throw("gp.waiting != nil")
	}
	nextp = &gp.waiting // 正在等待的sudog结构;按锁定顺序
	for _, casei := range lockorder {
		casi = int(casei)
		cas = &scases[casi]
		c = cas.c
		sg := acquireSudog()
        // 获取sudog,将当前goroutine绑定到sudog上
		sg.g = gp
		sg.isSelect = true
        // 在分配elem和在gp.waiting上排队sg之间没有堆栈分割,copystack可以找到它。
		sg.elem = cas.elem
		sg.releasetime = 0
		if t0 != 0 {
			sg.releasetime = -1
		}
		sg.c = c
        // 按锁定顺序构建waiting list 。
		*nextp = sg
		nextp = &sg.waitlink
        // 加入相应等待队列
		if casi < nsends {
			c.sendq.enqueue(sg)
		} else {
			c.recvq.enqueue(sg)
		}
	}

	// 被唤醒后会根据 param 来判断是否是由 close 操作唤醒的,所以先置为 nil
	gp.param = nil
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
    // 挂起当前goroutine
	gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
	gp.activeStackChans = false
    // 加锁所有的channel
	sellock(scases, lockorder)

	gp.selectDone = 0
	sg = (*sudog)(gp.param)
     // param 存放唤醒 goroutine 的 sudog,如果是关闭操作唤醒的,那么就为 nil
	gp.param = nil

	// === pass 3 - 当前 Goroutine 被唤醒之后找到满足条件的 Channel 并进行处理
    //dequeue from unsuccessful chans
	// otherwise they stack up on quiet channels
	// record the successful case, if any.
	// We singly-linked up the SudoGs in lock order.
    // 从不成功的通道中退出队列,否则它们会堆积在安静的通道上,记录成功的案例(如果有的话)。我们单独将SudoG按锁定顺序连接起来。


	casi = -1
	cas = nil
	caseSuccess = false
    // 当前goroutine 的 waiting 链表按照lockorder顺序存放着case的sudog
	sglist = gp.waiting
    // 在从 gp.waiting 取消case的sudog链接之前清除所有元素,便于GC
	for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
		sg1.isSelect = false
		sg1.elem = nil
		sg1.c = nil
	}
    // 清楚当前goroutine的waiting链表,因为被sg代表的协程唤醒了
	gp.waiting = nil

	for _, casei := range lockorder {
		k = &scases[casei]
        // 如果相等说明,goroutine是被当前case的channel收发操作唤醒的
		if sg == sglist {
            // sg唤醒了当前goroutine, 则当前G已经从sg的队列中出队,这里不需要再次出队
			casi = int(casei)
			cas = k
			caseSuccess = sglist.success
			if sglist.releasetime > 0 {
				caseReleaseTime = sglist.releasetime
			}
		} else {
            // 不是此case唤醒当前goroutine, 将goroutine从case对应的队列(发送或接收)出队
			c = k.c
			if int(casei) < nsends {
				c.sendq.dequeueSudoG(sglist)
			} else {
				c.recvq.dequeueSudoG(sglist)
			}
		}
         // 释放当前case的sudog,然后处理下一个case的sudog
		sgnext = sglist.waitlink
		sglist.waitlink = nil
		releaseSudog(sglist)
		sglist = sgnext
	}

	if cas == nil {
		throw("selectgo: bad wakeup")
	}

	c = cas.c

	if debugSelect {
		print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
	}

	if casi < nsends {
		if !caseSuccess {
			goto sclose
		}
	} else {
		recvOK = caseSuccess
	}

	if raceenabled {
		if casi < nsends {
			raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
		} else if cas.elem != nil {
			raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
		}
	}
	if msanenabled {
		if casi < nsends {
			msanread(cas.elem, c.elemtype.size)
		} else if cas.elem != nil {
			msanwrite(cas.elem, c.elemtype.size)
		}
	}
	if asanenabled {
		if casi < nsends {
			asanread(cas.elem, c.elemtype.size)
		} else if cas.elem != nil {
			asanwrite(cas.elem, c.elemtype.size)
		}
	}

	selunlock(scases, lockorder)
	goto retc

bufrecv:
    // 能从buffer获取数据
	if raceenabled {
		if cas.elem != nil {
			raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
		}
		racenotify(c, c.recvx, nil)
	}
	if msanenabled && cas.elem != nil {
		msanwrite(cas.elem, c.elemtype.size)
	}
	if asanenabled && cas.elem != nil {
		asanwrite(cas.elem, c.elemtype.size)
	}
	recvOK = true
	qp = chanbuf(c, c.recvx)
	if cas.elem != nil {
		typedmemmove(c.elemtype, cas.elem, qp)
	}
	typedmemclr(c.elemtype, qp)
	c.recvx++
	if c.recvx == c.dataqsiz {
		c.recvx = 0
	}
	c.qcount--
	selunlock(scases, lockorder)
	goto retc

bufsend:
	// 发送数据到缓存
	if raceenabled {
		racenotify(c, c.sendx, nil)
		raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
	}
	if msanenabled {
		msanread(cas.elem, c.elemtype.size)
	}
	if asanenabled {
		asanread(cas.elem, c.elemtype.size)
	}
	typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
	c.sendx++
	if c.sendx == c.dataqsiz {
		c.sendx = 0
	}
	c.qcount++
	selunlock(scases, lockorder)
	goto retc

recv:
    // 从休眠sender(sg)接收
	recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	if debugSelect {
		print("syncrecv: cas0=", cas0, " c=", c, "\n")
	}
	recvOK = true
	goto retc

rclose:
    // 读取结束的channel
	selunlock(scases, lockorder)
	recvOK = false
	if cas.elem != nil {
		typedmemclr(c.elemtype, cas.elem)
	}
	if raceenabled {
		raceacquire(c.raceaddr())
	}
	goto retc

send:
	// 想休眠的接收房发送数据
	if raceenabled {
		raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
	}
	if msanenabled {
		msanread(cas.elem, c.elemtype.size)
	}
	if asanenabled {
		asanread(cas.elem, c.elemtype.size)
	}
	send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	if debugSelect {
		print("syncsend: cas0=", cas0, " c=", c, "\n")
	}
	goto retc

retc:
	if caseReleaseTime > 0 {
		blockevent(caseReleaseTime-t0, 1)
	}
	return casi, recvOK

sclose:
	// 向关闭的channel发送数据
	selunlock(scases, lockorder)
	panic(plainError("send on closed channel"))
}

总结

简单总结下select对case处理逻辑:

1- 空的case 会被golang监听到无法唤醒的协程,会panic

2- 如果只有一个case, 根据操作类型转换成 <- ch 或 成ch <- () (会跳用channel 的 chansend , chanrecv)

3- 如果一个default 一个非default 的case,非default会走 selectnbsend 和 selectnbrecv 非阻塞的方法(最后转换成if-else 语句)

4- 多个case 的情况下, cmd/compile/internal/walk/select.go 优化程序中:

4.1 对 scase 数组, selv ,order数组初始化,将case放在带执行列表中

4.2 调用selectgo函数,根据返回的chosen 结果来生成if语句,执行对应的case

selectgo 函数:

1- 随机生成一个便利case 的 轮询 poollorder, 根据channel 地址生成一个枷锁顺序的lockorder。(随机顺序保证公平性,加锁顺序能够避免思索)

2- 根据pollorder顺序查找cases是否包含立即处理的chan, 如果有就处理。没有处理的话,创建 sudo 结构,将当前的G 加入各case的channel 对应的 接收发送队列,等待其他G唤醒

3- 当调度器 唤醒当前的G,会按照lockorder ,访问所有的case。从中找到需要处理的case进行读写处理,同时从所有的case 的发送姐搜队列中移除当前的文章来源地址https://www.toymoban.com/news/detail-819410.html

到了这里,关于golang并发安全-select的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Golang 中的 map 为什么是并发不安全的?

      golang 中的 map 是并发不安全的,多个 go 协程同时对同一个 map 进行读写操作时,会导致数据竞争(data race)问题,程序会 panic。   如果一个协程正在写入 map,而另一个协程正在读取或写入 map,那么就有可能出现一些未定义的行为,例如:读取到的值可能是过期的、不

    2024年02月05日
    浏览(59)
  • golang实现webgis后端开发

    目录 前言 二、实现步骤 1.postgis数据库和model的绑定 2.将pg库中的要素转换为geojson (1)几何定义 (2)将wkb解析为几何类型 (3)定义geojson类型 (4)数据转换 (5)数据返回  2.前端传入的geojson储存到数据库 3、其他功能实现 总结         停更了接近一个月都在研究一门新语言gola

    2024年02月08日
    浏览(50)
  • Go 语言 map 是并发安全的吗?

    原文链接: Go 语言 map 是并发安全的吗? Go 语言中的 map 是一个非常常用的数据结构,它允许我们快速地存储和检索键值对。然而,在并发场景下使用 map 时,还是有一些问题需要注意的。 本文将探讨 Go 语言中的 map 是否是并发安全的,并提供三种方案来解决并发问题。 先来

    2024年02月06日
    浏览(37)
  • 建站系列(六)--- 后端开发语言

    建站系列(一)— 网站基本常识 建站系列(二)— 域名、IP地址、URL、端口详解 建站系列(三)— 网络协议 建站系列(四)— Web服务器之Apache、Nginx 建站系列(五)— 前端开发语言之HTML、CSS、JavaScript 建站系列(六)— 后端开发语言 建站系列(七)— 常用前后端框架

    2024年02月09日
    浏览(41)
  • 【Golang】go编程语言适合哪些项目开发?

    前言 在当今数字化时代,软件开发已成为各行各业的核心需求之一。 而选择适合的编程语言对于项目的成功开发至关重要。 本文将重点探讨Go编程语言适合哪些项目开发,以帮助读者在选择合适的编程语言时做出明智的决策。 Go 编程语言适合哪些项目开发? Go是由Google开发

    2024年02月04日
    浏览(80)
  • select实现服务器并发

     

    2024年02月07日
    浏览(50)
  • 后端开发必知的11个线程安全小技巧

      对于从事后端开发的同学来说,线程安全问题是我们每天都需要考虑的问题。   线程安全问题通俗地讲主要是在多线程的环境下,不同线程同时读和写公共资源(临界资源)导致的数据异常问题。   比如:变量a=0,线程1给该变量+1,线程2也给该变量+1。此时,线程3获取

    2024年02月15日
    浏览(42)
  • Go学习圣经:Go语言实现高并发CRUD业务开发

    现在 拿到offer超级难 ,甚至连面试电话,一个都搞不到。 尼恩的技术社群中(50+),很多小伙伴凭借 “左手云原生+右手大数据”的绝活,拿到了offer,并且是非常优质的offer, 据说年终奖都足足18个月 。 第二个案例就是:前段时间,一个2年小伙伴希望涨薪到18K, 尼恩把

    2024年02月11日
    浏览(53)
  • 【Golang】VsCode下开发Go语言的环境配置(超详细图文详解)

    📓推荐网站(不断完善中):个人博客 📌个人主页:个人主页 👉相关专栏:CSDN专栏、个人专栏 🏝立志赚钱,干活想躺,瞎分享的摸鱼工程师一枚 ​ 话说在前,Go语言的编码方式是 UTF-8 ,理论上你直接使用文本进行编辑也是可以的,当然为了提升我们的开发效率我们还是需

    2024年02月07日
    浏览(85)
  • 后端开发有哪几种语言? - 易智编译EaseEditing

    后端开发是构建应用程序的一部分,负责处理服务器端的逻辑、数据库交互和数据处理。有许多编程语言可用于后端开发,以下是一些常见的后端开发语言: Java: Java是一种广泛使用的面向对象编程语言,具有强大的跨平台能力。在后端开发中,Java通常与Java EE(Java Platfor

    2024年02月11日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包