go-zero数据库连接池 database/sql 源码学习

这篇具有很好参考价值的文章主要介绍了go-zero数据库连接池 database/sql 源码学习。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

database/sql 中接口的层级关系https://draveness.me/golang/docs/part4-advanced/ch09-stdlib/golang-database-sql/


database/sql源码地址
https://github.com/golang/go/tree/release-branch.go1.17/src/database/sql

go-zero数据库连接池源码地址

https://github.com/zeromicro/go-zero/blob/master/core/stores/sqlx/sqlmanager.go

滑动验证页面

一.go zero 中数据库连接池

gozero默认 最大空闲连接数64个,最大连接数64个,连接空闲时间 一分钟;

const (
	maxIdleConns = 64
	maxOpenConns = 64
	maxLifetime  = time.Minute
)

新建连接

func newDBConnection(driverName, datasource string) (*sql.DB, error) {
	conn, err := sql.Open(driverName, datasource)
	if err != nil {
		return nil, err
	}

	// we need to do this until the issue https://github.com/golang/go/issues/9851 get fixed
	// discussed here https://github.com/go-sql-driver/mysql/issues/257
	// if the discussed SetMaxIdleTimeout methods added, we'll change this behavior
	// 8 means we can't have more than 8 goroutines to concurrently access the same database.
	conn.SetMaxIdleConns(maxIdleConns)
	conn.SetMaxOpenConns(maxOpenConns)
	conn.SetConnMaxLifetime(maxLifetime)

	if err := conn.Ping(); err != nil {
		_ = conn.Close()
		return nil, err
	}

	return conn, nil
}

二 数据库句柄DB结构

  • sql包自动创建和释放连接,并且是线程安;
  • 它还维护空闲连接的空闲池,能自动创建和释放连接。
  • 调用DB.Begin返回的Tx绑定到单个连接。一旦提交或对事务调用回滚,该事务的连接返回到DB的空闲连接池。
  • 连接池池大小可以使用SetMaxIdleConns进行控制。
type DB struct {
	// Total time waited for new connections.
	waitDuration atomic.Int64

	connector driver.Connector
	// numClosed is an atomic counter which represents a total number of
	// closed connections. Stmt.openStmt checks it before cleaning closed
	// connections in Stmt.css.
	numClosed atomic.Uint64

	mu           sync.Mutex    // protects following fields
	freeConn     []*driverConn // free connections ordered by returnedAt oldest to newest
	connRequests map[uint64]chan connRequest
	nextRequest  uint64 // Next key to use in connRequests.
	numOpen      int    // number of opened and pending open connections
	// Used to signal the need for new connections
	// a goroutine running connectionOpener() reads on this chan and
	// maybeOpenNewConnections sends on the chan (one send per needed connection)
	// It is closed during db.Close(). The close tells the connectionOpener
	// goroutine to exit.
	openerCh          chan struct{}
	closed            bool
	dep               map[finalCloser]depSet
	lastPut           map[*driverConn]string // stacktrace of last conn's put; debug only
	maxIdleCount      int                    // zero means defaultMaxIdleConns; negative means 0
	maxOpen           int                    // <= 0 means unlimited
	maxLifetime       time.Duration          // maximum amount of time a connection may be reused
	maxIdleTime       time.Duration          // maximum amount of time a connection may be idle before being closed
	cleanerCh         chan struct{}
	waitCount         int64 // Total number of connections waited for.
	maxIdleClosed     int64 // Total number of connections closed due to idle count.
	maxIdleTimeClosed int64 // Total number of connections closed due to idle time.
	maxLifetimeClosed int64 // Total number of connections closed due to max connection lifetime limit.

	stop func() // stop cancels the connection opener.
}

三.获取数据库句柄

sql.Open 接收 driverName 和 dataSourceName 作为入参,前者用于在全局 driver map 中查找对应的驱动实现,

func Open(driverName, dataSourceName string) (*DB, error) {
	driversMu.RLock()
	driveri, ok := drivers[driverName]
	driversMu.RUnlock()
	if !ok {
		return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
	}

	if driverCtx, ok := driveri.(driver.DriverContext); ok {
		connector, err := driverCtx.OpenConnector(dataSourceName)
		if err != nil {
			return nil, err
		}
		return OpenDB(connector), nil
	}

	return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}

2.1 driver map

sql.OpenDBdriveri map 中存储数据库对应的驱动实现,通过Register 函数来写入的 。


var (
	driversMu sync.RWMutex
	drivers   = make(map[string]driver.Driver)
)

// nowFunc returns the current time; it's overridden in tests.
var nowFunc = time.Now

// Register makes a database driver available by the provided name.
// If Register is called twice with the same name or if driver is nil,
// it panics.
func Register(name string, driver driver.Driver) {
	driversMu.Lock()
	defer driversMu.Unlock()
	if driver == nil {
		panic("sql: Register driver is nil")
	}
	if _, dup := drivers[name]; dup {
		panic("sql: Register called twice for driver " + name)
	}
	drivers[name] = driver
}
2.2 sql.OpenDB

sql.OpenDB 做的事情很简单,只验证参数,不会创建数据库连接。要验证数据源名称是否有效,请调用ping。

再另起一个 goroutine 调用 sql.DB 的 connectionOpener 方法后就结束。

方法结束,返回DB,返回的DB对于多个goroutine并发使用是安全的,并且维护其自己的空闲连接池。

func OpenDB(c driver.Connector) *DB {
	ctx, cancel := context.WithCancel(context.Background())
	db := &DB{
		connector:    c,
		openerCh:     make(chan struct{}, connectionRequestQueueSize),
		lastPut:      make(map[*driverConn]string),
		connRequests: make(map[uint64]chan connRequest),
		stop:         cancel,
	}

	go db.connectionOpener(ctx)

	return db
}

四.获取连接

sql.DB 的连接是延迟建立的,需要用到连接时才会去创建一条连接。通常是通过 sql.DB数据库交互的时候才会创建连接,这里的交互指的是pingContext 、queryContext 、exexContext。

func (db *DB) Ping() error {
	return db.PingContext(context.Background())
}
3.1 PingContext
func (db *DB) PingContext(ctx context.Context) error {
	var dc *driverConn
	var err error

	err = db.retry(func(strategy connReuseStrategy) error {
		dc, err = db.conn(ctx, strategy)
		return err
	})

	if err != nil {
		return err
	}

	return db.pingDC(ctx, dc, dc.releaseConn)
}

先最多尝试 maxBadConnRetries 次以 cachedOrNewConn 这个策略调用一个非导出函数,如果均失败且失败原因是 driver.ErrBadConn,那么尝试以 alwaysNewConn 这个策略调用同样的函数。

const maxBadConnRetries = 2

func (db *DB) retry(fn func(strategy connReuseStrategy) error) error {
	for i := int64(0); i < maxBadConnRetries; i++ {
		err := fn(cachedOrNewConn)
		// retry if err is driver.ErrBadConn
		if err == nil || !errors.Is(err, driver.ErrBadConn) {
			return err
		}
	}

	return fn(alwaysNewConn)
}

连接成功后,会设置连接的空闲时间,并把连接放入空闲连接数组(DB.freeConn)。

五.将连接放回连接池

当 DB.PingDC 结束时,releaseConn 就会被调用,而这个方法的逻辑很简单,仅仅只是调用DB.putConn方法。

func (dc *driverConn) releaseConn(err error) {
	dc.db.putConn(dc, err, true)
}
// putConn adds a connection to the db's free pool.
// err is optionally the last error that occurred on this connection.
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
	if !errors.Is(err, driver.ErrBadConn) {
		if !dc.validateConnection(resetSession) {
			err = driver.ErrBadConn
		}
	}
	db.mu.Lock()
	if !dc.inUse {
		db.mu.Unlock()
		if debugGetPut {
			fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
		}
		panic("sql: connection returned that was never out")
	}

	if !errors.Is(err, driver.ErrBadConn) && dc.expired(db.maxLifetime) {
		db.maxLifetimeClosed++
		err = driver.ErrBadConn
	}
	if debugGetPut {
		db.lastPut[dc] = stack()
	}
	dc.inUse = false
	dc.returnedAt = nowFunc()

	for _, fn := range dc.onPut {
		fn()
	}
	dc.onPut = nil

	if errors.Is(err, driver.ErrBadConn) {
		// Don't reuse bad connections.
		// Since the conn is considered bad and is being discarded, treat it
		// as closed. Don't decrement the open count here, finalClose will
		// take care of that.
		db.maybeOpenNewConnections()
		db.mu.Unlock()
		dc.Close()
		return
	}
	if putConnHook != nil {
		putConnHook(db, dc)
	}
	added := db.putConnDBLocked(dc, nil)
	db.mu.Unlock()

	if !added {
		dc.Close()
		return
	}
}

六.新建连接 DB.conn

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
	db.mu.Lock()
	if db.closed {
		db.mu.Unlock()
		return nil, errDBClosed
	}
	// Check if the context is expired.
	select {
	default:
	case <-ctx.Done():
		db.mu.Unlock()
		return nil, ctx.Err()
	}
	lifetime := db.maxLifetime

	// Prefer a free connection, if possible.
	last := len(db.freeConn) - 1
	if strategy == cachedOrNewConn && last >= 0 {
		// Reuse the lowest idle time connection so we can close
		// connections which remain idle as soon as possible.
		conn := db.freeConn[last]
		db.freeConn = db.freeConn[:last]
		conn.inUse = true
		if conn.expired(lifetime) {
			db.maxLifetimeClosed++
			db.mu.Unlock()
			conn.Close()
			return nil, driver.ErrBadConn
		}
		db.mu.Unlock()

		// Reset the session if required.
		if err := conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {
			conn.Close()
			return nil, err
		}

		return conn, nil
	}

	// Out of free connections or we were asked not to use one. If we're not
	// allowed to open any more connections, make a request and wait.
	if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
		// Make the connRequest channel. It's buffered so that the
		// connectionOpener doesn't block while waiting for the req to be read.
		req := make(chan connRequest, 1)
		reqKey := db.nextRequestKeyLocked()
		db.connRequests[reqKey] = req
		db.waitCount++
		db.mu.Unlock()

		waitStart := nowFunc()

		// Timeout the connection request with the context.
		select {
		case <-ctx.Done():
			// Remove the connection request and ensure no value has been sent
			// on it after removing.
			db.mu.Lock()
			delete(db.connRequests, reqKey)
			db.mu.Unlock()

			db.waitDuration.Add(int64(time.Since(waitStart)))

			select {
			default:
			case ret, ok := <-req:
				if ok && ret.conn != nil {
					db.putConn(ret.conn, ret.err, false)
				}
			}
			return nil, ctx.Err()
		case ret, ok := <-req:
			db.waitDuration.Add(int64(time.Since(waitStart)))

			if !ok {
				return nil, errDBClosed
			}
			// Only check if the connection is expired if the strategy is cachedOrNewConns.
			// If we require a new connection, just re-use the connection without looking
			// at the expiry time. If it is expired, it will be checked when it is placed
			// back into the connection pool.
			// This prioritizes giving a valid connection to a client over the exact connection
			// lifetime, which could expire exactly after this point anyway.
			if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
				db.mu.Lock()
				db.maxLifetimeClosed++
				db.mu.Unlock()
				ret.conn.Close()
				return nil, driver.ErrBadConn
			}
			if ret.conn == nil {
				return nil, ret.err
			}

			// Reset the session if required.
			if err := ret.conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {
				ret.conn.Close()
				return nil, err
			}
			return ret.conn, ret.err
		}
	}

	db.numOpen++ // optimistically
	db.mu.Unlock()
	ci, err := db.connector.Connect(ctx)
	if err != nil {
		db.mu.Lock()
		db.numOpen-- // correct for earlier optimism
		db.maybeOpenNewConnections()
		db.mu.Unlock()
		return nil, err
	}
	db.mu.Lock()
	dc := &driverConn{
		db:         db,
		createdAt:  nowFunc(),
		returnedAt: nowFunc(),
		ci:         ci,
		inUse:      true,
	}
	db.addDepLocked(dc, dc)
	db.mu.Unlock()
	return dc, nil
}

七.过期连接清理

当通过 DB.SetConnMaxLifetime 设置 DB.maxLifetime 或通过 DB.SetConnMaxIdleTime 设置 db.maxIdleTime 时,DB均会调用 DB.startCleanerLocked,这个函数的作用是按需初始化 DB.cleanerCh,然后新起一个协程调用 DB.connectionCleaner。文章来源地址https://www.toymoban.com/news/detail-738779.html

// startCleanerLocked starts connectionCleaner if needed.
func (db *DB) startCleanerLocked() {
	if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
		db.cleanerCh = make(chan struct{}, 1)
		go db.connectionCleaner(db.shortestIdleTimeLocked())
	}
}

func (db *DB) connectionCleaner(d time.Duration) {
	const minInterval = time.Second

	if d < minInterval {
		d = minInterval
	}
	t := time.NewTimer(d)

	for {
		select {
		case <-t.C:
		case <-db.cleanerCh: // maxLifetime was changed or db was closed.
		}

		db.mu.Lock()

		d = db.shortestIdleTimeLocked()
		if db.closed || db.numOpen == 0 || d <= 0 {
			db.cleanerCh = nil
			db.mu.Unlock()
			return
		}

		d, closing := db.connectionCleanerRunLocked(d)
		db.mu.Unlock()
		for _, c := range closing {
			c.Close()
		}

		if d < minInterval {
			d = minInterval
		}

		if !t.Stop() {
			select {
			case <-t.C:
			default:
			}
		}
		t.Reset(d)
	}
}

总结 

到了这里,关于go-zero数据库连接池 database/sql 源码学习的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【go-zero】go-zero分布式锁实战 | apifox测试go-zero分布式锁方式

    包地址:github.com/zeromicro/go-zero/core/stores/redis 使用场景: 为了防止并发的下载相同的excel 我们通过redis锁来控制请求相同的excel下载 个人思路: req为API传入的请求参数 然后加密成md5的字符串,这样可以处理 相同的请求

    2024年02月15日
    浏览(41)
  • IDEA自带的Database连接达梦数据库

    达梦数据库无法使用Navicat,只能使用自带的数据库管理工具,但是非常难用,所以想使用IDEA自带的Database连接达梦数据库。 首先下载达梦jdbc驱动包 选择Driver 确定达梦驱动 从datasource选择达梦数据库 填写数据源信息 上面的URL就是jdbc连接的url。 点击测试连接 成功,即可连接

    2024年02月13日
    浏览(46)
  • 【go-zero】docker镜像直接部署go-zero的API与RPC服务 如何实现注册发现?docker network 实现 go-zero 注册发现

    使用docker直接部署go-zero微服务会发现API无法找到RPC服务 用docker直接部署 我们会发现API无法注册发现RPC服务 原因是我们缺少了docker的network网桥 RPC服务运行正常 API服务启动,通过docker logs 查看日志还是未发现RPC API的yaml配置 RPC服务的IP是 127.0.0.1 与对应的端口 下图为改成了定

    2024年02月13日
    浏览(45)
  • Database Connection Pool 数据库连接池-01-概览及简单手写实现

    第一节 从零开始手写 mybatis(一)MVP 版本。 第二节 从零开始手写 mybatis(二)mybatis interceptor 插件机制详解 第三节 从零开始手写 mybatis(三)jdbc pool 从零实现数据库连接池 第四节 从零开始手写 mybatis(四)- mybatis 事务管理机制详解 由于数据库连接得到重用,避免了频繁创

    2024年03月14日
    浏览(92)
  • Tomcat DBCP(Database Connection Pool) 数据库连接池入门介绍

    从零开始手写 mybatis (三)jdbc pool 如何从零手写实现数据库连接池 dbcp? 万字长文深入浅出数据库连接池 HikariCP/Commons DBCP/Tomcat/c3p0/druid 对比 Database Connection Pool 数据库连接池概览 c3p0 数据池入门使用教程 alibaba druid 入门介绍 数据库连接池 HikariCP 性能为什么这么快? Apache

    2024年03月16日
    浏览(63)
  • 【go-zero】go-zero阿里云oss 前端上传文件到go-zero API服务 并在k8s pod中创建文件 并推送到阿里云oss 最佳实践

    问题:在本地通过上传文件,然后将文件推送到aliyun的oss中,是没问题的 但是部署到了k8s中,则出现了问题,一直报错没有创建的权限 思路:开始认为应该将该文件挂载到configmap中,然后通过这种方式修改了deployment和dockerfile。最终发现应该是go的创建文件路径方式搞错了,

    2024年02月13日
    浏览(46)
  • go-zero系列:接入Prometheus

    参考文档:https://zhuanlan.zhihu.com/p/463418864 https://prometheus.io/download/ 进入下载文件夹,比如prometheus-2.44.0.windows-amd64。 然后双击Prometheus.exe启动软件。 启动后,可以访问 http://127.0.0.1:9090/。就能查看Prometheus后台。 然后重启go-zero项目,能看到输出日志:Starting prometheus agent at 0.0.

    2024年02月16日
    浏览(39)
  • go-zero学习 第一章 基础

    因官网重新改版,本文是基于官网最新版本的文档并整合旧文档重新进行全面总结、归纳。 本文主要对官网 快速开始 进行提炼总结,未涉及部分将在后续章节陆续补充完善。 go-zero 的 goctl 工具下载 验证 goctl 的安装结果: goctl 一键安装 protoc 、 protoc-gen-go 、 protoc-gen-go-grp

    2024年02月09日
    浏览(47)
  • 使用go-zero快速构建微服务

    本文是对 使用go-zero快速构建微服务 [1] 的亲手实践 编写API Gateway代码 mkdir api goctl api -o api/bookstore.api cd api goctl api go -api bookstore.api -dir . go run bookstore.go -f etc/bookstore-api.yaml 启动API Gateway服务,默认侦听在8888端口 因为默认生成的 api/etc/bookstore-api.yml 为: 按提示下载,再次运行

    2024年02月13日
    浏览(64)
  • go-zero 开发之安装 etcd

    本文只涉及 Linux 上的安装。 二进制安装 下载二进制安装包 下载地址示例: 解压二进制安装包 删除二进制安装包 版本检查 启动 etcd 往 etcd 写读数据 Docker 安装 etcd 主要使用 Google 容器注册表(gcr.io)下的 gcr.io/etcd-development/etcd 仓库来存储其容器镜像。作为次要选项,它还使

    2024年02月04日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包