一、概述

今天笔者将围绕基于agent和异步编程输出流搭建的问答系统中的事实提取模块,构建的验证事实提取代理(FactExtractorAgent展开讲述,核心逻辑是 “构建测试环境→构造测试用例→执行代理并验证结果”,用来验证能否正确从文档中提取用户查询的关键信息

二、核心目标:

🫧给定包含事实的文档和用户查询,验证代理能否从文档中提取出与查询匹配的答案。整体流程可分为 4 个阶段:

  1. 初始化测试环境:创建必要的服务(会话、存储等)和模板配置
  2. 构建测试用例:定义包含文档内容和用户查询的输入消息
  3. 执行代理处理:通过运行器(Runner)调用事实提取代理处理输入
  4. 输出结果验证:打印代理返回的流式事件,观察是否提取到正确信息

三、分步骤实现细节

1. 初始化测试环境(准备运行依赖)

🫧这一步的目的是搭建代理运行所需的基础组件,包括模板配置、会话管理、存储服务等,确保代理能正常工作

# 1. 创建模板构建器,用于生成代理所需的模板数据
template_builder = TemplateBuilder()
# 2. 处理请求参数(假设data是输入的原始请求数据,此处省略定义)
unified_params = await platform_gateway.process_request(data)
# 3. 用统一参数构建模板数据(包含代理运行的基础配置)
template_data = await template_builder.build_template(unified_params)
# 4. 创建内存会话服务(测试场景用,非持久化,避免依赖数据库)
memory_session_service = InMemorySessionService()

🐋通过模板构建器和参数处理,将原始请求转换为代理可识别的配置数据;使用内存服务简化测试环境(无需外部存储)

2. 配置事实提取代理模板(定义代理行为)

🫧基于模板类(FactExtractorAgentTemplate)配置代理的核心参数,包括响应语言、存储方式、工具集等,确保代理按预期行为运行。

template = FactExtractorAgentTemplate(
    response_language='chinese',  # 强制代理返回中文结果
    session_service=memory_session_service,  # 关联会话服务(管理会话生命周期)
    memory_service=InMemoryMemoryService(),  # 内存存储上下文(多轮对话时用)
    artifact_service=InMemoryArtifactService(),  # 内存存储生成的内容(如提取结果)
    template_data=template_data,  # 传入之前构建的模板数据
    toolsets=[]  # 不使用额外工具(仅用基础提取功能)
)
  1. 创建TemplateBuilder实例(模板构建器,用于生成代理所需的模板数据)
  2. 调用platform_gateway.process_request(data)处理请求参数(await表示等待异步处理完成)
  3. 用构建器生成模板数据(template_data
  4. 创建内存会话服务(InMemorySessionService),用于存储会话信息(非持久化,仅在内存中)
  • 模板类封装了代理的默认配置(如之前定义的system_prompt和输出格式),这里通过参数指定运行时需求(如语言、存储),实现 “配置即代码”

3. 创建代理、会话和运行器(准备执行单元)

🫧将模板转换为可运行的代理实例,创建会话(标识一次测试会话),并通过运行器(Runner)关联代理和会话,为执行测试做准备

# 1. 从模板异步构建代理实例(模板中已预设模型、指令等核心参数)
agent = await template._build_agent()

# 2. 生成唯一会话ID(用UUID确保唯一性,避免会话冲突)
session_id = uuid.uuid4().hex
# 3. 创建会话(关联应用名称、用户ID和会话ID,用于跟踪测试上下文)
session = memory_session_service.create_session_sync(
    app_name=template.get_template_name(),  # 应用名称=模板名(标识测试的代理类型)
    user_id="1",  # 测试用固定用户ID
    session_id=session_id
)

# 4. 创建运行器(代理的执行入口,负责调度代理、管理会话状态)
runner = Runner(
    agent=agent,  # 要运行的代理
    app_name=template.get_template_name(),  # 关联应用名称
    session_service=memory_session_service,  # 关联会话服务(获取/更新会话状态)
)

🐋会话(session)用于隔离不同测试用例的上下文;运行器(Runner)是代理的 “启动器”,封装了调用代理的细节(如参数传递、状态管理)

4. 构造测试用例(定义输入数据)

🫧手动定义包含 “文档内容” 和 “用户查询” 的输入消息,作为代理的处理对象。测试用例需明确:文档中包含什么事实?用户想查询什么?

# 1. 格式化用户消息(用模板类中预设的提示词格式,确保代理能解析)
msg = FactExtractorAgentTemplate.next_prompt[0].format(
    content="Document [1]: 沙发是大厅里常见的家具,柔软宽大的靠背和坐垫让人一坐下就不想起来。它通常摆在客厅中央,正对着电视,是家人聚会、朋友闲聊的核心位置。",  # 文档内容(包含事实“沙发是大厅常见家具”)
    user_query="大厅里常见的家具是什么?"  # 用户查询(目标:从文档中提取对应事实)
)
# 2. 封装测试用例(角色为用户,内容为格式化后的消息)
test_case = {
    "messages": {"role": "user", "content": msg},
}

🐋用模板类的next_prompt格式化消息,确保输入格式符合代理的预期(代理依赖固定格式解析文档和查询);文档和查询需一一对应(文档包含查询的答案)

5. 执行代理并输出结果(验证提取功能)

🫧通过运行器异步执行代理,传入测试用例的输入消息,以流式方式接收结果并打印,观察代理是否正确提取事实

# 异步迭代代理返回的流式事件
async for event in runner.run_async(
    user_id="1",  # 关联用户ID
    session_id=session.id,  # 关联会话ID(确保上下文正确)
    # 封装用户消息(符合代理要求的Content格式)
    new_message=types.Content(
        parts=[types.Part(text=test_case["messages"]["content"])],  # 消息文本=测试用例内容
        role="user"  # 标识消息来自用户
    ),
    # 配置流式输出模式(SSE:服务器推送事件,实时返回中间结果)
    run_config=RunConfig(streaming_mode=StreamingMode.SSE)
):
    print(event)  # 打印每个事件(包含提取的中间结果或最终结果)

🐋以流式模式(SSE)运行,确保能实时看到代理的处理过程;通过async for迭代事件,逐个打印结果(正常情况下,最终应提取出 “沙发”)

6.启动测试(运行事件循环)

🫧通过asyncio.run()启动异步事件循环,执行整个测试流程

asyncio.run(run())  # 启动异步函数run(),自动管理事件循环

四、代码具体实现详细解析

1. 提取函数 extract_function

def extract_function(text):
    # 匹配所有 "..." 中的内容
    pattern = r'"(.*?)"'
    matches = re.findall(pattern, text)
    extra_key = ['query', 'document_index', 'extracted_info', 'result', 'has_result']
    matches = [s for s in matches if s not in extra_key]
    return matches
  • 功能:从文本中提取双引号内的字符串,但排除特定关键字
  • 逻辑步骤
    1. 使用正则表达式 r'"(.*?)"' 匹配所有双引号中的内容(.*? 是非贪婪模式,避免匹配多个引号间的内容)
    2. re.findall() 返回所有匹配结果的列表
    3. 定义需要排除的关键字列表 extra_key
    4. 通过列表推导式过滤掉在 extra_key 中存在的元素
  • 语法点
    • 正则表达式:r'' 表示原始字符串,避免转义字符冲突
    • 列表推导式:[s for s in matches if s not in extra_key] 高效过滤列表元素

    2. 数据模型类(基于 Pydantic)

    class ExtractedItem(BaseModel):
        query: str
        document_index: int
        extracted_info: str
    
    class ExtractedOutput(BaseModel):
        result: List[ExtractedItem]
        has_result: bool
    • 功能:定义结构化数据模型,用于数据验证和格式化输出(依赖 Pydantic 库)
    • 字段说明
      • ExtractedItem:单条提取结果,包含查询文本(query)、文档索引(document_index)、提取的信息(extracted_info
      • ExtractedOutput:整体提取结果,包含结果列表(result)和是否有结果的标识(has_result
    • 设计原理
      • 继承 BaseModel 后,自动获得数据验证功能(如类型检查、必填项校验)
      • 标准化输出格式,便于下游系统处理
    • 语法点
      • 类属性类型注解:query: str 指定字段类型,Pydantic 会据此验证数据
      • List[ExtractedItem]:泛型类型,表示元素为 ExtractedItem 的列表(需从 typing 导入 List

3.1类属性定义

class FactExtractorAgent(BaseAgent):
    fact_extractor: Agent  # 事实提取子代理
    model: Union[str, BaseLlm] = ''  # 语言模型(名称或实例)
    instruction: Union[str, InstructionProvider] = ''  # 指令(字符串或提供者)
    generate_content_config: Optional[types.GenerateContentConfig] = None  # 内容生成配置
    output_schema: Optional[type[BaseModel]] = None  # 输出数据模型
    include_contents: Literal['default', 'none'] = 'default'  # 是否包含内容
  • Union[str, BaseLlm]:表示变量可以是 str 或 BaseLlm 类型
  • Optional[T]:等价于 Union[T, None],表示变量可以是 T 类型或 None
  • Literal['default', 'none']:表示变量只能取这两个字符串中的一个(需从 typing 导入 Literal

3.2初始化__init__

def __init__(self, name: str, model: Union[str, BaseLlm], instruction: Union[str, InstructionProvider], generate_content_config: Optional[types.GenerateContentConfig], output_schema: Optional[type[BaseModel]], include_contents: Literal['default', 'none']):
    # 创建子代理
    fact_extractor = Agent(
        name=name,
        instruction=instruction,
        model=model,
        generate_content_config=generate_content_config,
        output_schema=output_schema,
        include_contents=include_contents
    )
    # 子代理列表
    sub_agents_list = [fact_extractor]
    # 调用父类初始化
    super().__init__(
        name=name,
        sub_agents=sub_agents_list,
        fact_extractor=fact_extractor,
        output_schema=output_schema
    )

4.异步运行方法 _run_async_impl

  1. 使用 streamingjson.Lexer 逐步解析流式返回的 JSON 文本(适合处理不完整的增量数据)
  2. 异步迭代子代理的输出事件(async for
  3. 对中间结果(event.partial=True):累加文本并解析,当结果列表长度变化时,提取新增的信息并返回
  4. 对完整结果(event.partial=False):提取最终结果并返回
  5. 异步函数:async def 定义,内部可使用 await 和 async for
  6. 异步生成器:yield 关键字返回事件,允许调用方逐步获取结果(返回类型 AsyncGenerator[Event, None]
  7. 异常处理:try-except 捕获 JSON 解析错误,避免程序崩溃
  8. 链式条件判断:if event.content and event.content.parts and ... 确保访问的属性存在,防止空指针错误

致谢

🐋谢谢大家的阅读,很多不足支出,欢迎大家在评论区指出,如果我的内容对你有帮助,

可以点赞 , 收藏 ,大家的支持就是我坚持下去的动力!

请赐予我平静,去接受我无法改变的 :赐予我勇气,去改变我能改变的!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐