2 个人工作记录——FinAgent 里的 LangGraph 与路由编排
2 FinAgent 里的 LangGraph 与路由编排
本篇文章记录我学习 LangGraph 并将其在项目里面跑起来并接到 API/前端,这期间解决了很多问题,如:图怎么搭、状态怎么传、工具调用怎么路由、任务怎么被 API 管起来、前端怎么轮询进度。
本文对应的核心代码入口:
-
backend/app/graph/runner.py:主流程 LangGraph(主图)与任务执行/落盘 -
backend/app/graph/analyst_subgraph.py:“分析师子图”(ReAct + 工具循环) -
backend/app/graph/state.py:主图状态AnalysisGraphState与阶段日志stage_log -
backend/app/api/routes/analysis.py:任务创建、轮询、取结果 -
backend/app/dataflows/interface.py:工具路由到具体数据源实现
一、怎么理解 LangGraph:它在项目里扮演什么角色?
在 FinAgent 里,LangGraph 主要负责两件事:
-
把“多阶段分析流程”写成图(节点、边、条件、子图)
-
把“运行中的状态”持续传递和更新(方便前端看进度)
相比之前我采用的“一个超长 orchestrator 函数”硬写 if/else,LangGraph 的收益主要是:编排清晰、状态可追踪(想给前端展示到哪一步了,本质就是 state 的一部分)、子图可复用。
二、主图怎么搭:FinAgent 的主流程是怎样被串起来的?
主图在 backend/app/graph/runner.py 的 build_analysis_graph() 里根据之前写的主链路构建,这里为了方便运行调试,还设置了每个节点可跳过:
-
节点(Node):
-
analyst:分析师子图(市场/舆情/新闻/基本面) -
research_debate:研究辩论 -
trader:交易员 -
risk_review:风控循环 -
portfolio_manager:组合经理 -
build_report:把 state 汇总为report_json+report_markdown
-
-
边(Edge):严格线性串起来
START -> analyst -> research_debate -> trader -> risk_review -> portfolio_manager -> build_report -> END
三、state:不只服务 LLM,也服务 API/前端
AnalysisGraphState 可以理解成“任务总账本”。
它不是只给节点之间传值用的,还是 API 和前端的事实来源。
里面大体分四类信息(有些字段后续可以弃用):
-
基础信息:
task_id、ticker、analysis_date、user_id -
阶段产物:
market_report、news_report、social_report、fundamentals_report、trader_output、risk_section、portfolio_output -
最终报告:
report_json、report_markdown -
运行日志:
stage_log、memory_log
其中前端最依赖的是 stage_log。
因为它记录了每个阶段的 pending/running/succeeded/skipped/failed,还带开始/结束时间和摘要。
所以前端想展示“当前跑到哪了”,直接读 stage_log 就行,不需要解析 LangGraph 内部执行细节。
在主流程中:
runner.py 里每个阶段节点几乎都遵循同一个模式:
-
先
mark_stage(..., status="running", started=True) -
执行本阶段逻辑(调用子图或下游函数)
-
成功后
mark_stage(..., status="succeeded", finished=True, summary=..., output=...) -
调
_persist_running_snapshot(state, updates)持久化快照
_persist_running_snapshot 会把关键字段(如 task_id、status、stage_log、memory_log、error_message)写到状态文件。
这一层主要是方便目前的调试。
四、分析师子图怎么跑:ReAct 循环
analyst_subgraph.py 里的 compile_analyst_subgraph() 把四位分析师按顺序组织起来:market、social、news、fundamentals
每位分析师都用同一套编排模板:
-
一个 analyst 节点(Prompt +
bind_tools):Prompt告诉模型“你是谁,你要分析什么”,bind_tools告诉模型“你可以调用哪些工具” -
一个工具节点
ToolNode(tools):执行节点,负责执行模型刚才请求的工具,然后把工具结果封装成消息返回给图。 -
一条条件边
tools_condition:这是“分流开关”。它会检查 analyst 节点刚输出的消息里有没有tool_calls:- 有 → 说明还在取证,流向
ToolNode; - 没有 → 说明准备收口,流向下一个分析师(或结束)。
- 有 → 说明还在取证,流向
行为规则:
-
这轮有
tool_calls:先去工具节点执行,再回 analyst 节点 -
这轮没有
tool_calls:把内容当该分析师最终结论,流转到下一位分析师
这个规则很重要,它避免了“工具还没调完就提前收口”的问题。
另外,子图还做了两件很实用的事:
-
保存
analyst_react_summaries,便于主图拼接摘要和审计 -
按角色限制工具集合,避免模型在无关工具里乱选
这套子图结构的核心目的,是把“数据收集”和“结论产出”分开,保证每个分析师先拿到证据,再下结论。
六、API 路由怎么和 LangGraph 对接:任务接口与前端轮询
这部分可以按“创建任务 → 后台执行 → 前端轮询”的顺序看:
-
创建任务(入口):前端调用
POST /api/analysis/tasks。 -
后端立刻返回可轮询对象:
-
先生成
task_id; -
立即写入
running初始状态; -
初始状态里已经包含
stage_log(进度骨架)。
-
-
后台异步执行图:LangGraph 在后台任务里运行,不阻塞创建接口。
-
preseeded=True的作用:-
表示“状态壳子已经提前写好”;
-
runner 直接在这份状态上持续更新;
-
前端拿到
task_id后马上就能轮询,不用等第一阶段结束。
-
前端常用的读取接口如下:
-
GET /api/analysis/tasks/{task_id}:查看任务状态和stage_log(进度)。 -
GET /api/analysis/tasks/{task_id}/result:任务完成后拿report_json/report_markdown。 -
GET /api/analysis/{task_id}/report:单独读取 JSON 报告。 -
GET /api/analysis/{task_id}/report/markdown:单独读取 Markdown 报告。 -
DELETE /api/analysis/tasks/{task_id}:删除任务及其落盘产物。
八、数据工具“路由”在哪里:抽象工具到 vendor 实现
在图里我们看到的是 get_stock_data、get_news 这种抽象工具名。
真正怎么调用 AkShare、失败后怎么回退,是 dataflows/interface.py 负责的。
这个模块的核心是:
-
TOOLS_CATEGORIES:先给工具分组,便于统一管理; -
VENDOR_METHODS:把“抽象方法”映射到“具体 vendor 的实现”; -
get_vendor(...):按配置选当前优先 vendor; -
route_to_vendor(...):真正执行调用,并在异常时按回退链重试下一个 vendor。
当前策略是:遇到异常会继续尝试回退链里的下一个 vendor(如果有)。
这个设计把“数据源不稳定”的复杂性集中在 dataflows 层,图层就能保持干净。当然近期我们对dataflows层也进行了一次大修改,具体可见另一组员的博客:组员博客:数据层
九、总结
目前阶段已经实现了从任务触发到结果返回的完整主流程:基于 LangGraph 的多阶段执行链路可以稳定推进,API 路由已支持任务创建与查询,状态落盘机制能够让前端持续感知任务进度;在工程实现上,已落地任务创建即写入 running、节点级 stage_log 记录、图内节点职责拆分、分析师子图化封装,以及图层与 dataflows 层的工具分层与 vendor 回退策略。
当前正在完善analyst节点,目前已接入llm,但输出不佳,具体工作在下一篇博文中汇报。
更多推荐

所有评论(0)