技术方案:主 Agent 调用子 Agent 架构设计

本文档面向 AI 实现者,目标是:读完本文即可从零实现整个「主 Agent 通过 function_tool 调用子 Agent」的服务端链路。
所有代码片段均可直接复制,无需再二次联想接口。


1. 背景与目标

1.1 业务背景

校园数据智能体需要统一入口调度多个垂直领域子 Agent(选课助手、校园通知、学工、教务等)。各子 Agent 是独立部署的 FastAPI 服务,各自持有独立的数据域、RAG 知识库和领域 prompt。主 Agent 职责只有两个:

  1. 理解用户意图,选择合适的子 Agent 并调用;
  2. 把子 Agent 的输出回流给用户(SSE 流式透传)。

1.2 目标

  • 主 Agent 使用 OpenAI Agents SDK(Python)的 function_tool 机制注册子 Agent,LLM 根据用户输入自动选路;
  • 主 → 子 通过 HTTP + SSE 调用,协议层零改造新增子 Agent(只需往 sub-agents.yaml 追加一行);
  • 全链路 X-Request-Id 贯通:网关 → 主 Agent → LLM → function_tool → HTTP → 子 Agent → 子 Agent LLM,日志可通过单个 request_id 串联排错;
  • 用户校园 token(campus_token)透传到子 Agent 完成数据鉴权;
  • 子 Agent 故障、超时不阻断主 Agent 流式响应,能降级返回友好提示。

1.3 非目标

  • 子 Agent 内部实现不在本文范围;
  • 前端 SSE 消费细节见另一份文档;
  • SSO 登录换 campus_token 见另一份文档。

2. 架构总览

2.1 组件拓扑

1. POST /api/v1/chat
Bearer JWT

2. messages + tools

3. tool_call
call_food_agent

4. POST /api/v1/chat
X-Request-Id + Bearer campus_token

5. SSE stream

6. 工具返回值

7. 最终文本

8. SSE stream

读写

前端浏览器

主 Agent
campus-agent-backend
FastAPI :8000

DashScope
qwen-plus

食堂子 Agent
food-agent :8000

选课子 Agent
course-agent :8000

PostgreSQL
会话/消息持久化

2.2 端到端时序

PostgreSQL 食堂子 Agent DashScope 主 Agent Nginx 网关 用户 PostgreSQL 食堂子 Agent DashScope 主 Agent Nginx 网关 用户 function_tool 入口 必须 re-set request_id_ctx POST /api/v1/chat Authorization: Bearer <jwt> 1 注入/透传 X-Request-Id 2 middleware: 解析 request_id, set request_id_ctx 3 JWT 解析 → user_id + campus_token 4 建会话 / 取历史消息 5 构造 AgentContext( campus_token, request_id, conversation_id) 6 Runner.run_streamed(messages, tools, context) 7 tool_call: call_food_agent(message="今天吃啥") 8 POST /api/v1/chat X-Request-Id + Bearer campus_token 9 SSE: chunk / chunk / done 10 tool_result (全文) 11 stream chunks (最终回复) 12 SSE: session / tool_call / chunk / done 13 SSE 透传 14 持久化 assistant 消息 15

2.3 技术栈

技术
语言/框架 Python 3.11+ / FastAPI / Uvicorn
Agent 框架 OpenAI Agents SDK (agents 包)
LLM 阿里云 DashScope(OpenAI 兼容模式),模型 qwen-plus
HTTP 客户端 httpx.AsyncClient (SSE 流式)
数据库 PostgreSQL 17(异步驱动 asyncpg + SQLAlchemy 2.0)
日志 loguru(双 sink:stdout + 文件)+ InterceptHandler 桥接 stdlib
鉴权 JWT(python-jose)+ HTTPBearer
配置 pydantic-settings + .env + sub-agents.yaml

3. 核心概念

3.1 主 Agent(Main Agent)

  • 单例 Agent 对象,SDK 提供;
  • 持有所有已注册子 Agent 作为 tools
  • 依赖 AgentContext(见 3.3)在 tool_call 期间读取运行期变量。

3.2 子 Agent(Sub Agent)

  • 独立 FastAPI 服务;
  • 暴露统一契约 POST /api/v1/chat,接收 {"message": "..."}
  • 返回 SSE 流,事件类型 chunk / tool_call / tool_result / done / error
  • 鉴权头 Authorization: Bearer <campus_token> + 追踪头 X-Request-Id

3.3 AgentContext

这是主 Agent 给 LLM tool_call 注入运行期数据的唯一通道。LLM 只看得见 message 字符串;campus_token、request_id、conversation_id 这些绝对不能走 prompt,必须走 RunContextWrapper

@dataclass
class AgentContext:
    campus_token: str = ""       # 校园 SSO token,透传给子 Agent
    request_id: str = "-"        # 全链路追踪 id
    conversation_id: str = ""    # 主会话 id,便于子 Agent 关联父会话

3.4 function_tool

OpenAI Agents SDK 的装饰器,把 Python 函数暴露给 LLM 调用。本项目中每个子 Agent 都动态生成一个 function_tool,工具名 = call_<sub_agent_name>(横杠转下划线)。

3.5 request_id 双通道传递

由于 function_tool 实际执行时 SDK 内部用了独立 executor,Python 的 ContextVar 不会自动跨越 executor 边界。本项目严格遵守"两手抓"原则:

  1. 显式字段AgentContext.request_id —— SDK 保证 context 对象传到 tool;
  2. ContextVar 回填:tool 入口 request_id_ctx.set(agent_ctx.request_id) —— 保证 tool 内部调用的第三方库(httpx、sqlalchemy)日志也带 request_id。

详见 §10。


4. 项目目录结构

campus-agent/
├── backend/
│   ├── app/
│   │   ├── main.py                    # FastAPI 入口;注册中间件、生命周期
│   │   ├── api/
│   │   │   └── v1/
│   │   │       ├── __init__.py        # v1_router 聚合
│   │   │       └── chat.py            # POST /api/v1/chat 入口
│   │   ├── agents/
│   │   │   ├── main_agent.py          # 主 Agent 单例 + AgentContext
│   │   │   ├── tools_loader.py        # 读 YAML 构造 function_tool
│   │   │   └── stream_handler.py      # Runner.run_streamed → 统一 SSE
│   │   ├── sub_agents_client/
│   │   │   └── http_client.py         # httpx SSE 客户端
│   │   ├── core/
│   │   │   ├── config.py              # Settings(pydantic-settings)
│   │   │   ├── context.py             # request_id_ctx: ContextVar
│   │   │   ├── log.py                 # loguru 双 sink + InterceptHandler
│   │   │   ├── middleware.py          # 函数式 request_id 中间件
│   │   │   ├── security.py            # JWT 签发/解析 + get_current_user
│   │   │   └── db.py                  # SQLAlchemy async engine + Base
│   │   ├── models/
│   │   │   ├── conversation.py        # 会话表
│   │   │   └── message.py             # 消息表(含 request_id 列)
│   │   └── schemas/
│   │       └── chat.py                # ChatRequest Pydantic
│   ├── config/
│   │   └── sub-agents.yaml            # 子 Agent 清单
│   ├── logs/                          # 运行期日志落盘
│   ├── .env
│   └── pyproject.toml
└── docs/
    ├── technical-design-sub-agent-calling.md  ← 本文件
    └── technical-design-logging-observability.md

5. 认证与鉴权 token 流转(简版)

本节只描述主 Agent ↔ 子 Agent 调用相关的 token 处理。完整 SSO 登录流程见另一份文档。

5.1 三种 token

名称 签发方 持有方 作用 传递头
campus_token 智慧校园 SSO 用户浏览器 → 主 Agent 登录接口 数据域鉴权,子 Agent 调用校园 API 的凭证
access_token (JWT) 主 Agent 登录接口 前端 localStorage 主 Agent 自身 API 鉴权;内部嵌入 campus_token 作为 claim Authorization: Bearer <jwt>
campus_token(透传) 主 Agent(从 JWT claim 取出) 子 Agent 子 Agent 调用校园 API 的凭证 Authorization: Bearer <campus_token>

5.2 流转时序

智慧校园 API 子 Agent 主 Agent 用户 智慧校园 API 子 Agent 主 Agent 用户 登录阶段(一次) 聊天阶段(每次请求) POST /auth/sso-callback?ticket=xxx 换 campus_token campus_token 签发 JWT, campus_token 作为 claim access_token (JWT) POST /api/v1/chat Authorization: Bearer <jwt> HTTPBearer 解析 JWT, 提取 campus_token 到 request.state AgentContext.campus_token = campus_token POST /api/v1/chat Authorization: Bearer <campus_token> X-Request-Id: <rid> 使用 campus_token 查数据 data SSE SSE

5.3 关键代码契约

JWT 签发app/core/security.py):

def create_access_token(user_id: UUID, roles: list[str], campus_token: str) -> str:
    expire = datetime.now(timezone.utc) + timedelta(hours=settings.jwt_ttl_hours)
    payload = {
        "sub": str(user_id),
        "roles": roles,
        "campus_token": campus_token,   # ← 嵌入
        "exp": expire,
    }
    return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)

依赖:解析 JWT 并把 campus_token 放到 request.state

async def get_current_user(
    request: Request,
    credentials: HTTPAuthorizationCredentials | None = Depends(_bearer_scheme),
) -> dict:
    if credentials is None:
        raise HTTPException(status_code=401, detail={"code": 40101, "message": "token_missing"})
    payload = decode_access_token(credentials.credentials)
    request.state.campus_token = payload.get("campus_token", "")
    return payload

chat 端点取出 campus_token 放进 AgentContext

campus_token = getattr(request.state, "campus_token", "")
# ... 后续传入 run_agent_stream(campus_token=...)

5.4 子 Agent 侧鉴权

子 Agent 必须实现:

  1. 解析 Authorization: Bearer <campus_token> 头;
  2. 用 campus_token 调校园 API 时,如返回 401 → SSE error 事件 {"code": "40102", "message": "campus_token_expired"}
  3. 禁止信任前端直接传过来的请求,必须 Authorization 头为 Bearer 格式且非空。

5.5 本版约定的不做项

  • 暂不做主 Agent ↔ 子 Agent 之间的服务间 mTLS(内网 K8s 集群内 SVC 调用,先用 Bearer + 网络隔离);
  • 暂不做 JWT refresh token(TTL 24h + 前端过期重新登录);
  • 暂不做 campus_token 撤销列表;
  • 暂不做子 Agent 侧 JWT 校验(子 Agent 只校验 campus_token,不校验主 Agent JWT)。

6. 配置 schema(sub-agents.yaml)

6.1 完整字段

# backend/config/sub-agents.yaml
agents:
  - name: food-agent
    description: >-
      食堂菜品推荐。当用户询问食堂、菜品、吃什么、美食推荐、
      口味偏好、餐厅价格、素食、辣度、套餐搭配等相关话题时调用此工具。
    base_url: http://food-agent-svc.default.svc.cluster.local:8000
    domain: food
    timeout_sec: 30

6.2 字段定义

字段 类型 必填 说明
name string 工具名(转下划线后作为 LLM 可见的 function_tool 名)。示例:food-agentcall_food_agent
description string LLM 决策依据。重要:必须包含触发关键词,LLM 读不到其它上下文
base_url string 子 Agent 根 URL,拼接 /api/v1/chat 得完整地址
domain string 推荐 领域标签,用于日志、前端侧边栏、hint_domain 路由
timeout_sec int 默认 30 单次调用超时

6.3 description 写作规范

LLM 只看 description 决定是否调用。规范:

  • 开头一句话概括能力;
  • 接"当用户询问 A、B、C、D 等相关话题时调用此工具"的触发关键词列表;
  • 不要写"我可以…"、"支持…"这种营销话术,LLM 不需要;
  • 关键词覆盖同义词(“食堂”、“吃什么”、"美食推荐"是三个触发点)。

6.4 扩展方式

新增子 Agent 只需:

  1. 在 yaml 追加一项;
  2. 无需改任何 Python 代码;
  3. 重启主 Agent。

7. 主 Agent 实现细节

7.1 AgentContext 定义

# app/agents/main_agent.py
from dataclasses import dataclass

@dataclass
class AgentContext:
    campus_token: str = ""
    request_id: str = "-"
    conversation_id: str = ""

7.2 LLM 客户端单例

from openai import AsyncOpenAI
from agents import set_tracing_disabled

set_tracing_disabled(True)   # 关闭 SDK 默认 tracing(避免发数据到 OpenAI)
DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"

_llm_client: AsyncOpenAI | None = None

def _get_llm_client() -> AsyncOpenAI:
    global _llm_client
    if _llm_client is None:
        if not settings.dashscope_api_key:
            raise RuntimeError("DASHSCOPE_API_KEY 未配置")
        _llm_client = AsyncOpenAI(
            api_key=settings.dashscope_api_key,
            base_url=DASHSCOPE_BASE_URL,
        )
    return _llm_client

7.3 SYSTEM_PROMPT

SYSTEM_PROMPT = """你是校园数据智能体的主调度 Agent,名叫"校园小智"。

你的职责:
1. 理解用户意图,选择合适的子 Agent 并调用对应工具;
2. 工具返回内容原样整理后回复用户,不要编造工具未返回的信息;
3. 工具调用失败或返回错误时,直接告诉用户"该领域暂时查询失败,请稍后重试"。

可用工具见 tools 列表,每个工具的 description 说明何时调用。
未命中任何工具时,用通用中文礼貌回答。
"""

7.4 function_tool 工厂

# app/agents/main_agent.py
from agents import function_tool, RunContextWrapper
from app.agents.tools_loader import invoke_sub_agent
from app.core.context import request_id_ctx

def _create_tool_fn(tool_config: dict):
    @function_tool(
        name_override=f"call_{tool_config['name'].replace('-', '_')}",
        description_override=tool_config["description"],
    )
    async def tool_fn(
        ctx: RunContextWrapper[AgentContext],
        message: str,
    ) -> str:
        agent_ctx = ctx.context

        # ⚠️ 两手抓:SDK tool executor 可能不继承外层 ContextVar,
        # 必须在 tool 入口立刻把 request_id 回填到当前任务的 ContextVar,
        # 否则 tool 内部调用的 httpx、sqlalchemy 等库的日志会丢 request_id。
        token = request_id_ctx.set(agent_ctx.request_id)
        try:
            result = await invoke_sub_agent(
                tool_config=tool_config,
                message=message,
                campus_token=agent_ctx.campus_token,
                request_id=agent_ctx.request_id,
                conversation_id=agent_ctx.conversation_id,
            )
            return result
        finally:
            request_id_ctx.reset(token)

    return tool_fn

7.5 Agent 创建与单例

from agents import Agent, OpenAIChatCompletionsModel, ModelSettings
from app.agents.tools_loader import build_sub_agent_tools

def create_main_agent() -> Agent:
    tool_configs = build_sub_agent_tools()
    tools = [_create_tool_fn(cfg) for cfg in tool_configs]
    agent = Agent(
        name="campus-main-agent",
        instructions=SYSTEM_PROMPT,
        tools=tools,
        model=OpenAIChatCompletionsModel(
            model=settings.llm_model,
            openai_client=_get_llm_client(),
        ),
        model_settings=ModelSettings(temperature=0.7),
    )
    logger.info(f"main agent created tools={[t.name for t in tools]}")
    return agent

_agent: Agent | None = None

def get_main_agent() -> Agent:
    global _agent
    if _agent is None:
        _agent = create_main_agent()
    return _agent

7.6 Runner.run_streamed 与事件映射

见 §14.4 的 stream_handler.py 完整代码。

主要事件类型处理:

SDK 事件 前端收到的 SSE 事件 说明
raw_response_event + response.output_text.delta {"type": "chunk", "content": "..."} LLM 逐 token 输出
run_item_stream_event + ToolCallItem {"type": "tool_call", "tool": "call_food_agent"} 开始调工具
run_item_stream_event + ToolCallOutputItem {"type": "tool_result", "ok": true} 工具返回
agent_updated_stream_event {"type": "agent_update", "agent_name": "..."} 切换 agent(handoff)
(流结束后补发) {"type": "done", "full_text": "..."} 主 Agent 自己补的完结事件
异常捕获 {"type": "error", "code": "50002", "message": "..."}

7.7 会话与历史消息

  • 会话建立:首次请求 conversation_id=null → 新建 Conversation 行;后续请求携带已有 id;
  • 历史取最近 20 条(MAX_HISTORY = 20),按 created_at 升序传给 LLM;
  • 用户消息持久化在 LLM 调用前(先入库再调 LLM,保证请求可追溯);
  • Assistant 消息持久化在 SSE done 后(拿到 full_text 再入库)。

8. 子 Agent 标准契约

所有子 Agent 必须遵守以下契约,否则主 Agent 无法接入。

8.1 HTTP 接口

POST /api/v1/chat
Content-Type: application/json
Authorization: Bearer <campus_token>
X-Request-Id: <request_id>
X-Parent-Conversation-Id: <main_conversation_id>

{
    "message": "今天中午吃啥"
}

可选字段(主 Agent 通过 extra 传入,子 Agent 按需读取):

  • domain:来自 YAML 的 domain 字段;
  • hint_*:意图提示字段。

8.2 响应:SSE

Content-Type: text/event-stream,逐行 data: <json>\n\n

事件类型(枚举,新增类型必须同步主 Agent 的 stream_handler):

type payload 说明
session {"conversation_id": "...", "request_id": "..."} 子 Agent 自身的 session,可选
chunk {"content": "..."} 增量文本,必须累计拼成回复
tool_call {"tool": "...", "args": "..."} 子 Agent 内部工具调用(展示用,可选)
tool_result {"ok": true} 子 Agent 内部工具结束
done {"full_text": "...", "chart": {...}} 必须发,标志本次响应结束
error {"code": "50002", "message": "..."} 出错时发,随后仍需发 done

8.3 错误码对齐

code 含义 主 Agent 处理
40101 token 缺失 主 Agent 已经卡在自己这一层,不会发生
40102 campus_token 过期/无效 主 Agent 降级文本提示用户重新登录
50001 子 Agent 内部错误 工具结果返回"子 Agent 返回错误: xxx"
50002 通用错误 同上
50003 下游依赖错误 同上

8.4 超时策略

  • 子 Agent 单次响应硬超时 ≤ 30s(YAML 里每个 agent 可独立配置);
  • 主 Agent httpx.AsyncClient(timeout=timeout_sec) 到时直接断流;
  • 子 Agent 内部若需调 LLM,必须自己设更短的超时(建议 25s),给 SSE 收尾留余量。

8.5 幂等性

  • 子 Agent 不要求幂等,但必须能在同一 conversation_id 上被多次调用(不能 400);
  • 重复的 request_id 视为新请求(主 Agent 也不会主动重试)。

9. HTTP 通信契约

9.1 SSE 客户端实现要点

见 §14.7 的完整代码。核心约束:

  1. 使用 httpx.AsyncClient + client.stream("POST", ...)
  2. 不要 resp.text,必须 aiter_lines()
  3. 只处理以 data: 开头的行,去前缀后 yield;
  4. 超时分三类捕获:
    • httpx.TimeoutException → “子 Agent 响应超时,请稍后重试。”
    • httpx.HTTPError(连接失败、DNS、RST)→ “调用子 Agent 失败: {e}”
    • 非 200 状态码 → “子 Agent 返回错误({status}),请稍后重试。”
  5. 不要对 5xx 做自动重试——重试 SSE 流意味着用户可能看到两段半吊子回复。

9.2 Header 约定

Header 方向 必填 内容
Authorization 主→子 Bearer <campus_token>
X-Request-Id 主→子 16 字符 hex,来源于主 Agent 的 request_id_ctx
X-Parent-Conversation-Id 主→子 推荐 主会话 id,便于日志关联
Content-Type 主→子 application/json
X-Request-Id 子→主 推荐 回显,方便双向对账

9.3 payload

{
  "message": "<用户原始 message 或 LLM 重写过的 message>",
  "extra_field_1": "..."
}

message 是 LLM 决策调用工具时填入 call_food_agent(message="...") 的那个值,可能与用户原话不一致(LLM 会重写)。这是 OpenAI Agents SDK 的正常行为,子 Agent 把它当作用户提问即可。


10. request_id 全链路传递

10.1 为什么需要专门一节

本项目最大的排错痛点就是「日志对不上」。必须做到从接入网关到子 Agent 内部 LLM 调用,都能用同一个 request_id 过滤出完整一条链路。

10.2 全链路关键节点

子 Agent httpx client function_tool Agents SDK Runner stream_handler chat endpoint middleware Nginx 子 Agent httpx client function_tool Agents SDK Runner stream_handler chat endpoint middleware Nginx Header X-Request-Id: rid 1 request_id_ctx.set(rid) request.state.request_id = rid 2 call_next 3 从 request.state 读 rid 4 run_agent_stream(request_id=rid, ...) 5 AgentContext(request_id=rid) 6 Runner.run_streamed(context=AgentContext) 7 [内部 executor] 这里 ContextVar 丢失! 8 tool_fn(ctx, message) 9 agent_ctx = ctx.context ⚠️ request_id_ctx.set(agent_ctx.request_id) 10 call_sub_agent_stream(request_id=rid, ...) 11 Header X-Request-Id: rid 12 POST /api/v1/chat 13 自己的 middleware 复用该 rid 14

10.3 十处 set/get 清单

位置 操作 代码
1. nginx 保留或生成 proxy_set_header X-Request-Id $request_id;
2. middleware 入口 读 header 或生成新 id rid = request.headers.get("X-Request-Id") or uuid.uuid4().hex[:16]
3. middleware 入口 写 ContextVar token = request_id_ctx.set(rid)
4. middleware 入口 写 request.state request.state.request_id = rid
5. chat endpoint 读 request.state request_id = getattr(request.state, "request_id", "-")
6. stream_handler 放进 AgentContext AgentContext(request_id=request_id, ...)
7. function_tool 入口 重新 set ContextVar token = request_id_ctx.set(agent_ctx.request_id)
8. http_client 放进 header headers = {"X-Request-Id": request_id, ...}
9. middleware 出口 reset ContextVar request_id_ctx.reset(token)
10. middleware 出口 回显 response header response.headers["X-Request-Id"] = request_id

10.4 最常踩的两个坑

  1. 只做了 AgentContext,没在 tool 入口 re-set ContextVar → tool 外层日志(主 Agent 打的)带 request_id,tool 内部 httpx / sqlalchemy 的日志全是 request_id=-。修正:§7.4 的 try/finally 模式。
  2. middleware 用 BaseHTTPMiddleware 子类 → anyio task group 破坏 ContextVar 传递,StreamingResponse 下游日志全丢。修正:只能用函数式 @app.middleware("http")

10.5 request_id 格式

  • 16 字符 hex,取 uuid.uuid4().hex[:16](不是 str(uuid.uuid4()) 的 36 字符);
  • 短便于日志里肉眼对账;
  • 冲突概率 16^16 ≈ 10^19,对单日请求量远远够用;
  • messages.request_id 数据库列 String(32) 预留长度即可。

11. 日志实现

本节遵循 @technical-scheme-design:references/logging-standards.md。以下是落到本项目的具体实现,不是规范复述。

11.1 loguru 双 sink 配置清单

app/core/log.py 完整代码见 §14.1。关键决策:

# 控制台 sink
_raw_logger.add(
    sys.stdout,
    level=settings.log_level,
    format=FMT,
    enqueue=False,        # Windows + uvicorn --reload 下 True 会吞日志
    backtrace=True,
    diagnose=settings.debug,
    colorize=True,
)

# 文件 sink(生产主依赖)
_raw_logger.add(
    LOG_DIR / "app.log",
    level=settings.log_level,
    format=FMT_PLAIN,
    rotation="50 MB",
    retention="14 days",
    encoding="utf-8",
    enqueue=False,
    backtrace=True,
    diagnose=False,
)

为什么 enqueue=False:本项目开发以 Windows + uvicorn --reload 为主,Linux K8s pod 部署单进程单容器,两种场景下 enqueue=True 带来的问题都大于收益(Windows stdout 吞日志、多 worker 多进程共享文件 sink 竞争)。如果后续上 gunicorn 多 worker,文件 sink 改成按 pid 分文件,或切换成 sink=my_http_sink 推到日志中心。

11.2 格式串与字段定义

FMT = (
    "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
    "<level>{level: <8}</level> | "
    "<magenta>request_id={extra[request_id]}</magenta> | "
    "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
    "<level>{message}</level>"
)
FMT_PLAIN = (
    "{time:YYYY-MM-DD HH:mm:ss.SSS} | "
    "{level: <8} | "
    "request_id={extra[request_id]} | "
    "{name}:{function}:{line} - "
    "{message}"
)

字段含义:

字段 来源 备注
time loguru 自动 带毫秒,本地时区
level logger.info/warning/... 左对齐 8 字符
request_id extra[request_id] ← patcher 注入 无 ContextVar 时默认 -
name loguru 自动 调用方模块路径
function loguru 自动 函数名
line loguru 自动 行号
message f-string 拼好的消息 业务字段用 k=v 紧凑写法

11.3 InterceptHandler 桥接 stdlib

将 uvicorn/httpx/sqlalchemy 等使用 logging.Logger 的库全部重定向到 loguru:

class InterceptHandler(logging.Handler):
    def emit(self, record: logging.LogRecord) -> None:
        try:
            level = _raw_logger.level(record.levelname).name
        except ValueError:
            level = record.levelno
        frame, depth = inspect.currentframe(), 0
        while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__):
            frame = frame.f_back
            depth += 1
        _raw_logger.opt(depth=depth, exception=record.exc_info).log(
            level, record.getMessage()
        )

def setup_logging() -> None:
    # ... sink 配置 ...
    logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
    for name in (
        "uvicorn", "uvicorn.error", "uvicorn.access",
        "fastapi",
        "asyncio",
        "httpx", "httpcore",
        "sqlalchemy.engine",
    ):
        lg = logging.getLogger(name)
        lg.handlers = [InterceptHandler()]
        lg.propagate = False

已知现象uvicorn.access 的日志依旧是 request_id=-,因为 ASGI access 日志是在 middleware 之外、协议层打的,中间件的 ContextVar 尚未 set。这是预期行为,排错时忽略这一行,从业务 logger.info("→ METHOD PATH") 开始看。可选优化:

logging.getLogger("uvicorn.access").setLevel(logging.WARNING)

11.4 patcher + ContextVar 自动注入

from app.core.context import request_id_ctx

def _patcher(record: dict) -> None:
    record["extra"].setdefault("request_id", request_id_ctx.get())

_raw_logger.configure(patcher=_patcher)
  • 每条日志发出前都会被 patcher 挂钩,从当前任务的 ContextVar 读 request_id;
  • setdefault 而非 set:如果调用方显式 logger.bind(request_id="custom"),不覆盖。

11.5 本架构强制打的关键日志节点

下面每一条都是必打,排错靠它们。所有 logger.xxx 调用都会被 patcher 自动注入 request_id。

节点 代码位置 日志模板 级别
请求入口 middleware → {method} {path} INFO
请求出口 middleware ← {method} {path} status={code} cost_ms={ms} INFO
请求异常 middleware ✗ {method} {path} cost_ms={ms} ERROR (exception)
chat 入口 chat endpoint chat request conversation_id={cid} user_id={uid} roles={r} message={前50字} INFO
chat 出口 chat endpoint chat done conversation_id={cid} cost_ms={ms} reply_len={n} INFO
chat 异常 chat endpoint chat stream error conversation_id={cid} ERROR (exception)
main agent 初始化 main_agent.py main agent created tools={names} INFO
子 Agent 注册 tools_loader registered sub-agent tool name={n} domain={d} base_url={url} INFO
子 Agent 调用开始 http_client call sub-agent url={url} request_id={rid} INFO
子 Agent 非 200 http_client sub-agent error status={code} url={url} body={前200字} ERROR
子 Agent 超时 http_client sub-agent timeout url={url} timeout_sec={n} ERROR
子 Agent HTTP 异常 http_client sub-agent http error url={url} error={e!r} ERROR (exception)
Agent stream 异常 stream_handler agent stream error request_id={rid} ERROR (exception)
服务启动 lifespan starting service={name} INFO
服务关闭 lifespan shutdown complete INFO

11.6 敏感字段脱敏

禁止直接把 campus_token、JWT 原文打日志。需要时只打前后 4 字符:

def _redact(token: str) -> str:
    if not token or len(token) < 12:
        return "***"
    return f"{token[:4]}...{token[-4:]}"

logger.info(f"chat token={_redact(campus_token)}")

其他敏感字段:

字段 处理
password 禁打
Authorization 头原值 禁打
手机号 138****1234
身份证号 禁打
message 用户原话 截断前 50 字(见 chat endpoint)

11.7 生产 vs 开发配置差异

维度 开发(Windows/WSL) 生产(Linux K8s)
log_level DEBUG 或 INFO INFO
stdout sink 开,彩色 开,彩色(K8s 收集 stdout)
文件 sink 开(stdout 被 PowerShell 吞时兜底) 可关,若开则 PVC 挂载;主依赖 stdout + 日志中心
enqueue False False(单进程单容器);多 worker 改文件按 pid
diagnose True(debug 模式下) False(堆栈可能泄敏)
uvicorn --reload --workers N(根据 CPU)

11.8 引用

完整规范见 @technical-scheme-design:references/logging-standards.md,重点关注:R14(禁 BaseHTTPMiddleware)、§4.4.1(SDK executor ContextVar 两手抓)、§6.6(dev pitfalls)、§7.2(uvicorn access log request_id=- 是预期)。


12. 数据库设计

12.1 表清单

说明
users 用户(本文档不定义,由 SSO 文档定义)
conversations 主 Agent 会话
messages 每条用户 / assistant 消息

12.2 conversations

CREATE TABLE conversations (
    id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id       UUID NOT NULL REFERENCES users(id),
    title         VARCHAR(128) NOT NULL DEFAULT '',
    hint_domain   VARCHAR(32),
    created_at    TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at    TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX ix_conversations_user_id ON conversations(user_id);

SQLAlchemy 模型:

class Conversation(Base):
    __tablename__ = "conversations"
    id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False, index=True)
    title: Mapped[str] = mapped_column(String(128), default="")
    hint_domain: Mapped[str | None] = mapped_column(String(32), nullable=True)
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
    updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))

12.3 messages

CREATE TABLE messages (
    id               UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    conversation_id  UUID NOT NULL REFERENCES conversations(id),
    role             VARCHAR(16) NOT NULL,  -- 'user' | 'assistant' | 'tool'
    content          TEXT NOT NULL DEFAULT '',
    chart            JSONB,                 -- 子 Agent 返回的图表数据
    agent_name       VARCHAR(64),           -- 哪个子 Agent 生成,null 表示主 Agent
    request_id       VARCHAR(32),           -- 追踪 id,关联日志
    created_at       TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX ix_messages_conversation_id ON messages(conversation_id);

SQLAlchemy 模型:

class Message(Base):
    __tablename__ = "messages"
    id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    conversation_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("conversations.id"), nullable=False, index=True)
    role: Mapped[str] = mapped_column(String(16), nullable=False)
    content: Mapped[str] = mapped_column(Text, nullable=False, default="")
    chart: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
    agent_name: Mapped[str | None] = mapped_column(String(64), nullable=True)
    request_id: Mapped[str | None] = mapped_column(String(32), nullable=True)
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))

12.4 写入时机

时机 写什么
POST /chat 入口,LLM 调用前 messages(role='user', content=req.message, request_id=rid)
SSE done 事件后,full_text 非空 messages(role='assistant', content=full_text, request_id=rid)
首次请求(req.conversation_id is None conversations(title=req.message[:50])

12.5 不做的事

  • 不存工具调用中间结果(role='tool' 暂不写);
  • 不存 chart(子 Agent 返回 chart 时留给 assistant 消息的 chart 列,但本版主 Agent 不提取,先 null);
  • 不做软删除,直接 DELETE。

13. 错误处理与重试

13.1 分层异常

异常 暴露给用户
middleware 任意未捕获 500 json {code: "50000", message: "internal error"}
JWT 解析 JWTError 401 json {code: "40101", message: "token_invalid"}
DB 操作 SQLAlchemyError 500 SSE error 事件
LLM 调用 OpenAI SDK 异常 SSE error 事件 {code: "50002"}
子 Agent HTTP 异常 httpx.TimeoutException 工具返回字符串给 LLM:“子 Agent 响应超时,请稍后重试。”
子 Agent HTTP 异常 httpx.HTTPError 工具返回字符串:“调用子 Agent 失败: {e}”
子 Agent 非 200 status != 200 工具返回字符串:“子 Agent 返回错误({status}),请稍后重试。”
子 Agent SSE error 事件 event.type == “error” 工具返回字符串:“子 Agent 返回错误: {msg}”

13.2 重试策略

  • 不做自动重试。理由:
    • 用户等待的是 SSE 流式回复,重试意味着等二倍时间;
    • LLM 工具调用失败,LLM 会自己决定要不要"换个说法再试"(由 prompt 控制);
    • 自动重试掩盖子 Agent 的真实故障率。
  • 幂等性有保证的只读接口未来可酌情加,但本版不做。

13.3 降级

  • 子 Agent 全挂 → LLM 无工具可用,走通用回答 prompt(“该领域暂时查询失败…”);
  • DashScope 挂 → 直接 SSE error + done,前端显示"AI 服务暂时不可用";
  • 数据库挂(会话持久化失败) → 本版直接 500。未来可改成先走 LLM,持久化失败落消息队列异步补写。

13.4 熔断

本版不做熔断,子 Agent 错误率监控见独立 observability 文档。


14. 关键文件完整示例代码

以下代码是本项目实际可运行版本,实现时可直接抄。

14.1 app/core/log.py

import inspect
import logging
import sys
from pathlib import Path

from loguru import logger as _raw_logger

from app.core.config import settings
from app.core.context import request_id_ctx

FMT = (
    "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
    "<level>{level: <8}</level> | "
    "<magenta>request_id={extra[request_id]}</magenta> | "
    "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
    "<level>{message}</level>"
)

FMT_PLAIN = (
    "{time:YYYY-MM-DD HH:mm:ss.SSS} | "
    "{level: <8} | "
    "request_id={extra[request_id]} | "
    "{name}:{function}:{line} - "
    "{message}"
)

LOG_DIR = Path("logs")


def _patcher(record: dict) -> None:
    record["extra"].setdefault("request_id", request_id_ctx.get())


class InterceptHandler(logging.Handler):
    def emit(self, record: logging.LogRecord) -> None:
        try:
            level = _raw_logger.level(record.levelname).name
        except ValueError:
            level = record.levelno
        frame, depth = inspect.currentframe(), 0
        while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__):
            frame = frame.f_back
            depth += 1
        _raw_logger.opt(depth=depth, exception=record.exc_info).log(
            level, record.getMessage()
        )


def setup_logging() -> None:
    _raw_logger.remove()
    _raw_logger.configure(patcher=_patcher)

    _raw_logger.add(
        sys.stdout,
        level=settings.log_level,
        format=FMT,
        enqueue=False,
        backtrace=True,
        diagnose=settings.debug,
        colorize=True,
    )

    LOG_DIR.mkdir(parents=True, exist_ok=True)
    _raw_logger.add(
        LOG_DIR / "app.log",
        level=settings.log_level,
        format=FMT_PLAIN,
        rotation="50 MB",
        retention="14 days",
        encoding="utf-8",
        enqueue=False,
        backtrace=True,
        diagnose=False,
    )

    logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
    for name in (
        "uvicorn", "uvicorn.error", "uvicorn.access",
        "fastapi",
        "asyncio",
        "httpx", "httpcore",
        "sqlalchemy.engine",
    ):
        lg = logging.getLogger(name)
        lg.handlers = [InterceptHandler()]
        lg.propagate = False


logger = _raw_logger

14.2 app/core/context.py

from contextvars import ContextVar

request_id_ctx: ContextVar[str] = ContextVar("request_id", default="-")

14.3 app/core/middleware.py

import time
import uuid
from collections.abc import Awaitable, Callable

from fastapi import FastAPI, Request
from starlette.responses import Response

from app.core.context import request_id_ctx
from app.core.log import logger

REQUEST_ID_HEADER = "X-Request-Id"


def register_request_id_middleware(app: FastAPI) -> None:
    """函数式 @app.middleware('http') 注册 request_id。
    禁止用 BaseHTTPMiddleware 子类,anyio task group 破坏 ContextVar。"""

    @app.middleware("http")
    async def request_id_middleware(
        request: Request,
        call_next: Callable[[Request], Awaitable[Response]],
    ) -> Response:
        request_id = request.headers.get(REQUEST_ID_HEADER) or uuid.uuid4().hex[:16]
        token = request_id_ctx.set(request_id)
        request.state.request_id = request_id

        start = time.perf_counter()
        try:
            logger.info(f"→ {request.method} {request.url.path}")
            response = await call_next(request)
            cost_ms = int((time.perf_counter() - start) * 1000)
            logger.info(
                f"← {request.method} {request.url.path} "
                f"status={response.status_code} cost_ms={cost_ms}"
            )
            response.headers[REQUEST_ID_HEADER] = request_id
            return response
        except Exception:
            cost_ms = int((time.perf_counter() - start) * 1000)
            logger.exception(
                f"✗ {request.method} {request.url.path} cost_ms={cost_ms}"
            )
            raise
        finally:
            request_id_ctx.reset(token)

14.4 app/agents/main_agent.py

from dataclasses import dataclass

from agents import (
    Agent,
    ModelSettings,
    OpenAIChatCompletionsModel,
    RunContextWrapper,
    function_tool,
    set_tracing_disabled,
)
from openai import AsyncOpenAI

from app.agents.tools_loader import build_sub_agent_tools, invoke_sub_agent
from app.core.config import settings
from app.core.context import request_id_ctx
from app.core.log import logger


@dataclass
class AgentContext:
    campus_token: str = ""
    request_id: str = "-"
    conversation_id: str = ""


set_tracing_disabled(True)
DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"

_llm_client: AsyncOpenAI | None = None


def _get_llm_client() -> AsyncOpenAI:
    global _llm_client
    if _llm_client is None:
        if not settings.dashscope_api_key:
            raise RuntimeError("DASHSCOPE_API_KEY 未配置")
        _llm_client = AsyncOpenAI(
            api_key=settings.dashscope_api_key,
            base_url=DASHSCOPE_BASE_URL,
        )
    return _llm_client


SYSTEM_PROMPT = """你是校园数据智能体的主调度 Agent,名叫"校园小智"。

你的职责:
1. 理解用户意图,选择合适的子 Agent 并调用对应工具;
2. 工具返回内容原样整理后回复用户,不要编造工具未返回的信息;
3. 工具调用失败或返回错误时,直接告诉用户"该领域暂时查询失败,请稍后重试"。

可用工具见 tools 列表,每个工具的 description 说明何时调用。
未命中任何工具时,用通用中文礼貌回答。
"""


def _create_tool_fn(tool_config: dict):
    @function_tool(
        name_override=f"call_{tool_config['name'].replace('-', '_')}",
        description_override=tool_config["description"],
    )
    async def tool_fn(
        ctx: RunContextWrapper[AgentContext],
        message: str,
    ) -> str:
        agent_ctx = ctx.context
        # 两手抓:re-set ContextVar,让 tool 内部 httpx/sqlalchemy 日志带 rid
        token = request_id_ctx.set(agent_ctx.request_id)
        try:
            result = await invoke_sub_agent(
                tool_config=tool_config,
                message=message,
                campus_token=agent_ctx.campus_token,
                request_id=agent_ctx.request_id,
                conversation_id=agent_ctx.conversation_id,
            )
            return result
        finally:
            request_id_ctx.reset(token)

    return tool_fn


def create_main_agent() -> Agent:
    tool_configs = build_sub_agent_tools()
    tools = [_create_tool_fn(cfg) for cfg in tool_configs]
    agent = Agent(
        name="campus-main-agent",
        instructions=SYSTEM_PROMPT,
        tools=tools,
        model=OpenAIChatCompletionsModel(
            model=settings.llm_model,
            openai_client=_get_llm_client(),
        ),
        model_settings=ModelSettings(temperature=0.7),
    )
    logger.info(f"main agent created tools={[t.name for t in tools]}")
    return agent


_agent: Agent | None = None


def get_main_agent() -> Agent:
    global _agent
    if _agent is None:
        _agent = create_main_agent()
    return _agent

14.5 app/agents/tools_loader.py

import json
from pathlib import Path
from typing import Any

import yaml

from app.core.config import settings
from app.core.log import logger
from app.sub_agents_client.http_client import call_sub_agent_stream


def _load_sub_agents_config() -> list[dict]:
    config_path = Path(settings.sub_agents_config_path)
    if not config_path.exists():
        logger.warning(f"sub-agents config not found path={config_path}")
        return []
    with open(config_path, encoding="utf-8") as f:
        data = yaml.safe_load(f)
    return data.get("agents", [])


def build_sub_agent_tools() -> list[dict[str, Any]]:
    configs = _load_sub_agents_config()
    tools = []
    for cfg in configs:
        name = cfg["name"]
        description = cfg["description"]
        base_url = cfg["base_url"]
        domain = cfg.get("domain", "")
        timeout_sec = cfg.get("timeout_sec", 30)

        logger.info(
            f"registered sub-agent tool name={name} domain={domain} base_url={base_url}"
        )
        tools.append({
            "name": name,
            "description": description,
            "domain": domain,
            "base_url": base_url,
            "timeout_sec": timeout_sec,
        })
    return tools


async def invoke_sub_agent(
    tool_config: dict,
    message: str,
    campus_token: str,
    request_id: str,
    conversation_id: str,
) -> str:
    """调用子 Agent,收集完整响应文本返回。"""
    chunks: list[str] = []
    async for raw_line in call_sub_agent_stream(
        base_url=tool_config["base_url"],
        message=message,
        campus_token=campus_token,
        request_id=request_id,
        conversation_id=conversation_id,
        timeout_sec=tool_config["timeout_sec"],
    ):
        try:
            event = json.loads(raw_line)
        except (json.JSONDecodeError, TypeError):
            continue

        event_type = event.get("type")
        if event_type == "chunk":
            content = event.get("content", "")
            if content:
                chunks.append(content)
        elif event_type == "error":
            error_msg = event.get("message", "unknown error")
            return f"子 Agent 返回错误: {error_msg}"

    return "".join(chunks)

14.6 app/agents/stream_handler.py

import json
from collections.abc import AsyncGenerator

from agents import Runner

from app.agents.main_agent import AgentContext, get_main_agent
from app.core.log import logger


async def run_agent_stream(
    messages: list[dict],
    campus_token: str,
    request_id: str,
    conversation_id: str,
) -> AsyncGenerator[str, None]:
    """运行主 Agent 并 yield SSE data 行(不含 'data: ' 前缀)。"""
    agent = get_main_agent()
    context = AgentContext(
        campus_token=campus_token,
        request_id=request_id,
        conversation_id=conversation_id,
    )
    full_text = ""

    try:
        result = Runner.run_streamed(agent, input=messages, context=context)

        async for event in result.stream_events():
            event_type = event.type

            if event_type == "raw_response_event":
                data = event.data
                data_type = getattr(data, "type", "")
                if data_type == "response.output_text.delta":
                    content = getattr(data, "delta", "") or ""
                    if content:
                        full_text += content
                        yield json.dumps(
                            {"type": "chunk", "content": content},
                            ensure_ascii=False,
                        )

            elif event_type == "run_item_stream_event":
                item = event.item
                item_type = type(item).__name__

                if item_type == "ToolCallItem":
                    raw = getattr(item, "raw_item", None)
                    tool_name = getattr(raw, "name", None) or (
                        raw.get("name", "unknown") if isinstance(raw, dict) else "unknown"
                    )
                    yield json.dumps(
                        {"type": "tool_call", "tool": tool_name, "args": ""},
                        ensure_ascii=False,
                    )
                elif item_type == "ToolCallOutputItem":
                    yield json.dumps(
                        {"type": "tool_result", "ok": True},
                        ensure_ascii=False,
                    )

            elif event_type == "agent_updated_stream_event":
                agent_name = getattr(event, "new_agent", agent).name
                yield json.dumps(
                    {"type": "agent_update", "agent_name": agent_name},
                    ensure_ascii=False,
                )

    except Exception:
        logger.exception(f"agent stream error request_id={request_id}")
        yield json.dumps(
            {"type": "error", "code": "50002", "message": "internal error"},
            ensure_ascii=False,
        )

    yield json.dumps({"type": "done", "full_text": full_text}, ensure_ascii=False)

14.7 app/sub_agents_client/http_client.py

from collections.abc import AsyncGenerator

import httpx

from app.core.log import logger


async def call_sub_agent_stream(
    base_url: str,
    message: str,
    campus_token: str,
    request_id: str,
    conversation_id: str,
    timeout_sec: int = 30,
    extra: dict | None = None,
) -> AsyncGenerator[str, None]:
    """调用子 Agent 的 /api/v1/chat SSE 接口,逐行 yield SSE data 行。"""
    payload = {"message": message}
    if extra:
        payload.update(extra)

    headers = {
        "Authorization": f"Bearer {campus_token}",
        "X-Request-Id": request_id,
        "X-Parent-Conversation-Id": conversation_id,
        "Content-Type": "application/json",
    }

    url = f"{base_url.rstrip('/')}/api/v1/chat"
    logger.info(f"call sub-agent url={url} request_id={request_id}")

    try:
        async with httpx.AsyncClient(timeout=timeout_sec) as client:
            async with client.stream("POST", url, json=payload, headers=headers) as resp:
                if resp.status_code != 200:
                    body = await resp.aread()
                    logger.error(
                        f"sub-agent error status={resp.status_code} "
                        f"url={url} body={body[:200]}"
                    )
                    yield f'{{"type":"error","code":"{resp.status_code}","message":"子 Agent 返回错误"}}'
                    return

                async for line in resp.aiter_lines():
                    if not line.startswith("data: "):
                        continue
                    raw = line[6:]
                    yield raw
    except httpx.TimeoutException:
        logger.error(f"sub-agent timeout url={url} timeout_sec={timeout_sec}")
        yield '{"type":"error","code":"50003","message":"子 Agent 响应超时"}'
    except httpx.HTTPError as e:
        logger.exception(f"sub-agent http error url={url} error={e!r}")
        yield f'{{"type":"error","code":"50002","message":"调用子 Agent 失败: {e}"}}'

14.8 app/api/v1/chat.py

import json
import time
import uuid

from fastapi import APIRouter, Depends, Request
from fastapi.responses import StreamingResponse
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.agents.stream_handler import run_agent_stream
from app.core.db import get_db
from app.core.log import logger
from app.core.security import get_current_user
from app.models.conversation import Conversation
from app.models.message import Message
from app.schemas.chat import ChatRequest

router = APIRouter(tags=["chat"])
MAX_HISTORY = 20


@router.post("/chat")
async def api_chat(
    request: Request,
    req: ChatRequest,
    current_user: dict = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    user_id = uuid.UUID(current_user["sub"])
    roles = current_user.get("roles", [])
    campus_token = getattr(request.state, "campus_token", "")
    request_id = getattr(request.state, "request_id", "-")

    if req.conversation_id:
        conversation_id = uuid.UUID(req.conversation_id)
    else:
        conversation_id = uuid.uuid4()
        conv = Conversation(
            id=conversation_id,
            user_id=user_id,
            title=req.message[:50],
            hint_domain=req.hint_domain,
        )
        db.add(conv)
        await db.commit()

    user_msg = Message(
        id=uuid.uuid4(),
        conversation_id=conversation_id,
        role="user",
        content=req.message,
        request_id=request_id,
    )
    db.add(user_msg)
    await db.commit()

    logger.info(
        f"chat request conversation_id={conversation_id} "
        f"user_id={user_id} roles={roles} message={req.message[:50]}"
    )

    stmt = (
        select(Message)
        .where(Message.conversation_id == conversation_id)
        .order_by(Message.created_at.desc())
        .limit(MAX_HISTORY)
    )
    result = await db.execute(stmt)
    history_rows = list(reversed(result.scalars().all()))
    messages = [{"role": row.role, "content": row.content} for row in history_rows]

    async def event_stream():
        start = time.perf_counter()
        full_text = ""
        try:
            session_event = json.dumps(
                {
                    "type": "session",
                    "conversation_id": str(conversation_id),
                    "message_id": str(user_msg.id),
                    "request_id": request_id,
                },
                ensure_ascii=False,
            )
            yield f"data: {session_event}\n\n"

            async for data_line in run_agent_stream(
                messages=messages,
                campus_token=campus_token,
                request_id=request_id,
                conversation_id=str(conversation_id),
            ):
                try:
                    parsed = json.loads(data_line)
                    if parsed.get("type") == "done":
                        full_text = parsed.get("full_text", full_text)
                except (json.JSONDecodeError, TypeError):
                    pass
                yield f"data: {data_line}\n\n"

        except Exception as e:
            logger.exception(f"chat stream error conversation_id={conversation_id}")
            yield f'data: {json.dumps({"type":"error","code":"50002","message":str(e)}, ensure_ascii=False)}\n\n'
            yield f'data: {json.dumps({"type":"done"})}\n\n'

        cost_ms = int((time.perf_counter() - start) * 1000)
        if full_text:
            assistant_msg = Message(
                id=uuid.uuid4(),
                conversation_id=conversation_id,
                role="assistant",
                content=full_text,
                request_id=request_id,
            )
            db.add(assistant_msg)
            await db.commit()

        logger.info(
            f"chat done conversation_id={conversation_id} "
            f"cost_ms={cost_ms} reply_len={len(full_text)}"
        )

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )

14.9 app/main.py

from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from app.core.config import settings
from app.core.db import engine
from app.core.log import setup_logging
from app.core.middleware import register_request_id_middleware
from app.api.v1 import v1_router

setup_logging()


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
    from app.core.log import logger
    logger.info(f"starting service={settings.app_name}")
    yield
    await engine.dispose()
    logger.info("shutdown complete")


app = FastAPI(title="校园数据智能体", lifespan=lifespan)
register_request_id_middleware(app)
app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "http://localhost:5173",
        "http://localhost:3000",
        "https://campus-agent-frontend.miaopinai.com",
    ],
    allow_methods=["*"],
    allow_headers=["*"],
)
app.include_router(v1_router)

14.10 config/sub-agents.yaml(新增子 Agent 示例)

agents:
  - name: food-agent
    description: >-
      食堂菜品推荐。当用户询问食堂、菜品、吃什么、美食推荐、口味偏好、
      餐厅价格、素食、辣度、套餐搭配等相关话题时调用此工具。
    base_url: http://food-agent-svc.default.svc.cluster.local:8000
    domain: food
    timeout_sec: 30

  - name: course-agent
    description: >-
      选课助手。当用户询问课程、学分、选课系统、教师评价、培养方案、
      冲突检测、课程表查询等相关话题时调用此工具。
    base_url: http://course-agent-svc.default.svc.cluster.local:8000
    domain: course
    timeout_sec: 30

附录 A:日志实例(真实一次请求)

2026-04-17 10:22:14.331 | INFO     | request_id=9bfee368691a4dc3 | app.core.middleware:request_id_middleware:32 - → POST /api/v1/chat
2026-04-17 10:22:14.345 | INFO     | request_id=9bfee368691a4dc3 | app.api.v1.chat:api_chat:59 - chat request conversation_id=... user_id=... message=今天中午吃啥
2026-04-17 10:22:14.411 | INFO     | request_id=9bfee368691a4dc3 | app.agents.main_agent:create_main_agent:71 - main agent created tools=['call_food_agent']
2026-04-17 10:22:14.890 | INFO     | request_id=9bfee368691a4dc3 | app.sub_agents_client.http_client:call_sub_agent_stream:30 - call sub-agent url=http://food-agent-svc...:8000/api/v1/chat request_id=9bfee368691a4dc3
2026-04-17 10:22:15.120 | INFO     | request_id=9bfee368691a4dc3 | httpx._client:_send_single_request:1740 - HTTP Request: POST http://food-agent-svc...:8000/api/v1/chat "HTTP/1.1 200 OK"
2026-04-17 10:22:18.045 | INFO     | request_id=9bfee368691a4dc3 | app.api.v1.chat:event_stream:136 - chat done conversation_id=... cost_ms=3714 reply_len=128
2026-04-17 10:22:18.046 | INFO     | request_id=9bfee368691a4dc3 | app.core.middleware:request_id_middleware:36 - ← POST /api/v1/chat status=200 cost_ms=3715

所有行都以 request_id=9bfee368691a4dc3 串联,子 Agent 侧也会看到同一 request_id。这就是本架构的排错基准。


附录 B:实现 Checklist

  • app/core/context.py 定义 request_id_ctx
  • app/core/log.py 双 sink + patcher + InterceptHandler
  • app/core/middleware.py 函数式中间件(不用 BaseHTTPMiddleware
  • app/core/security.py JWT 签发/解析 + HTTPBearer
  • app/core/config.py Settings
  • app/core/db.py async engine + Base
  • app/models/conversation.py + app/models/message.py
  • app/schemas/chat.py
  • app/sub_agents_client/http_client.py
  • app/agents/tools_loader.py
  • app/agents/main_agent.pytool_fn 入口必须 re-set request_id_ctx
  • app/agents/stream_handler.py
  • app/api/v1/chat.py
  • app/main.py 装配 + 注册中间件
  • config/sub-agents.yaml
  • .envDASHSCOPE_API_KEYJWT_SECRETDATABASE_URL
  • 手工跑一次 POST /api/v1/chat,在日志里肉眼确认所有行都带同一 request_id
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐