python sqlite3 线程池封装

这篇具有很好参考价值的文章主要介绍了python sqlite3 线程池封装。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 封装 sqlite3

1.1. 依赖包引入

# -*- coding: utf-8 -*-
#import os
import sys
import datetime
import logging

import sqlite3

1.2. 封装类

class SqliteTool(object):
    #def __init__(self, host, port, user, password, database):
    def __init__(self, host, database):
        self._host = host
        #self._port = port
        #self._user = user
        #self._password = password
        self._database = database
        self._pool = None
        self._maxconns = 6  # 连接池中最多有多少个连接
        print("__init__", self._database)

1.3. 连接池操作

    def init_pool(self):
        '''初始化连接池'''
        try:
            logging.info('Begin to create {0} postgresql pool on:{1}.\n'.format(self._host, datetime.datetime.now()))
            pool = []
            for _ in range(self._maxconns):
                # check_same_thread=False 支持多线程
                conn = sqlite3.connect(self._database, check_same_thread=False)
                pool.append(conn)
            self._pool = pool
            #print("init_pool", self._maxconns, len(self._pool), self._pool)
            logging.info('SUCCESS: create {0} postgresql pool success on {1}.\n'.format(self._host, datetime.datetime.now()))
        except Exception as e:
            logging.error('ERROR: create {0} postgresql pool failed on {1}.\n'.format(self._host, datetime.datetime.now()))
            self.close_pool()
            sys.exit('ERROR: create postgresql pool error caused by {0}'.format(str(e)))

    def close_pool(self):
        '''关闭 pool'''
        if self._pool != None:
            for conn in self._pool:
                conn.close()

    def get_conn(self):
        if not self._pool:
            self.init_pool()
        return self._pool.pop()

    def close_conn(self, conn):
        if self._pool:
            self._pool.append(conn)

1.4. 增删改查

1.4.1. 创建表

    # 创建数据表
    def create_table(self, sql: str):
        """
        创建表
        :param sql: create sql语句
        :return: True表示创建表成功
        """
        print("create_table", sql)
        result = False
        try:
            conn = self.get_conn()
            cursor = conn.cursor()
            cursor.execute(sql)
            print("[create table success]")
            result = True
        except Exception as e:
            logging.error('ERROR: execute {0} causes error {1} in create_table'.format(sql, str(e)))
            sys.exit('ERROR: create table from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.commit()
            #conn.close()
            self.close_conn(conn)
        return result

1.4.2. 删除表

    # 删除数据表
    def drop_table(self, sql: str):
        """
        删除表
        :param sql: drop sql语句
        :return: True表示删除成功
        """
        result = False
        try:
            conn = self.get_conn()
            cursor = conn.cursor()
            cursor.execute(sql)
            print("[drop table success]")
            result = True
        except Exception as e:
            logging.error('ERROR: execute {0} causes error {1} in drop_table'.format(sql, str(e)))
            sys.exit('ERROR: drop table from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.commit()
            #conn.close()
            self.close_conn(conn)
        return result

1.4.3. 插入数据

    def exec_insert(self, sql):
        '''执行插入'''
        result = False
        try:
            conn = self.get_conn()
            cursor = conn.cursor()
            cursor.execute(sql)
            result =  True
        except Exception as e:
            logging.error('ERROR: execute {0} causes error {1} in exec_insert'.format(sql, str(e)))
            sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.commit()
            #conn.close()
            self.close_conn(conn)
        return result

    def exec_insert_plus(self, table: str, params: dict):
        '''执行插入'''
        result = False
        try:
            key_tup = tuple(params.keys())
            key_str = ",".join(key_tup)
            # different with psql
            val_str = ",".join(("?",)*len(key_tup))

            sql_str = "insert into " + table + " (" + key_str + ") values (" + val_str + ")"
            #val_tup = tuple(params.values())
            val_tup = ()
            for item in params.values():
                if type(item) == list:
                    val_tup += (json.dumps(item),)
                elif type(item) == str:
                    val_tup += (item,)
                elif type(item) == int:
                    val_tup += (item,)
                else:
                    val_tup += (item,)
                #val_tup.append(str(item))
            #print("exec_insert_plus", sql_str, val_tup)

            conn = self.get_conn()
            cursor = conn.cursor()
            cursor.execute(sql_str, val_tup)
            result =  True
        except Exception as e:
            logging.error('ERROR: execute {0} causes error {1} in exec_insert_plus'.format(table, str(e)))
            sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.commit()
            #conn.close()
            self.close_conn(conn)
        return result
      
    def exec_insert_many(self, table: str, params: List[Dict]):
        def dict_to_str(tab: str, param: Dict):
            key_tup = tuple(param.keys())
            key_str = ",".join(key_tup)
            # different with psql
            val_str = ",".join(("?",)*len(key_tup))
            sql_str = "insert into " + tab + " (" + key_str + ") values (" + val_str + ")"
            return sql_str

        def dict_to_tuple(param: Dict):
            val_tup = ()
            for item in param.values():
                if type(item) == list:
                    val_tup += (json.dumps(item),)
                elif type(item) == str:
                    val_tup += (item,)
                elif type(item) == int:
                    val_tup += (item,)
                else:
                    val_tup += (item,)
                #val_tup.append(str(item))
            return val_tup
            
        '''执行插入'''
        result = False
        if len(params) <= 0:
            return result
        try:
            sql_str = dict_to_str(table, params[0])
            val_lst = []
            for param in params:
                val_lst.append(dict_to_tuple(param))
            print("exec_insert_many", sql_str, val_lst)

            conn = self.get_conn()
            cursor = conn.cursor()
            cursor.executemany(sql_str, val_lst)
            result =  True
        except Exception as e:
            logging.error('ERROR: execute {0} causes error {1} in exec_insert'.format(sql_str, str(e)))
            sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.commit()
            #conn.close()
            self.close_conn(conn)
        return result

1.4.4. 删除数据

	# sql = "DELETE from users where user_id='83f7d86b594e4b26a7196ab761afcc7c';"
    def exec_delete(self, sql):
        '''执行查询'''
        result = False
        try:
            conn = self.get_conn()
            cursor = conn.cursor()
            cursor.execute(sql)
            result =  True
        except Exception as e:
            logging.error('ERROR: execute {0} causes error {1} in exec_delete'.format(sql, str(e)))
            sys.exit('ERROR: delete data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.commit()
            #conn.close()
            self.close_conn(conn)
        return result

1.4.5. 更新数据

	# 修改单个值
    # update tasks set status='running' where task_id='0791216839b04d5c88846817f78280cc';
    # 修改多个值
    # update tasks set status='running',score='10' where task_id='0791216839b04d5c88846817f78280cc';
    def exec_update(self, sql):
        '''执行更新'''
        result = False
        try:
            conn = self.get_conn()
            cursor = conn.cursor()
            cursor.execute(sql)
            result =  True
        except Exception as e:
            logging.error('ERROR: execute {0} causes error {1} in exec_update'.format(sql, str(e)))
            sys.exit('ERROR: update data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.commit()
            #conn.close()
            self.close_conn(conn)
        return result

1.4.6. 查询数据

    # select * from users where user_name='hello';
    def exec_select(self, sql):
        '''执行查询'''
        try:
            conn = self.get_conn()
            cursor = conn.cursor()
            cursor.execute(sql)
            result = cursor.fetchall()
            #result = cursor.fetchone()
        except Exception as e:
            logging.error('ERROR: execute {0} causes error {1} in exec_select'.format(sql, str(e)))
            sys.exit('ERROR: load data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            #conn.close()
            #print("exec_select", len(self._pool), self._pool)
            print("init_pool", self._maxconns, len(self._pool), self._pool)
            self.close_conn(conn)
        return result

1.4.7. 测试

    def test_select(self, sql):
        result = self.exec_select(sql)
        print("test_select", result)
        return result

2. 操作使用实例

# for test
from typing import Optional, List, Dict, Union
from pydantic import BaseModel, Field

class ......

if __name__ == '__main__':
    dbhost = ""
    dbdatabase = "./test.db"
    db = SqliteTool(dbhost, dbdatabase)

    class TaskInDB(BaseModel):
        task_id: str
        disabled: int

    def create_tests_table(db):
        sql_str = "create table if not exists tests("
        sql_str += "task_id char(32) primary key,"
        sql_str += "disabled int not null"
        sql_str += ");"
        return db.create_table(sql_str)

    def drop_tests_table(db):
        sql_str = "drop table if exists tests;"
        return db.drop_table(sql_str)

    def get_tests_indb(db, tasks: Union[List[str], str, None] = None):
        if tasks == None:
            sql_str = "select * from tests;"

        if type(tasks) == list:
            print("list", tasks)
            key_str = ",".join(tasks)
            sql_str = "select * from tests where task_id in (" + key_str + ");"
        elif type(tasks) == str:
            print("str", tasks)
            sql_str = "select * from tests where task_id ='"+tasks+"';"
        elif type(tasks) == None:
            print("none")

        ret_tasks = db.exec_select(sql_str)
        return ret_tasks
        
    def create_tests_indb(db, tasks: List[TaskInDB]):
        #return db.exec_insert_plus("tests", task_indb.model_dump())
        #return db.exec_insert_plus("tests", task_indb.dict())

        params = []
        for task in tasks:
            print("create_tests_indb", task)
            #params.append(task.model_dump())
            params.append(task.dict())
        return db.exec_insert_many("tests", params)

    #  重建数据库
    if not drop_tests_table(db) or not create_tests_table(db):
        print("ERROR")

    # 创建两条记录
    task_indb1 = TaskInDB(task_id="11111111", disabled=1)
    #create_tests_indb(db, task_indb)
    task_indb2 = TaskInDB(task_id="22222222", disabled=0)
    #create_tests_indb(db, task_indb)
    task_indb3 = TaskInDB(task_id="33333333", disabled=0)
    #create_tests_indb(db, task_indb)
    create_tests_indb(db, [task_indb1, task_indb2, task_indb3])

    # 查询记录
    #key_tup = tuple(TaskInDB.model_fields.keys())
    key_tup = tuple(TaskInDB.__fields__.keys())
    #key_str = ",".join(key_tup)

    # all
    ret_tasks = get_tests_indb(db)
    #  str
    #ret_tasks = get_tests_indb(db, tasks="11111111")
    # list
    #ret_tasks = get_tests_indb(db, tasks=["11111111", "22222222"])
    for ret_task in ret_tasks:
        print(ret_task)
        task_indb = TaskInDB(**{key: ret_task[i] for i,key in enumerate(key_tup)})
        print(task_indb)
    print("OK")

3. ProgrammingError 单线程报错

python在执行数据库查询语句出现如下异常:
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 65984 and this is thread id 61236.
错误原因:
因为在python操作某个数据库的数据的线程必须和创建数据库的线程是同一个线程,不然就会出现异常。简而言之,应该就是create语句和select语句应该都在同一个线程下执行才可以。
所以,要想解决这个问题有如下两种方法:
1.(这个方法有点麻烦,不建议,不过可以根据个人项目实际情况来决定)
把创建数据库对象的代码剪切到当前线程里来,同时把之前的已经创建成功的数据库删掉重新执行创建语句
2. (推荐使用)
把当前线程里的connect语句改为这种形式:conn=sqlite3.connect(“stocks.db”,check_same_thread = False)
即加上:check_same_thread = False
check_same_thread=False的作用: python sqlite3的线程模式默认串行, 如果需要再多线程中用一个句柄,需要加此参数,否则将出现上述错误。
这样在执行操纵数据库的语句时就不再检查线程是否相同的问题了。
参考:https://blog.csdn.net/AshleyXM/article/details/104812879

同时也需要加互斥锁, 否则在 execute 时偶现(多线程同时execute)地会报如下错误:
cannot commit - no transaction is active
参考:https://blog.csdn.net/weixin_43380311/article/details/120221910

4. 添加互斥锁

要在 Python 中锁定 sqlite3 数据库,可以使用 threading 模块中的 Lock 对象。下面是一个示例代码:

import sqlite3
import threading

# 创建一个锁对象
lock = threading.Lock()

# 定义一个函数来执行数据库操作
def execute_query(query):
    # 获取锁
    lock.acquire()
    try:
        # 连接到sqlite3数据库
        conn = sqlite3.connect('your_database.db')
        cursor = conn.cursor()

        # 执行查询操作
        cursor.execute(query)
        # 提交事务
        conn.commit()
        # 关闭数据库连接
        conn.close()
    finally:
        # 释放锁
        lock.release()

# 调用函数执行数据库查询
execute_query('SELECT * FROM your_table')

在上面的示例中,我们首先创建了一个 Lock 对象。然后,在执行数据库操作之前,我们使用 lock.acquire() 获取锁,确保只有一个线程可以执行数据库查询。在执行完数据库操作后,我们使用 lock.release() 释放锁,以允许其他线程获取锁并执行数据库操作。
这种方式可以确保在并发访问sqlite3数据库时,每次只有一个线程可以执行数据库操作,避免了数据竞争和不一致性的问题。文章来源地址https://www.toymoban.com/news/detail-824922.html

到了这里,关于python sqlite3 线程池封装的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【C语言+sqlite3 API接口】实现水果超市

    假如我家开了个水果超市,有以下水果,想实现自动化管理,扫描二维码就能知道当前的水果状态,进货几天了, 好久需要再次进货,那些水果畅销,那些水果不畅销,那些水果春夏秋冬的价格波动,好,那么现在我想将这些信息保存在数据库中,那么我应该怎么做? 超市

    2024年02月17日
    浏览(27)
  • 嵌入式中如何用C语言操作sqlite3(07)

    sqlite3编程接口非常多,对于初学者来说,我们暂时只需要掌握常用的几个函数,其他函数自然就知道如何使用了。 本篇假设数据库为my.db,有数据表student。 no name score 4 嵌入式开发爱好者 89.0 创建表格语句如下: sqlite3_open sqlite3_close sqlite3_get_table 举例 下面比如我们要显示st

    2024年02月07日
    浏览(39)
  • 24 Python的sqlite3模块

    概述         在上一节,我们介绍了Python的shutil模块,包括:shutil模块中一些常用的函数。在这一节,我们将介绍Python的sqlite3模块。sqlite3模块是Python中的内置模块,用于与SQLite数据库交互。SQLite是一个轻量级的磁盘数据库,不需要单独的服务器进程。你可以在多个线程和

    2024年02月08日
    浏览(37)
  • python连接sqlite3工具类

    简单使用python连接sqlite3工具类,代码可根据场景自行抽象

    2024年01月16日
    浏览(34)
  • SQLite3移植STM32MP157 ARM开发板

    移植首先就得有源码,从SQLite官网下载最新版源码 下载地址 这里使用的环境为Ubuntu16 所以直接在Ubuntu下下载的。 下载完成后解压文件 进入解压后的目录 进入后可以看到解压出的源码文件如下 配置生成Makefile 在源码个目录下执行如下命令 –host为指定交叉编译器为arm-linux-

    2024年02月07日
    浏览(29)
  • Python3不支持sqlite3的解决方法

    先贴报错: 在网上查了一下,居然要我 先安装sqlite3,再重新编译安装python ,真无语了,奈何开发环境是Windows,话不多说,连上Centos开搞吧: 一、安装sqlite3的包 我这里把各个服务器都装上了,准备全都搞一遍,免得后面麻烦,请告诉我还是什么是不支持的吧,真想全都先

    2024年02月11日
    浏览(28)
  • Python数据库模块sqlite3操作实例(非常实用)

    当使用Python进行SQLite数据库操作时, sqlite3 模块是一个非常常用和强大的工具。它提供了一系列函数和方法,用于创建、连接、查询和管理数据库。下面是一些常见的用法示例:   sqlite-utils 、 sqlitebiter 和其他类似的库可以提供更高级的功能,包括预编译语句的执行优化和数

    2024年02月13日
    浏览(35)
  • Python - 嵌入式数据库Sqlite3的基本使用

    SQLite是一种轻量级的嵌入式关系型数据库管理系统,而Python标准库中提供了与SQLite交互的模块,sqlite3。下面是一个Python 3中使用sqlite3模块的详细示例与解析。 这个例子展示了如何使用sqlite3模块来创建或连接一个数据库,创建一个表格,插入一些数据,查询数据,提交更改并

    2024年02月15日
    浏览(38)
  • python3使用sqlite3构建本地持久化缓存

    环境:Windows 10_x64 python版本:3.9.2 sqlite3版本:3.34.0 日常python开发中会遇到数据持久化的问题,今天记录下如何使用sqlite3进行数据持久化,并提供示例代码及数据查看工具。 python应用程序在运行过程中被kill掉(比如版本升级等情况),内存中的运行数据将会丢失,如果能够

    2024年02月05日
    浏览(26)
  • 在Python中利用内置SQLite3模块进行数据库操作的完整指南

      在Python中,使用SQLite非常方便,Python内置了 SQLite3 模块,无需额外安装。SQLite 是一个轻量级的嵌入式数据库,适用于小型项目和单用户应用。以下是一个简单的示例,演示如何在 Python 中使用 SQLite,并提供了常见的查询、增加、修改和删除功能。 首先,确保你的 Python 安装

    2024年02月03日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包