2 FinAgent 里的 LangGraph 与路由编排

本篇文章记录我学习 LangGraph 并将其在项目里面跑起来并接到 API/前端,这期间解决了很多问题,如:图怎么搭、状态怎么传、工具调用怎么路由、任务怎么被 API 管起来、前端怎么轮询进度。

本文对应的核心代码入口:

  • backend/app/graph/runner.py:主流程 LangGraph(主图)与任务执行/落盘

  • backend/app/graph/analyst_subgraph.py:“分析师子图”(ReAct + 工具循环)

  • backend/app/graph/state.py:主图状态 AnalysisGraphState 与阶段日志 stage_log

  • backend/app/api/routes/analysis.py:任务创建、轮询、取结果

  • backend/app/dataflows/interface.py:工具路由到具体数据源实现


一、怎么理解 LangGraph:它在项目里扮演什么角色?

在 FinAgent 里,LangGraph 主要负责两件事:

  • 把“多阶段分析流程”写成图(节点、边、条件、子图)

  • 把“运行中的状态”持续传递和更新(方便前端看进度)

相比之前我采用的“一个超长 orchestrator 函数”硬写 if/else,LangGraph 的收益主要是:编排清晰、状态可追踪(想给前端展示到哪一步了,本质就是 state 的一部分)、子图可复用。


二、主图怎么搭:FinAgent 的主流程是怎样被串起来的?

主图在 backend/app/graph/runner.pybuild_analysis_graph() 里根据之前写的主链路构建,这里为了方便运行调试,还设置了每个节点可跳过:

  • 节点(Node):

    • analyst:分析师子图(市场/舆情/新闻/基本面)

    • research_debate:研究辩论

    • trader:交易员

    • risk_review:风控循环

    • portfolio_manager:组合经理

    • build_report:把 state 汇总为 report_json + report_markdown

  • 边(Edge):严格线性串起来

    • START -> analyst -> research_debate -> trader -> risk_review -> portfolio_manager -> build_report -> END

START

analyst
分析师子图

research_debate
研究辩论

trader
交易员

risk_review
风控循环

portfolio_manager
组合经理

build_report
汇总报告

END


三、state:不只服务 LLM,也服务 API/前端

AnalysisGraphState 可以理解成“任务总账本”。

它不是只给节点之间传值用的,还是 API 和前端的事实来源。

里面大体分四类信息(有些字段后续可以弃用):

  • 基础信息task_idtickeranalysis_dateuser_id

  • 阶段产物market_reportnews_reportsocial_reportfundamentals_reporttrader_outputrisk_sectionportfolio_output

  • 最终报告report_jsonreport_markdown

  • 运行日志stage_logmemory_log

其中前端最依赖的是 stage_log

因为它记录了每个阶段的 pending/running/succeeded/skipped/failed,还带开始/结束时间和摘要。

所以前端想展示“当前跑到哪了”,直接读 stage_log 就行,不需要解析 LangGraph 内部执行细节。

在主流程中:

runner.py 里每个阶段节点几乎都遵循同一个模式:

  1. mark_stage(..., status="running", started=True)

  2. 执行本阶段逻辑(调用子图或下游函数)

  3. 成功后 mark_stage(..., status="succeeded", finished=True, summary=..., output=...)

  4. _persist_running_snapshot(state, updates) 持久化快照

_persist_running_snapshot 会把关键字段(如 task_idstatusstage_logmemory_logerror_message)写到状态文件。

这一层主要是方便目前的调试。


四、分析师子图怎么跑:ReAct 循环

analyst_subgraph.py 里的 compile_analyst_subgraph() 把四位分析师按顺序组织起来:marketsocialnewsfundamentals

单个 Analyst 的 ReAct 循环

有 tool_calls

无 tool_calls

Analyst Node
Prompt + bind_tools

tools_condition

ToolNode
执行工具

输出该分析师结论

下一个分析师 / 子图结束

每位分析师都用同一套编排模板:

  • 一个 analyst 节点(Prompt + bind_tools):Prompt告诉模型“你是谁,你要分析什么”,bind_tools告诉模型“你可以调用哪些工具”

  • 一个工具节点 ToolNode(tools):执行节点,负责执行模型刚才请求的工具,然后把工具结果封装成消息返回给图。

  • 一条条件边 tools_condition:这是“分流开关”。它会检查 analyst 节点刚输出的消息里有没有 tool_calls

    • 有 → 说明还在取证,流向 ToolNode
    • 没有 → 说明准备收口,流向下一个分析师(或结束)。

行为规则:

  • 这轮有 tool_calls:先去工具节点执行,再回 analyst 节点

  • 这轮没有 tool_calls:把内容当该分析师最终结论,流转到下一位分析师

这个规则很重要,它避免了“工具还没调完就提前收口”的问题。

另外,子图还做了两件很实用的事:

  • 保存 analyst_react_summaries,便于主图拼接摘要和审计

  • 按角色限制工具集合,避免模型在无关工具里乱选

这套子图结构的核心目的,是把“数据收集”和“结论产出”分开,保证每个分析师先拿到证据,再下结论。


六、API 路由怎么和 LangGraph 对接:任务接口与前端轮询

这部分可以按“创建任务 → 后台执行 → 前端轮询”的顺序看:

  • 创建任务(入口):前端调用 POST /api/analysis/tasks

  • 后端立刻返回可轮询对象

    • 先生成 task_id

    • 立即写入 running 初始状态;

    • 初始状态里已经包含 stage_log(进度骨架)。

  • 后台异步执行图:LangGraph 在后台任务里运行,不阻塞创建接口。

  • preseeded=True 的作用

    • 表示“状态壳子已经提前写好”;

    • runner 直接在这份状态上持续更新;

    • 前端拿到 task_id 后马上就能轮询,不用等第一阶段结束。

前端常用的读取接口如下:

  • GET /api/analysis/tasks/{task_id}:查看任务状态和 stage_log(进度)。

  • GET /api/analysis/tasks/{task_id}/result:任务完成后拿 report_json/report_markdown

  • GET /api/analysis/{task_id}/report:单独读取 JSON 报告。

  • GET /api/analysis/{task_id}/report/markdown:单独读取 Markdown 报告。

  • DELETE /api/analysis/tasks/{task_id}:删除任务及其落盘产物。


八、数据工具“路由”在哪里:抽象工具到 vendor 实现

在图里我们看到的是 get_stock_dataget_news 这种抽象工具名。

真正怎么调用 AkShare、失败后怎么回退,是 dataflows/interface.py 负责的。

这个模块的核心是:

  • TOOLS_CATEGORIES:先给工具分组,便于统一管理;

  • VENDOR_METHODS:把“抽象方法”映射到“具体 vendor 的实现”;

  • get_vendor(...):按配置选当前优先 vendor;

  • route_to_vendor(...):真正执行调用,并在异常时按回退链重试下一个 vendor。

当前策略是:遇到异常会继续尝试回退链里的下一个 vendor(如果有)。

这个设计把“数据源不稳定”的复杂性集中在 dataflows 层,图层就能保持干净。当然近期我们对dataflows层也进行了一次大修改,具体可见另一组员的博客:组员博客:数据层


九、总结

目前阶段已经实现了从任务触发到结果返回的完整主流程:基于 LangGraph 的多阶段执行链路可以稳定推进,API 路由已支持任务创建与查询,状态落盘机制能够让前端持续感知任务进度;在工程实现上,已落地任务创建即写入 running、节点级 stage_log 记录、图内节点职责拆分、分析师子图化封装,以及图层与 dataflows 层的工具分层与 vendor 回退策略。

当前正在完善analyst节点,目前已接入llm,但输出不佳,具体工作在下一篇博文中汇报。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐