从源码分析 Go 语言使用 cgo 导致的线程增长

这篇具有很好参考价值的文章主要介绍了从源码分析 Go 语言使用 cgo 导致的线程增长。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

TDengine Go 连接器 https://github.com/taosdata/driver-go 使用 cgo 调用 taos.so 中的 API,使用过程中发现线程数不断增长,本文从一个 cgo 调用开始解析 Go 源码,分析造成线程增长的原因。

转换 cgo 代码

对 driver-go/wrapper/taosc.go 进行转换

go tool cgo taosc.go

执行后生成 _obj 文件夹

go 代码分析

taosc.cgo1.goTaosResetCurrentDB 为例来分析。

// TaosResetCurrentDB void taos_reset_current_db(TAOS *taos);
func TaosResetCurrentDB(taosConnect unsafe.Pointer) {
    func() { _cgo0 := /*line :161:26*/taosConnect; _cgoCheckPointer(_cgo0, nil); _Cfunc_taos_reset_current_db(_cgo0); }()
}

//go:linkname _cgoCheckPointer runtime.cgoCheckPointer
func _cgoCheckPointer(interface{}, interface{})

//go:cgo_unsafe_args
func _Cfunc_taos_reset_current_db(p0 unsafe.Pointer) (r1 _Ctype_void) {
    _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0)))
    if _Cgo_always_false {
        _Cgo_use(p0)
    }
    return
}

//go:linkname _cgo_runtime_cgocall runtime.cgocall
func _cgo_runtime_cgocall(unsafe.Pointer, uintptr) int32

//go:cgo_import_static _cgo_453a0cad50ef_Cfunc_taos_reset_current_db
//go:linkname __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db _cgo_453a0cad50ef_Cfunc_taos_reset_current_db
var __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db byte
var _cgo_453a0cad50ef_Cfunc_taos_reset_current_db = unsafe.Pointer(&__cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db)

  • TaosResetCurrentDB 首先调用 _cgoCheckPointer 检查传入参数是否为 nil
  • //go:linkname _cgoCheckPointer runtime.cgoCheckPointer 表示 cgoCheckPointer 方法实现是 runtime.cgoCheckPointer,如果传入参数是 nil 程序将会 panic
  • 接着调用 _Cfunc_taos_reset_current_db
  • Cfunc_taos_reset_current_db 方法中 _Cgo_always_false 在运行时会是 false,所以只分析第一句 _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0)))
    • _cgo_runtime_cgocall 实现是 runtime.cgocall 这个会重点分析。
    • _cgo_453a0cad50ef_Cfunc_taos_reset_current_db 由上方最后代码块可以看出是 taos_reset_current_db 方法指针。
    • uintptr(unsafe.Pointer(&p0)) 表示 p0 的指针地址。
    • 由上面可以看出这句意思是调用 runtime.cgocall,参数为方法指针和参数的指针地址。

分析 runtime.cgocall

基于 golang 1.20.4 分析该方法

func cgocall(fn, arg unsafe.Pointer) int32 {
    if !iscgo && GOOS != "solaris" && GOOS != "illumos" && GOOS != "windows" {
        throw("cgocall unavailable")
    }

    if fn == nil {
        throw("cgocall nil")
    }

    if raceenabled {
        racereleasemerge(unsafe.Pointer(&racecgosync))
    }

    mp := getg().m // 获取当前 goroutine 的 M
    mp.ncgocall++  // 总 cgo 计数 +1
    mp.ncgo++      // 当前 cgo 计数 +1

    mp.cgoCallers[0] = 0 // 重置追踪

    entersyscall() // 进入系统调用,保存上下文, 标记当前 goroutine 独占 m, 跳过垃圾回收

    osPreemptExtEnter(mp) // 标记异步抢占, 使异步抢占逻辑失效

    mp.incgo = true // 修改状态
    errno := asmcgocall(fn, arg) // 真正进行方法调用的地方

    mp.incgo = false // 修改状态
    mp.ncgo-- // 当前 cgo 调用-1

    osPreemptExtExit(mp) // 恢复异步抢占

    exitsyscall() // 退出系统调用,恢复调度器控制


    if raceenabled {
        raceacquire(unsafe.Pointer(&racecgosync))
    }

    // 避免 GC 过早回收
    KeepAlive(fn)
    KeepAlive(arg)
    KeepAlive(mp)

    return errno
}

其中两个主要的方法 entersyscallasmcgocall,接下来对这两个方法进行着重分析。

分析 entersyscall

func entersyscall() {
    reentersyscall(getcallerpc(), getcallersp())
}

entersyscall 直接调用的 reentersyscall,关注下 reentersyscall 注释中的一段:

// If the syscall does not block, that is it, we do not emit any other events.
// If the syscall blocks (that is, P is retaken), retaker emits traceGoSysBlock;

如果 syscall 调用没有阻塞则不会触发任何事件,如果被阻塞 retaker 会触发 traceGoSysBlock,那需要了解一下多长时间被认为是阻塞,先跟到 retaker 方法。

func retake(now int64) uint32 {
    n := 0
    lock(&allpLock)
    for i := 0; i < len(allp); i++ {
        pp := allp[i]
        if pp == nil {
            continue
        }
        pd := &pp.sysmontick
        s := pp.status
        sysretake := false
        if s == _Prunning || s == _Psyscall {
            t := int64(pp.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
            } else if pd.schedwhen+forcePreemptNS <= now {
                preemptone(pp)
                sysretake = true
            }
        }
        // 从系统调用中抢占P
        if s == _Psyscall {
            // 如果已经超过了一个系统监控的 tick(20us),则从系统调用中抢占 P
            t := int64(pp.syscalltick)
            if !sysretake && int64(pd.syscalltick) != t {
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            if runqempty(pp) && sched.nmspinning.Load()+sched.npidle.Load() > 0 && pd.syscallwhen+10*1000*1000 > now {
                continue
            }
            unlock(&allpLock)
            incidlelocked(-1)
            if atomic.Cas(&pp.status, s, _Pidle) {
                if trace.enabled {
                    traceGoSysBlock(pp)
                    traceProcStop(pp)
                }
                n++
                pp.syscalltick++
                handoffp(pp)
            }
            incidlelocked(1)
            lock(&allpLock)
        }
    }
    unlock(&allpLock)
    return uint32(n)
}

从上面可以看到系统调用阻塞 20 多微秒会被抢占 P,cgo 被迫 handoffp,接下来分析 handoffp 方法

func handoffp(pp *p) {
    // ...
    // 没有任务且没有自旋和空闲的 M 则需要启动一个新的 M
    if sched.nmspinning.Load()+sched.npidle.Load() == 0 && sched.nmspinning.CompareAndSwap(0, 1) {
        sched.needspinning.Store(0)
        startm(pp, true)
        return
    }
    // ...
}

handoffp 方法会调用 startm 来启动一个新的 M,跟到 startm 方法。

func startm(pp *p, spinning bool) {
    // ...
    nmp := mget()
    if nmp == nil {
        // 没有M可用,调用newm
        id := mReserveID()
        unlock(&sched.lock)

        var fn func()
        if spinning {
            fn = mspinning
        }
        newm(fn, pp, id)
        releasem(mp)
        return
    }
    // ...
}

此时如果没有 M startm 会调用 newm 创建一个新的 M,接下来分析 newm 方法。

func newm(fn func(), pp *p, id int64) {
    acquirem()
    mp := allocm(pp, fn, id)
    mp.nextp.set(pp)
    mp.sigmask = initSigmask
    if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {
        lock(&newmHandoff.lock)
        if newmHandoff.haveTemplateThread == 0 {
            throw("on a locked thread with no template thread")
        }
        mp.schedlink = newmHandoff.newm
        newmHandoff.newm.set(mp)
        if newmHandoff.waiting {
            newmHandoff.waiting = false
            notewakeup(&newmHandoff.wake)
        }
        unlock(&newmHandoff.lock)
        releasem(getg().m)
        return
    }
    newm1(mp)
    releasem(getg().m)
}

func newm1(mp *m) {
    if iscgo {
        var ts cgothreadstart
        if _cgo_thread_start == nil {
            throw("_cgo_thread_start missing")
        }
        ts.g.set(mp.g0)
        ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
        ts.fn = unsafe.Pointer(abi.FuncPCABI0(mstart))
        if msanenabled {
            msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
        }
        if asanenabled {
            asanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
        }
        execLock.rlock()
        // 创建新线程
        asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
        execLock.runlock()
        return
    }
    execLock.rlock()
    newosproc(mp)
    execLock.runlock()
}

newm 看出如果线程都在阻塞中则调用 newm1newm1 调用 _cgo_thread_start 创建新线程。

由以上分析得出当高并发调用 cgo 且执行时间超过 20 微秒时会创建新线程。

分析 asmcgocall

只分析 amd64
asm_amd64.s

TEXT ·asmcgocall(SB),NOSPLIT,$0-20
    MOVQ    fn+0(FP), AX
    MOVQ    arg+8(FP), BX

    MOVQ    SP, DX

    // 考虑是否需要切换到 m.g0 栈
    // 也用来调用创建新的 OS 线程,这些线程已经在 m.g0 栈中了
    get_tls(CX)
    MOVQ    g(CX), DI
    CMPQ    DI, $0
    JEQ nosave
    MOVQ    g_m(DI), R8
    MOVQ    m_gsignal(R8), SI
    CMPQ    DI, SI
    JEQ nosave
    MOVQ    m_g0(R8), SI
    CMPQ    DI, SI
    JEQ nosave
    
    // 切换到系统栈
    CALL    gosave_systemstack_switch<>(SB)
    MOVQ    SI, g(CX)
    MOVQ    (g_sched+gobuf_sp)(SI), SP

    // 于调度栈中(pthread 新创建的栈)
    // 确保有足够的空间给四个 stack-based fast-call 寄存器
    // 为使得 windows amd64 调用服务
    SUBQ    $64, SP
    ANDQ    $~15, SP // 为 gcc ABI 对齐
    MOVQ    DI, 48(SP) // 保存 g
    MOVQ    (g_stack+stack_hi)(DI), DI
    SUBQ    DX, DI
    MOVQ    DI, 40(SP) // 保存栈深 (不能仅保存 SP,因为栈可能在回调时被复制)
    MOVQ    BX, DI  // DI = AMD64 ABI 第一个参数
    MOVQ    BX, CX  // CX = Win64 第一个参数
    CALL    AX  // 调用 fn

    // 恢复寄存器、 g、栈指针
    get_tls(CX)
    MOVQ    48(SP), DI
    MOVQ    (g_stack+stack_hi)(DI), SI
    SUBQ    40(SP), SI
    MOVQ    DI, g(CX)
    MOVQ    SI, SP

    MOVL    AX, ret+16(FP)
    RET

nosave:
    // 在系统栈上运行,可能没有 g
    // 没有 g 的情况发生在线程创建中或线程结束中(比如 Solaris 平台上的 needm/dropm)
    // 这段代码和上面类似,但没有保存和恢复 g,且没有考虑栈的移动问题(因为我们在系统栈上,而非 goroutine 栈)
    // 如果已经在系统栈上,则上面的代码可被直接使用,在 Solaris 上会进入下面这段代码。
    // 使用这段代码来为所有 "已经在系统栈" 的调用进行服务,从而保持正确性。
    SUBQ    $64, SP
    ANDQ    $~15, SP // ABI 对齐
    MOVQ    $0, 48(SP) // 上面的代码保存了 g, 确保 debug 时可用
    MOVQ    DX, 40(SP) // 保存原始的栈指针
    MOVQ    BX, DI  // DI = AMD64 ABI 第一个参数
    MOVQ    BX, CX  // CX = Win64 第一个参数
    CALL    AX
    MOVQ    40(SP), SI // 恢复原来的栈指针
    MOVQ    SI, SP
    MOVL    AX, ret+16(FP)
    RET

这段就是将当前栈移到系统栈去执行,因为 C 需要无穷大的栈,在 Go 的栈上执行 C 函数会导致栈溢出。

产生问题

cgo 调用会将当前栈移到系统栈,并且当 cgo 高并发调用且阻塞超过 20 微秒时会新建线程。而 Go 并不会销毁线程,由此造成线程增长。

解决方案

限制 Go 程序最大线程数,默认为 cpu 核数。

runtime.GOMAXPROCS(runtime.NumCPU())

使用 channel 限制 cgo 最大并发数为 cpu 核数

package thread

import "runtime"

var c chan struct{}

func Lock() {
    c <- struct{}{}
}

func Unlock() {
    <-c
}

func init() {
    c = make(chan struct{}, runtime.NumCPU())
}

针对超过 20 微秒的 cgo 调用进行限制:文章来源地址https://www.toymoban.com/news/detail-471679.html

thread.Lock()
wrapper.TaosFreeResult(result)
thread.Unlock()

到了这里,关于从源码分析 Go 语言使用 cgo 导致的线程增长的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Go坑:time.After可能导致的内存泄露问题分析

    Go 中 time.After 可能导致的内存泄露 go v1.20.4 time 包中有 3 个比较常用的定时函数:NewTicker,NewTimer 和 time.After: NewTimer : 表示在一段时间后才执行,默认情况下执行一次。如果想再次执行,需要调用 time.Reset() 方法,这时类似于 NewTicker 定时器了。可以调用 stop 方法停止执行。

    2024年02月02日
    浏览(43)
  • 连接sqlite3报错:go-sqlite3 requires cgo to work. This is a stub

    register db Ping default , Binary was compiled with ‘CGO_ENABLED=0’, go-sqlite3 requires cgo to work. This is a stub 报错信息: Failed to build the application: # runtime/cgo cgo: C compiler “gcc” not found: exec: “gcc”: executable file not found in %PATH% 3.1 进入Sqlite3官网 github 3.2 找到Windows部分 3.2 点击下载链接 3.3 按自

    2024年02月03日
    浏览(31)
  • 线程池使用不规范导致线程数大以及@Async的规范使用

    文章详细内容来自:线程数突增!领导:谁再这么写就滚蛋! 下面是看完后文章的,一个总结 线程池的使用不规范,导致程序中线程数不下降,线程数量大。 临时变量的接口,通过下面简单的线程池执行, 线程不被GC回收,主要是线程池的gc root还是有可达路径的。这里讲个

    2024年02月10日
    浏览(28)
  • 【Go】cgo: C compiler “gcc“ not found: exec: “gcc“: executable file not found in %PATH%

     cgo: C compiler \\\"gcc\\\" not found: exec: \\\"gcc\\\": executable file not found in %PATH% 下载符合自己系统版本的压缩包,MinGW-w64 - for 32 and 64 bit Windows - Browse /mingw-w64 at SourceForge.net  下载这个版本:  解压缩放入C:Program Files目录下:  配置环境变量: 设置环境变量 path  打开cmd窗口, 输入 gcc -v 能看

    2024年02月13日
    浏览(38)
  • 深入分析 Java、Kotlin、Go 的线程和协程

    Go 语言比 Java 语言性能优越的一个原因,就是轻量级线程 Goroutines (协程Coroutine)。本篇文章深入分析下 Java 的线程和 Go 的协程。 协程并不是 Go 提出来的新概念,其他的一些编程语言,例如:Go、Python 等都可以在语言层面上实现协程,甚至是 Java,也可以通过使用扩展库来

    2024年02月01日
    浏览(31)
  • 【调试记录】QT中使用多线程导致的死锁

    原因在于第18行采用阻塞队列的连接方式。 子线程在第17行获取到锁,主线程刚好运行到24行准备获取锁。此时子线程执行第18行,阻塞调用等待主线程执行 qDebug() \\\"invokeMethod:\\\" ++count_; 完成。 子线程已经获取到锁,主线程等待获取锁,子线程又等待主线程事件循环执行函数,由

    2023年04月16日
    浏览(30)
  • CountDownLatch使用错误+未最终断开连接导致线程池资源耗尽

            我设置了CountDownLatch对线程的协作做出了一些限制,但是我发现运行一段时间以后便发现定时任务不运行了。 具体代码: 报错以后定时任务不运行了   打印线程日志发现定时任务的线程在第86行代码停着不动了。 正常的线程日志应该是这样的。 查看第86行代码,

    2024年04月24日
    浏览(28)
  • GO语言:Worker Pools线程池、Select语句、Metex互斥锁详细示例教程

    技术小白记录学习过程,有错误或不解的地方请指出,如果这篇文章对你有所帮助请点点赞收藏+关注谢谢支持 !!!

    2024年02月11日
    浏览(32)
  • Java/Python/Go不同开发语言在进程、线程和协程的设计差异

    在多线程项目开发时,最常用、最常遇到的问题是 1,线程、协程安全 2,线程、协程间的通信和控制 本文主要探讨不同开发语言go、java、python在进程、线程和协程上的设计和开发方式的异同。 进程 进程是 操作系统进行资源分配的基本单位,每个进程都有自己的独立内存空

    2024年01月23日
    浏览(38)
  • 线程池ThreadPoolExecutor底层原理源码分析

    ThreadPoolExecutor中提供了两种执行任务的方法: void execute(Runnable command) Future? submit(Runnable task) 实际上submit中最终还是调用的execute()方法,只不过会返回⼀个Future对象,用来获取任务执行结果: execute(Runnable command)方法执行时会分为三步: 注意:提交⼀个Runnable时,不管当前线程

    2024年02月06日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包