【实操实录】构建事实提取代理(FactExtractorAgent)的测试验证流程,正则表达式提取函数、Pydantic数据模型定义和异步流式处理机制,内存服务简化依赖,通过流式事件实时观察代理处理结
本文介绍了构建事实提取代理(FactExtractorAgent)的测试验证流程。通过初始化测试环境、配置代理模板、创建执行单元、构造测试用例等步骤,验证代理能否从文档中正确提取用户查询的关键信息。核心实现包括正则表达式提取函数、Pydantic数据模型定义和异步流式处理机制。测试采用内存服务简化依赖,通过流式事件实时观察代理提取结果,最终验证了文档到答案的提取功能。整体流程体现了"配置
一、概述
今天笔者将围绕基于agent和异步编程输出流搭建的问答系统中的事实提取模块,构建的验证事实提取代理(FactExtractorAgent
)展开讲述,核心逻辑是 “构建测试环境→构造测试用例→执行代理并验证结果”,用来验证能否正确从文档中提取用户查询的关键信息
二、核心目标:
🫧给定包含事实的文档和用户查询,验证代理能否从文档中提取出与查询匹配的答案。整体流程可分为 4 个阶段:
- 初始化测试环境:创建必要的服务(会话、存储等)和模板配置
- 构建测试用例:定义包含文档内容和用户查询的输入消息
- 执行代理处理:通过运行器(
Runner
)调用事实提取代理处理输入 - 输出结果验证:打印代理返回的流式事件,观察是否提取到正确信息
三、分步骤实现细节
1. 初始化测试环境(准备运行依赖)
🫧这一步的目的是搭建代理运行所需的基础组件,包括模板配置、会话管理、存储服务等,确保代理能正常工作
# 1. 创建模板构建器,用于生成代理所需的模板数据
template_builder = TemplateBuilder()
# 2. 处理请求参数(假设data是输入的原始请求数据,此处省略定义)
unified_params = await platform_gateway.process_request(data)
# 3. 用统一参数构建模板数据(包含代理运行的基础配置)
template_data = await template_builder.build_template(unified_params)
# 4. 创建内存会话服务(测试场景用,非持久化,避免依赖数据库)
memory_session_service = InMemorySessionService()
🐋通过模板构建器和参数处理,将原始请求转换为代理可识别的配置数据;使用内存服务简化测试环境(无需外部存储)
2. 配置事实提取代理模板(定义代理行为)
🫧基于模板类(FactExtractorAgentTemplate
)配置代理的核心参数,包括响应语言、存储方式、工具集等,确保代理按预期行为运行。
template = FactExtractorAgentTemplate(
response_language='chinese', # 强制代理返回中文结果
session_service=memory_session_service, # 关联会话服务(管理会话生命周期)
memory_service=InMemoryMemoryService(), # 内存存储上下文(多轮对话时用)
artifact_service=InMemoryArtifactService(), # 内存存储生成的内容(如提取结果)
template_data=template_data, # 传入之前构建的模板数据
toolsets=[] # 不使用额外工具(仅用基础提取功能)
)
- 创建
TemplateBuilder
实例(模板构建器,用于生成代理所需的模板数据) - 调用
platform_gateway.process_request(data)
处理请求参数(await
表示等待异步处理完成) - 用构建器生成模板数据(
template_data
) - 创建内存会话服务(
InMemorySessionService
),用于存储会话信息(非持久化,仅在内存中)
- 模板类封装了代理的默认配置(如之前定义的
system_prompt
和输出格式),这里通过参数指定运行时需求(如语言、存储),实现 “配置即代码”
3. 创建代理、会话和运行器(准备执行单元)
🫧将模板转换为可运行的代理实例,创建会话(标识一次测试会话),并通过运行器(Runner
)关联代理和会话,为执行测试做准备
# 1. 从模板异步构建代理实例(模板中已预设模型、指令等核心参数)
agent = await template._build_agent()
# 2. 生成唯一会话ID(用UUID确保唯一性,避免会话冲突)
session_id = uuid.uuid4().hex
# 3. 创建会话(关联应用名称、用户ID和会话ID,用于跟踪测试上下文)
session = memory_session_service.create_session_sync(
app_name=template.get_template_name(), # 应用名称=模板名(标识测试的代理类型)
user_id="1", # 测试用固定用户ID
session_id=session_id
)
# 4. 创建运行器(代理的执行入口,负责调度代理、管理会话状态)
runner = Runner(
agent=agent, # 要运行的代理
app_name=template.get_template_name(), # 关联应用名称
session_service=memory_session_service, # 关联会话服务(获取/更新会话状态)
)
🐋会话(session
)用于隔离不同测试用例的上下文;运行器(Runner
)是代理的 “启动器”,封装了调用代理的细节(如参数传递、状态管理)
4. 构造测试用例(定义输入数据)
🫧手动定义包含 “文档内容” 和 “用户查询” 的输入消息,作为代理的处理对象。测试用例需明确:文档中包含什么事实?用户想查询什么?
# 1. 格式化用户消息(用模板类中预设的提示词格式,确保代理能解析)
msg = FactExtractorAgentTemplate.next_prompt[0].format(
content="Document [1]: 沙发是大厅里常见的家具,柔软宽大的靠背和坐垫让人一坐下就不想起来。它通常摆在客厅中央,正对着电视,是家人聚会、朋友闲聊的核心位置。", # 文档内容(包含事实“沙发是大厅常见家具”)
user_query="大厅里常见的家具是什么?" # 用户查询(目标:从文档中提取对应事实)
)
# 2. 封装测试用例(角色为用户,内容为格式化后的消息)
test_case = {
"messages": {"role": "user", "content": msg},
}
🐋用模板类的next_prompt
格式化消息,确保输入格式符合代理的预期(代理依赖固定格式解析文档和查询);文档和查询需一一对应(文档包含查询的答案)
5. 执行代理并输出结果(验证提取功能)
🫧通过运行器异步执行代理,传入测试用例的输入消息,以流式方式接收结果并打印,观察代理是否正确提取事实
# 异步迭代代理返回的流式事件
async for event in runner.run_async(
user_id="1", # 关联用户ID
session_id=session.id, # 关联会话ID(确保上下文正确)
# 封装用户消息(符合代理要求的Content格式)
new_message=types.Content(
parts=[types.Part(text=test_case["messages"]["content"])], # 消息文本=测试用例内容
role="user" # 标识消息来自用户
),
# 配置流式输出模式(SSE:服务器推送事件,实时返回中间结果)
run_config=RunConfig(streaming_mode=StreamingMode.SSE)
):
print(event) # 打印每个事件(包含提取的中间结果或最终结果)
🐋以流式模式(SSE
)运行,确保能实时看到代理的处理过程;通过async for
迭代事件,逐个打印结果(正常情况下,最终应提取出 “沙发”)
6.启动测试(运行事件循环)
🫧通过asyncio.run()
启动异步事件循环,执行整个测试流程
asyncio.run(run()) # 启动异步函数run(),自动管理事件循环
四、代码具体实现详细解析
1. 提取函数 extract_function
def extract_function(text):
# 匹配所有 "..." 中的内容
pattern = r'"(.*?)"'
matches = re.findall(pattern, text)
extra_key = ['query', 'document_index', 'extracted_info', 'result', 'has_result']
matches = [s for s in matches if s not in extra_key]
return matches
- 功能:从文本中提取双引号内的字符串,但排除特定关键字
- 逻辑步骤:
- 使用正则表达式
r'"(.*?)"'
匹配所有双引号中的内容(.*?
是非贪婪模式,避免匹配多个引号间的内容) re.findall()
返回所有匹配结果的列表- 定义需要排除的关键字列表
extra_key
- 通过列表推导式过滤掉在
extra_key
中存在的元素
- 使用正则表达式
- 语法点:
- 正则表达式:
r''
表示原始字符串,避免转义字符冲突 - 列表推导式:
[s for s in matches if s not in extra_key]
高效过滤列表元素
2. 数据模型类(基于 Pydantic)
class ExtractedItem(BaseModel): query: str document_index: int extracted_info: str class ExtractedOutput(BaseModel): result: List[ExtractedItem] has_result: bool
- 功能:定义结构化数据模型,用于数据验证和格式化输出(依赖 Pydantic 库)
- 字段说明:
ExtractedItem
:单条提取结果,包含查询文本(query
)、文档索引(document_index
)、提取的信息(extracted_info
)ExtractedOutput
:整体提取结果,包含结果列表(result
)和是否有结果的标识(has_result
)
- 设计原理:
- 继承
BaseModel
后,自动获得数据验证功能(如类型检查、必填项校验) - 标准化输出格式,便于下游系统处理
- 继承
- 语法点:
- 类属性类型注解:
query: str
指定字段类型,Pydantic 会据此验证数据 List[ExtractedItem]
:泛型类型,表示元素为ExtractedItem
的列表(需从typing
导入List
)
- 类属性类型注解:
- 正则表达式:
3.1类属性定义
class FactExtractorAgent(BaseAgent):
fact_extractor: Agent # 事实提取子代理
model: Union[str, BaseLlm] = '' # 语言模型(名称或实例)
instruction: Union[str, InstructionProvider] = '' # 指令(字符串或提供者)
generate_content_config: Optional[types.GenerateContentConfig] = None # 内容生成配置
output_schema: Optional[type[BaseModel]] = None # 输出数据模型
include_contents: Literal['default', 'none'] = 'default' # 是否包含内容
Union[str, BaseLlm]
:表示变量可以是str
或BaseLlm
类型Optional[T]
:等价于Union[T, None]
,表示变量可以是T
类型或None
Literal['default', 'none']
:表示变量只能取这两个字符串中的一个(需从typing
导入Literal
)
3.2初始化__init__
def __init__(self, name: str, model: Union[str, BaseLlm], instruction: Union[str, InstructionProvider], generate_content_config: Optional[types.GenerateContentConfig], output_schema: Optional[type[BaseModel]], include_contents: Literal['default', 'none']):
# 创建子代理
fact_extractor = Agent(
name=name,
instruction=instruction,
model=model,
generate_content_config=generate_content_config,
output_schema=output_schema,
include_contents=include_contents
)
# 子代理列表
sub_agents_list = [fact_extractor]
# 调用父类初始化
super().__init__(
name=name,
sub_agents=sub_agents_list,
fact_extractor=fact_extractor,
output_schema=output_schema
)
4.异步运行方法 _run_async_impl
- 使用
streamingjson.Lexer
逐步解析流式返回的 JSON 文本(适合处理不完整的增量数据) - 异步迭代子代理的输出事件(
async for
) - 对中间结果(
event.partial=True
):累加文本并解析,当结果列表长度变化时,提取新增的信息并返回 - 对完整结果(
event.partial=False
):提取最终结果并返回 - 异步函数:
async def
定义,内部可使用await
和async for
- 异步生成器:
yield
关键字返回事件,允许调用方逐步获取结果(返回类型AsyncGenerator[Event, None]
) - 异常处理:
try-except
捕获 JSON 解析错误,避免程序崩溃 - 链式条件判断:
if event.content and event.content.parts and ...
确保访问的属性存在,防止空指针错误
致谢
🐋谢谢大家的阅读,很多不足支出,欢迎大家在评论区指出,如果我的内容对你有帮助,
可以点赞 , 收藏 ,大家的支持就是我坚持下去的动力!
请赐予我平静,去接受我无法改变的 :赐予我勇气,去改变我能改变的!
更多推荐
所有评论(0)