技术方案:主 Agent 调用子 Agent 架构设计
本文档详细介绍了主Agent调用子Agent的架构设计方案。主要内容包括: 背景与目标:校园数据智能体需要主Agent作为统一入口调度多个垂直领域子Agent(如选课助手、校园通知等),主Agent负责意图识别和子Agent调用。 架构设计: 采用FastAPI+PostgreSQL技术栈 主Agent通过function_tool机制动态注册子Agent 全链路X-Request-Id实现请求追
技术方案:主 Agent 调用子 Agent 架构设计
本文档面向 AI 实现者,目标是:读完本文即可从零实现整个「主 Agent 通过 function_tool 调用子 Agent」的服务端链路。
所有代码片段均可直接复制,无需再二次联想接口。
1. 背景与目标
1.1 业务背景
校园数据智能体需要统一入口调度多个垂直领域子 Agent(选课助手、校园通知、学工、教务等)。各子 Agent 是独立部署的 FastAPI 服务,各自持有独立的数据域、RAG 知识库和领域 prompt。主 Agent 职责只有两个:
- 理解用户意图,选择合适的子 Agent 并调用;
- 把子 Agent 的输出回流给用户(SSE 流式透传)。
1.2 目标
- 主 Agent 使用 OpenAI Agents SDK(Python)的
function_tool机制注册子 Agent,LLM 根据用户输入自动选路; - 主 → 子 通过 HTTP + SSE 调用,协议层零改造新增子 Agent(只需往
sub-agents.yaml追加一行); - 全链路
X-Request-Id贯通:网关 → 主 Agent → LLM → function_tool → HTTP → 子 Agent → 子 Agent LLM,日志可通过单个request_id串联排错; - 用户校园 token(campus_token)透传到子 Agent 完成数据鉴权;
- 子 Agent 故障、超时不阻断主 Agent 流式响应,能降级返回友好提示。
1.3 非目标
- 子 Agent 内部实现不在本文范围;
- 前端 SSE 消费细节见另一份文档;
- SSO 登录换 campus_token 见另一份文档。
2. 架构总览
2.1 组件拓扑
2.2 端到端时序
2.3 技术栈
| 层 | 技术 |
|---|---|
| 语言/框架 | Python 3.11+ / FastAPI / Uvicorn |
| Agent 框架 | OpenAI Agents SDK (agents 包) |
| LLM | 阿里云 DashScope(OpenAI 兼容模式),模型 qwen-plus |
| HTTP 客户端 | httpx.AsyncClient (SSE 流式) |
| 数据库 | PostgreSQL 17(异步驱动 asyncpg + SQLAlchemy 2.0) |
| 日志 | loguru(双 sink:stdout + 文件)+ InterceptHandler 桥接 stdlib |
| 鉴权 | JWT(python-jose)+ HTTPBearer |
| 配置 | pydantic-settings + .env + sub-agents.yaml |
3. 核心概念
3.1 主 Agent(Main Agent)
- 单例
Agent对象,SDK 提供; - 持有所有已注册子 Agent 作为
tools; - 依赖
AgentContext(见 3.3)在 tool_call 期间读取运行期变量。
3.2 子 Agent(Sub Agent)
- 独立 FastAPI 服务;
- 暴露统一契约
POST /api/v1/chat,接收{"message": "..."}; - 返回 SSE 流,事件类型
chunk/tool_call/tool_result/done/error; - 鉴权头
Authorization: Bearer <campus_token>+ 追踪头X-Request-Id。
3.3 AgentContext
这是主 Agent 给 LLM tool_call 注入运行期数据的唯一通道。LLM 只看得见 message 字符串;campus_token、request_id、conversation_id 这些绝对不能走 prompt,必须走 RunContextWrapper。
@dataclass
class AgentContext:
campus_token: str = "" # 校园 SSO token,透传给子 Agent
request_id: str = "-" # 全链路追踪 id
conversation_id: str = "" # 主会话 id,便于子 Agent 关联父会话
3.4 function_tool
OpenAI Agents SDK 的装饰器,把 Python 函数暴露给 LLM 调用。本项目中每个子 Agent 都动态生成一个 function_tool,工具名 = call_<sub_agent_name>(横杠转下划线)。
3.5 request_id 双通道传递
由于 function_tool 实际执行时 SDK 内部用了独立 executor,Python 的 ContextVar 不会自动跨越 executor 边界。本项目严格遵守"两手抓"原则:
- 显式字段:
AgentContext.request_id—— SDK 保证 context 对象传到 tool; - ContextVar 回填:tool 入口
request_id_ctx.set(agent_ctx.request_id)—— 保证 tool 内部调用的第三方库(httpx、sqlalchemy)日志也带 request_id。
详见 §10。
4. 项目目录结构
campus-agent/
├── backend/
│ ├── app/
│ │ ├── main.py # FastAPI 入口;注册中间件、生命周期
│ │ ├── api/
│ │ │ └── v1/
│ │ │ ├── __init__.py # v1_router 聚合
│ │ │ └── chat.py # POST /api/v1/chat 入口
│ │ ├── agents/
│ │ │ ├── main_agent.py # 主 Agent 单例 + AgentContext
│ │ │ ├── tools_loader.py # 读 YAML 构造 function_tool
│ │ │ └── stream_handler.py # Runner.run_streamed → 统一 SSE
│ │ ├── sub_agents_client/
│ │ │ └── http_client.py # httpx SSE 客户端
│ │ ├── core/
│ │ │ ├── config.py # Settings(pydantic-settings)
│ │ │ ├── context.py # request_id_ctx: ContextVar
│ │ │ ├── log.py # loguru 双 sink + InterceptHandler
│ │ │ ├── middleware.py # 函数式 request_id 中间件
│ │ │ ├── security.py # JWT 签发/解析 + get_current_user
│ │ │ └── db.py # SQLAlchemy async engine + Base
│ │ ├── models/
│ │ │ ├── conversation.py # 会话表
│ │ │ └── message.py # 消息表(含 request_id 列)
│ │ └── schemas/
│ │ └── chat.py # ChatRequest Pydantic
│ ├── config/
│ │ └── sub-agents.yaml # 子 Agent 清单
│ ├── logs/ # 运行期日志落盘
│ ├── .env
│ └── pyproject.toml
└── docs/
├── technical-design-sub-agent-calling.md ← 本文件
└── technical-design-logging-observability.md
5. 认证与鉴权 token 流转(简版)
本节只描述主 Agent ↔ 子 Agent 调用相关的 token 处理。完整 SSO 登录流程见另一份文档。
5.1 三种 token
| 名称 | 签发方 | 持有方 | 作用 | 传递头 |
|---|---|---|---|---|
| campus_token | 智慧校园 SSO | 用户浏览器 → 主 Agent 登录接口 | 数据域鉴权,子 Agent 调用校园 API 的凭证 | — |
| access_token (JWT) | 主 Agent 登录接口 | 前端 localStorage | 主 Agent 自身 API 鉴权;内部嵌入 campus_token 作为 claim | Authorization: Bearer <jwt> |
| campus_token(透传) | 主 Agent(从 JWT claim 取出) | 子 Agent | 子 Agent 调用校园 API 的凭证 | Authorization: Bearer <campus_token> |
5.2 流转时序
5.3 关键代码契约
JWT 签发(app/core/security.py):
def create_access_token(user_id: UUID, roles: list[str], campus_token: str) -> str:
expire = datetime.now(timezone.utc) + timedelta(hours=settings.jwt_ttl_hours)
payload = {
"sub": str(user_id),
"roles": roles,
"campus_token": campus_token, # ← 嵌入
"exp": expire,
}
return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)
依赖:解析 JWT 并把 campus_token 放到 request.state:
async def get_current_user(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(_bearer_scheme),
) -> dict:
if credentials is None:
raise HTTPException(status_code=401, detail={"code": 40101, "message": "token_missing"})
payload = decode_access_token(credentials.credentials)
request.state.campus_token = payload.get("campus_token", "")
return payload
chat 端点取出 campus_token 放进 AgentContext:
campus_token = getattr(request.state, "campus_token", "")
# ... 后续传入 run_agent_stream(campus_token=...)
5.4 子 Agent 侧鉴权
子 Agent 必须实现:
- 解析
Authorization: Bearer <campus_token>头; - 用 campus_token 调校园 API 时,如返回 401 → SSE error 事件
{"code": "40102", "message": "campus_token_expired"}; - 禁止信任前端直接传过来的请求,必须
Authorization头为 Bearer 格式且非空。
5.5 本版约定的不做项
- 暂不做主 Agent ↔ 子 Agent 之间的服务间 mTLS(内网 K8s 集群内 SVC 调用,先用 Bearer + 网络隔离);
- 暂不做 JWT refresh token(TTL 24h + 前端过期重新登录);
- 暂不做 campus_token 撤销列表;
- 暂不做子 Agent 侧 JWT 校验(子 Agent 只校验 campus_token,不校验主 Agent JWT)。
6. 配置 schema(sub-agents.yaml)
6.1 完整字段
# backend/config/sub-agents.yaml
agents:
- name: food-agent
description: >-
食堂菜品推荐。当用户询问食堂、菜品、吃什么、美食推荐、
口味偏好、餐厅价格、素食、辣度、套餐搭配等相关话题时调用此工具。
base_url: http://food-agent-svc.default.svc.cluster.local:8000
domain: food
timeout_sec: 30
6.2 字段定义
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
name |
string | ✔ | 工具名(转下划线后作为 LLM 可见的 function_tool 名)。示例:food-agent → call_food_agent |
description |
string | ✔ | LLM 决策依据。重要:必须包含触发关键词,LLM 读不到其它上下文 |
base_url |
string | ✔ | 子 Agent 根 URL,拼接 /api/v1/chat 得完整地址 |
domain |
string | 推荐 | 领域标签,用于日志、前端侧边栏、hint_domain 路由 |
timeout_sec |
int | 默认 30 | 单次调用超时 |
6.3 description 写作规范
LLM 只看 description 决定是否调用。规范:
- 开头一句话概括能力;
- 接"当用户询问 A、B、C、D 等相关话题时调用此工具"的触发关键词列表;
- 不要写"我可以…"、"支持…"这种营销话术,LLM 不需要;
- 关键词覆盖同义词(“食堂”、“吃什么”、"美食推荐"是三个触发点)。
6.4 扩展方式
新增子 Agent 只需:
- 在 yaml 追加一项;
- 无需改任何 Python 代码;
- 重启主 Agent。
7. 主 Agent 实现细节
7.1 AgentContext 定义
# app/agents/main_agent.py
from dataclasses import dataclass
@dataclass
class AgentContext:
campus_token: str = ""
request_id: str = "-"
conversation_id: str = ""
7.2 LLM 客户端单例
from openai import AsyncOpenAI
from agents import set_tracing_disabled
set_tracing_disabled(True) # 关闭 SDK 默认 tracing(避免发数据到 OpenAI)
DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
_llm_client: AsyncOpenAI | None = None
def _get_llm_client() -> AsyncOpenAI:
global _llm_client
if _llm_client is None:
if not settings.dashscope_api_key:
raise RuntimeError("DASHSCOPE_API_KEY 未配置")
_llm_client = AsyncOpenAI(
api_key=settings.dashscope_api_key,
base_url=DASHSCOPE_BASE_URL,
)
return _llm_client
7.3 SYSTEM_PROMPT
SYSTEM_PROMPT = """你是校园数据智能体的主调度 Agent,名叫"校园小智"。
你的职责:
1. 理解用户意图,选择合适的子 Agent 并调用对应工具;
2. 工具返回内容原样整理后回复用户,不要编造工具未返回的信息;
3. 工具调用失败或返回错误时,直接告诉用户"该领域暂时查询失败,请稍后重试"。
可用工具见 tools 列表,每个工具的 description 说明何时调用。
未命中任何工具时,用通用中文礼貌回答。
"""
7.4 function_tool 工厂
# app/agents/main_agent.py
from agents import function_tool, RunContextWrapper
from app.agents.tools_loader import invoke_sub_agent
from app.core.context import request_id_ctx
def _create_tool_fn(tool_config: dict):
@function_tool(
name_override=f"call_{tool_config['name'].replace('-', '_')}",
description_override=tool_config["description"],
)
async def tool_fn(
ctx: RunContextWrapper[AgentContext],
message: str,
) -> str:
agent_ctx = ctx.context
# ⚠️ 两手抓:SDK tool executor 可能不继承外层 ContextVar,
# 必须在 tool 入口立刻把 request_id 回填到当前任务的 ContextVar,
# 否则 tool 内部调用的 httpx、sqlalchemy 等库的日志会丢 request_id。
token = request_id_ctx.set(agent_ctx.request_id)
try:
result = await invoke_sub_agent(
tool_config=tool_config,
message=message,
campus_token=agent_ctx.campus_token,
request_id=agent_ctx.request_id,
conversation_id=agent_ctx.conversation_id,
)
return result
finally:
request_id_ctx.reset(token)
return tool_fn
7.5 Agent 创建与单例
from agents import Agent, OpenAIChatCompletionsModel, ModelSettings
from app.agents.tools_loader import build_sub_agent_tools
def create_main_agent() -> Agent:
tool_configs = build_sub_agent_tools()
tools = [_create_tool_fn(cfg) for cfg in tool_configs]
agent = Agent(
name="campus-main-agent",
instructions=SYSTEM_PROMPT,
tools=tools,
model=OpenAIChatCompletionsModel(
model=settings.llm_model,
openai_client=_get_llm_client(),
),
model_settings=ModelSettings(temperature=0.7),
)
logger.info(f"main agent created tools={[t.name for t in tools]}")
return agent
_agent: Agent | None = None
def get_main_agent() -> Agent:
global _agent
if _agent is None:
_agent = create_main_agent()
return _agent
7.6 Runner.run_streamed 与事件映射
见 §14.4 的 stream_handler.py 完整代码。
主要事件类型处理:
| SDK 事件 | 前端收到的 SSE 事件 | 说明 |
|---|---|---|
raw_response_event + response.output_text.delta |
{"type": "chunk", "content": "..."} |
LLM 逐 token 输出 |
run_item_stream_event + ToolCallItem |
{"type": "tool_call", "tool": "call_food_agent"} |
开始调工具 |
run_item_stream_event + ToolCallOutputItem |
{"type": "tool_result", "ok": true} |
工具返回 |
agent_updated_stream_event |
{"type": "agent_update", "agent_name": "..."} |
切换 agent(handoff) |
| (流结束后补发) | {"type": "done", "full_text": "..."} |
主 Agent 自己补的完结事件 |
| 异常捕获 | {"type": "error", "code": "50002", "message": "..."} |
7.7 会话与历史消息
- 会话建立:首次请求
conversation_id=null→ 新建Conversation行;后续请求携带已有 id; - 历史取最近 20 条(
MAX_HISTORY = 20),按created_at升序传给 LLM; - 用户消息持久化在 LLM 调用前(先入库再调 LLM,保证请求可追溯);
- Assistant 消息持久化在 SSE
done后(拿到full_text再入库)。
8. 子 Agent 标准契约
所有子 Agent 必须遵守以下契约,否则主 Agent 无法接入。
8.1 HTTP 接口
POST /api/v1/chat
Content-Type: application/json
Authorization: Bearer <campus_token>
X-Request-Id: <request_id>
X-Parent-Conversation-Id: <main_conversation_id>
{
"message": "今天中午吃啥"
}
可选字段(主 Agent 通过 extra 传入,子 Agent 按需读取):
domain:来自 YAML 的 domain 字段;hint_*:意图提示字段。
8.2 响应:SSE
Content-Type: text/event-stream,逐行 data: <json>\n\n。
事件类型(枚举,新增类型必须同步主 Agent 的 stream_handler):
| type | payload | 说明 |
|---|---|---|
session |
{"conversation_id": "...", "request_id": "..."} |
子 Agent 自身的 session,可选 |
chunk |
{"content": "..."} |
增量文本,必须累计拼成回复 |
tool_call |
{"tool": "...", "args": "..."} |
子 Agent 内部工具调用(展示用,可选) |
tool_result |
{"ok": true} |
子 Agent 内部工具结束 |
done |
{"full_text": "...", "chart": {...}} |
必须发,标志本次响应结束 |
error |
{"code": "50002", "message": "..."} |
出错时发,随后仍需发 done |
8.3 错误码对齐
| code | 含义 | 主 Agent 处理 |
|---|---|---|
| 40101 | token 缺失 | 主 Agent 已经卡在自己这一层,不会发生 |
| 40102 | campus_token 过期/无效 | 主 Agent 降级文本提示用户重新登录 |
| 50001 | 子 Agent 内部错误 | 工具结果返回"子 Agent 返回错误: xxx" |
| 50002 | 通用错误 | 同上 |
| 50003 | 下游依赖错误 | 同上 |
8.4 超时策略
- 子 Agent 单次响应硬超时 ≤ 30s(YAML 里每个 agent 可独立配置);
- 主 Agent
httpx.AsyncClient(timeout=timeout_sec)到时直接断流; - 子 Agent 内部若需调 LLM,必须自己设更短的超时(建议 25s),给 SSE 收尾留余量。
8.5 幂等性
- 子 Agent 不要求幂等,但必须能在同一
conversation_id上被多次调用(不能 400); - 重复的
request_id视为新请求(主 Agent 也不会主动重试)。
9. HTTP 通信契约
9.1 SSE 客户端实现要点
见 §14.7 的完整代码。核心约束:
- 使用
httpx.AsyncClient+client.stream("POST", ...); - 不要
resp.text,必须aiter_lines(); - 只处理以
data:开头的行,去前缀后 yield; - 超时分三类捕获:
httpx.TimeoutException→ “子 Agent 响应超时,请稍后重试。”httpx.HTTPError(连接失败、DNS、RST)→ “调用子 Agent 失败: {e}”- 非 200 状态码 → “子 Agent 返回错误({status}),请稍后重试。”
- 不要对 5xx 做自动重试——重试 SSE 流意味着用户可能看到两段半吊子回复。
9.2 Header 约定
| Header | 方向 | 必填 | 内容 |
|---|---|---|---|
Authorization |
主→子 | ✔ | Bearer <campus_token> |
X-Request-Id |
主→子 | ✔ | 16 字符 hex,来源于主 Agent 的 request_id_ctx |
X-Parent-Conversation-Id |
主→子 | 推荐 | 主会话 id,便于日志关联 |
Content-Type |
主→子 | ✔ | application/json |
X-Request-Id |
子→主 | 推荐 | 回显,方便双向对账 |
9.3 payload
{
"message": "<用户原始 message 或 LLM 重写过的 message>",
"extra_field_1": "..."
}
message 是 LLM 决策调用工具时填入 call_food_agent(message="...") 的那个值,可能与用户原话不一致(LLM 会重写)。这是 OpenAI Agents SDK 的正常行为,子 Agent 把它当作用户提问即可。
10. request_id 全链路传递
10.1 为什么需要专门一节
本项目最大的排错痛点就是「日志对不上」。必须做到从接入网关到子 Agent 内部 LLM 调用,都能用同一个 request_id 过滤出完整一条链路。
10.2 全链路关键节点
10.3 十处 set/get 清单
| 位置 | 操作 | 代码 |
|---|---|---|
| 1. nginx | 保留或生成 | proxy_set_header X-Request-Id $request_id; |
| 2. middleware 入口 | 读 header 或生成新 id | rid = request.headers.get("X-Request-Id") or uuid.uuid4().hex[:16] |
| 3. middleware 入口 | 写 ContextVar | token = request_id_ctx.set(rid) |
| 4. middleware 入口 | 写 request.state | request.state.request_id = rid |
| 5. chat endpoint | 读 request.state | request_id = getattr(request.state, "request_id", "-") |
| 6. stream_handler | 放进 AgentContext | AgentContext(request_id=request_id, ...) |
| 7. function_tool 入口 | 重新 set ContextVar | token = request_id_ctx.set(agent_ctx.request_id) |
| 8. http_client | 放进 header | headers = {"X-Request-Id": request_id, ...} |
| 9. middleware 出口 | reset ContextVar | request_id_ctx.reset(token) |
| 10. middleware 出口 | 回显 response header | response.headers["X-Request-Id"] = request_id |
10.4 最常踩的两个坑
- 只做了 AgentContext,没在 tool 入口 re-set ContextVar → tool 外层日志(主 Agent 打的)带 request_id,tool 内部
httpx/sqlalchemy的日志全是request_id=-。修正:§7.4 的 try/finally 模式。 - middleware 用
BaseHTTPMiddleware子类 → anyio task group 破坏 ContextVar 传递,StreamingResponse 下游日志全丢。修正:只能用函数式@app.middleware("http")。
10.5 request_id 格式
- 16 字符 hex,取
uuid.uuid4().hex[:16](不是str(uuid.uuid4())的 36 字符); - 短便于日志里肉眼对账;
- 冲突概率 16^16 ≈ 10^19,对单日请求量远远够用;
messages.request_id数据库列String(32)预留长度即可。
11. 日志实现
本节遵循
@technical-scheme-design:references/logging-standards.md。以下是落到本项目的具体实现,不是规范复述。
11.1 loguru 双 sink 配置清单
app/core/log.py 完整代码见 §14.1。关键决策:
# 控制台 sink
_raw_logger.add(
sys.stdout,
level=settings.log_level,
format=FMT,
enqueue=False, # Windows + uvicorn --reload 下 True 会吞日志
backtrace=True,
diagnose=settings.debug,
colorize=True,
)
# 文件 sink(生产主依赖)
_raw_logger.add(
LOG_DIR / "app.log",
level=settings.log_level,
format=FMT_PLAIN,
rotation="50 MB",
retention="14 days",
encoding="utf-8",
enqueue=False,
backtrace=True,
diagnose=False,
)
为什么 enqueue=False:本项目开发以 Windows + uvicorn --reload 为主,Linux K8s pod 部署单进程单容器,两种场景下 enqueue=True 带来的问题都大于收益(Windows stdout 吞日志、多 worker 多进程共享文件 sink 竞争)。如果后续上 gunicorn 多 worker,文件 sink 改成按 pid 分文件,或切换成 sink=my_http_sink 推到日志中心。
11.2 格式串与字段定义
FMT = (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<magenta>request_id={extra[request_id]}</magenta> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
"<level>{message}</level>"
)
FMT_PLAIN = (
"{time:YYYY-MM-DD HH:mm:ss.SSS} | "
"{level: <8} | "
"request_id={extra[request_id]} | "
"{name}:{function}:{line} - "
"{message}"
)
字段含义:
| 字段 | 来源 | 备注 |
|---|---|---|
time |
loguru 自动 | 带毫秒,本地时区 |
level |
logger.info/warning/... |
左对齐 8 字符 |
request_id |
extra[request_id] ← patcher 注入 |
无 ContextVar 时默认 - |
name |
loguru 自动 | 调用方模块路径 |
function |
loguru 自动 | 函数名 |
line |
loguru 自动 | 行号 |
message |
f-string 拼好的消息 | 业务字段用 k=v 紧凑写法 |
11.3 InterceptHandler 桥接 stdlib
将 uvicorn/httpx/sqlalchemy 等使用 logging.Logger 的库全部重定向到 loguru:
class InterceptHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
try:
level = _raw_logger.level(record.levelname).name
except ValueError:
level = record.levelno
frame, depth = inspect.currentframe(), 0
while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__):
frame = frame.f_back
depth += 1
_raw_logger.opt(depth=depth, exception=record.exc_info).log(
level, record.getMessage()
)
def setup_logging() -> None:
# ... sink 配置 ...
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
for name in (
"uvicorn", "uvicorn.error", "uvicorn.access",
"fastapi",
"asyncio",
"httpx", "httpcore",
"sqlalchemy.engine",
):
lg = logging.getLogger(name)
lg.handlers = [InterceptHandler()]
lg.propagate = False
已知现象:uvicorn.access 的日志依旧是 request_id=-,因为 ASGI access 日志是在 middleware 之外、协议层打的,中间件的 ContextVar 尚未 set。这是预期行为,排错时忽略这一行,从业务 logger.info("→ METHOD PATH") 开始看。可选优化:
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
11.4 patcher + ContextVar 自动注入
from app.core.context import request_id_ctx
def _patcher(record: dict) -> None:
record["extra"].setdefault("request_id", request_id_ctx.get())
_raw_logger.configure(patcher=_patcher)
- 每条日志发出前都会被 patcher 挂钩,从当前任务的 ContextVar 读 request_id;
setdefault而非set:如果调用方显式logger.bind(request_id="custom"),不覆盖。
11.5 本架构强制打的关键日志节点
下面每一条都是必打,排错靠它们。所有 logger.xxx 调用都会被 patcher 自动注入 request_id。
| 节点 | 代码位置 | 日志模板 | 级别 |
|---|---|---|---|
| 请求入口 | middleware | → {method} {path} |
INFO |
| 请求出口 | middleware | ← {method} {path} status={code} cost_ms={ms} |
INFO |
| 请求异常 | middleware | ✗ {method} {path} cost_ms={ms} |
ERROR (exception) |
| chat 入口 | chat endpoint | chat request conversation_id={cid} user_id={uid} roles={r} message={前50字} |
INFO |
| chat 出口 | chat endpoint | chat done conversation_id={cid} cost_ms={ms} reply_len={n} |
INFO |
| chat 异常 | chat endpoint | chat stream error conversation_id={cid} |
ERROR (exception) |
| main agent 初始化 | main_agent.py | main agent created tools={names} |
INFO |
| 子 Agent 注册 | tools_loader | registered sub-agent tool name={n} domain={d} base_url={url} |
INFO |
| 子 Agent 调用开始 | http_client | call sub-agent url={url} request_id={rid} |
INFO |
| 子 Agent 非 200 | http_client | sub-agent error status={code} url={url} body={前200字} |
ERROR |
| 子 Agent 超时 | http_client | sub-agent timeout url={url} timeout_sec={n} |
ERROR |
| 子 Agent HTTP 异常 | http_client | sub-agent http error url={url} error={e!r} |
ERROR (exception) |
| Agent stream 异常 | stream_handler | agent stream error request_id={rid} |
ERROR (exception) |
| 服务启动 | lifespan | starting service={name} |
INFO |
| 服务关闭 | lifespan | shutdown complete |
INFO |
11.6 敏感字段脱敏
禁止直接把 campus_token、JWT 原文打日志。需要时只打前后 4 字符:
def _redact(token: str) -> str:
if not token or len(token) < 12:
return "***"
return f"{token[:4]}...{token[-4:]}"
logger.info(f"chat token={_redact(campus_token)}")
其他敏感字段:
| 字段 | 处理 |
|---|---|
password |
禁打 |
Authorization 头原值 |
禁打 |
| 手机号 | 138****1234 |
| 身份证号 | 禁打 |
message 用户原话 |
截断前 50 字(见 chat endpoint) |
11.7 生产 vs 开发配置差异
| 维度 | 开发(Windows/WSL) | 生产(Linux K8s) |
|---|---|---|
log_level |
DEBUG 或 INFO | INFO |
| stdout sink | 开,彩色 | 开,彩色(K8s 收集 stdout) |
| 文件 sink | 开(stdout 被 PowerShell 吞时兜底) | 可关,若开则 PVC 挂载;主依赖 stdout + 日志中心 |
enqueue |
False | False(单进程单容器);多 worker 改文件按 pid |
diagnose |
True(debug 模式下) | False(堆栈可能泄敏) |
| uvicorn | --reload |
--workers N(根据 CPU) |
11.8 引用
完整规范见 @technical-scheme-design:references/logging-standards.md,重点关注:R14(禁 BaseHTTPMiddleware)、§4.4.1(SDK executor ContextVar 两手抓)、§6.6(dev pitfalls)、§7.2(uvicorn access log request_id=- 是预期)。
12. 数据库设计
12.1 表清单
| 表 | 说明 |
|---|---|
users |
用户(本文档不定义,由 SSO 文档定义) |
conversations |
主 Agent 会话 |
messages |
每条用户 / assistant 消息 |
12.2 conversations
CREATE TABLE conversations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
title VARCHAR(128) NOT NULL DEFAULT '',
hint_domain VARCHAR(32),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX ix_conversations_user_id ON conversations(user_id);
SQLAlchemy 模型:
class Conversation(Base):
__tablename__ = "conversations"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False, index=True)
title: Mapped[str] = mapped_column(String(128), default="")
hint_domain: Mapped[str | None] = mapped_column(String(32), nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
12.3 messages
CREATE TABLE messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
conversation_id UUID NOT NULL REFERENCES conversations(id),
role VARCHAR(16) NOT NULL, -- 'user' | 'assistant' | 'tool'
content TEXT NOT NULL DEFAULT '',
chart JSONB, -- 子 Agent 返回的图表数据
agent_name VARCHAR(64), -- 哪个子 Agent 生成,null 表示主 Agent
request_id VARCHAR(32), -- 追踪 id,关联日志
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX ix_messages_conversation_id ON messages(conversation_id);
SQLAlchemy 模型:
class Message(Base):
__tablename__ = "messages"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
conversation_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("conversations.id"), nullable=False, index=True)
role: Mapped[str] = mapped_column(String(16), nullable=False)
content: Mapped[str] = mapped_column(Text, nullable=False, default="")
chart: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
agent_name: Mapped[str | None] = mapped_column(String(64), nullable=True)
request_id: Mapped[str | None] = mapped_column(String(32), nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
12.4 写入时机
| 时机 | 写什么 |
|---|---|
POST /chat 入口,LLM 调用前 |
messages(role='user', content=req.message, request_id=rid) |
SSE done 事件后,full_text 非空 |
messages(role='assistant', content=full_text, request_id=rid) |
首次请求(req.conversation_id is None) |
conversations(title=req.message[:50]) |
12.5 不做的事
- 不存工具调用中间结果(
role='tool'暂不写); - 不存 chart(子 Agent 返回 chart 时留给 assistant 消息的
chart列,但本版主 Agent 不提取,先 null); - 不做软删除,直接 DELETE。
13. 错误处理与重试
13.1 分层异常
| 层 | 异常 | 暴露给用户 |
|---|---|---|
| middleware | 任意未捕获 | 500 json {code: "50000", message: "internal error"} |
| JWT 解析 | JWTError |
401 json {code: "40101", message: "token_invalid"} |
| DB 操作 | SQLAlchemyError |
500 SSE error 事件 |
| LLM 调用 | OpenAI SDK 异常 | SSE error 事件 {code: "50002"} |
| 子 Agent HTTP 异常 | httpx.TimeoutException |
工具返回字符串给 LLM:“子 Agent 响应超时,请稍后重试。” |
| 子 Agent HTTP 异常 | httpx.HTTPError |
工具返回字符串:“调用子 Agent 失败: {e}” |
| 子 Agent 非 200 | status != 200 | 工具返回字符串:“子 Agent 返回错误({status}),请稍后重试。” |
| 子 Agent SSE error 事件 | event.type == “error” | 工具返回字符串:“子 Agent 返回错误: {msg}” |
13.2 重试策略
- 不做自动重试。理由:
- 用户等待的是 SSE 流式回复,重试意味着等二倍时间;
- LLM 工具调用失败,LLM 会自己决定要不要"换个说法再试"(由 prompt 控制);
- 自动重试掩盖子 Agent 的真实故障率。
- 幂等性有保证的只读接口未来可酌情加,但本版不做。
13.3 降级
- 子 Agent 全挂 → LLM 无工具可用,走通用回答 prompt(“该领域暂时查询失败…”);
- DashScope 挂 → 直接 SSE error + done,前端显示"AI 服务暂时不可用";
- 数据库挂(会话持久化失败) → 本版直接 500。未来可改成先走 LLM,持久化失败落消息队列异步补写。
13.4 熔断
本版不做熔断,子 Agent 错误率监控见独立 observability 文档。
14. 关键文件完整示例代码
以下代码是本项目实际可运行版本,实现时可直接抄。
14.1 app/core/log.py
import inspect
import logging
import sys
from pathlib import Path
from loguru import logger as _raw_logger
from app.core.config import settings
from app.core.context import request_id_ctx
FMT = (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<magenta>request_id={extra[request_id]}</magenta> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
"<level>{message}</level>"
)
FMT_PLAIN = (
"{time:YYYY-MM-DD HH:mm:ss.SSS} | "
"{level: <8} | "
"request_id={extra[request_id]} | "
"{name}:{function}:{line} - "
"{message}"
)
LOG_DIR = Path("logs")
def _patcher(record: dict) -> None:
record["extra"].setdefault("request_id", request_id_ctx.get())
class InterceptHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
try:
level = _raw_logger.level(record.levelname).name
except ValueError:
level = record.levelno
frame, depth = inspect.currentframe(), 0
while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__):
frame = frame.f_back
depth += 1
_raw_logger.opt(depth=depth, exception=record.exc_info).log(
level, record.getMessage()
)
def setup_logging() -> None:
_raw_logger.remove()
_raw_logger.configure(patcher=_patcher)
_raw_logger.add(
sys.stdout,
level=settings.log_level,
format=FMT,
enqueue=False,
backtrace=True,
diagnose=settings.debug,
colorize=True,
)
LOG_DIR.mkdir(parents=True, exist_ok=True)
_raw_logger.add(
LOG_DIR / "app.log",
level=settings.log_level,
format=FMT_PLAIN,
rotation="50 MB",
retention="14 days",
encoding="utf-8",
enqueue=False,
backtrace=True,
diagnose=False,
)
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
for name in (
"uvicorn", "uvicorn.error", "uvicorn.access",
"fastapi",
"asyncio",
"httpx", "httpcore",
"sqlalchemy.engine",
):
lg = logging.getLogger(name)
lg.handlers = [InterceptHandler()]
lg.propagate = False
logger = _raw_logger
14.2 app/core/context.py
from contextvars import ContextVar
request_id_ctx: ContextVar[str] = ContextVar("request_id", default="-")
14.3 app/core/middleware.py
import time
import uuid
from collections.abc import Awaitable, Callable
from fastapi import FastAPI, Request
from starlette.responses import Response
from app.core.context import request_id_ctx
from app.core.log import logger
REQUEST_ID_HEADER = "X-Request-Id"
def register_request_id_middleware(app: FastAPI) -> None:
"""函数式 @app.middleware('http') 注册 request_id。
禁止用 BaseHTTPMiddleware 子类,anyio task group 破坏 ContextVar。"""
@app.middleware("http")
async def request_id_middleware(
request: Request,
call_next: Callable[[Request], Awaitable[Response]],
) -> Response:
request_id = request.headers.get(REQUEST_ID_HEADER) or uuid.uuid4().hex[:16]
token = request_id_ctx.set(request_id)
request.state.request_id = request_id
start = time.perf_counter()
try:
logger.info(f"→ {request.method} {request.url.path}")
response = await call_next(request)
cost_ms = int((time.perf_counter() - start) * 1000)
logger.info(
f"← {request.method} {request.url.path} "
f"status={response.status_code} cost_ms={cost_ms}"
)
response.headers[REQUEST_ID_HEADER] = request_id
return response
except Exception:
cost_ms = int((time.perf_counter() - start) * 1000)
logger.exception(
f"✗ {request.method} {request.url.path} cost_ms={cost_ms}"
)
raise
finally:
request_id_ctx.reset(token)
14.4 app/agents/main_agent.py
from dataclasses import dataclass
from agents import (
Agent,
ModelSettings,
OpenAIChatCompletionsModel,
RunContextWrapper,
function_tool,
set_tracing_disabled,
)
from openai import AsyncOpenAI
from app.agents.tools_loader import build_sub_agent_tools, invoke_sub_agent
from app.core.config import settings
from app.core.context import request_id_ctx
from app.core.log import logger
@dataclass
class AgentContext:
campus_token: str = ""
request_id: str = "-"
conversation_id: str = ""
set_tracing_disabled(True)
DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
_llm_client: AsyncOpenAI | None = None
def _get_llm_client() -> AsyncOpenAI:
global _llm_client
if _llm_client is None:
if not settings.dashscope_api_key:
raise RuntimeError("DASHSCOPE_API_KEY 未配置")
_llm_client = AsyncOpenAI(
api_key=settings.dashscope_api_key,
base_url=DASHSCOPE_BASE_URL,
)
return _llm_client
SYSTEM_PROMPT = """你是校园数据智能体的主调度 Agent,名叫"校园小智"。
你的职责:
1. 理解用户意图,选择合适的子 Agent 并调用对应工具;
2. 工具返回内容原样整理后回复用户,不要编造工具未返回的信息;
3. 工具调用失败或返回错误时,直接告诉用户"该领域暂时查询失败,请稍后重试"。
可用工具见 tools 列表,每个工具的 description 说明何时调用。
未命中任何工具时,用通用中文礼貌回答。
"""
def _create_tool_fn(tool_config: dict):
@function_tool(
name_override=f"call_{tool_config['name'].replace('-', '_')}",
description_override=tool_config["description"],
)
async def tool_fn(
ctx: RunContextWrapper[AgentContext],
message: str,
) -> str:
agent_ctx = ctx.context
# 两手抓:re-set ContextVar,让 tool 内部 httpx/sqlalchemy 日志带 rid
token = request_id_ctx.set(agent_ctx.request_id)
try:
result = await invoke_sub_agent(
tool_config=tool_config,
message=message,
campus_token=agent_ctx.campus_token,
request_id=agent_ctx.request_id,
conversation_id=agent_ctx.conversation_id,
)
return result
finally:
request_id_ctx.reset(token)
return tool_fn
def create_main_agent() -> Agent:
tool_configs = build_sub_agent_tools()
tools = [_create_tool_fn(cfg) for cfg in tool_configs]
agent = Agent(
name="campus-main-agent",
instructions=SYSTEM_PROMPT,
tools=tools,
model=OpenAIChatCompletionsModel(
model=settings.llm_model,
openai_client=_get_llm_client(),
),
model_settings=ModelSettings(temperature=0.7),
)
logger.info(f"main agent created tools={[t.name for t in tools]}")
return agent
_agent: Agent | None = None
def get_main_agent() -> Agent:
global _agent
if _agent is None:
_agent = create_main_agent()
return _agent
14.5 app/agents/tools_loader.py
import json
from pathlib import Path
from typing import Any
import yaml
from app.core.config import settings
from app.core.log import logger
from app.sub_agents_client.http_client import call_sub_agent_stream
def _load_sub_agents_config() -> list[dict]:
config_path = Path(settings.sub_agents_config_path)
if not config_path.exists():
logger.warning(f"sub-agents config not found path={config_path}")
return []
with open(config_path, encoding="utf-8") as f:
data = yaml.safe_load(f)
return data.get("agents", [])
def build_sub_agent_tools() -> list[dict[str, Any]]:
configs = _load_sub_agents_config()
tools = []
for cfg in configs:
name = cfg["name"]
description = cfg["description"]
base_url = cfg["base_url"]
domain = cfg.get("domain", "")
timeout_sec = cfg.get("timeout_sec", 30)
logger.info(
f"registered sub-agent tool name={name} domain={domain} base_url={base_url}"
)
tools.append({
"name": name,
"description": description,
"domain": domain,
"base_url": base_url,
"timeout_sec": timeout_sec,
})
return tools
async def invoke_sub_agent(
tool_config: dict,
message: str,
campus_token: str,
request_id: str,
conversation_id: str,
) -> str:
"""调用子 Agent,收集完整响应文本返回。"""
chunks: list[str] = []
async for raw_line in call_sub_agent_stream(
base_url=tool_config["base_url"],
message=message,
campus_token=campus_token,
request_id=request_id,
conversation_id=conversation_id,
timeout_sec=tool_config["timeout_sec"],
):
try:
event = json.loads(raw_line)
except (json.JSONDecodeError, TypeError):
continue
event_type = event.get("type")
if event_type == "chunk":
content = event.get("content", "")
if content:
chunks.append(content)
elif event_type == "error":
error_msg = event.get("message", "unknown error")
return f"子 Agent 返回错误: {error_msg}"
return "".join(chunks)
14.6 app/agents/stream_handler.py
import json
from collections.abc import AsyncGenerator
from agents import Runner
from app.agents.main_agent import AgentContext, get_main_agent
from app.core.log import logger
async def run_agent_stream(
messages: list[dict],
campus_token: str,
request_id: str,
conversation_id: str,
) -> AsyncGenerator[str, None]:
"""运行主 Agent 并 yield SSE data 行(不含 'data: ' 前缀)。"""
agent = get_main_agent()
context = AgentContext(
campus_token=campus_token,
request_id=request_id,
conversation_id=conversation_id,
)
full_text = ""
try:
result = Runner.run_streamed(agent, input=messages, context=context)
async for event in result.stream_events():
event_type = event.type
if event_type == "raw_response_event":
data = event.data
data_type = getattr(data, "type", "")
if data_type == "response.output_text.delta":
content = getattr(data, "delta", "") or ""
if content:
full_text += content
yield json.dumps(
{"type": "chunk", "content": content},
ensure_ascii=False,
)
elif event_type == "run_item_stream_event":
item = event.item
item_type = type(item).__name__
if item_type == "ToolCallItem":
raw = getattr(item, "raw_item", None)
tool_name = getattr(raw, "name", None) or (
raw.get("name", "unknown") if isinstance(raw, dict) else "unknown"
)
yield json.dumps(
{"type": "tool_call", "tool": tool_name, "args": ""},
ensure_ascii=False,
)
elif item_type == "ToolCallOutputItem":
yield json.dumps(
{"type": "tool_result", "ok": True},
ensure_ascii=False,
)
elif event_type == "agent_updated_stream_event":
agent_name = getattr(event, "new_agent", agent).name
yield json.dumps(
{"type": "agent_update", "agent_name": agent_name},
ensure_ascii=False,
)
except Exception:
logger.exception(f"agent stream error request_id={request_id}")
yield json.dumps(
{"type": "error", "code": "50002", "message": "internal error"},
ensure_ascii=False,
)
yield json.dumps({"type": "done", "full_text": full_text}, ensure_ascii=False)
14.7 app/sub_agents_client/http_client.py
from collections.abc import AsyncGenerator
import httpx
from app.core.log import logger
async def call_sub_agent_stream(
base_url: str,
message: str,
campus_token: str,
request_id: str,
conversation_id: str,
timeout_sec: int = 30,
extra: dict | None = None,
) -> AsyncGenerator[str, None]:
"""调用子 Agent 的 /api/v1/chat SSE 接口,逐行 yield SSE data 行。"""
payload = {"message": message}
if extra:
payload.update(extra)
headers = {
"Authorization": f"Bearer {campus_token}",
"X-Request-Id": request_id,
"X-Parent-Conversation-Id": conversation_id,
"Content-Type": "application/json",
}
url = f"{base_url.rstrip('/')}/api/v1/chat"
logger.info(f"call sub-agent url={url} request_id={request_id}")
try:
async with httpx.AsyncClient(timeout=timeout_sec) as client:
async with client.stream("POST", url, json=payload, headers=headers) as resp:
if resp.status_code != 200:
body = await resp.aread()
logger.error(
f"sub-agent error status={resp.status_code} "
f"url={url} body={body[:200]}"
)
yield f'{{"type":"error","code":"{resp.status_code}","message":"子 Agent 返回错误"}}'
return
async for line in resp.aiter_lines():
if not line.startswith("data: "):
continue
raw = line[6:]
yield raw
except httpx.TimeoutException:
logger.error(f"sub-agent timeout url={url} timeout_sec={timeout_sec}")
yield '{"type":"error","code":"50003","message":"子 Agent 响应超时"}'
except httpx.HTTPError as e:
logger.exception(f"sub-agent http error url={url} error={e!r}")
yield f'{{"type":"error","code":"50002","message":"调用子 Agent 失败: {e}"}}'
14.8 app/api/v1/chat.py
import json
import time
import uuid
from fastapi import APIRouter, Depends, Request
from fastapi.responses import StreamingResponse
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.stream_handler import run_agent_stream
from app.core.db import get_db
from app.core.log import logger
from app.core.security import get_current_user
from app.models.conversation import Conversation
from app.models.message import Message
from app.schemas.chat import ChatRequest
router = APIRouter(tags=["chat"])
MAX_HISTORY = 20
@router.post("/chat")
async def api_chat(
request: Request,
req: ChatRequest,
current_user: dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
user_id = uuid.UUID(current_user["sub"])
roles = current_user.get("roles", [])
campus_token = getattr(request.state, "campus_token", "")
request_id = getattr(request.state, "request_id", "-")
if req.conversation_id:
conversation_id = uuid.UUID(req.conversation_id)
else:
conversation_id = uuid.uuid4()
conv = Conversation(
id=conversation_id,
user_id=user_id,
title=req.message[:50],
hint_domain=req.hint_domain,
)
db.add(conv)
await db.commit()
user_msg = Message(
id=uuid.uuid4(),
conversation_id=conversation_id,
role="user",
content=req.message,
request_id=request_id,
)
db.add(user_msg)
await db.commit()
logger.info(
f"chat request conversation_id={conversation_id} "
f"user_id={user_id} roles={roles} message={req.message[:50]}"
)
stmt = (
select(Message)
.where(Message.conversation_id == conversation_id)
.order_by(Message.created_at.desc())
.limit(MAX_HISTORY)
)
result = await db.execute(stmt)
history_rows = list(reversed(result.scalars().all()))
messages = [{"role": row.role, "content": row.content} for row in history_rows]
async def event_stream():
start = time.perf_counter()
full_text = ""
try:
session_event = json.dumps(
{
"type": "session",
"conversation_id": str(conversation_id),
"message_id": str(user_msg.id),
"request_id": request_id,
},
ensure_ascii=False,
)
yield f"data: {session_event}\n\n"
async for data_line in run_agent_stream(
messages=messages,
campus_token=campus_token,
request_id=request_id,
conversation_id=str(conversation_id),
):
try:
parsed = json.loads(data_line)
if parsed.get("type") == "done":
full_text = parsed.get("full_text", full_text)
except (json.JSONDecodeError, TypeError):
pass
yield f"data: {data_line}\n\n"
except Exception as e:
logger.exception(f"chat stream error conversation_id={conversation_id}")
yield f'data: {json.dumps({"type":"error","code":"50002","message":str(e)}, ensure_ascii=False)}\n\n'
yield f'data: {json.dumps({"type":"done"})}\n\n'
cost_ms = int((time.perf_counter() - start) * 1000)
if full_text:
assistant_msg = Message(
id=uuid.uuid4(),
conversation_id=conversation_id,
role="assistant",
content=full_text,
request_id=request_id,
)
db.add(assistant_msg)
await db.commit()
logger.info(
f"chat done conversation_id={conversation_id} "
f"cost_ms={cost_ms} reply_len={len(full_text)}"
)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
14.9 app/main.py
from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.config import settings
from app.core.db import engine
from app.core.log import setup_logging
from app.core.middleware import register_request_id_middleware
from app.api.v1 import v1_router
setup_logging()
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
from app.core.log import logger
logger.info(f"starting service={settings.app_name}")
yield
await engine.dispose()
logger.info("shutdown complete")
app = FastAPI(title="校园数据智能体", lifespan=lifespan)
register_request_id_middleware(app)
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:5173",
"http://localhost:3000",
"https://campus-agent-frontend.miaopinai.com",
],
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(v1_router)
14.10 config/sub-agents.yaml(新增子 Agent 示例)
agents:
- name: food-agent
description: >-
食堂菜品推荐。当用户询问食堂、菜品、吃什么、美食推荐、口味偏好、
餐厅价格、素食、辣度、套餐搭配等相关话题时调用此工具。
base_url: http://food-agent-svc.default.svc.cluster.local:8000
domain: food
timeout_sec: 30
- name: course-agent
description: >-
选课助手。当用户询问课程、学分、选课系统、教师评价、培养方案、
冲突检测、课程表查询等相关话题时调用此工具。
base_url: http://course-agent-svc.default.svc.cluster.local:8000
domain: course
timeout_sec: 30
附录 A:日志实例(真实一次请求)
2026-04-17 10:22:14.331 | INFO | request_id=9bfee368691a4dc3 | app.core.middleware:request_id_middleware:32 - → POST /api/v1/chat
2026-04-17 10:22:14.345 | INFO | request_id=9bfee368691a4dc3 | app.api.v1.chat:api_chat:59 - chat request conversation_id=... user_id=... message=今天中午吃啥
2026-04-17 10:22:14.411 | INFO | request_id=9bfee368691a4dc3 | app.agents.main_agent:create_main_agent:71 - main agent created tools=['call_food_agent']
2026-04-17 10:22:14.890 | INFO | request_id=9bfee368691a4dc3 | app.sub_agents_client.http_client:call_sub_agent_stream:30 - call sub-agent url=http://food-agent-svc...:8000/api/v1/chat request_id=9bfee368691a4dc3
2026-04-17 10:22:15.120 | INFO | request_id=9bfee368691a4dc3 | httpx._client:_send_single_request:1740 - HTTP Request: POST http://food-agent-svc...:8000/api/v1/chat "HTTP/1.1 200 OK"
2026-04-17 10:22:18.045 | INFO | request_id=9bfee368691a4dc3 | app.api.v1.chat:event_stream:136 - chat done conversation_id=... cost_ms=3714 reply_len=128
2026-04-17 10:22:18.046 | INFO | request_id=9bfee368691a4dc3 | app.core.middleware:request_id_middleware:36 - ← POST /api/v1/chat status=200 cost_ms=3715
所有行都以 request_id=9bfee368691a4dc3 串联,子 Agent 侧也会看到同一 request_id。这就是本架构的排错基准。
附录 B:实现 Checklist
-
app/core/context.py定义request_id_ctx -
app/core/log.py双 sink + patcher + InterceptHandler -
app/core/middleware.py函数式中间件(不用 BaseHTTPMiddleware) -
app/core/security.pyJWT 签发/解析 + HTTPBearer -
app/core/config.pySettings -
app/core/db.pyasync engine + Base -
app/models/conversation.py+app/models/message.py -
app/schemas/chat.py -
app/sub_agents_client/http_client.py -
app/agents/tools_loader.py -
app/agents/main_agent.py(tool_fn 入口必须 re-set request_id_ctx) -
app/agents/stream_handler.py -
app/api/v1/chat.py -
app/main.py装配 + 注册中间件 -
config/sub-agents.yaml -
.env:DASHSCOPE_API_KEY、JWT_SECRET、DATABASE_URL - 手工跑一次
POST /api/v1/chat,在日志里肉眼确认所有行都带同一request_id
更多推荐


所有评论(0)