Python开发中,SQLAlchemy 的同步操作和异步操作封装,以及常规CRUD的处理。
SQLAlchemy
是一个非常不错的ORM工具,通过它我们可以很好的实现多种数据库的统一模型接入,而且它提供了非常多的特性,通过结合不同的数据库驱动,我们可以实现同步或者异步的处理封装。
1、SQLAlchemy介绍
是一个功能强大且灵活的 Python SQL 工具包和对象关系映射(ORM)库。它被广泛用于在 Python 项目中处理关系型数据库的场景,既提供了高级的 ORM 功能,又保留了对底层 SQL 语句的强大控制力。
SQLAlchemy
允许开发者通过 Python 代码与数据库进行交互,而无需直接编写 SQL 语句,同时也支持直接使用原生 SQL 进行复杂查询。下面是
SQLAlchemy
和我们常规数据库对象的对应关系说明。
1)
数据库表 (Database Table)
- SQLAlchemy
: 使用
Table
对象或
Declarative Base
中的类来表示。 - 对应关系
: 数据库中的每一个表对应于SQLAlchemy中的一个类,该类继承自
declarative_base()
。
from sqlalchemy importColumn, Integer, String, create_enginefrom sqlalchemy.ext.declarative importdeclarative_base
Base=declarative_base()classUser(Base):__tablename__ = 'users' #数据库表名 id = Column(Integer, primary_key=True)
name=Column(String)
email= Column(String)
2)
数据库列 (Database Column)
- SQLAlchemy
: 使用
Column
对象来表示。 - 对应关系
: 每个数据库表中的列在SQLAlchemy中表示为
Column
对象,并作为类的属性定义。
id = Column(Integer, primary_key=True)
name= Column(String(50))
3)
数据库行 (Database Row)
- SQLAlchemy
: 每个数据库表的一个实例(对象)代表数据库表中的一行。 - 对应关系
: 在SQLAlchemy中,通过实例化模型类来表示数据库表中的一行。
new_user = User(id=1, name='John Doe', email='john@example.com')
4)
主键 (Primary Key)
- SQLAlchemy
: 使用
primary_key=True
参数定义主键。 - 对应关系
: 在数据库表中定义主键列,这列在SQLAlchemy中也需要明确标注。
id = Column(Integer, primary_key=True)
5)
外键 (Foreign Key)
- SQLAlchemy
: 使用
ForeignKey
对象来表示。 - 对应关系
: 在SQLAlchemy中使用
ForeignKey
指定关系,指向另一个表的主键列。
from sqlalchemy importForeignKeyfrom sqlalchemy.orm importrelationshipclassAddress(Base):__tablename__ = 'addresses'id= Column(Integer, primary_key=True)
user_id= Column(Integer, ForeignKey('users.id'))
user= relationship('User')
6)
关系 (Relationships)
- SQLAlchemy
: 使用
relationship
对象来表示。 - 对应关系
: 数据库中表与表之间的关系在SQLAlchemy中通过
relationship
来定义。
addresses = relationship("Address", back_populates="user")
7)
会话 (Session)
- SQLAlchemy
: 使用
Session
对象进行事务性操作(如查询、插入、更新、删除)。 - 对应关系
:
Session
对象类似于数据库连接对象,用于与数据库进行交互。
from sqlalchemy.orm importsessionmaker
Session= sessionmaker(bind=engine)
session=Session()
session.add(new_user)
session.commit()
通过以上对应关系,SQLAlchemy允许开发者以面向对象的方式与数据库交互,提供了一个Pythonic的接口来操作数据库。
2、SQLAlchemy 的同步操作
SQLAlchemy 提供了同步和异步两种操作方式,分别适用于不同的应用场景。以下是如何封装 SQLAlchemy 的同步和异步操作的方法说明:
在同步操作中,SQLAlchemy 使用传统的阻塞方式进行数据库操作。首先,定义一个基础的
Session
和
Engine
对象:
from sqlalchemy importcreate_enginefrom sqlalchemy.orm importdeclarative_base, sessionmakerfrom typing importGeneratorfrom core.config importsettings#常规同步处理 engine =create_engine(settings.DB_URI)
SessionLocal= sessionmaker(autocommit=False, autoflush=False, bind=engine)def get_db() ->Generator:"""创建一个 SQLAlchemy 数据库会话-同步处理.""" try:
db=SessionLocal()yielddbfinally:
db.close()
前面说了,使用SQLAlchemy可以实现不同数据库的统一模型的处理,我们可以对应创建不同数据库的连接(engine),如下是常规几种关系型数据库的连接处理。
#mysql 数据库引擎 engine =create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/WinFramework",
pool_recycle=3600,#echo=True, )#Sqlite 数据库引擎 engine = create_engine("sqlite:///testdir//test.db")#PostgreSQL 数据库引擎 engine =create_engine("postgresql+psycopg2://postgres:123456@localhost:5432/winframework",#echo=True, )#SQLServer 数据库引擎 engine =create_engine("mssql+pymssql://sa:123456@localhost/WinFramework?tds_version=7.0",#echo=True, )
我们可以根据数据库的CRUD操作方式,封装一些操作,如下所示。
classCRUDOperations:def __init__(self, model):
self.model=modeldefcreate(self, db, obj_in):
db_obj= self.model(**obj_in.dict())
db.add(db_obj)
db.commit()
db.refresh(db_obj)returndb_objdefget(self, db, id):return db.query(self.model).filter(self.model.id ==id).first()defupdate(self, db, db_obj, obj_in):
obj_data= obj_in.dict(exclude_unset=True)for field inobj_data:
setattr(db_obj, field, obj_data[field])
db.commit()
db.refresh(db_obj)returndb_objdefremove(self, db, id):
obj=db.query(self.model).get(id)
db.delete(obj)
db.commit()return obj
使用时,构建数据访问类进行操作,如下测试代码所示。
crud_user =CRUDOperations(User)#Create with get_db() as db:
user=crud_user.create(db, user_data)#Read with get_db() as db:
user=crud_user.get(db, user_id)#Update with get_db() as db:
user=crud_user.update(db, user, user_data)#Delete with get_db() as db:
crud_user.remove(db, user_id)
3、SQLAlchemy 的异步操作封装
对于异步操作,SQLAlchemy 使用
AsyncSession
来管理异步事务。
首先,定义一个异步的
Session
和
Engine
对象:
from sqlalchemy importcreate_engine, URLfrom sqlalchemy.ext.asyncio importAsyncSession, async_sessionmaker, create_async_enginefrom typing importAsyncGeneratordef create_engine_and_session(url: str |URL):try:#数据库引擎 engine = create_async_engine(url, pool_pre_ping=True)exceptException as e:print("❌ 数据库链接失败 {}", e)
sys.exit()else:
db_session=async_sessionmaker(
bind=engine, autoflush=False, expire_on_commit=False
)returnengine, db_session#异步处理 async_engine, async_session =create_engine_and_session(settings.DB_URI_ASYNC)
asyncdef get_db() ->AsyncGenerator[AsyncSession, None]:"""创建一个 SQLAlchemy 数据库会话-异步处理."""async with async_session() as session:yield session
和同步的处理类似,不过是换了一个对象来实现,并且函数使用了async await的组合来实现异步操作。
为了实现我的SQLSugar开发框架类似的封装模式,我们参考SQLSugar开发框架中基类CRUD的定义方式来实现多种接口的封装处理。
参照上面的实现方式,我们来看看Python中使用泛型的处理封装类的代码。
ModelType = TypeVar("ModelType", bound=Base)
PrimaryKeyType= TypeVar("PrimaryKeyType", int, str, float) #限定主键的类型 PageDtoType = TypeVar("PageDtoType", bound=BaseModel)
DtoType= TypeVar("DtoType", bound=BaseModel)classBaseCrud(Generic[ModelType, PrimaryKeyType, PageDtoType, DtoType]):"""基础CRUD操作类""" def __init__(self, model: Type[ModelType]):"""数据库访问操作的基类对象(CRUD).
**Parameters**
* `model`: A SQLAlchemy model class"""self.model= model
这样,我们就可以通过泛型定义不同的类型,以及相关的处理类的信息。
该基类函数中,异步定义get_all的返回所有的数据接口如下所示。
async defget_all(
self, sorting: Optional[str], db: AsyncSession
)-> List[ModelType] |None:"""根据ID字符串列表获取对象列表
:param sorting: 格式:name asc 或 name asc,age desc"""query=select(self.model)ifsorting:
query=self.apply_sorting(query, sorting)
result=await db.execute(query)
items=result.scalars().all()return items
而对应获得单个对象的操作函数,如下所示。
async def get(self, id: PrimaryKeyType, db: AsyncSession) ->Optional[ModelType]:"""根据主键获取一个对象"""query= select(self.model).filter(self.model.id ==id)
result=await db.execute(query)
item=result.scalars().first()return item
而创建对象的操作函数,如下所示。
async def create(self, obj_in: DtoType, db: AsyncSession, **kwargs) ->bool:"""创建对象,使用 kwargs 时可以扩展创建对象时的字段。
:param obj_in: 对象输入数据
:param kwargs: 扩展字段,如格式: is_deleted=0, is_active=1""" try:ifkwargs:
instance= self.model(**obj_in.model_dump(), **kwargs)else:
instance= self.model(**obj_in.model_dump()) #type: ignore db.add(instance)
await db.commit()returnTrueexceptSQLAlchemyError as e:print(e)
await db.rollback()return False
这个异步函数
create
旨在通过 SQLAlchemy 在数据库中创建一个对象,同时允许通过
kwargs
参数动态扩展创建对象时的字段。
async def
: 表明这是一个异步函数,可以与
await
一起使用。self
: 这是一个类的方法,因此
self
引用类的实例。obj_in: DtoType
:
obj_in
是一个数据传输对象(DTO),它包含了需要插入到数据库中的数据。
DtoType
是一个泛型类型,用于表示 DTO 对象。db: AsyncSession
:
db
是一个 SQLAlchemy 的异步会话(
AsyncSession
),用于与数据库进行交互。**kwargs
: 接受任意数量的关键字参数,允许在对象创建时动态传入额外的字段。
obj_in.model_dump()
: 假设
obj_in
是一个 Pydantic 模型或类似结构,它可以通过
model_dump()
方法转换为字典格式,用于创建 SQLAlchemy 模型实例。self.model(**obj_in.model_dump(), **kwargs)
: 使用
obj_in
中的字段以及通过
kwargs
传入的扩展字段来实例化 SQLAlchemy 模型对象。如果
kwargs
非空,它们会被解包并作为额外的字段传入模型构造函数。
db.add(instance)
: 将新创建的对象添加到当前的数据库会话中。await db.commit()
: 提交事务,将新对象保存到数据库。
SQLAlchemyError
: 捕获所有 SQLAlchemy 相关的错误。await db.rollback()
: 在发生异常时,回滚事务,以防止不完整或错误的数据被提交。
通过上面的封装,我们可以测试调用的处理例子
from crud.customer importcustomer as customer_crudfrom models.customer importCustomerfrom pydantic importBaseModelfrom schemas.customer importCustomerDto, CustomerPageDto
asyncdeftest_list_customer():
async with get_db() as db:print("get_list")
totalCount, items=await customer_crud.get_list(
CustomerPageDto(skipCount=0, maxResultCount=10, name="test"),
db,
)print(totalCount, items)for customer incustomers:print(customer.name, customer.age)print("get_by_name")
name= "test"customer=await customer_crud.get_by_name(
name,
db,
)ifcustomer:print(customer.name, customer.age)else:print(f"{name} not found")print("soft delete")
result= await customer_crud.delete_byid(customer.id, db, is_deleted=1)print("操作结果:", result)print("soft delete_byids")
result=await customer_crud.delete_byids(
["11122", "2C5F8672-2AA7-4B14-85AD-DF56F5BF7F1F"], db, is_deleted=1)print(f"Soft delete successful: {result}")print("update_by_column")
result=await customer_crud.update_by_column("id", customer.id, {"age": 30}, db
)print("操作结果:", result)
await db.close()
同步和异步处理的差异:
- 同步操作
适用于传统的阻塞式应用场景,比如命令行工具或简单的脚本。 - 异步操作
更适合异步框架如
FastAPI
,可以提高高并发场景下的性能。
通过封装数据库操作,可以让代码更具复用性和可维护性,支持不同类型的操作场景。