DeerFlow多智能体项目分析-依赖LangGraph实现条件路由和中断机制的源码解析
摘要 DeerFlow基于LangGraph框架实现了多智能体工作流控制,核心机制包括条件路由和中断功能。条件路由通过状态检查函数(如continue_to_running_research_team)实现动态节点跳转,根据计划步骤类型路由到不同执行节点(研究员/编程员)。协调器节点支持多轮澄清对话,通过Command对象控制流程走向,包含人工反馈机制。该架构支持复杂业务逻辑判断(最大迭代次数检查
DeerFlow依赖LangGraph实现条件路由和中断机制的源码解析
概述
DeerFlow基于LangGraph构建了复杂的条件路由和中断机制,实现了智能的工作流控制和人机交互。本文档深入分析这两个核心机制的实现逻辑和关键代码。这部分也同样是langGraph的核心,通过使用LangGraph的特色功能,实现智能体交互中的中断和人工校准(辅助更精准的计划制定,其实也可以理解为任务拆解)。
项目地址:https://github.com/bytedance/deer-flow
LangGraph中文在线文档:https://github.langchain.ac.cn/langgraph/agents/agents/
1. 条件路由机制
1.1 条件路由架构设计
条件路由是DeerFlow工作流控制的核心,通过状态检查和业务逻辑判断来决定下一个执行节点。
1.1.1 路由决策函数
源码文件路径: src/graph/builder.py
def continue_to_running_research_team(state: State):
"""研究团队路由决策函数 - 核心路由逻辑"""
current_plan = state.get("current_plan")
# 检查1: 计划是否存在
if not current_plan or not current_plan.steps:
return "planner"
# 检查2: 所有步骤是否完成
if all(step.execution_res for step in current_plan.steps):
return "planner" # 返回规划器生成报告
# 检查3: 根据未完成步骤类型路由
for step in current_plan.steps:
if not step.execution_res: # 找到第一个未完成步骤
if step.step_type == StepType.RESEARCH:
return "researcher" # 路由到研究员
if step.step_type == StepType.PROCESSING:
return "coder" # 路由到编程员
return "planner" # 默认返回规划器
1.1.2 条件边配置
源码文件路径: src/graph/builder.py
def _build_base_graph():
"""构建基础状态图 - 条件边配置"""
builder = StateGraph(State)
# 核心条件边: 研究团队路由
builder.add_conditional_edges(
"research_team", # 源节点
continue_to_running_research_team, # 路由决策函数
["planner", "researcher", "coder"], # 可能的目标节点
)
# 协调器条件边: 支持澄清功能
builder.add_conditional_edges(
"coordinator",
lambda state: state.get("goto", "planner"), # 动态路由
["planner", "background_investigator", "coordinator", END],
)
return builder
1.2 协调器动态路由
1.2.1 澄清功能路由逻辑
源码文件路径: src/graph/nodes.py
def coordinator_node(state: State, config: RunnableConfig):
"""协调器节点 - 复杂的条件路由实现"""
enable_clarification = state.get("enable_clarification", False)
# 路由分支1: 澄清功能禁用
if not enable_clarification:
# 直接路由到规划器
goto = "planner"
if state.get("enable_background_investigation"):
goto = "background_investigator"
# 路由分支2: 澄清功能启用
else:
clarification_rounds = state.get("clarification_rounds", 0)
max_clarification_rounds = state.get("max_clarification_rounds", 3)
# 子路由1: 继续澄清对话
if (not response.tool_calls and response.content and
clarification_rounds < max_clarification_rounds):
goto = "__end__" # 中断等待用户输入
# 子路由2: 澄清完成或达到最大轮次
else:
goto = "planner"
if state.get("enable_background_investigation"):
goto = "background_investigator"
# 返回Command对象控制路由
return Command(
update={
"goto": goto,
# ... 其他状态更新
},
goto=goto,
)
1.2.2 澄清状态检查函数
源码文件路径: src/graph/nodes.py
def needs_clarification(state: dict) -> bool:
"""澄清需求检查 - 路由决策辅助函数"""
# 检查1: 澄清功能是否启用
if not state.get("enable_clarification", False):
return False
# 检查2: 获取澄清状态
clarification_rounds = state.get("clarification_rounds", 0)
is_clarification_complete = state.get("is_clarification_complete", False)
max_clarification_rounds = state.get("max_clarification_rounds", 3)
# 检查3: 综合判断是否需要继续澄清
return (
clarification_rounds > 0 # 已开始澄清
and not is_clarification_complete # 澄清未完成
and clarification_rounds <= max_clarification_rounds # 未超过最大轮次
)
1.3 Command对象路由控制
1.3.1 规划器节点路由
源码文件路径: src/graph/nodes.py
def planner_node(state: State, config: RunnableConfig):
"""规划器节点 - Command路由控制"""
plan_iterations = state.get("plan_iterations", 0)
# 路由条件1: 超过最大迭代次数
if plan_iterations >= configurable.max_plan_iterations:
return Command(goto="reporter")
# 路由条件2: 计划包含足够上下文
if isinstance(curr_plan, dict) and curr_plan.get("has_enough_context"):
return Command(
update={
"messages": [AIMessage(content=full_response, name="planner")],
"current_plan": new_plan,
},
goto="reporter", # 直接生成报告
)
# 路由条件3: 需要人工反馈
return Command(
update={
"messages": [AIMessage(content=full_response, name="planner")],
"current_plan": full_response,
},
goto="human_feedback", # 等待人工审核
)
1.3.2 人工反馈路由
源码文件路径: src/graph/nodes.py
def human_feedback_node(state):
"""人工反馈节点 - 基于反馈内容的路由"""
auto_accepted_plan = state.get("auto_accepted_plan", False)
if not auto_accepted_plan:
feedback = interrupt("请审核计划。")
# 路由分支1: 编辑计划
if feedback and str(feedback).upper().startswith("[EDIT_PLAN]"):
return Command(
update={"messages": [HumanMessage(content=feedback, name="feedback")]},
goto="planner", # 返回规划器修改
)
# 路由分支2: 接受计划
elif feedback and str(feedback).upper().startswith("[ACCEPTED]"):
goto = "research_team" # 开始执行研究
# 计划验证和路由
try:
new_plan = json.loads(repair_json_output(current_plan))
return Command(
update={
"current_plan": Plan.model_validate(new_plan),
"plan_iterations": plan_iterations + 1,
},
goto=goto,
)
except json.JSONDecodeError:
# 错误处理路由
if plan_iterations > 1:
return Command(goto="reporter")
else:
return Command(goto="__end__")
2. 中断机制
2.1 中断机制架构
中断机制允许工作流在特定点暂停,等待外部输入或处理,是实现人机交互的关键。
2.1.1 基础中断实现
源码文件路径: src/graph/nodes.py
from langgraph.types import interrupt
def human_feedback_node(state):
"""基础中断机制 - 等待人工反馈"""
auto_accepted_plan = state.get("auto_accepted_plan", False)
if not auto_accepted_plan:
# 核心中断调用
feedback = interrupt("请审核计划。")
# 中断后的处理逻辑
if feedback:
# 根据反馈内容决定后续流程
if str(feedback).upper().startswith("[EDIT_PLAN]"):
# 处理编辑请求
return Command(
update={"messages": [HumanMessage(content=feedback, name="feedback")]},
goto="planner",
)
elif str(feedback).upper().startswith("[ACCEPTED]"):
# 处理接受请求
logger.info("用户接受了计划。")
2.1.2 澄清功能中断
源码文件路径: src/graph/nodes.py
def coordinator_node(state: State, config: RunnableConfig):
"""澄清功能中断 - 复杂中断处理"""
enable_clarification = state.get("enable_clarification", False)
if enable_clarification:
clarification_rounds = state.get("clarification_rounds", 0)
max_clarification_rounds = state.get("max_clarification_rounds", 3)
# 中断条件: LLM提出澄清问题
if not response.tool_calls and response.content:
if clarification_rounds < max_clarification_rounds:
# 使用__interrupt__键实现中断
return Command(
update={
"messages": state_messages,
"clarification_rounds": clarification_rounds + 1,
"is_clarification_complete": False,
"__interrupt__": [("coordinator", response.content)], # 中断标记
},
goto="__end__", # 结束当前流程等待输入
)
else:
# 达到最大轮次,不再中断
logger.warning(f"达到最大澄清轮次({max_clarification_rounds})。移交给规划器。")
goto = "planner"
2.2 中断恢复机制
2.2.1 澄清对话恢复
源码文件路径: src/workflow.py
async def run_agent_workflow_async(user_input: str, **kwargs):
"""中断恢复 - 澄清对话继续"""
# 执行工作流
async for s in graph.astream(input=initial_state, config=config, stream_mode="values"):
final_state = s
# ... 处理流式输出
# 检查是否需要澄清 - 中断恢复逻辑
if final_state and isinstance(final_state, dict):
from src.graph.nodes import needs_clarification
if needs_clarification(final_state):
# 等待用户输入 - 中断点
clarification_rounds = final_state.get("clarification_rounds", 0)
max_clarification_rounds = final_state.get("max_clarification_rounds", 3)
user_response = input(
f"您的回复 ({clarification_rounds}/{max_clarification_rounds}): "
).strip()
if not user_response:
logger.warning("空回复,结束澄清")
return final_state
# 恢复工作流 - 递归调用
current_state = final_state.copy()
current_state["messages"] = final_state["messages"] + [
{"role": "user", "content": user_response}
]
# 递归恢复执行
return await run_agent_workflow_async(
user_input=user_response,
initial_state=current_state, # 传递中断状态
**kwargs
)
2.2.2 中断状态管理
源码文件路径: src/graph/nodes.py
def coordinator_node(state: State, config: RunnableConfig):
"""中断状态管理"""
# 澄清历史管理
clarification_history = state.get("clarification_history", [])
# 处理用户响应 - 中断恢复后的状态更新
if clarification_rounds > 0:
last_message = state.get("messages", [])[-1] if state.get("messages") else None
# 提取用户响应内容
if last_message:
if isinstance(last_message, dict) and last_message.get("role") == "user":
clarification_history.append(last_message["content"])
elif hasattr(last_message, "content"):
clarification_history.append(last_message.content)
# 构建澄清上下文
clarification_context = f"""继续澄清(第{clarification_rounds}/{max_clarification_rounds}轮):
用户最新回复: {current_response}
询问剩余缺失的维度。不要重复问题或开始新话题。"""
messages.append({"role": "system", "content": clarification_context})
2.3 工具调用中断
2.3.1 工具定义
源码文件路径: src/graph/nodes.py
from langchain_core.tools import tool
from typing import Annotated
@tool
def handoff_to_planner(
research_topic: Annotated[str, "要移交的研究任务主题"],
locale: Annotated[str, "用户检测到的语言区域设置"],
):
"""移交给规划器智能体进行计划制定 - 工具中断"""
# 工具不返回实际内容,仅用于信号传递
return
@tool
def handoff_after_clarification(
locale: Annotated[str, "用户检测到的语言区域设置"],
):
"""澄清轮次完成后移交给规划器 - 澄清结束信号"""
return
2.3.2 工具调用处理
源码文件路径: src/graph/nodes.py
def coordinator_node(state: State, config: RunnableConfig):
"""工具调用中断处理"""
# 绑定工具
if enable_clarification:
tools = [handoff_to_planner, handoff_after_clarification]
else:
tools = [handoff_to_planner]
# 调用LLM with工具
response = (
get_llm_by_type(AGENT_LLM_MAP["coordinator"])
.bind_tools(tools)
.invoke(messages)
)
# 处理工具调用 - 中断信号检测
if response.tool_calls:
try:
for tool_call in response.tool_calls:
tool_name = tool_call.get("name", "")
tool_args = tool_call.get("args", {})
if tool_name in ["handoff_to_planner", "handoff_after_clarification"]:
logger.info("移交给规划器")
goto = "planner"
# 提取工具参数
if tool_args.get("locale") and tool_args.get("research_topic"):
locale = tool_args.get("locale")
research_topic = tool_args.get("research_topic")
break
except Exception as e:
logger.error(f"处理工具调用时出错: {e}")
goto = "planner"
3. 路由和中断的协同工作
3.1 状态驱动的协同
源码文件路径: src/graph/nodes.py
def coordinator_node(state: State, config: RunnableConfig):
"""路由和中断的协同工作示例"""
# 状态检查
enable_clarification = state.get("enable_clarification", False)
clarification_rounds = state.get("clarification_rounds", 0)
# 协同逻辑1: 中断条件检查影响路由
if (enable_clarification and
not response.tool_calls and
response.content and
clarification_rounds < max_clarification_rounds):
# 中断: 暂停工作流
return Command(
update={
"clarification_rounds": clarification_rounds + 1,
"__interrupt__": [("coordinator", response.content)],
},
goto="__end__", # 路由: 结束等待输入
)
# 协同逻辑2: 工具调用决定路由
if response.tool_calls:
goto = "planner" # 路由: 移交给规划器
else:
goto = "__end__" # 路由: 结束流程
# 协同逻辑3: 背景调查路由修正
if goto == "planner" and state.get("enable_background_investigation"):
goto = "background_investigator" # 路由: 先进行背景调查
return Command(
update={
"goto": goto,
"is_clarification_complete": goto != "coordinator",
},
goto=goto,
)
3.2 错误处理中的路由
源码文件路径: src/graph/nodes.py
def human_feedback_node(state):
"""错误处理中的路由决策"""
try:
# 尝试解析计划
current_plan = repair_json_output(current_plan)
new_plan = json.loads(current_plan)
# 成功路由
return Command(
update={"current_plan": Plan.model_validate(new_plan)},
goto="research_team",
)
except json.JSONDecodeError:
logger.warning("规划器响应不是有效的JSON")
# 错误路由决策
if plan_iterations > 1:
return Command(goto="reporter") # 路由: 生成报告
else:
return Command(goto="__end__") # 路由: 结束流程
4. 核心实现总结
4.1 条件路由核心要素
- 状态检查: 基于State对象的字段进行条件判断
- 业务逻辑: 根据计划完成度、步骤类型等业务状态路由
- Command控制: 使用Command对象精确控制路由和状态更新
- 条件边配置: 在图构建时定义条件边和路由函数
4.2 中断机制核心要素
- interrupt函数: LangGraph提供的中断原语
- __interrupt__键: 在Command更新中标记中断点
- 状态恢复: 通过递归调用和状态传递实现中断恢复
- 工具信号: 使用工具调用作为中断和恢复的信号
4.3 协同工作模式
- 状态驱动: 路由决策基于状态,中断影响状态
- 信号传递: 工具调用、LLM响应作为路由和中断信号
- 递归恢复: 中断后通过递归调用恢复工作流
- 错误处理: 异常情况下的路由降级和恢复策略
这种设计使得DeerFlow能够实现复杂的人机交互和智能工作流控制,同时保持系统的稳定性和可维护性。
更多推荐

所有评论(0)