方案一:
直接套用脚本,需可以看懂一些脚本逻辑
改代码中 类名为 OracleConnector,它可以同时连接多个 Oracle 数据库,并提供执行增删改查操作的方法。
import cx_Oracle
class OracleConnector:
def __init__(self, databases):
self.connections = {}
for db in databases:
conn = cx_Oracle.connect(
user=db['username'],
password=db['password'],
dsn=db['dsn'],
encoding=db.get('encoding', 'UTF-8')
)
self.connections[db['name']] = conn
def execute_query(self, database_name, query, params=None):
with self.connections[database_name].cursor() as cursor:
cursor.execute(query, params)
result = cursor.fetchall()
return result
def execute_non_query(self, database_name, query, params=None):
with self.connections[database_name].cursor() as cursor:
cursor.execute(query, params)
self.connections[database_name].commit()
return cursor.rowcount
def execute_query_all(self, databases, query, params=None):
results = {}
for db in databases:
database_name = db['name']
results[database_name] = self.execute_query(database_name, query, params)
return results
def execute_non_query_all(self, databases, query, params=None):
results = {}
for db in databases:
database_name = db['name']
try:
res = self.execute_non_query(database_name, query, params)
results[database_name] = res
except cx_Oracle.DatabaseError as e:
print(f"Error occurred when running query on {database_name}: {e}")
self.connections[database_name].rollback()
return results
以下是代码解释:
import cx_Oracle:导入 cx_Oracle 库,用于与 Oracle 数据库进行连接和交互。
class OracleConnector::定义一个名为 OracleConnector 的类。
def __init__(self, databases)::类的初始化方法,接受一个包含多个数据库信息的列表作为参数。
self.connections = {}:初始化一个字典,用于存储数据库连接。
for db in databases::遍历数据库列表中的每个数据库信息。
conn = cx_Oracle.connect(...):根据数据库信息创建数据库连接对象。user=db['username']:连接数据库的用户名。password=db['password']:连接数据库的密码。dsn=db['dsn']:Oracle 数据库的 DSN(数据源名称)。encoding=db.get('encoding', 'UTF-8'):可选的编码方式,默认为 UTF-8。
self.connections[db['name']] = conn:将数据库连接对象添加到字典中,以数据库名称作为键。
def execute_query(self, database_name, query, params=None)::执行查询语句的方法。database_name:数据库名称。query:要执行的查询语句。params=None:可选的查询参数。
with self.connections[database_name].cursor() as cursor::获取游标对象,用于执行 SQL 语句。
cursor.execute(query, params):执行查询语句。
result = cursor.fetchall():获取查询结果集。
return result:返回查询结果。
def execute_non_query(self, database_name, query, params=None)::执行非查询语句的方法,比如插入、更新和删除操作。
database_name:数据库名称。
query:要执行的非查询语句。
params=None:可选的参数。
with self.connections[database_name].cursor() as cursor::获取游标对象,用于执行 SQL 语句。
cursor.execute(query, params):执行非查询语句,比如插入、更新和删除操作。
self.connections[database_name].commit():提交事务,将修改操作永久保存到数据库中。
return cursor.rowcount:返回受影响的行数,即执行非查询操作后所影响的行数。
def execute_query_all(self, databases, query, params=None)::在多个数据库上执行相同的查询语句,并返回结果。
databases:要执行查询的数据库列表。
query:要执行的查询语句。
params=None:可选的查询参数。
results = {}:初始化一个字典,用于存储查询结果。
for db in databases::遍历数据库列表中的每个数据库。
database_name = db['name']:获取数据库名称。
results[database_name] = self.execute_query(database_name, query, params):执行查询,并将结果存储到字典中。
return results:返回包含查询结果的字典。
def execute_non_query_all(self, databases, query, params=None)::在多个数据库上执行相同的非查询语句,并返回结果。
databases:要执行非查询操作的数据库列表。
query:要执行的非查询语句。
params=None:可选的参数。
results = {}:初始化一个字典,用于存储操作结果。
for db in databases::遍历数据库列表中的每个数据库。
database_name = db['name']:获取数据库名称。
try::捕获可能发生的异常。
res = self.execute_non_query(database_name, query, params):执行非查询操作。
results[database_name] = res:将操作结果存储到字典中。
except cx_Oracle.DatabaseError as e::捕获 Oracle 数据库相关的异常。
print(f"Error occurred when running query on {database_name}: {e}"):打印出错信息。
self.connections[database_name].rollback():执行回滚操作,撤销之前的操作。
return results:返回包含操作结果的字典。
这个封装类使得可以同时连接多个 Oracle 数据库,并在它们上执行增删改查操作。而在执行非查询操作时,如果遇到错误,将会进行回滚操作,保证数据的一致性。
方案二:直接调用封装脚本(写用例,执行脚本即可)
脚本实现封装后,只需要在Oracle.yaml文件中写用例即可,此后执行Oracle.py脚本即实现数据库的增删改查操作
目录介绍:
Oracle.yaml 编写基本数据库信息
Orcale:
username: user
password: pwd
ip: 127.0.0.1
port: 1521
sid: orcl
PublicConfig.py脚本: 配置读取信息,方便调用
import os
from Public_Utils.util_yaml import YamlReader
class YamlPath:
def __init__(self):
current = os.path.abspath(__file__)
self.base_dir = os.path.dirname(os.path.dirname(current))
self._config_path = self.base_dir + os.sep + "Public_Config\Public_yaml"
def get_oracle_file(self):
self._config_file = self._config_path + os.sep + "Oracle.yaml"
return self._config_file
class ConfigYaml:
def __init__(self): #初始yaml读取配置文件
self.oracle_config = YamlReader(YamlPath().get_oracle_file()).yaml_data()
def get_oracle_yaml(self):
return self.oracle_config['Orcale']
if __name__ == '__main__':
pass
Oracle.py执行脚本文章来源:https://www.toymoban.com/news/detail-771377.html
# coding=utf-8
import cx_Oracle
import os
import json
from Public_Config.PublicConfig import ConfigYaml
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
class Oracle:
def __init__(self):
user = ConfigYaml().get_oracle_yaml()['username']
pwd = ConfigYaml().get_oracle_yaml()['password']
ip = ConfigYaml().get_oracle_yaml()['ip']
port = ConfigYaml().get_oracle_yaml()['port']
sid = ConfigYaml().get_oracle_yaml()['sid']
self.connect = cx_Oracle.connect(str(user) + "/" + str(pwd) + "@" + str(ip) + ":" + str(port) + "/" + str(sid))
# self.connect = cx_Oracle.connect(f'{user}/{pwd}@{ip}:{port}/{sid}') # 这里的顺序是 用户名/密码@oracleserver的ip地址/数据库名字
self.cursor = self.connect.cursor() # 使用cursor()方法获取操作游标
def GetData(self,sql):
self.cursor.execute(sql)
# 使用Rowfactory更改查询结果,更直观查看数据
columns = [col[0] for col in self.cursor.description]
self.cursor.rowfactory = lambda *args: dict(zip(columns, args))
# fetchall()一次取完所有结果
# fetchone()一次取一行结果
data = self.cursor.fetchall()
return data
def select(self, sql): #查询
list = []
self.cursor.execute(sql) # 使用execute方法执行SQL语句
result = self.cursor.fetchall() # fetchall()一次取完所有结果,fetchone()一次取一行结果
col_name = self.cursor.description
for row in result:
dict = {}
for col in range(len(col_name)):
key = col_name[col][0]
value = row[col]
dict[key] = value
list.append(dict)
js = json.dumps(list, ensure_ascii=False, indent=2, separators=(',', ':'))
'''json.dumps() 是把python对象转换成json对象的一个过程,生成的是字符串
json.dump() 是把python对象转换成json对象生成一个fp的文件流,和文件相关'''
return js #将结果返回为一个字符串
def selectlist(self,sql):
list = []
self.cursor.execute(sql)
result = self.cursor.fetchall()
col_name = self.cursor.description
for row in result:
dict = {}
for col in range(len(col_name)):
key = col_name[col][0]
value = row[col]
dict[key] = value
list.append(dict)
return list #将结果以列表返回
def disconnect(self): #未连接
self.cursor.close()
self.connect.close()
def insert(self, sql, list_param): #插入
try:
self.cursor.executemany(sql, list_param)
self.connect.commit()
print("插入ok")
except Exception as e:
print(e)
finally:
self.disconnect()
def update(self, sql): #更新
try:
self.cursor.execute(sql)
self.connect.commit()
except Exception as e:
print(e)
finally:
self.disconnect()
def delete(self, sql): #删除
try:
self.cursor.execute(sql)
self.connect.commit()
print("delete ok")
except Exception as e:
print(e)
finally:
self.disconnect()
if __name__ == '__main__':
print(Oracle().select(sql="select * from cifaccount a where a.cif_account ='310400009590' "))
pass
util_yaml.py文章来源地址https://www.toymoban.com/news/detail-771377.html
import os
import yaml
class YamlReader:
#初始化,判断文件是否存在
def __init__(self,yaml_file):
if os.path.exists(yaml_file):
self.yaml_file = yaml_file
else:
raise FileNotFoundError("yaml文件不存在")
self._data = None
self._data_all = None
def yaml_data(self): #yaml文件读取 --单个文档读取
#第一次调用data,读取yaml文档,如果不是,直接返回之前保存的数据
if not self._data:
with open(self.yaml_file,'rb') as f:
self._data = yaml.safe_load(f)
return self._data
def yaml_data_all(self): #多个文档的读取
if not self._data_all:
with open(self.yaml_file,'rb') as f:
self._data_all = yaml.safe_load_all(f)
return self._data_all
到了这里,关于Python优雅地操作Oracle数据库的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!