有时候,我们在使用SQLAlchemy操作某些表的时候,需要使用外键关系来实现一对多或者多对多的关系引用,以及对多表的联合查询,有序列的uuid值或者自增id值,字符串的分拆等常见处理操作。

1、在 SQLAlchemy 中定义具有嵌套
children
关系的表

要在 SQLAlchemy 中定义具有嵌套
children
关系的表,如表中包含
id

pid
字段,可以使用
relationship

ForeignKey
来建立父子关系。

首先,你需要定义一个模型类,其中包含
id

pid
字段。
id
是主键,
pid
是指向父记录的外键。然后,你使用
relationship
来建立父子关系。

from sqlalchemy importColumn, Integer, String, ForeignKeyfrom sqlalchemy.orm importrelationship, declarative_basefrom sqlalchemy.ext.asyncio importcreate_async_engine, AsyncSessionfrom sqlalchemy.orm importsessionmaker

Base
=declarative_base()classDictTypeInfo(Base):__tablename__ = 'dict_type_info'id= Column(Integer, primary_key=True, index=True)
name
= Column(String, index=True)
code
=Column(String)
remark
=Column(String)
seq
=Column(Integer)
pid
= Column(Integer, ForeignKey('dict_type_info.id')) #外键指向父节点的 id #定义 parent 关系 parent = relationship("DictTypeInfo", remote_side=[id], back_populates="children")#定义 children 关系 children = relationship("DictTypeInfo", back_populates="parent")

例子使用代码如下所示。

#创建异步引擎和会话
DATABASE_URL = "mysql+asyncmy://username:password@localhost/mydatabase"engine= create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal
= sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)

async
definit_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
#示例:如何插入数据并进行查询 async defexample_usage():
async with AsyncSessionLocal() as session:
async with session.begin():
#插入数据 parent_node = DictTypeInfo(name="Parent", code="P001", remark="Parent Node", seq=1)
child_node1
= DictTypeInfo(name="Child1", code="C001", remark="First Child", seq=1, parent=parent_node)
child_node2
= DictTypeInfo(name="Child2", code="C002", remark="Second Child", seq=2, parent=parent_node)
session.add(parent_node)
session.add(child_node1)
session.add(child_node2)
#查询数据 async with session.begin():
result
=await session.execute("SELECT * FROM dict_type_info WHERE pid IS NULL")
parent_nodes
=result.scalars().all()for node inparent_nodes:print(f"Parent Node: {node.name}, Children: {[child.name for child in node.children]}")

代码说明

  1. 定义模型类
    (
    DictTypeInfo
    ):


    • id
      : 主键。
    • pid
      : 外键,指向同一个表的
      id
      ,表示父节点。
    • parent
      : 父关系,通过
      remote_side
      设定本模型的外键指向自身的主键。
    • children
      : 子关系,
      back_populates
      用于双向关系的映射。
  2. 创建异步引擎和会话
    :


    • 使用
      create_async_engine

      AsyncSession
      创建数据库引擎和会话,以支持异步操作。
  3. 插入和查询数据
    :


    • 插入数据示例展示了如何创建父节点和子节点,并将子节点关联到父节点。
    • 查询数据示例展示了如何查询所有父节点以及它们的子节点。

注意事项

  • remote_side
    : 在
    relationship
    中,
    remote_side
    是指定哪些字段是远程的一方(即子节点关系的目标)。
  • 确保在模型中定义了正确的外键约束。在你提供的模型中,
    pid
    列需要指向同一表中的
    id
    列。确保
    ForeignKey
    设置正确。
  • 异步操作
    : 使用
    AsyncSession

    asyncio
    进行异步数据库操作。
  • 创建表
    : 在初始化数据库时,确保表结构是正确的。

要使用
selectinload
加载某个
pid
下的对象及其子列表,可以通过 SQLAlchemy 的
selectinload
来优化加载子关系。
selectinload
可以减少 SQL 查询的数量,特别是在加载具有层次结构的数据时。

async defget_tree(pid: int):
async with AsyncSessionLocal() as session:
#通过 selectinload 加载所有子节点 stmt = select(DictTypeInfo).filter(DictTypeInfo.pid ==pid).options(selectinload(DictTypeInfo.children))
result
=await session.execute(stmt)
nodes
=result.scalars().all()return nodes

这样,调用
get_tree
函数获取指定
pid
的节点及其子节点,代码如下。

async defexample_usage():
nodes
= await get_tree(pid=1)for node innodes:print(f"Node: {node.name}, Children: {[child.name for child in node.children]}")

selectinload
:
selectinload
可以减少 N+1 查询问题,它通过一条额外的查询来加载相关对象。这适合用于层次结构数据的加载。通过这种方式,你可以使用 SQLAlchemy 的
selectinload
来高效地加载具有父子关系的对象,并优化数据库查询性能。

同样,我们在 SQLAlchemy 中实现多对多关系也是类似的处理方式。

在 SQLAlchemy 中,实现多对多关系通常需要创建一个关联表(association table),该表将存储两个相关联表的外键,从而实现多对多关系。以下是一个实现多对多关系的详细步骤。

1) 定义多对多关系的关联表

首先,需要定义一个关联表,该表包含两个外键,分别指向两端的主表。这通常使用
Table
对象来实现。

from sqlalchemy importTable, Column, Integer, ForeignKeyfrom sqlalchemy.ext.declarative importdeclarative_base

Base
=declarative_base()

association_table
= Table('association', Base.metadata,
Column(
'left_id', Integer, ForeignKey('left_table.id')),
Column(
'right_id', Integer, ForeignKey('right_table.id'))
)

在这个例子中,
association_table
是一个包含两个外键的中间表:
left_id

right_id
分别指向
left_table

right_table
的主键。

2)定义两端的模型并添加关系

在两端的模型中,使用
relationship
来定义多对多关系,并指定
secondary
参数为关联表。

from sqlalchemy.orm importrelationshipclassLeftModel(Base):__tablename__ = 'left_table'id= Column(Integer, primary_key=True)
name
= Column(String(50))
rights
= relationship("RightModel", secondary=association_table, back_populates="lefts")classRightModel(Base):__tablename__ = 'right_table'id= Column(Integer, primary_key=True)
name
= Column(String(50))
lefts
= relationship("LeftModel", secondary=association_table, back_populates="rights")
  • rights

    LeftModel
    中定义的关系属性,它将连接到
    RightModel
  • lefts

    RightModel
    中定义的关系属性,它将连接到
    LeftModel
  • secondary=association_table
    告诉 SQLAlchemy 使用
    association_table
    作为连接表。
  • back_populates
    用于双向关系的对称引用。

3)创建数据库并插入数据

下面的代码展示了如何创建数据库、插入数据并查询多对多关系。

from sqlalchemy importcreate_enginefrom sqlalchemy.orm importsessionmaker#创建数据库引擎
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)

Session
= sessionmaker(bind=engine)
session
=Session()#创建模型实例 left1 = LeftModel(name="Left 1")
right1
= RightModel(name="Right 1")
right2
= RightModel(name="Right 2")#设置多对多关系 left1.rights =[right1, right2]#添加到会话并提交 session.add(left1)
session.commit()
#查询并打印关系 for right inleft1.rights:print(right.name) #输出: Right 1, Right 2 for left inright1.lefts:print(left.name) #输出: Left 1

你可以像操作普通列表一样来处理这些关系,例如添加、删除关联等:

#添加关系
left1.rights.append(RightModel(name="Right 3"))
session.commit()
#删除关系 left1.rights.remove(right2)
session.commit()

通过这些步骤,你可以在 SQLAlchemy 中实现和操作多对多关系。

2、在 SQLAlchemy 中联合多个表进行记录关联查询

例如,在我的框架中,字典大类和字典项目是不同的表进管理的,因此如果需要根据大类名称进行字典项目的查询,那么就需要联合两个表进行处理。

具体操作如下:创建一个查询,将
DictDataInfo
表与
DictTypeInfo
表联接(通过
DictType_ID

Id
列)

from sqlalchemy.future importselectfrom sqlalchemy.orm importaliasedfrom sqlalchemy.ext.asyncio importAsyncSessionfrom sqlalchemy.ext.asyncio importcreate_async_enginefrom sqlalchemy.orm importsessionmaker#假设你的数据库模型是 DictDataInfo 和 DictTypeInfo#需要提前定义好这两个模型类
DATABASE_URL= "mysql+asyncmy://username:password@localhost/mydatabase"engine= create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal
= sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)

async
defget_dict_data(dict_type_name: str):
async with AsyncSessionLocal() as session:
#创建别名 DictData =aliased(DictDataInfo)
DictType
=aliased(DictTypeInfo)#联合查询并根据条件过滤 stmt =(
select(DictData)
.join(DictType, DictData.DictType_ID
==DictType.id)
.filter(DictType.name
==dict_type_name)
)

result
=await session.execute(stmt)
dict_data
=result.scalars().all()returndict_data#示例用法 importasyncio

async
defexample_usage():
dict_type_name
= "some_type_name"dict_data=await get_dict_data(dict_type_name)for data indict_data:print(data)

代码说明

  1. aliased
    : 使用
    aliased
    创建表的别名,这样可以方便地在查询中引用这些表。

  2. join
    : 使用
    join
    进行表连接。这里
    DictDataInfo
    表的
    DictType_ID
    列与
    DictTypeInfo
    表的
    id
    列连接。

  3. filter
    : 使用
    filter
    来添加条件筛选,筛选出
    DictTypeInfo
    表中
    name
    列等于
    dict_type_name
    的记录。

  4. select
    : 使用
    select
    语句来选择
    DictDataInfo
    表中的记录,这对应于
    Select(d => d)

  5. 异步操作: 由于使用的是 SQLAlchemy 的异步模式,所有数据库操作都在
    async with

    await
    语句中进行,以确保异步执行。

如果我们需要将获得的数据进行对象转换,我们可以使用下面的处理代码实现。

#定义 CListItem 类
classCListItem:def __init__(self, name, value):
self.name
=name
self.value
=value#定义示例列表和转换操作 defconvert_list_items(list_items):
dict_list
=[]if list_items: #确保 list_items 不是 None for info inlist_items.Items:
dict_list.append(CListItem(info.Name, info.Value))
return dict_list

3、使用sqlalchemy插入数据的时候,如何判断为非自增类型的时候,id赋值一个有序列的uuid值

有时候,我们的数据表主键是用字符串的,这种适用于很广的用途,比较容易在插入的时候就确定好id键的值,从而可以处理相关的内容。

但是,有时候我们可以让后端进行确定一个有序的ID值,那么使用SQLAlchemy 我们应该如何实现?

首先,确保你已经导入了
uuid
库,这是用于生成 UUID 的 Python 标准库。

有序 UUID 通常是基于时间的 UUID。你可以使用
uuid.uuid1()
来生成基于时间的 UUID。

defgenerate_sequential_uuid():return uuid.uuid1()  #基于时间生成有序UUID

在定义 SQLAlchemy 模型时,可以将
id
字段设置为使用该函数生成的 UUID。通常在模型中通过
default
参数设置默认值。

from sqlalchemy importColumn, Stringfrom sqlalchemy.ext.declarative importdeclarative_base

Base
=declarative_base()classMyModel(Base):__tablename__ = 'my_table'id= Column(String(36), primary_key=True, default=generate_sequential_uuid, nullable=False)#其他字段...

在插入新数据时,如果
id
字段为空,它将自动使用
generate_sequential_uuid
函数生成一个基于时间的 UUID。

这样就可以确保在插入数据时,非自增类型的
id
字段会被赋值为一个有序列的 UUID 值。

对于自增的整型
id
,SQLAlchemy 提供了自动处理机制。你只需要在模型中将
id
字段定义为
Integer
类型,并设置
primary_key=True
,SQLAlchemy 就会自动为该字段设置自增属性。

from sqlalchemy importColumn, Integer, Stringfrom sqlalchemy.ext.declarative importdeclarative_base

Base
=declarative_base()classMyModel(Base):__tablename__ = 'my_table'id= Column(Integer, primary_key=True, autoincrement=True)
name
= Column(String(50))#其他字段..

默认情况下,SQLAlchemy 会使用数据库的原生自增机制(如 MySQL 的
AUTO_INCREMENT
或 PostgreSQL 的
SERIAL
)。如果你需要使用自定义的自增策略,可以通过设置
Sequence
来实现(适用于支持
Sequence
的数据库,如 PostgreSQL)。

from sqlalchemy importSequenceclassMyModel(Base):__tablename__ = 'my_table'id= Column(Integer, Sequence('my_sequence'), primary_key=True)
name
= Column(String(50))

在上述代码中,
Sequence('my_sequence')
定义了一个序列,SQLAlchemy 将使用该序列生成自增的
id
值。

通过这些步骤,你可以轻松处理整型自增
id
字段,SQLAlchemy 会自动为每个新记录分配唯一的自增
id

4、在插入记录的时候,对字符串的数据处理

在批量插入数据字典的时候,我希望根据用户输入内容(多行数据)进行转化,把每行的数据分拆进行判断,如果符合条件的进行处理插入。

在 Python 中,可以使用字符串的
splitlines()
方法来实现相同的功能。

#假设 Data 和 input.Seq 是从输入中获取的
Data = "example\nline1\nline2\n"  #示例数据
input_seq = "123"  #示例序列字符串

#将 Data 按行拆分,并移除空行
array_items = [line for line in Data.splitlines() ifline]#初始化变量
int_seq = -1seq_length= 3str_seq=input_seq#尝试将 str_seq 转换为整数
ifstr_seq.isdigit():
int_seq
=int(str_seq)
seq_length
=len(str_seq)#打印结果 print(f"Array Items: {array_items}")print(f"int_seq: {int_seq}")print(f"seq_length: {seq_length}")
  • Python 的
    splitlines()
    方法将字符串按行分割,同时自动处理各种换行符(包括
    \n

    \r\n
    )。
  • 列表推导式
    [line for line in Data.splitlines() if line]
    移除了空行,类似于 C# 中的
    StringSplitOptions.RemoveEmptyEntries
  • 使用
    str_seq.isdigit()
    检查
    str_seq
    是否全部由数字组成,这类似于 C# 的
    int.TryParse

在 Python 中,可以使用
re.split()
函数来按照正则表达式分割字符串。以下是对应的 Python 代码:

importre#假设 info 是一个包含 Name 和 Value 属性的对象
classInfo:def __init__(self):
self.Name
= ""self.Value= ""info=Info()#dictData 是输入的字符串 dict_data = "example_name example_value" #使用正则表达式按照空白字符分割字符串 array = re.split(r'\s+', dict_data)#赋值给 info 对象的属性 info.Name =array[0]
info.Value
= array[1] if len(array) > 1 elsearray[0]#打印结果 print(f"Name: {info.Name}")print(f"Value: {info.Value}")

使用
re.split()
函数根据空白字符(包括空格、制表符等)分割字符串
dict_data

r'\s+'
是一个正则表达式,表示一个或多个空白字符。

如果你需要根据多个分隔符来分割字符串,同样可以使用正则表达式(
re
模块)的
re.split()
方法。

str_item = "1,2,3;4;5/6/7、8、9;10"

importre

result
= re.split(r"[;,|/,;、]+", str_item.strip())print(result)

结果输出:['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']

解释:

  • re.split(r'[;,|/,;、]', text) 中的 r'[;,|/,;、]' 是一个正则表达式模式:
    [] 表示字符类,表示匹配字符类中的任意一个字符。
    ;,|/,;、 分别表示分号、逗号,竖线,中文逗号,中文分号,和空格,这些字符都将作为分隔符。

使用正则表达式可以灵活处理多个分隔符,适用于更复杂的分割需求。

标签: none

添加新评论