SQLAlchemy(CRUD)操作
这段代码是聊天系统的数据访问层核心实现,通过封装数据库操作函数,提供了消息的“增、查、改”功能,并自动维护消息与会话、用户的关联关系。装饰器简化了会话管理,异步特性确保了高并发场景下的性能,整体设计符合现代 Web 应用的数据库操作最佳实践。在这段代码(以及整个聊天系统的上下文)中,“会话”(对应模型和相关逻辑)指的是“用户与AI的一次连续交互过程”,可以理解为“聊天对话框”或“聊天线程”,是用来
这段代码是一个基于 SQLAlchemy 异步模式的数据库操作工具类,主要用于管理聊天系统中的消息和会话数据。下面从注解、函数功能、核心方法等方面进行详细解释:
一、基础依赖与注解说明
# 导入异步会话管理工具,用于处理数据库连接与会话生命周期
from server.db.session import with_async_session, async_session_scope
# 导入类型提示工具,用于规范函数参数和返回值类型
from typing import Dict, List
# 用于生成唯一ID(UUID)
import uuid
# 导入数据模型,对应数据库表结构
from server.db.models.message_model import MessageModel
from server.db.models.conversation_model import ConversationModel
from server.db.models.user_model import UserModel
# 导入SQLAlchemy异步查询工具
from sqlalchemy.future import select
# 导入FastAPI异常处理类,用于返回HTTP错误响应
from fastapi import HTTPException
关键注解说明:
@with_async_session
:这是一个自定义装饰器(核心作用是自动管理数据库会话),被装饰的函数无需手动创建/关闭会话,装饰器会自动注入session
参数(数据库会话对象),并在函数执行后自动提交或回滚事务,释放资源。- 类型提示(如
Dict, List
):用于明确函数参数和返回值的类型,提高代码可读性和IDE类型检查能力(例如metadata: Dict
表示metadata
必须是字典类型)。
二、核心函数详解
1. add_message_to_db
:新增聊天记录(核心函数)
@with_async_session # 自动注入数据库会话
async def add_message_to_db(
session, # 数据库会话对象(由装饰器自动注入)
user_id: str, # 用户ID(消息所属用户)
conversation_id: str, # 会话ID(消息所属会话)
conversation_name: str, # 会话名称(新建会话时使用)
prompt_name: str, # 聊天类型(如"普通对话"、"知识库对话")
query: str, # 用户的提问内容
response="", # 模型的回答内容(可空,例如刚发送提问时还没回答)
metadata: Dict={}, # 附加元数据(如知识库ID、搜索关键词等)
message_id=None, # 消息ID(可选,不提供则自动生成UUID)
):
"""新增聊天记录"""
# 1. 检查会话是否存在(通过会话ID查询)
conversation = await session.get(ConversationModel, conversation_id)
# 2. 若会话不存在,则创建新会话(确保消息有归属的会话)
if not conversation:
conversation = ConversationModel(
id=conversation_id, # 使用传入的会话ID
user_id=user_id, # 关联到当前用户
name=conversation_name, # 会话名称
chat_type=prompt_name # 会话类型(与消息类型一致)
)
session.add(conversation) # 将新会话添加到会话中
# 3. 提交会话(确保会话被保存到数据库,避免后续消息关联失败)
await session.commit()
# 4. 生成消息ID(若未指定则自动生成UUID)
if not message_id:
message_id = str(uuid.uuid4()) # 生成36位唯一UUID
# 5. 创建消息记录对象
m = MessageModel(
id=message_id, # 消息ID
conversation_id=conversation_id, # 关联的会话ID
chat_type=prompt_name, # 聊天类型
query=query, # 用户提问
response=response, # 模型回答
metadata=metadata # 附加元数据
)
# 6. 将消息添加到会话并提交
session.add(m) # 添加消息到会话(无需await)
await session.commit() # 异步提交事务(保存消息到数据库)
return m.id # 返回新增消息的ID
核心功能:
- 同时处理“会话”和“消息”的创建:如果消息所属的会话不存在,会自动创建新会话。
- 确保数据关联性:消息必须属于某个会话,会话必须属于某个用户。
- 灵活性:支持手动指定消息ID(用于特殊场景),默认自动生成UUID确保唯一性。
2. filter_message
:按会话ID查询消息
@with_async_session
async def filter_message(
session,
conversation_id: str, # 会话ID(筛选条件)
limit: int = 10 # 最多返回的消息数量(默认10条)
):
"""按会话ID异步筛选消息,限制返回数量"""
# 执行异步查询:筛选指定会话的消息,排除无回复的记录,按时间倒序
result = await session.execute(
select(MessageModel) # 查询MessageModel表
.filter_by(conversation_id=conversation_id) # 条件1:属于指定会话
.filter(MessageModel.response != '') # 条件2:排除无回复的消息(response不为空)
.order_by(MessageModel.create_time.desc()) # 按创建时间倒序(最新的消息在前)
.limit(limit) # 限制返回数量
)
# 返回查询结果(转换为MessageModel对象列表)
return result.scalars().all()
核心功能:
- 按会话ID批量查询消息,常用于“加载历史聊天记录”场景(例如用户打开某个会话时,展示该会话的历史消息)。
- 过滤逻辑:只返回有回复的消息(避免展示未处理的提问),并按时间倒序排列(符合聊天记录的展示习惯)。
3. get_message_by_id
:按ID查询单条消息
@with_async_session
async def get_message_by_id(
session,
message_id # 消息ID(唯一标识)
) -> MessageModel: # 类型提示:返回MessageModel对象
"""通过ID异步查询单条聊天记录"""
# 执行查询:按消息ID筛选
result = await session.execute(
select(MessageModel).filter_by(id=message_id)
)
# 返回第一条匹配结果(ID唯一,因此最多一条)
return result.scalars().first()
核心功能:
- 精确查询单条消息,用于“查看消息详情”“更新消息内容”等场景(例如后续的
update_message
函数依赖此方法查询消息)。
4. update_message
:更新消息内容
@with_async_session
async def update_message(
session,
message_id, # 要更新的消息ID
response: str = None, # 可选:更新后的回复内容
metadata: Dict = None # 可选:更新后的元数据
):
# 1. 先查询消息是否存在
m = await get_message_by_id(message_id) # 调用上面的查询函数
# 2. 若消息存在,更新字段
if m is not None:
if response is not None:
m.response = response # 更新回复内容(例如模型生成回答后更新)
if isinstance(metadata, dict):
m.meta_data = metadata # 更新元数据(例如补充知识库引用信息)
session.add(m) # 将更新后的对象添加到会话
await session.commit() # 提交事务(保存更新)
return m.id # 返回更新后的消息ID
else:
# 3. 若消息不存在,抛出404异常(符合FastAPI的错误处理规范)
raise HTTPException(status_code=404, detail="Message not found")
核心功能:
- 支持更新消息的回复内容和元数据,主要用于“模型生成回答后补充回复内容”场景(例如用户发送提问后,先保存空回复的消息,模型生成回答后再调用此函数更新)。
- 异常处理:消息不存在时抛出404错误,便于前端展示“消息不存在”的提示。
三、测试函数 main
与执行入口
# 主测试函数:验证上述数据库操作函数的正确性
async def main():
# 测试1:查询指定会话的消息(最多4条)
test_conversation_id = 'edcrfv33'
messages = await filter_message(test_conversation_id, limit=4)
messages = list(reversed(messages)) # 反转列表(按时间正序排列)
# 测试2:查询单条消息
message = await get_message_by_id(message_id="041c8958055a4a62827cb39a789e3603")
print(messages) # 打印查询结果
# 测试3:更新消息的回复内容
updated_id = await update_message(
message_id="041c8958055a4a62827cb39a789e3603",
response="这是最新更新的回复"
)
print(updated_id) # 打印更新后的消息ID
# 脚本执行入口:当直接运行该脚本时,执行测试函数
if __name__ == '__main__':
import asyncio
asyncio.run(main()) # 运行异步测试函数
作用:
- 用于验证上述数据库操作函数的正确性,确保新增、查询、更新功能正常工作。
- 通过
asyncio.run(main())
启动异步事件循环,执行异步测试逻辑(因为所有数据库操作函数都是异步的)。
四、整体设计逻辑与应用场景
-
数据关联设计:
消息(MessageModel
)→ 会话(ConversationModel
)→ 用户(UserModel
)形成三级关联,确保数据归属清晰(用户拥有多个会话,会话包含多条消息)。 -
异步操作优势:
所有函数均为异步(async def
),配合 SQLAlchemy 的异步查询(await session.execute(...)
),适合 FastAPI 等异步 Web 框架,能高效处理并发请求(例如同时有多个用户发送消息)。 -
典型应用流程:
- 用户发送提问 → 调用
add_message_to_db
保存提问(response
为空)。 - 模型生成回答 → 调用
update_message
更新该消息的response
字段。 - 用户查看历史记录 → 调用
filter_message
按会话ID加载消息。
- 用户发送提问 → 调用
总结
这段代码是聊天系统的数据访问层核心实现,通过封装数据库操作函数,提供了消息的“增、查、改”功能,并自动维护消息与会话、用户的关联关系。装饰器 @with_async_session
简化了会话管理,异步特性确保了高并发场景下的性能,整体设计符合现代 Web 应用的数据库操作最佳实践。
在这段代码(以及整个聊天系统的上下文)中,“会话”(对应 ConversationModel
模型和 conversation
相关逻辑)指的是“用户与AI的一次连续交互过程”,可以理解为“聊天对话框”或“聊天线程”,是用来组织和管理多条相关消息的“容器”。
为什么需要“会话”?
想象你用聊天软件(如微信、ChatGPT)的场景:你可能会和AI聊“如何做蛋糕”“推荐电影”“调试代码”等不同话题——每个话题就是一个独立的“会话”,每个会话下包含多条你和AI的交互消息(你的提问、AI的回答)。
会话的核心作用是 “分组管理消息”,避免不同话题的消息混乱在一起。比如:
- “蛋糕制作”会话下,消息是“材料有哪些?”“步骤是什么?”“烤焦了怎么办?”;
- “电影推荐”会话下,消息是“推荐科幻片”“有没有豆瓣9分以上的?”“《星际穿越》好看吗?”;
- 通过会话ID(
conversation_id
),可以快速筛选出某一话题下的所有消息。
代码中“会话”的具体体现
结合你之前看到的 ConversationModel
和 MessageModel
逻辑,会话的核心属性和关联关系如下:
1. 会话的核心信息(ConversationModel
模型)
每个会话都有自己的“身份标识”和“基础属性”,比如:
id
:会话唯一ID(如UUID,对应message
表的conversation_id
外键);user_id
:所属用户ID(确保“用户A的会话”不会被“用户B”看到,实现数据隔离);name
:会话名称(如“蛋糕制作”“电影推荐”,方便用户在列表中识别);chat_type
:会话类型(如“普通对话”“知识库对话”“Agent对话”,对应不同功能场景)。
2. 会话与消息的关联(关键逻辑)
代码中通过 conversation_id
实现“会话-消息”的绑定:
- 每条消息(
MessageModel
)都有conversation_id
字段,指向它所属的会话; - 一个会话可以包含多条消息(“一对多”关系),比如
conversation.messages
可以获取该会话下的所有消息; - 新增消息时(
add_message_to_db
函数),会先检查会话是否存在:- 若会话不存在(比如用户新建了一个话题),则自动创建新会话;
- 若会话已存在(比如用户继续聊之前的话题),则直接将消息添加到该会话下。
3. 会话的实际用途(代码逻辑)
在你提供的工具函数中,会话的作用随处可见:
add_message_to_db
:新增消息前先确认会话,确保消息“有归属”;filter_message
:通过conversation_id
筛选某会话下的所有消息(比如用户点击“蛋糕制作”会话,展示该话题的历史消息);get_message_by_id
:虽然是查单条消息,但消息的conversation_id
仍能关联到它所属的会话(比如显示消息时,标注它属于哪个话题)。
举个实际场景例子
假设用户操作流程如下,你能更直观理解会话:
- 用户打开聊天页面,点击“新建会话”,输入会话名称“调试Python代码”;
- 系统创建一个新会话:
id="conv_123"
,user_id="user_456"
,name="调试Python代码"
; - 用户发送第一条消息:“为什么我的代码报SyntaxError?” → 消息的
conversation_id="conv_123"
; - AI回复:“可能是括号不匹配,检查一下if/for语句” → 这条回复消息的
conversation_id
也是"conv_123"
; - 用户后续继续提问“括号检查过了,还是报错” → 消息仍绑定到
"conv_123"
会话; - 当用户切换到“电影推荐”会话时,系统通过
conversation_id="conv_789"
筛选出该会话下的所有消息,展示给用户。
总结
简单来说,代码中的“会话”就是 “用户与AI的一个独立聊天话题容器”,它通过唯一ID绑定多条相关消息,实现消息的分组管理、用户数据隔离、历史记录追溯等核心功能,是聊天系统中不可或缺的“消息组织单元”。
更多推荐
所有评论(0)