免费的网站后台管理系统,建站公司常见提成比例,wordpress网站类型,seowhy是什么意思中文原文#xff1a;towardsdatascience.com/how-to-use-sqlalchemy-to-make-database-requests-asynchronously-e90a4c8c11b1 数据库请求是一个典型的 I/O 密集型任务#xff0c;因为它大部分时间都在等待数据库服务器的响应。因此#xff0c;如果你的应用程序进行了大量的数据…原文towardsdatascience.com/how-to-use-sqlalchemy-to-make-database-requests-asynchronously-e90a4c8c11b1数据库请求是一个典型的 I/O 密集型任务因为它大部分时间都在等待数据库服务器的响应。因此如果你的应用程序进行了大量的数据库请求那么通过并发执行它们性能可以得到显著提升这是 SQLAlchemy一个多功能的 Python SQL 工具包和对象关系映射器所支持的。此外异步编程在 Python 中越来越受欢迎尤其是在使用 FastAPI 进行 Web 开发时我们经常需要在协程中执行数据库请求即在用async def语句定义的函数中。不幸的是我们不能使用经典的同步版本的 SQLAlchemy而需要创建引擎、连接和会话的异步版本。在本文中我们将介绍如何在不同的场景下使用 SQLAlchemy 异步即使用简单的 SQL 查询、Core 和 ORM。重要的是我们将介绍如何在多个异步任务中并发使用它如果使用得当可以显著提高 I/O 密集型应用程序的效率。准备工作我们将使用 Docker 在本地启动一个 MySQL 服务器在其中我们将创建用于演示的数据库和表# Create a volume to persist the data.$ docker volume create mysql8-data# Create the container for MySQL.$ docker run--name mysql8-d-e MYSQL_ROOT_PASSWORDroot-p13306:3306-v mysql8-data:/var/lib/mysql mysql:8# Connect to the local MySQL server in Docker.$ dockerexec-it mysql8 mysql-u root-proot mysqlSELECT VERSION();-----------|VERSION()|-----------|8.3.0|-----------1rowinset(0.00sec)CREATE DATABASE sales;CREATE TABLE sales.customers(id SMALLINT NOT NULL AUTO_INCREMENT,name VARCHAR(50)NOT NULL,job VARCHAR(50)DEFAULT,PRIMARY KEY(id),UNIQUE UQ_name(name));INSERT INTO sales.customers(name,job)VALUES(Lynn,Backend Developer);然后让我们创建一个 虚拟环境这样我们就可以尝试最新的 Python 和库版本conda create-n sql python3.12conda activate sql pip install-Usqlalchemy[asyncio]2.0,2.1pip install-Uaiomysql0.2,0.3pip install-Ucryptography42.0,42.1sqlalchemy[asyncio]– SQLAlchemy 与greenlet依赖项一起安装这是一个 SQLAlchemy 用于异步工作的库。aiomysql– 一个从 asyncio 框架访问 MySQL 数据库的驱动程序它背后使用 PyMySQL。cryptography– 由 SQLAlchemy 用于身份验证。异步执行简单的 SQL 查询要使用 SQLAlchemy 异步运行 SQL 查询我们首先需要使用create_async_engine()创建一个异步引擎。然后在创建连接、执行查询和处置引擎时我们需要使用awaitimportasynciofromsqlalchemyimporttextfromsqlalchemy.ext.asyncioimportcreate_async_engineasyncdefmain():# Create an asynchronous engine.async_enginecreate_async_engine(mysqlaiomysql://root:rootlocalhost:13306/sales)# Insert new data with a transation.asyncwithasync_engine.begin()asconn:insert_querytext( INSERT INTO sales.customers (name, job) VALUES (:name, :job) )awaitconn.execute(insert_query,{name:Hans,job:Data Engineer})# Check the data afer its inserted.asyncwithasync_engine.connect()asconn:select_querytext( SELECT * FROM sales.customers WHERE name :name )resultawaitconn.execute(select_query,{name:Hans})print(result.fetchall())# Close and clean-up pooled connections.awaitasync_engine.dispose()asyncio.run(main())注意当异步执行如上所示的简单 SQL 查询时我们需要使用字典传递变量而不是像同步版本那样使用关键字参数。当运行上面的代码时你将看到以下结果打印出来[(2,Hans,Data Engineer))]当你想快速开始使用 SQLAlchemy 而又不了解 Core 和 ORM 功能时使用简单的 SQL 查询是一个不错的选择。然而如你所见它并不太符合 Python 风格因为它使用了自由风格的简单 SQL 查询。当你对 SQLAlchemy 有更多经验时你可能想使用 Core 或 ORM 功能。使用 SQLAlchemy Core 异步执行在 SQLAlchemy 2.0 中核心功能通常意味着直接与Table对象交互现在非常强大。它实际上与 ORM 功能混合到了非常高的程度。例如select操作符可以用于核心和 ORM。对于核心使用我们还需要创建一个异步引擎然后使用它来异步创建连接。基本工作流程与普通查询相同不同之处在于语句是通过核心操作符如insert和select构建的。importasynciofromsqlalchemyimportColumn,Integer,insertfromsqlalchemyimportMetaDatafromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemyimportTablefromsqlalchemy.ext.asyncioimportcreate_async_engine meta_dataMetaData()tableTable(customers,meta_data,Column(id,Integer,primary_keyTrue),Column(name,String(50),nullableFalse),Column(job,String(50),default),)asyncdefmain():# Create an asynchronous engine.async_enginecreate_async_engine(mysqlaiomysql://root:rootlocalhost:13306/sales)# Insert new data with a transation.asyncwithengine.begin()asconn:stmtinsert(table).values(nameJack,jobFrontend Developer)awaitconn.execute(stmt)# Check the data afer its inserted.asyncwithengine.connect()asconn:resultawaitconn.execute(select(table).where(table.c.nameJack))print(result.fetchall())# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())当运行上述代码时将显示以下结果[(3,Jack,Frontend Developer)]使用 SQLAlchemy ORM 异步使用 SQLAlchemy ORM 的 ORM 功能要复杂一些尤其是在 2.0 版本中ORM 类的创建语法发生了显著变化。特别是Mapped[]用于指定类型mapped_column()构造其他列属性。fromsqlalchemyimportStringfromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_columnclassBase(DeclarativeBase):passclassCustomer(Base):__tablename__customersid:Mapped[int]mapped_column(primary_keyTrue)name:Mapped[str]mapped_column(String(50),nullableFalse,uniqueTrue)job:Mapped[str|None]mapped_column(String(50),default)要异步处理 ORM我们需要使用async_sessionmaker()创建一个异步会话工厂然后使用with来创建异步会话实例# Create an asynchronous session.async_sessionasync_sessionmaker(engine,expire_on_commitFalse)# Create an async session instance.asyncwithasync_session()assession:...处理 ORM 的完整异步代码如下importasynciofromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemy.ext.asyncioimportasync_sessionmakerfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_columnclassBase(DeclarativeBase):passclassCustomer(Base):__tablename__customersid:Mapped[int]mapped_column(primary_keyTrue)name:Mapped[str]mapped_column(String(50),nullableFalse,uniqueTrue)job:Mapped[str|None]mapped_column(String(50),default)asyncdefmain():# Create an asynchronous engine.enginecreate_async_engine(mysqlaiomysql://root:rootlocalhost:13306/sales)# Create an asynchronous session.async_sessionasync_sessionmaker(engine,expire_on_commitFalse)# Create an async session instance.asyncwithasync_session()assession:# Insert new data with a transation.asyncwithsession.begin():session.add(Customer(nameStephen,jobManager))# Check the data afer its inserted.asyncwithasync_session()assession:resultawaitsession.execute(select(Customer).where(Customer.nameStephen))customerresult.scalars().one()print(fname {customer.name}, job {customer.job})# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())当运行上述代码时将显示以下结果nameStephen,jobManager使用 SQLAlchemy Core 在多个异步任务中在多个异步任务中并发使用 SQLAlchemy Core 简单因为连接对象可以直接在多个异步任务中传递和使用importasynciofrompprintimportpprintfromsqlalchemyimportColumn,IntegerfromsqlalchemyimportMetaDatafromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemyimportTablefromsqlalchemy.ext.asyncioimportcreate_async_engine meta_dataMetaData()tableTable(customers,meta_data,Column(id,Integer,primary_keyTrue),Column(name,String(50),nullableFalse),Column(job,String(50),default),)asyncdefget_customer(name,conn):resultawaitconn.execute(select(table).where(table.c.namename))returnresult.fetchone()asyncdefmain():# Create an asynchronous engine.enginecreate_async_engine(mysqlaiomysql://root:rootlocalhost:13306/sales)names[Lynn,Hans,Jack,Stephen]tasks[]# Check the data afer its inserted.asyncwithengine.connect()asconn:fornameinnames:tasks.append(get_customer(name,conn))resultsawaitasyncio.gather(*tasks)pprint(results)# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())当运行上述代码时你会看到以下结果打印出来[(1,Lynn,Backend Developer),(2,Hans,Data Engineer),(3,Jack,Frontend Developer),(4,Stephen,Manager)]在多个异步任务中使用 SQLAlchemy ORM另一方面在多个异步任务中使用 SQLAlchemy ORM 要复杂一些因为不能直接在并发任务中使用相同的AsyncSession实例。让我们直接尝试使用它并看看会发生什么importasynciofromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemy.ext.asyncioimportasync_sessionmakerfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_columnclassBase(DeclarativeBase):passclassCustomer(Base):__tablename__customersid:Mapped[int]mapped_column(primary_keyTrue)name:Mapped[str]mapped_column(String(50),nullableFalse,uniqueTrue)job:Mapped[str|None]mapped_column(String(50),default)asyncdefget_customer(name,session):resultawaitsession.execute(select(Customer).where(Customer.namename))customerresult.scalars().one()return{name:customer.name,job:customer.job}asyncdefmain():# Create an asynchronous engine.enginecreate_async_engine(mysqlaiomysql://root:rootlocalhost:13306/sales)# Create an asynchronous session.async_sessionasync_sessionmaker(engine,expire_on_commitFalse)names[Lynn,Hans,Jack,Stephen]tasks[]# Check the data afer its inserted.asyncwithasync_session()assession:fornameinnames:tasks.append(get_customer(name,session))resultsawaitasyncio.gather(*tasks)print(results)# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())当运行上述代码时你会看到以下错误sqlalchemy.exc.InvalidRequestError:This sessionisprovisioning a new connection;concurrent operations arenotpermitted这个错误意味着单个AsyncSession实例不能在多个并发任务例如使用asyncio.gather()之类的函数之间共享。如果你想深入了解这个话题可以查看这个参考。解决这个问题的简单可行方案是在每个任务中创建一个AsyncSession实例。我们将重构代码以全局创建engine和async_session_factory然后在每个任务中调用async_session_factory()来创建一个独立的会话importasynciofrompprintimportpprintfromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemy.ext.asyncioimportasync_sessionmakerfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.ormimportDeclarativeBasefromsqlalchemy.ormimportMappedfromsqlalchemy.ormimportmapped_column# Create an asynchronous engine.enginecreate_async_engine(mysqlaiomysql://root:rootlocalhost:13306/sales)# Create an asynchronous session.async_session_factoryasync_sessionmaker(engine,expire_on_commitFalse)classBase(DeclarativeBase):passclassCustomer(Base):__tablename__customersid:Mapped[int]mapped_column(primary_keyTrue)name:Mapped[str]mapped_column(String(50),nullableFalse,uniqueTrue)job:Mapped[str|None]mapped_column(String(50),default)asyncdefget_customer(name):# Create an async session instance.asyncwithasync_session_factory()assession:resultawaitsession.execute(select(Customer).where(Customer.namename))customerresult.scalars().one()return{name:customer.name,job:customer.job}asyncdefmain():names[Lynn,Hans,Jack,Stephen]tasks[]# Check the data afer its inserted.fornameinnames:tasks.append(get_customer(name))resultsawaitasyncio.gather(*tasks)pprint(results)# Close and clean-up pooled connections.awaitengine.dispose()asyncio.run(main())当代码运行时你会看到以下结果打印出来[{job:Backend Developer,name:Lynn},{job:Data Engineer,name:Hans},{job:Frontend Developer,name:Jack},{job:Manager,name:Stephen}]就像 HTTP 请求一样数据库请求也是 I/O 密集型任务因为它们大部分时间都在等待数据库服务器的响应。因此我们可以通过并发而不是顺序地执行数据库请求来显著提高应用程序的效率。另一方面异步地执行数据库请求也越来越重要因为异步编程在 Python 中变得越来越流行尤其是在使用 FastAPI 进行 Web 开发时这也突出了学习这个主题的必要性。在这篇文章中我们介绍了如何在不同的场景下使用 SQLAlchemy 进行异步操作即使用纯 SQL 查询、Core 和 ORM。你可以简单地调整代码以适应你的特定使用。我们特别介绍了如何在多个异步任务中并发使用 SQLAlchemy如果应用程序需要并发执行大量数据库请求这将提高应用程序的效率。相关文章学习基础知识并开始使用 SQLAlchemy ORM如何在 Python 中使用 SQLAlchemy 执行纯 SQL 查询