(二十七)Flask之数据库连接池DBUtils库

这篇具有很好参考价值的文章主要介绍了(二十七)Flask之数据库连接池DBUtils库。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

每篇前言:

  • 🏆🏆作者介绍:【孤寒者】—CSDN全栈领域优质创作者、HDZ核心组成员、华为云享专家Python全栈领域博主、CSDN原力计划作者

  • 🔥🔥本文已收录于Flask框架从入门到实战专栏:《Flask框架从入门到实战》
  • 🔥🔥热门专栏推荐:《Python全栈系列教程》、《爬虫从入门到精通系列教程》、《爬虫进阶+实战系列教程》、《Scrapy框架从入门到实战》、《Flask框架从入门到实战》、《Django框架从入门到实战》、《Tornado框架从入门到实战》、《前端系列教程》。
  • 📝​📝本专栏面向广大程序猿,为的是大家都做到Python全栈技术从入门到精通,穿插有很多实战优化点。
  • 🎉🎉订阅专栏后可私聊进一千多人Python全栈交流群(手把手教学,问题解答); 进群可领取Python全栈教程视频 + 多得数不过来的计算机书籍:基础、Web、爬虫、数据分析、可视化、机器学习、深度学习、人工智能、算法、面试题等。
  • 🚀🚀加入我一起学习进步,一个人可以走的很快,一群人才能走的更远!

(二十七)Flask之数据库连接池DBUtils库,多种技术合集,flask,数据库,python,DBUtils库,mysql

引子:

在上一个demo项目中,登录部分验证是直接写死的,本文模拟实际生产,查询MySQL数据库做验证。

一个很low的方法是:
项目根目录下创建utils/sql.py:

import pymysql


class SQLHelper(object):

    @staticmethod
    def open():
        conn = pymysql.connect(host='127.0.0.1', port=3306, password='123456', db='UserInfo')
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        return conn, cursor

    @staticmethod
    def close(conn, cursor):
        conn.commit()
        cursor.close()
        conn.close()

    @classmethod
    def fetch_one(cls, sql, args):
        conn, cursor = cls.open()
        cursor.execute(sql, args)
        obj = cursor.fetchone()
        cls.close(conn, cursor)
        return obj

(二十七)Flask之数据库连接池DBUtils库,多种技术合集,flask,数据库,python,DBUtils库,mysql

上面这种用法很严重且明显的一个问题是: 每次登录一次都要和数据库创建一个连接!

解决方法就是使用DBUtils三方库:

DBUtils库

pip install DBUtils==1.3

DBUtils 是一套用于管理数据库连接池的Python包,为高频度高并发的数据库访问提供更好的性能,可以自动管理连接对象的创建和释放。并允许对非线程安全的数据库接口进行线程安全包装。

这种连接池有两种连接模式:

  1. PersistentDB:提供线程专用的数据库连接,并自动管理连接。

    为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭!

  2. PooledDB:提供线程间可共享的数据库连接,并自动管理连接。

    创建一批连接到连接池,供所有线程共享使用。

    PS:由于pymysql的threadsafety值为1,而DBUtils库用的内部又是用的pymysql,所以该模式连接池中的线程会被所有线程共享。

实测证明 PersistentDB 的速度是最高的(即第一种模式),但是在某些特殊情况下,数据库的连接过程可能异常缓慢,而此时的PooledDB(即模式二,所以推荐这个)则可以提供相对来说平均连接时间比较短的管理方式。

模式一(底层使用threading.local实现):

  • 了解即可~~~
from DBUtils.PersistentDB import PersistentDB
import pymysql

POOL = PersistentDB(
    creator=pymysql,       # 连接数据库的模块
    maxusage=None,         # 一个数据库连接最多被重复使用的次数,None表示无限制
    setsession=[],         # 开始会话前执行的命令列表。如["set time zone ..."]
    ping=0,                # ping MySQL客户端,检查服务是否可用。  【一般用0,4,7】
    # 0 = None = never;  1 = default = whenever if is requested;  2 = when a cursor is created; 4 = when a query is executed;  7 = always
    closeable=False,       # False:conn.close()实际上被忽略,供下次使用,在线程关闭时,才会自动关闭连接;True:conn.close()则关闭连接,再次调用就是一个新的连接了
    threadlocal=None,      # 本线程独享值的对象,用于保存链接对象,如果链接对象被重置,也清除
    host='127.0.0.1',
    port=6379,
    user='root',
    password='123456',
    database='UserInfo',
    charset='utf8'
)

使用:

def demo():
    conn = POOL.connection(shareable=False)
    cursor = conn.cursor()
    cursor.execute('select * from users')
    result = cursor.fetchall()
    cursor.close()  
    conn.close()
    

模式二:

  • 用这个~~~
from DBUtils.PooledDB import PooledDB
import pymysql

POOL = PooledDB(
    creator=pymysql,       # 连接数据库的模块
    maxconnections=6,      # 连接池允许的最大连接数, 0和None表示不限制
    mincached=2,           # 初始化时,连接池中至少创建的空闲的连接,0表示不创建
    maxcached=5,           # 连接池中最多闲置的连接,0和None不限制
    maxshared=3,           # 连接池中最多共享的连接数量,0和None表示全部共享【默认为0,而且哪怕设置别的值也无用!!!下面会将为啥】
    blocking=True,         # 连接池中如果没有可用连接后,是否阻塞等待。True:等待;False:不等待直接报错
    maxusage=None,         # 一个连接最多被重复使用的次数,None表示无限制
    setsession=[],         # 开始会话前执行的命令列表。如["set time zone ..."]
    ping=0,                # ping MySQL客户端,检查服务是否可用。  【一般用0,4,7】
    # 0 = None = never;  1 = default = whenever if is requested;  2 = when a cursor is created; 4 = when a query is executed;  7 = always

    host='127.0.0.1',
    port=6379,
    user='root',
    password='123456',
    database='UserInfo',
    charset='utf8'
)


  • 为什么设置maxshared参数的值是无用的!因为DBUtils使用的pymysql,而pymysql内部threadsafety值为1。看源码:

    import pymysql
    

    (二十七)Flask之数据库连接池DBUtils库,多种技术合集,flask,数据库,python,DBUtils库,mysql

    DBUtils源码:

    from DBUtils.PooledDB import PooledDB
    

    (二十七)Flask之数据库连接池DBUtils库,多种技术合集,flask,数据库,python,DBUtils库,mysql

使用:

def demo():
	"""
	检测当前正在运行连接数是否小于最大连接数;如果不小于则等待或报错:raise TooManyConnection异常
	否则则优先去初始化时创建的连接中获取连接:SteadyDBConnection
	然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
	如果最开始创建的连接没有连接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
	
	一旦关闭连接后,连接就返回到连接池让后续线程继续使用~
	"""
    conn = POOL.connection()
    
    cursor = conn.cursor()
    cursor.execute('select * from users')
    result = cursor.fetchall()
    conn.close()

Flask中使用

方式一:直接将DBUtils初始化放到settings.py文件中

(二十七)Flask之数据库连接池DBUtils库,多种技术合集,flask,数据库,python,DBUtils库,mysql
(二十七)Flask之数据库连接池DBUtils库,多种技术合集,flask,数据库,python,DBUtils库,mysql

方式二:从utils文件夹中导入

(二十七)Flask之数据库连接池DBUtils库,多种技术合集,flask,数据库,python,DBUtils库,mysql
(二十七)Flask之数据库连接池DBUtils库,多种技术合集,flask,数据库,python,DBUtils库,mysql文章来源地址https://www.toymoban.com/news/detail-849901.html

脚本使用DBUtils代码demo:

# coding=utf-8
"""
使用DBUtils数据库连接池中的连接,操作数据库
"""
import datetime
import pymysql
from DBUtils.PooledDB import PooledDB


class MysqlClient(object):
    def __init__(self, **kwargs):
        self.pool = self.create_pool(**kwargs)
        self.connection = None
        self.cursor = None

    def create_pool(self, **kwargs):
        return PooledDB(
            pymysql,
            mincached=kwargs.get('mincached', 10),
            maxcached=kwargs.get('maxcached', 20),
            maxshared=kwargs.get('maxshared', 10),
            maxconnections=kwargs.get('maxconnections', 200),
            blocking=kwargs.get('blocking', True),
            maxusage=kwargs.get('maxusage', 100),
            setsession=kwargs.get('setsession', None),
            reset=kwargs.get('reset', True),
            host=kwargs.get('host', '127.0.0.1'),
            port=kwargs.get('port', 3306),
            db=kwargs.get('db', 'mysqldemo'),
            user=kwargs.get('user', 'root'),
            passwd=kwargs.get('passwd', '123456'),
            charset=kwargs.get('charset', 'utf8mb4'),
            cursorclass=pymysql.cursors.DictCursor
        )

    def get_conn_cursor(self):
        self.connection = self.pool.connection()
        self.cursor = self.connection.cursor()

    def close(self):
        try:
            if self.cursor:
                self.cursor.close()
            if self.connection:
                self.connection.close()
        except Exception as e:
            print(e)

    def execute(self, sql, param=()):
        try:
            self.get_conn_cursor()
            count = self.cursor.execute(sql, param)
            print(count)
            return count
        finally:
            self.close()

    def __dict_datetime_obj_to_str(self, result_dict):
        """把字典里面的datetime对象转成字符串,使json转换不出错"""
        if result_dict:
            return {k: v.__str__() if isinstance(v, datetime.datetime) else v for k, v in result_dict.items()}
        return result_dict

    def select_one(self, sql, param=()):
        """查询单个结果"""
        try:
            self.get_conn_cursor()
            count = self.execute(sql, param)
            result = self.cursor.fetchone()
            result = self.__dict_datetime_obj_to_str(result)
            return count, result
        finally:
            self.close()

    def select_many(self, sql, param=()):
        """查询多个结果"""
        try:
            self.get_conn_cursor()
            count = self.execute(sql, param)
            result = self.cursor.fetchall()
            result = [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
            return count, result
        finally:
            self.close()

    def begin(self):
        """开启事务"""
        try:
            self.get_conn_cursor()
            self.connection.autocommit(False)
        except Exception as e:
            print(e)

    def end(self, option='commit'):
        """结束事务"""
        try:
            if option == 'commit':
                self.connection.commit()
            else:
                self.connection.rollback()
        except Exception as e:
            print(e)
        finally:
            self.connection.autocommit(True)


if __name__ == "__main__":
    mc = MysqlClient()
    sql1 = 'SELECT * FROM customers  WHERE  customerNumber = 103'
    result1 = mc.select_one(sql1)
    print(result1[1])

    sql2 = 'SELECT * FROM customers  WHERE  customerNumber IN (%s,%s,%s)'
    param = (103, 144, 145)
    print(mc.select_many(sql2, param)[1])

到了这里,关于(二十七)Flask之数据库连接池DBUtils库的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 【Flask 连接数据库,使用Flask-Migrate实现数据库迁移及问题汇总】

    Flask 连接数据库,使用Flask-Migrate实现数据库迁移 安装Flask-Migrate插件 使用Flask-Migrate步骤 app.py主要用于数据库连接 model.py 中导入了 db,作用是存储一个User类 ,用于生成表头。 manager.py用于数据迁移管理,运行后将生成一个文件夹。 Flask-Migrate运行 问题汇总 问题一:flask_mig

    2024年01月16日
    浏览(25)
  • flask创建数据库连接池

    flask创建数据库连接池

    flask创建数据库连接池 在Python中,您可以使用 Flask-SQLAlchemy 这个扩展来创建一个数据库连接池。Flask-SQLAlchemy 是一个用于 Flask 框架的 SQLAlchemy 操作封装,实现了 ORM(Object Relational Mapper)。ORM 主要用于将类与数据库中的表建立映射关系,使得我们可以非常方便地通过操作类来操作

    2024年02月15日
    浏览(8)
  • flask笔记 02 | Flask数据库连接(sqlite、mysql)

    flask笔记 02 | Flask数据库连接(sqlite、mysql)

    Flask没有指定使用的数据库,不像django提供了orm数据库抽象层,可以直接采用对象的方式操作数据库。但为了开发效率,在开发Flask项目中一般会选择 SQLALchemy 来操作数据库,类似django的ORM. SQLALchemy实际是对数据库的抽象,让开发者不直接使用sql语句进行开发,而是通过Pytho

    2024年01月25日
    浏览(10)
  • flask-sqlalchemy连接数据库

    flask-sqlalchemy连接数据库

    1、安装flask_sqlalchemy和pymysql包 2、进行配置 使用Flask-SQLAlchemy扩展操作数据库,首先需要通过URL建立数据库连接,必须保存到Flask配置对象的SQLALCHEMY_DATABASE_URI中。 其中HOSTNAME为路由地址 PORT为端口号,我在创建数据库时没有更改,所以用的默认的3306端口号 DATABASE为数据库名字

    2024年02月06日
    浏览(10)
  • flask-session、数据库连接池

    flask-session、数据库连接池

    flask 自带session---》以cookie的形式放到了浏览器中---》加密 真正的session,是在服务端存储     -django中存在djangosession表中     -flask中,使用第三方,保存在---》redis中---》flask-session 使用步骤 高版本的flask出现一个问题:         用高版本:在app中放一个参数  app.sessio

    2024年02月14日
    浏览(6)
  • 如何使用Flask-SQLAlchemy来管理数据库连接和操作数据?

    首先,我们需要安装Flask-SQLAlchemy。你可以使用pip来安装它,就像这样: 好了,现在我们已经有了一个可以操作数据库的工具,接下来让我们来看看如何使用它吧! 首先,我们需要创建一个Flask应用对象,并配置数据库的连接字符串和师: 现在,我们已经创建了一个数据库,

    2024年02月15日
    浏览(11)
  • 一百四十七、Kettle——Linux上安装的kettle8.2连接ClickHouse数据库

    一百四十七、Kettle——Linux上安装的kettle8.2连接ClickHouse数据库

    kettle8.2在Linux安装好后,需要与ClickHouse数据库建立连接 https://pan.baidu.com/s/1iqGyXsTaQSCHEbjj7yX7AA 提取码: mvzd   注意 : clickhouse-plugins文件里就是自定义的驱动jar包 注意: 要知道Linux系统架构是64位还是32位, 它们所属的Linux文件夹不同 到这里,Linux安装的kettle8.2就可以与ClickHou

    2024年02月13日
    浏览(15)
  • 将网页数据读入数据库+将数据库数据读出到网页——基于python flask实现网页与数据库的交互连接【全网最全】

    将网页数据读入数据库+将数据库数据读出到网页——基于python flask实现网页与数据库的交互连接【全网最全】

    【全网最全!保姆级教学!!】 本篇博客基于flask技术,实现数据库和网页端的交互。 实现效果:在网页端输入数据,能够将数据存入数据库。反向操作时,能将数据库的数据取出,显示在网页端。不仅如此,还支持数据的查询和修改。  读取网页数据存入数据库,效果如下

    2024年02月13日
    浏览(13)
  • python Flask项目使用SQLalchemy连接数据库时,出现RuntimeError:Working outside of application context.的解决过程记录

    在使用python的Flask框架跟着教程编写项目时,我跟着教程使用了三个文件来组织,分别是main.py(主程序),module.py(数据库模型),controller.py(蓝图模块程序,用Blueprint衔接) 在主程序中,创建app、SQLalchemy实例对象db并将二者绑定 在module.py中,导入主程序中的db和app,创建

    2024年02月09日
    浏览(8)
  • 数据库——多种方法导入Excel数据

    数据库——多种方法导入Excel数据

    接下来就一直点击NEXT,直到完成 此时EXCEL的数据就被导入进SQL Server了: 这个技巧就是直接使用复制粘贴的方式: 注意:这种方法只适用于添加少量数据,如果是 几十万行 的数据,是无法这样导入的。 如图,我们在Excel当中直接复制数据: 右键PTYPES表,选择编辑前2002行:

    2024年02月04日
    浏览(10)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包