当我们考虑直接与数据库交互的软件时,许多人设想了一种类似的管理连接、显式提交查询的模式,以及同样令人厌烦的样板文件。像PyMySQL 和Psycopg2这样的 Python 库完全符合这种范式,并维持了我们所接受的现状。SQLAlchemy通过抽象软件中平凡且琐碎的数据库,已成为 Python 最具标志性的库之一。
它也是我曾经不幸浏览过的 Python 库中最糟糕的文档的所在地。
SQLAlchemy 的 API 提供了一种更好、更简单、更快的方式来使用关系数据库。对于那些试图理解伪装成SQLAlchemy 文档的深奥术语的人来说,这毫无用处,这些术语打了你一巴掌,说:“我才不在乎你呢。”
我在这里想说的是,我确实很关心你,并且只想以 SQLAlchemy 教程的形式表达这些感受。
SQL 还是 ORM?
SQLAlchemy 可以实现两个目的:使 SQL 交互更容易并充当成熟的 ORM。即使您对实现 ORM 不感兴趣(或者不知道 ORM 是什么),SQLAlchemy 作为简化 SQL 数据库连接和执行原始查询的工具也是非常宝贵的。我以为这是我写的原始语句,但事实证明 SQLAlchemy 是这样描述自己的:
SQLAlchemy 由两个不同的组件组成,称为核心和ORM。Core 本身是一个功能齐全的 SQL 抽象工具包,为各种 DBAPI 实现和行为提供平滑的抽象层,以及允许通过生成式 Python 表达式来表达 SQL 语言的 SQL 表达式语言。
连接到数据库
SQLAlchemy 为我们提供了一些管理数据库连接的选项,但它们都始于引擎的概念。“引擎”是代表数据库的 Python 对象。通过传入目标数据库的连接信息来预先创建引擎。
创建引擎后,我们可以随时通过与引擎对象交互来快速使用我们的数据库。将 SQL 语句直接传递给引擎可以让 SQLAlchemy 处理打开新连接、执行并立即关闭连接的操作。对于需要维持大量数据传输的复杂 Web 应用程序,最好通过会话保持持续的数据库连接。我们将在下一篇文章中讨论这一点。
创建引擎的语法非常简单。create_engine()需要一个位置参数,它是一个表示连接到数据库的连接信息的字符串:
初始化 SQLAlchemy 数据库引擎:
"""创建数据库连接。""" from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://user:password@host:3306/database", )
传入的字符串create_engine()是一个连接 URI。假设您有一个数据库可供使用,我们将弄清楚您的 URI 应该是什么样子。
数据库连接 URI
将 URI 分解为几个部分,如下所示:
数据库连接URI结构:
[DB_TYPE]+[DB_CONNECTOR]://[USERNAME]:[PASSWORD]@[HOST]:[PORT]/[DB_NAME]
[DB_TYPE]:指定我们要连接的数据库的类型(方言)。SQLAlchemy 可以与所有主流类型的关系数据库接口。根据您要连接的数据库,替换[DB_TYPE]为匹配的方言:
MySQL:mysql
PostgreSQL:postgresql
SQLite:sqlite
甲骨文(呃):oracle
Microsoft SQL(稍微不那么恼怒的“呃”):mssql
[DB_CONNECTOR]:为了管理您的数据库连接,SQLAlchemy 利用您选择使用的任何 Python 数据库连接库。如果您不确定这意味着什么,以下是我针对每种方言推荐的库:
MySQL:pymysql、mysqldb
PostgreSQL:psycopg2、pg8000
SQLite:(不需要)
甲骨文:cx_oracle
微软 SQL:pymssql、pyodbc
pymysql:https://pypi.org/project/PyMySQL/?ref=toymoban.com mysqldb:https://pypi.org/project/MySQL-python/?ref=toymoban.com psycopg2:https://pypi.org/project/psycopg2/?ref=toymoban.com pg8000:https://pypi.org/project/pg8000/?ref=toymoban.com pymssql:https://pypi.org/project/pymssql/?ref=toymoban.com pyodbc:https://pypi.org/project/pyodbc/?ref=toymoban.com
之后的变量应该看起来都很熟悉;这些指的是目标数据库的 URL、数据库用户、该用户的密码等。
完成此操作后,我们可以将 SQLAlchemy 与您的连接器一起安装:
#安装 SQLAlchemy 和数据库连接器库 $ pip install sqlalchemy pymysql
附加发动机配置
除了连接 URI 之外,还可以配置引擎来满足数据库设置的特定需求(或首选项)。我要求通过 SSL 进行连接;这需要提供一个 PEM 密钥以提高安全性,我们可以connect_args在创建引擎时通过关键字参数传递该密钥:
#SQLAlchemy 数据库引擎 """创建数据库连接。""" from sqlalchemy import create_engine from config import SQLALCHEMY_DATABASE_PEM engine = create_engine( "mysql+pymysql://user:password@host:3306/database", connect_args={"ssl": {"key": SQLALCHEMY_DATABASE_PEM}}, echo=True, )
如果您是好奇的类型,您还可以传递echo=True给您的引擎,它将在发生时将正在执行的所有SQL 语句打印到您的控制台(包括连接、表创建等)。这对于了解新来者的情况非常有用,但它很快就会变得烦人和垃圾邮件。
执行查询
我们可以使用 直接在引擎对象上运行即席查询engine.execute("SELECT * FROM mytable")。当我们调用execute()引擎时,SQLAlchemy 会处理:
打开与我们的数据库的连接。
在我们的数据库上执行原始 SQL 并返回结果。
立即关闭用于运行此查询的数据库连接以避免挂起连接。
这种与数据库交互的方式称为显式无连接执行。这种类型的操作是“显式”的,因为查询在执行时自动提交(与传统的 Python 库相比,传统的 Python 库遵守PEP-249,除非后面跟着 a ,否则不会运行查询commit())。
#PEP-249 https://www.python.org/dev/peps/pep-0249/?ref=toymoban.com#id3
当我们想要按需运行即席查询时(即:几乎所有不是面向用户的应用程序的情况),这非常有用。我们可以获取或改变我们想要的数据,而不必担心额外的东西。
选择数据
我为此示例设置了一个虚拟数据数据库。为了演示使用 SQLAlchemy 有多么容易,我将首先从名为nyc_jobs 的表中获取一些数据:
#执行SELECT查询 """针对 SQLAlchemy 引擎执行原始 SQL 查询。""" from typing import List, Optional import json from sqlalchemy import text from sqlalchemy.engine.base import Engine from sqlalchemy.exc import SQLAlchemyError from logger import LOGGER def fetch_job_listings(engine: Engine) -> Optional[List[dict]]: """ Select rows from database and parse as list of dicts. :param Engine engine: Database engine to handle raw SQL queries. :return: Optional[List[dict]] """ try: with engine.begin() as conn: result = conn.execute( text( "SELECT job_id, agency, business_title, \ salary_range_from, salary_range_to \ FROM nyc_jobs ORDER BY RAND() LIMIT 10;" ), ) results = result.fetchall() LOGGER.info(f"Selected rows: {results}" except SQLAlchemyError as e: LOGGER.error(f"SQLAlchemyError while fetching records: {e}") except Exception as e: LOGGER.error(f"Unexpected error while fetching records: {e}")
我已经传递了一个简单的 SQL 查询conn.execute(),其中包含我的nyc_jobsSELECT表中的几列(以随机顺序返回)。让我们检查一下输出:
#查询返回一个Result对象 <sqlalchemy.engine.result.Result object at 0x10c59bdc0>
天啊,这到底是什么?!什么是Result?这一切是不是好得令人难以置信?
从 SQLAlchemy 1.4.X 开始,该对象ResultProxy已重命名为Result. 检查您的 SQLAlchemy 版本,了解在获取结果时要排除哪个对象。
解析查询结果
Result(是一个有用的数据结构,它总结了我们的查询结果并包装了结果本身。该Result对象使我们可以轻松地从高层次上了解我们的查询是如何成功的,方法是让我们访问以下属性rowcount:
#查询返回的行数 ... with engine.begin() as conn: result = conn.execute( text( "SELECT job_id, agency, business_title, \ salary_range_from, salary_range_to \ FROM nyc_jobs ORDER BY RAND();" ), ) results = result.fetchall() LOGGER.info(f"Selected {result.rowcount} rows.")
这给了我们...
#输出results.rowcount Selected 3123 rows.
...但已经够闲聊了。让我们通过调用fetchall()我们的来看看我们的数据是什么样的Result:
#打印从查询中获取的行 ... with engine.begin() as conn: result = conn.execute( text( "SELECT job_id, agency, business_title, \ salary_range_from, salary_range_to \ FROM nyc_jobs ORDER BY RAND() LIMIT 5;" ), ) results = result.fetchall() print(f"Selected {result.rowcount} rows.") for row in result.fetchall(): print(row)
现在我们正在谈论:
#查询结果 Selected 5 rows. (399274, 'NYC HOUSING AUTHORITY', 'Vice President Portfolio Planning, Project Development and Asset Mgt', 86346, 217244) (399276, 'BOROUGH PRESIDENT-QUEENS', 'Director of Graphic Design', 52524, 81535) (399276, 'BOROUGH PRESIDENT-QUEENS', 'Director of Graphic Design', 52524, 81535) (399300, 'NYC HOUSING AUTHORITY', 'VICE PRESIDENT TRANSACTIONS', 86346, 217244) ...
嘿,这些看起来像是包含职位列表的行!
大多数 SQLAlchemy 用户可能更喜欢将获取的行格式化为不同的数据类型,而不是 SQLAlchemy 对象。一个常见的场景是将行解析为 Python 字典。幸运的是,SQLAlchemy对象有一个用于此目的的Row内置方法:_asdict()
#将获取的数据解析为字典列表 ... results_dict = [row._asdict() for row in results] LOGGER.info( f"Selected {result.rowcount} rows: {results_dict}" )
输出:
Selected 5 rows: [{'job_id': 312335, 'agency': 'DEPT OF ENVIRONMENT PROTECTION', 'business_title': 'Dynamic CRM Developer', 'salary_range_from': 85823, 'salary_range_to': 121363}, {'job_id': 385305, 'agency': 'POLICE DEPARTMENT', 'business_title': 'Case Management Nurse', 'salary_range_from': 81653, 'salary_range_to': 81653}, {'job_id': 384016, 'agency': 'DEPT OF INFO TECH & TELECOMM', 'business_title': '.NET Developer/Programmer Analyst', 'salary_range_from': 56990, 'salary_range_to': 110000}, {'job_id': 221045, 'agency': 'NYC EMPLOYEES RETIREMENT SYS', 'business_title': 'CERTIFIED IT ADMINISTRATOR (WAN)', 'salary_range_from': 89383, 'salary_range_to': 134914}, {'job_id': 397017, 'agency': 'DEPARTMENT OF BUILDINGS', 'business_title': 'Operations Associate', 'salary_range_from': 31893, 'salary_range_to': 36677}]
非常接近,但有点难以阅读。使用 Python 的内置json库可以帮助我们:
#将行解析为字典列表 ... import json results_dict = [row._asdict() for row in results] LOGGER.info( f"Selected {result.rowcount} rows: {json.dumps(results_dict, indent=2)}" )
现在,无论您想要实现什么目标,您都拥有了一种肯定能满足您需求的格式:
[ ... { "job_id": 398955, "agency": "DEPT OF INFO TECH & TELECOMM", "business_title": "Cyber Threat Analyst", "salary_range_from": 76500, "salary_range_to": 95000 }, { "job_id": 380613, "agency": "DEPT OF INFO TECH & TELECOMM", "business_title": "Vulnerability Manager", "salary_range_from": 69940, "salary_range_to": 165000 }, { "job_id": 390870, "agency": "DEPT OF INFO TECH & TELECOMM", "business_title": "Cyber Threat Analyst", "salary_range_from": 75000, "salary_range_to": 95000 }, { "job_id": 356355, "agency": "DEPT OF ENVIRONMENT PROTECTION", "business_title": "Engineering Technician I", "salary_range_from": 36239, "salary_range_to": 41675 }, ... ]
更新行
我们可以像运行 SELECT 查询一样运行 UPDATE 查询,但需要注意的是,存在错误的UPDATE 查询。让我们确保避免这些。
我从表中随机选择一行进行更新。我将更改第229837行,它的 JSON 形式如下所示:
#SELECT * FROM nyc_jobs WHERE job_id = 229837; [ { "job_id": 229837, "agency": "DEPT OF INFO TECH & TELECOMM", "business_title": "Senior Quality Oversight Analyst", "job_category": "Information Technology & Telecommunications", "salary_range_from": 58675, "salary_range_to": 125000, "salary_frequency": "Annual", "work_location": "75 Park Place New York Ny", "division": "General Counsel", "created_at": "2016-02-17T00:00:00.000", "updated_at": "2016-02-17T00:00:00.000" } ]
让我们编写一个潜在危险的 SQL 查询,它会引入一些有问题的字符:
#UPDATE包含引号和表情符号的查询 UPDATE nyc_jobs SET business_title = 'Senior QA Scapegoat 🏆', job_category = 'Info <>!#%%Technology%%#^&%* & Telecom' WHERE job_id = 229837;
如果值包含肯定会中断操作的运算符,则尝试更新是一个坏主意。让我们看看发生了什么:
#执行有问题的 UPDATE 查询 with engine.begin() as conn: result = engine.execute( text( "UPDATE nyc_jobs SET business_title = 'Senior QA Scapegoat 🏆', \ job_category = 'Information? <>!#%%Technology!%%#^&%* & Telecom' \ WHERE job_id = 229837;" ) ) LOGGER.info(result.rowcount)
运行这个会惹恼一切:
#错误 SQL UPDATE 查询的堆栈跟踪 2021-01-09 15:44:14,122 INFO sqlalchemy.engine.base.Engine UPDATE nyc_jobs SET business_title = 'Senior QA Scapegoat 🏆', job_category = 'Information <>!#%%Technology%%#^&%* & Telecommunications' WHERE job_id = 229837; 2021-01-09 15:44:14,123 INFO sqlalchemy.engine.base.Engine {} 2021-01-09 15:44:14,123 INFO sqlalchemy.engine.base.Engine ROLLBACK Traceback (most recent call last): File "/Users/toddbirchard/projects/sqlalchemy-tutorial/main.py", line 4, in <module> init_script() File "/Users/toddbirchard/projects/sqlalchemy-tutorial/sqlalchemy_tutorial/__init__.py", line 17, in init_script rows_updated = update_job_listing(db) File "/Users/toddbirchard/projects/sqlalchemy-tutorial/sqlalchemy_tutorial/queries.py", line 18, in update_job_listing result = db.execute( File "/Users/toddbirchard/projects/sqlalchemy-tutorial/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2235, in execute return connection.execute(statement, *multiparams, **params) File "/Users/toddbirchard/projects/sqlalchemy-tutorial/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1003, in execute return self._execute_text(object_, multiparams, params) File "/Users/toddbirchard/projects/sqlalchemy-tutorial/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1172, in _execute_text ret = self._execute_context( File "/Users/toddbirchard/projects/sqlalchemy-tutorial/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context self._handle_dbapi_exception( File "/Users/toddbirchard/projects/sqlalchemy-tutorial/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1514, in _handle_dbapi_exception util.raise_(exc_info[1], with_traceback=exc_info[2]) File "/Users/toddbirchard/projects/sqlalchemy-tutorial/.venv/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_ raise exception File "/Users/toddbirchard/projects/sqlalchemy-tutorial/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context self.dialect.do_execute( File "/Users/toddbirchard/projects/sqlalchemy-tutorial/.venv/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 609, in do_execute cursor.execute(statement, parameters) File "/Users/toddbirchard/projects/sqlalchemy-tutorial/.venv/lib/python3.8/site-packages/pymysql/cursors.py", line 146, in execute query = self.mogrify(query, args) File "/Users/toddbirchard/projects/sqlalchemy-tutorial/.venv/lib/python3.8/site-packages/pymysql/cursors.py", line 125, in mogrify query = query % self._escape_args(args, conn) TypeError: * wants int
对我们来说幸运的是,SQLAlchemy 有一个名为 的方法text(),它可以转义在此类查询中发现的危险字符。始终将您的查询包装在该方法中:
#将查询包装在text() ... from sqlalchemy import text with engine.begin() as conn: result = conn.execute( text( "UPDATE nyc_jobs SET business_title = 'Senior QA Scapegoat 🏆',\ job_category = 'Information <>!#%%Technology%%#^&%* & Telecom' \ WHERE job_id = 229837;" ) ) LOGGER.success(f"Updated {result.rowcount} row: {result}")
输出:
08-31-2023 08:11:10 | SUCCESS: Updated 1 row: <sqlalchemy.engine.cursor.CursorResult object at 0x107021780>
现在,我们没有收到任何错误,并且返回计数1以指示更新了单行。文章来源:https://www.toymoban.com/diary/sql/581.html
关键词:SQLAlchemy,Python数据库交互,引擎,连接URI,数据库连接文章来源地址https://www.toymoban.com/diary/sql/581.html
到此这篇关于使用SQLAlchemy简化Python数据库交互的文章就介绍到这了,更多相关内容可以在右上角搜索或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!