1. 封装 sqlite3
1.1. 依赖包引入
# -*- coding: utf-8 -*-
#import os
import sys
import datetime
import logging
import sqlite3
1.2. 封装类
class SqliteTool(object):
#def __init__(self, host, port, user, password, database):
def __init__(self, host, database):
self._host = host
#self._port = port
#self._user = user
#self._password = password
self._database = database
self._pool = None
self._maxconns = 6 # 连接池中最多有多少个连接
print("__init__", self._database)
1.3. 连接池操作
def init_pool(self):
'''初始化连接池'''
try:
logging.info('Begin to create {0} postgresql pool on:{1}.\n'.format(self._host, datetime.datetime.now()))
pool = []
for _ in range(self._maxconns):
# check_same_thread=False 支持多线程
conn = sqlite3.connect(self._database, check_same_thread=False)
pool.append(conn)
self._pool = pool
#print("init_pool", self._maxconns, len(self._pool), self._pool)
logging.info('SUCCESS: create {0} postgresql pool success on {1}.\n'.format(self._host, datetime.datetime.now()))
except Exception as e:
logging.error('ERROR: create {0} postgresql pool failed on {1}.\n'.format(self._host, datetime.datetime.now()))
self.close_pool()
sys.exit('ERROR: create postgresql pool error caused by {0}'.format(str(e)))
def close_pool(self):
'''关闭 pool'''
if self._pool != None:
for conn in self._pool:
conn.close()
def get_conn(self):
if not self._pool:
self.init_pool()
return self._pool.pop()
def close_conn(self, conn):
if self._pool:
self._pool.append(conn)
1.4. 增删改查
1.4.1. 创建表
# 创建数据表
def create_table(self, sql: str):
"""
创建表
:param sql: create sql语句
:return: True表示创建表成功
"""
print("create_table", sql)
result = False
try:
conn = self.get_conn()
cursor = conn.cursor()
cursor.execute(sql)
print("[create table success]")
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error {1} in create_table'.format(sql, str(e)))
sys.exit('ERROR: create table from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
#conn.close()
self.close_conn(conn)
return result
1.4.2. 删除表
# 删除数据表
def drop_table(self, sql: str):
"""
删除表
:param sql: drop sql语句
:return: True表示删除成功
"""
result = False
try:
conn = self.get_conn()
cursor = conn.cursor()
cursor.execute(sql)
print("[drop table success]")
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error {1} in drop_table'.format(sql, str(e)))
sys.exit('ERROR: drop table from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
#conn.close()
self.close_conn(conn)
return result
1.4.3. 插入数据
def exec_insert(self, sql):
'''执行插入'''
result = False
try:
conn = self.get_conn()
cursor = conn.cursor()
cursor.execute(sql)
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error {1} in exec_insert'.format(sql, str(e)))
sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
#conn.close()
self.close_conn(conn)
return result
def exec_insert_plus(self, table: str, params: dict):
'''执行插入'''
result = False
try:
key_tup = tuple(params.keys())
key_str = ",".join(key_tup)
# different with psql
val_str = ",".join(("?",)*len(key_tup))
sql_str = "insert into " + table + " (" + key_str + ") values (" + val_str + ")"
#val_tup = tuple(params.values())
val_tup = ()
for item in params.values():
if type(item) == list:
val_tup += (json.dumps(item),)
elif type(item) == str:
val_tup += (item,)
elif type(item) == int:
val_tup += (item,)
else:
val_tup += (item,)
#val_tup.append(str(item))
#print("exec_insert_plus", sql_str, val_tup)
conn = self.get_conn()
cursor = conn.cursor()
cursor.execute(sql_str, val_tup)
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error {1} in exec_insert_plus'.format(table, str(e)))
sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
#conn.close()
self.close_conn(conn)
return result
def exec_insert_many(self, table: str, params: List[Dict]):
def dict_to_str(tab: str, param: Dict):
key_tup = tuple(param.keys())
key_str = ",".join(key_tup)
# different with psql
val_str = ",".join(("?",)*len(key_tup))
sql_str = "insert into " + tab + " (" + key_str + ") values (" + val_str + ")"
return sql_str
def dict_to_tuple(param: Dict):
val_tup = ()
for item in param.values():
if type(item) == list:
val_tup += (json.dumps(item),)
elif type(item) == str:
val_tup += (item,)
elif type(item) == int:
val_tup += (item,)
else:
val_tup += (item,)
#val_tup.append(str(item))
return val_tup
'''执行插入'''
result = False
if len(params) <= 0:
return result
try:
sql_str = dict_to_str(table, params[0])
val_lst = []
for param in params:
val_lst.append(dict_to_tuple(param))
print("exec_insert_many", sql_str, val_lst)
conn = self.get_conn()
cursor = conn.cursor()
cursor.executemany(sql_str, val_lst)
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error {1} in exec_insert'.format(sql_str, str(e)))
sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
#conn.close()
self.close_conn(conn)
return result
1.4.4. 删除数据
# sql = "DELETE from users where user_id='83f7d86b594e4b26a7196ab761afcc7c';"
def exec_delete(self, sql):
'''执行查询'''
result = False
try:
conn = self.get_conn()
cursor = conn.cursor()
cursor.execute(sql)
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error {1} in exec_delete'.format(sql, str(e)))
sys.exit('ERROR: delete data from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
#conn.close()
self.close_conn(conn)
return result
1.4.5. 更新数据
# 修改单个值
# update tasks set status='running' where task_id='0791216839b04d5c88846817f78280cc';
# 修改多个值
# update tasks set status='running',score='10' where task_id='0791216839b04d5c88846817f78280cc';
def exec_update(self, sql):
'''执行更新'''
result = False
try:
conn = self.get_conn()
cursor = conn.cursor()
cursor.execute(sql)
result = True
except Exception as e:
logging.error('ERROR: execute {0} causes error {1} in exec_update'.format(sql, str(e)))
sys.exit('ERROR: update data from database error caused {0}'.format(str(e)))
finally:
cursor.close()
conn.commit()
#conn.close()
self.close_conn(conn)
return result
1.4.6. 查询数据
# select * from users where user_name='hello';
def exec_select(self, sql):
'''执行查询'''
try:
conn = self.get_conn()
cursor = conn.cursor()
cursor.execute(sql)
result = cursor.fetchall()
#result = cursor.fetchone()
except Exception as e:
logging.error('ERROR: execute {0} causes error {1} in exec_select'.format(sql, str(e)))
sys.exit('ERROR: load data from database error caused {0}'.format(str(e)))
finally:
cursor.close()
#conn.close()
#print("exec_select", len(self._pool), self._pool)
print("init_pool", self._maxconns, len(self._pool), self._pool)
self.close_conn(conn)
return result
1.4.7. 测试
def test_select(self, sql):
result = self.exec_select(sql)
print("test_select", result)
return result
2. 操作使用实例
# for test
from typing import Optional, List, Dict, Union
from pydantic import BaseModel, Field
class ......
if __name__ == '__main__':
dbhost = ""
dbdatabase = "./test.db"
db = SqliteTool(dbhost, dbdatabase)
class TaskInDB(BaseModel):
task_id: str
disabled: int
def create_tests_table(db):
sql_str = "create table if not exists tests("
sql_str += "task_id char(32) primary key,"
sql_str += "disabled int not null"
sql_str += ");"
return db.create_table(sql_str)
def drop_tests_table(db):
sql_str = "drop table if exists tests;"
return db.drop_table(sql_str)
def get_tests_indb(db, tasks: Union[List[str], str, None] = None):
if tasks == None:
sql_str = "select * from tests;"
if type(tasks) == list:
print("list", tasks)
key_str = ",".join(tasks)
sql_str = "select * from tests where task_id in (" + key_str + ");"
elif type(tasks) == str:
print("str", tasks)
sql_str = "select * from tests where task_id ='"+tasks+"';"
elif type(tasks) == None:
print("none")
ret_tasks = db.exec_select(sql_str)
return ret_tasks
def create_tests_indb(db, tasks: List[TaskInDB]):
#return db.exec_insert_plus("tests", task_indb.model_dump())
#return db.exec_insert_plus("tests", task_indb.dict())
params = []
for task in tasks:
print("create_tests_indb", task)
#params.append(task.model_dump())
params.append(task.dict())
return db.exec_insert_many("tests", params)
# 重建数据库
if not drop_tests_table(db) or not create_tests_table(db):
print("ERROR")
# 创建两条记录
task_indb1 = TaskInDB(task_id="11111111", disabled=1)
#create_tests_indb(db, task_indb)
task_indb2 = TaskInDB(task_id="22222222", disabled=0)
#create_tests_indb(db, task_indb)
task_indb3 = TaskInDB(task_id="33333333", disabled=0)
#create_tests_indb(db, task_indb)
create_tests_indb(db, [task_indb1, task_indb2, task_indb3])
# 查询记录
#key_tup = tuple(TaskInDB.model_fields.keys())
key_tup = tuple(TaskInDB.__fields__.keys())
#key_str = ",".join(key_tup)
# all
ret_tasks = get_tests_indb(db)
# str
#ret_tasks = get_tests_indb(db, tasks="11111111")
# list
#ret_tasks = get_tests_indb(db, tasks=["11111111", "22222222"])
for ret_task in ret_tasks:
print(ret_task)
task_indb = TaskInDB(**{key: ret_task[i] for i,key in enumerate(key_tup)})
print(task_indb)
print("OK")
3. ProgrammingError 单线程报错
python在执行数据库查询语句出现如下异常:
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 65984 and this is thread id 61236.
错误原因:
因为在python操作某个数据库的数据的线程必须和创建数据库的线程是同一个线程,不然就会出现异常。简而言之,应该就是create语句和select语句应该都在同一个线程下执行才可以。
所以,要想解决这个问题有如下两种方法:
1.(这个方法有点麻烦,不建议,不过可以根据个人项目实际情况来决定)
把创建数据库对象的代码剪切到当前线程里来,同时把之前的已经创建成功的数据库删掉重新执行创建语句
2. (推荐使用)
把当前线程里的connect语句改为这种形式:conn=sqlite3.connect(“stocks.db”,check_same_thread = False)
即加上:check_same_thread = False
check_same_thread=False的作用: python sqlite3的线程模式默认串行, 如果需要再多线程中用一个句柄,需要加此参数,否则将出现上述错误。
这样在执行操纵数据库的语句时就不再检查线程是否相同的问题了。
参考:https://blog.csdn.net/AshleyXM/article/details/104812879
同时也需要加互斥锁, 否则在 execute 时偶现(多线程同时execute)地会报如下错误:
cannot commit - no transaction is active
参考:https://blog.csdn.net/weixin_43380311/article/details/120221910
4. 添加互斥锁
要在 Python 中锁定 sqlite3 数据库,可以使用 threading 模块中的 Lock 对象。下面是一个示例代码:文章来源:https://www.toymoban.com/news/detail-824922.html
import sqlite3
import threading
# 创建一个锁对象
lock = threading.Lock()
# 定义一个函数来执行数据库操作
def execute_query(query):
# 获取锁
lock.acquire()
try:
# 连接到sqlite3数据库
conn = sqlite3.connect('your_database.db')
cursor = conn.cursor()
# 执行查询操作
cursor.execute(query)
# 提交事务
conn.commit()
# 关闭数据库连接
conn.close()
finally:
# 释放锁
lock.release()
# 调用函数执行数据库查询
execute_query('SELECT * FROM your_table')
在上面的示例中,我们首先创建了一个 Lock 对象。然后,在执行数据库操作之前,我们使用 lock.acquire() 获取锁,确保只有一个线程可以执行数据库查询。在执行完数据库操作后,我们使用 lock.release() 释放锁,以允许其他线程获取锁并执行数据库操作。
这种方式可以确保在并发访问sqlite3数据库时,每次只有一个线程可以执行数据库操作,避免了数据竞争和不一致性的问题。文章来源地址https://www.toymoban.com/news/detail-824922.html
到了这里,关于python sqlite3 线程池封装的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!