DeerFlow依赖LangGraph实现条件路由和中断机制的源码解析

概述

DeerFlow基于LangGraph构建了复杂的条件路由和中断机制,实现了智能的工作流控制和人机交互。本文档深入分析这两个核心机制的实现逻辑和关键代码。这部分也同样是langGraph的核心,通过使用LangGraph的特色功能,实现智能体交互中的中断和人工校准(辅助更精准的计划制定,其实也可以理解为任务拆解)。
项目地址:https://github.com/bytedance/deer-flow
LangGraph中文在线文档:https://github.langchain.ac.cn/langgraph/agents/agents/

1. 条件路由机制

1.1 条件路由架构设计

条件路由是DeerFlow工作流控制的核心,通过状态检查和业务逻辑判断来决定下一个执行节点。

1.1.1 路由决策函数

源码文件路径: src/graph/builder.py

def continue_to_running_research_team(state: State):
    """研究团队路由决策函数 - 核心路由逻辑"""
    current_plan = state.get("current_plan")
    
    # 检查1: 计划是否存在
    if not current_plan or not current_plan.steps:
        return "planner"
    
    # 检查2: 所有步骤是否完成
    if all(step.execution_res for step in current_plan.steps):
        return "planner"  # 返回规划器生成报告
    
    # 检查3: 根据未完成步骤类型路由
    for step in current_plan.steps:
        if not step.execution_res:  # 找到第一个未完成步骤
            if step.step_type == StepType.RESEARCH:
                return "researcher"  # 路由到研究员
            if step.step_type == StepType.PROCESSING:
                return "coder"       # 路由到编程员
    
    return "planner"  # 默认返回规划器
1.1.2 条件边配置

源码文件路径: src/graph/builder.py

def _build_base_graph():
    """构建基础状态图 - 条件边配置"""
    builder = StateGraph(State)
    
    # 核心条件边: 研究团队路由
    builder.add_conditional_edges(
        "research_team",                    # 源节点
        continue_to_running_research_team,  # 路由决策函数
        ["planner", "researcher", "coder"], # 可能的目标节点
    )
    
    # 协调器条件边: 支持澄清功能
    builder.add_conditional_edges(
        "coordinator",
        lambda state: state.get("goto", "planner"),  # 动态路由
        ["planner", "background_investigator", "coordinator", END],
    )
    
    return builder

1.2 协调器动态路由

1.2.1 澄清功能路由逻辑

源码文件路径: src/graph/nodes.py

def coordinator_node(state: State, config: RunnableConfig):
    """协调器节点 - 复杂的条件路由实现"""
    enable_clarification = state.get("enable_clarification", False)
    
    # 路由分支1: 澄清功能禁用
    if not enable_clarification:
        # 直接路由到规划器
        goto = "planner"
        if state.get("enable_background_investigation"):
            goto = "background_investigator"
    
    # 路由分支2: 澄清功能启用
    else:
        clarification_rounds = state.get("clarification_rounds", 0)
        max_clarification_rounds = state.get("max_clarification_rounds", 3)
        
        # 子路由1: 继续澄清对话
        if (not response.tool_calls and response.content and 
            clarification_rounds < max_clarification_rounds):
            goto = "__end__"  # 中断等待用户输入
        
        # 子路由2: 澄清完成或达到最大轮次
        else:
            goto = "planner"
            if state.get("enable_background_investigation"):
                goto = "background_investigator"
    
    # 返回Command对象控制路由
    return Command(
        update={
            "goto": goto,
            # ... 其他状态更新
        },
        goto=goto,
    )
1.2.2 澄清状态检查函数

源码文件路径: src/graph/nodes.py

def needs_clarification(state: dict) -> bool:
    """澄清需求检查 - 路由决策辅助函数"""
    # 检查1: 澄清功能是否启用
    if not state.get("enable_clarification", False):
        return False
    
    # 检查2: 获取澄清状态
    clarification_rounds = state.get("clarification_rounds", 0)
    is_clarification_complete = state.get("is_clarification_complete", False)
    max_clarification_rounds = state.get("max_clarification_rounds", 3)
    
    # 检查3: 综合判断是否需要继续澄清
    return (
        clarification_rounds > 0                        # 已开始澄清
        and not is_clarification_complete               # 澄清未完成
        and clarification_rounds <= max_clarification_rounds  # 未超过最大轮次
    )

1.3 Command对象路由控制

1.3.1 规划器节点路由

源码文件路径: src/graph/nodes.py

def planner_node(state: State, config: RunnableConfig):
    """规划器节点 - Command路由控制"""
    plan_iterations = state.get("plan_iterations", 0)
    
    # 路由条件1: 超过最大迭代次数
    if plan_iterations >= configurable.max_plan_iterations:
        return Command(goto="reporter")
    
    # 路由条件2: 计划包含足够上下文
    if isinstance(curr_plan, dict) and curr_plan.get("has_enough_context"):
        return Command(
            update={
                "messages": [AIMessage(content=full_response, name="planner")],
                "current_plan": new_plan,
            },
            goto="reporter",  # 直接生成报告
        )
    
    # 路由条件3: 需要人工反馈
    return Command(
        update={
            "messages": [AIMessage(content=full_response, name="planner")],
            "current_plan": full_response,
        },
        goto="human_feedback",  # 等待人工审核
    )
1.3.2 人工反馈路由

源码文件路径: src/graph/nodes.py

def human_feedback_node(state):
    """人工反馈节点 - 基于反馈内容的路由"""
    auto_accepted_plan = state.get("auto_accepted_plan", False)
    
    if not auto_accepted_plan:
        feedback = interrupt("请审核计划。")
        
        # 路由分支1: 编辑计划
        if feedback and str(feedback).upper().startswith("[EDIT_PLAN]"):
            return Command(
                update={"messages": [HumanMessage(content=feedback, name="feedback")]},
                goto="planner",  # 返回规划器修改
            )
        
        # 路由分支2: 接受计划
        elif feedback and str(feedback).upper().startswith("[ACCEPTED]"):
            goto = "research_team"  # 开始执行研究
    
    # 计划验证和路由
    try:
        new_plan = json.loads(repair_json_output(current_plan))
        return Command(
            update={
                "current_plan": Plan.model_validate(new_plan),
                "plan_iterations": plan_iterations + 1,
            },
            goto=goto,
        )
    except json.JSONDecodeError:
        # 错误处理路由
        if plan_iterations > 1:
            return Command(goto="reporter")
        else:
            return Command(goto="__end__")

2. 中断机制

2.1 中断机制架构

中断机制允许工作流在特定点暂停,等待外部输入或处理,是实现人机交互的关键。

2.1.1 基础中断实现

源码文件路径: src/graph/nodes.py

from langgraph.types import interrupt

def human_feedback_node(state):
    """基础中断机制 - 等待人工反馈"""
    auto_accepted_plan = state.get("auto_accepted_plan", False)
    
    if not auto_accepted_plan:
        # 核心中断调用
        feedback = interrupt("请审核计划。")
        
        # 中断后的处理逻辑
        if feedback:
            # 根据反馈内容决定后续流程
            if str(feedback).upper().startswith("[EDIT_PLAN]"):
                # 处理编辑请求
                return Command(
                    update={"messages": [HumanMessage(content=feedback, name="feedback")]},
                    goto="planner",
                )
            elif str(feedback).upper().startswith("[ACCEPTED]"):
                # 处理接受请求
                logger.info("用户接受了计划。")
2.1.2 澄清功能中断

源码文件路径: src/graph/nodes.py

def coordinator_node(state: State, config: RunnableConfig):
    """澄清功能中断 - 复杂中断处理"""
    enable_clarification = state.get("enable_clarification", False)
    
    if enable_clarification:
        clarification_rounds = state.get("clarification_rounds", 0)
        max_clarification_rounds = state.get("max_clarification_rounds", 3)
        
        # 中断条件: LLM提出澄清问题
        if not response.tool_calls and response.content:
            if clarification_rounds < max_clarification_rounds:
                # 使用__interrupt__键实现中断
                return Command(
                    update={
                        "messages": state_messages,
                        "clarification_rounds": clarification_rounds + 1,
                        "is_clarification_complete": False,
                        "__interrupt__": [("coordinator", response.content)],  # 中断标记
                    },
                    goto="__end__",  # 结束当前流程等待输入
                )
            else:
                # 达到最大轮次,不再中断
                logger.warning(f"达到最大澄清轮次({max_clarification_rounds})。移交给规划器。")
                goto = "planner"

2.2 中断恢复机制

2.2.1 澄清对话恢复

源码文件路径: src/workflow.py

async def run_agent_workflow_async(user_input: str, **kwargs):
    """中断恢复 - 澄清对话继续"""
    # 执行工作流
    async for s in graph.astream(input=initial_state, config=config, stream_mode="values"):
        final_state = s
        # ... 处理流式输出
    
    # 检查是否需要澄清 - 中断恢复逻辑
    if final_state and isinstance(final_state, dict):
        from src.graph.nodes import needs_clarification
        
        if needs_clarification(final_state):
            # 等待用户输入 - 中断点
            clarification_rounds = final_state.get("clarification_rounds", 0)
            max_clarification_rounds = final_state.get("max_clarification_rounds", 3)
            user_response = input(
                f"您的回复 ({clarification_rounds}/{max_clarification_rounds}): "
            ).strip()
            
            if not user_response:
                logger.warning("空回复,结束澄清")
                return final_state
            
            # 恢复工作流 - 递归调用
            current_state = final_state.copy()
            current_state["messages"] = final_state["messages"] + [
                {"role": "user", "content": user_response}
            ]
            
            # 递归恢复执行
            return await run_agent_workflow_async(
                user_input=user_response,
                initial_state=current_state,  # 传递中断状态
                **kwargs
            )
2.2.2 中断状态管理

源码文件路径: src/graph/nodes.py

def coordinator_node(state: State, config: RunnableConfig):
    """中断状态管理"""
    # 澄清历史管理
    clarification_history = state.get("clarification_history", [])
    
    # 处理用户响应 - 中断恢复后的状态更新
    if clarification_rounds > 0:
        last_message = state.get("messages", [])[-1] if state.get("messages") else None
        
        # 提取用户响应内容
        if last_message:
            if isinstance(last_message, dict) and last_message.get("role") == "user":
                clarification_history.append(last_message["content"])
            elif hasattr(last_message, "content"):
                clarification_history.append(last_message.content)
        
        # 构建澄清上下文
        clarification_context = f"""继续澄清(第{clarification_rounds}/{max_clarification_rounds}轮):
        用户最新回复: {current_response}
        询问剩余缺失的维度。不要重复问题或开始新话题。"""
        
        messages.append({"role": "system", "content": clarification_context})

2.3 工具调用中断

2.3.1 工具定义

源码文件路径: src/graph/nodes.py

from langchain_core.tools import tool
from typing import Annotated

@tool
def handoff_to_planner(
    research_topic: Annotated[str, "要移交的研究任务主题"],
    locale: Annotated[str, "用户检测到的语言区域设置"],
):
    """移交给规划器智能体进行计划制定 - 工具中断"""
    # 工具不返回实际内容,仅用于信号传递
    return

@tool  
def handoff_after_clarification(
    locale: Annotated[str, "用户检测到的语言区域设置"],
):
    """澄清轮次完成后移交给规划器 - 澄清结束信号"""
    return
2.3.2 工具调用处理

源码文件路径: src/graph/nodes.py

def coordinator_node(state: State, config: RunnableConfig):
    """工具调用中断处理"""
    # 绑定工具
    if enable_clarification:
        tools = [handoff_to_planner, handoff_after_clarification]
    else:
        tools = [handoff_to_planner]
    
    # 调用LLM with工具
    response = (
        get_llm_by_type(AGENT_LLM_MAP["coordinator"])
        .bind_tools(tools)
        .invoke(messages)
    )
    
    # 处理工具调用 - 中断信号检测
    if response.tool_calls:
        try:
            for tool_call in response.tool_calls:
                tool_name = tool_call.get("name", "")
                tool_args = tool_call.get("args", {})
                
                if tool_name in ["handoff_to_planner", "handoff_after_clarification"]:
                    logger.info("移交给规划器")
                    goto = "planner"
                    
                    # 提取工具参数
                    if tool_args.get("locale") and tool_args.get("research_topic"):
                        locale = tool_args.get("locale")
                        research_topic = tool_args.get("research_topic")
                    break
        except Exception as e:
            logger.error(f"处理工具调用时出错: {e}")
            goto = "planner"

3. 路由和中断的协同工作

3.1 状态驱动的协同

源码文件路径: src/graph/nodes.py

def coordinator_node(state: State, config: RunnableConfig):
    """路由和中断的协同工作示例"""
    # 状态检查
    enable_clarification = state.get("enable_clarification", False)
    clarification_rounds = state.get("clarification_rounds", 0)
    
    # 协同逻辑1: 中断条件检查影响路由
    if (enable_clarification and 
        not response.tool_calls and 
        response.content and 
        clarification_rounds < max_clarification_rounds):
        
        # 中断: 暂停工作流
        return Command(
            update={
                "clarification_rounds": clarification_rounds + 1,
                "__interrupt__": [("coordinator", response.content)],
            },
            goto="__end__",  # 路由: 结束等待输入
        )
    
    # 协同逻辑2: 工具调用决定路由
    if response.tool_calls:
        goto = "planner"  # 路由: 移交给规划器
    else:
        goto = "__end__"  # 路由: 结束流程
    
    # 协同逻辑3: 背景调查路由修正
    if goto == "planner" and state.get("enable_background_investigation"):
        goto = "background_investigator"  # 路由: 先进行背景调查
    
    return Command(
        update={
            "goto": goto,
            "is_clarification_complete": goto != "coordinator",
        },
        goto=goto,
    )

3.2 错误处理中的路由

源码文件路径: src/graph/nodes.py

def human_feedback_node(state):
    """错误处理中的路由决策"""
    try:
        # 尝试解析计划
        current_plan = repair_json_output(current_plan)
        new_plan = json.loads(current_plan)
        
        # 成功路由
        return Command(
            update={"current_plan": Plan.model_validate(new_plan)},
            goto="research_team",
        )
        
    except json.JSONDecodeError:
        logger.warning("规划器响应不是有效的JSON")
        
        # 错误路由决策
        if plan_iterations > 1:
            return Command(goto="reporter")    # 路由: 生成报告
        else:
            return Command(goto="__end__")     # 路由: 结束流程

4. 核心实现总结

4.1 条件路由核心要素

  1. 状态检查: 基于State对象的字段进行条件判断
  2. 业务逻辑: 根据计划完成度、步骤类型等业务状态路由
  3. Command控制: 使用Command对象精确控制路由和状态更新
  4. 条件边配置: 在图构建时定义条件边和路由函数

4.2 中断机制核心要素

  1. interrupt函数: LangGraph提供的中断原语
  2. __interrupt__键: 在Command更新中标记中断点
  3. 状态恢复: 通过递归调用和状态传递实现中断恢复
  4. 工具信号: 使用工具调用作为中断和恢复的信号

4.3 协同工作模式

  • 状态驱动: 路由决策基于状态,中断影响状态
  • 信号传递: 工具调用、LLM响应作为路由和中断信号
  • 递归恢复: 中断后通过递归调用恢复工作流
  • 错误处理: 异常情况下的路由降级和恢复策略

这种设计使得DeerFlow能够实现复杂的人机交互和智能工作流控制,同时保持系统的稳定性和可维护性。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐