DeerFlow中LangGraph工作流控制的完整实现逻辑分析

概述

DeerFlow基于LangGraph构建了多个专业化的工作流,通过节点、边组合构建业务流程处理图,按图的流程驱动完成整个业务。本项目这种通过几个综合案例演示工作流的配置、组建、执行,如深度研究、播客生成、PPT制作、散文写作和提示词增强等。本文档深入分析所有工作流控制的实现逻辑和核心代码。
项目地址:https://github.com/bytedance/deer-flow
LangGraph中文在线文档:https://github.langchain.ac.cn/langgraph/agents/agents/

1. 主工作流 - 深度研究工作流

1.1 核心状态图构建

源码文件路径: src/graph/builder.py

from langgraph.graph import END, START, StateGraph

def _build_base_graph():
    """构建深度研究工作流的基础状态图"""
    builder = StateGraph(State)
    
    # 添加核心节点
    builder.add_edge(START, "coordinator")                    # 入口: 协调器
    builder.add_node("coordinator", coordinator_node)         # 协调器节点
    builder.add_node("background_investigator", background_investigation_node)  # 背景调查
    builder.add_node("planner", planner_node)                # 规划器节点
    builder.add_node("reporter", reporter_node)              # 报告员节点
    builder.add_node("research_team", research_team_node)    # 研究团队节点
    builder.add_node("researcher", researcher_node)          # 研究员节点
    builder.add_node("coder", coder_node)                   # 编程员节点
    builder.add_node("human_feedback", human_feedback_node)  # 人工反馈节点
    
    # 添加固定边
    builder.add_edge("background_investigator", "planner")   # 背景调查 -> 规划器
    builder.add_edge("reporter", END)                        # 报告员 -> 结束
    
    # 添加条件边 - 核心工作流控制
    builder.add_conditional_edges(
        "research_team",                        # 源节点
        continue_to_running_research_team,      # 路由决策函数
        ["planner", "researcher", "coder"],     # 目标节点列表
    )
    
    # 协调器条件边 - 支持澄清功能
    builder.add_conditional_edges(
        "coordinator",
        lambda state: state.get("goto", "planner"),  # 动态路由函数
        ["planner", "background_investigator", "coordinator", END],
    )
    
    return builder

def build_graph():
    """构建无内存的工作流图"""
    builder = _build_base_graph()
    return builder.compile()

def build_graph_with_memory():
    """构建带内存的工作流图"""
    from langgraph.checkpoint.memory import MemorySaver
    memory = MemorySaver()
    builder = _build_base_graph()
    return builder.compile(checkpointer=memory)

1.2 状态定义和管理

源码文件路径: src/graph/types.py

from langgraph.graph import MessagesState
from src.prompts.planner_model import Plan
from src.rag import Resource

class State(MessagesState):
    """深度研究工作流状态定义 - 扩展MessagesState"""
    
    # 核心运行时变量
    locale: str = "en-US"                    # 语言区域
    research_topic: str = ""                 # 研究主题
    observations: list[str] = []             # 观察结果列表
    resources: list[Resource] = []           # 资源列表
    plan_iterations: int = 0                 # 计划迭代次数
    current_plan: Plan | str = None          # 当前计划
    final_report: str = ""                   # 最终报告
    auto_accepted_plan: bool = False         # 自动接受计划标志
    
    # 背景调查功能
    enable_background_investigation: bool = True
    background_investigation_results: str = None
    
    # 澄清功能状态跟踪 (默认禁用)
    enable_clarification: bool = False       # 启用/禁用澄清功能
    clarification_rounds: int = 0            # 澄清轮次
    clarification_history: list[str] = []    # 澄清历史
    is_clarification_complete: bool = False  # 澄清完成标志
    clarified_question: str = ""             # 澄清后的问题
    max_clarification_rounds: int = 3        # 最大澄清轮次
    
    # 工作流控制
    goto: str = "planner"                    # 默认下一个节点

1.3 核心路由控制逻辑

源码文件路径: src/graph/builder.py

def continue_to_running_research_team(state: State):
    """研究团队路由决策函数 - 核心工作流控制逻辑"""
    current_plan = state.get("current_plan")
    
    # 路由检查1: 计划有效性
    if not current_plan or not current_plan.steps:
        return "planner"  # 返回规划器重新制定计划
    
    # 路由检查2: 所有步骤完成检查
    if all(step.execution_res for step in current_plan.steps):
        return "planner"  # 所有步骤完成,返回规划器生成报告
    
    # 路由检查3: 按步骤类型分配智能体
    for step in current_plan.steps:
        if not step.execution_res:  # 找到第一个未完成步骤
            if step.step_type == StepType.RESEARCH:
                return "researcher"  # 分配给研究员
            if step.step_type == StepType.PROCESSING:
                return "coder"       # 分配给编程员
    
    return "planner"  # 默认返回规划器

1.4 工作流执行引擎

源码文件路径: src/workflow.py

from src.graph import build_graph

# 创建全局工作流图实例
graph = build_graph()

async def run_agent_workflow_async(
    user_input: str,
    debug: bool = False,
    max_plan_iterations: int = 1,
    max_step_num: int = 3,
    enable_background_investigation: bool = True,
    enable_clarification: bool | None = None,
    max_clarification_rounds: int | None = None,
    initial_state: dict | None = None,
):
    """异步运行深度研究工作流"""
    
    # 输入验证
    if not user_input:
        raise ValueError("输入不能为空")
    
    # 调试模式配置
    if debug:
        enable_debug_logging()
    
    # 初始状态构建
    if initial_state is None:
        initial_state = {
            "messages": [{"role": "user", "content": user_input}],
            "auto_accepted_plan": True,
            "enable_background_investigation": enable_background_investigation,
        }
        
        # 澄清功能配置
        if enable_clarification is not None:
            initial_state["enable_clarification"] = enable_clarification
        if max_clarification_rounds is not None:
            initial_state["max_clarification_rounds"] = max_clarification_rounds
    
    # 工作流配置
    config = {
        "configurable": {
            "thread_id": "default",
            "max_plan_iterations": max_plan_iterations,
            "max_step_num": max_step_num,
            "mcp_settings": {  # MCP服务器配置
                "servers": {
                    "mcp-github-trending": {
                        "transport": "stdio",
                        "command": "uvx",
                        "args": ["mcp-github-trending"],
                        "enabled_tools": ["get_github_trending_repositories"],
                        "add_to_agents": ["researcher"],
                    }
                }
            },
        },
        "recursion_limit": get_recursion_limit(default=100),
    }
    
    # 流式执行工作流
    last_message_cnt = 0
    final_state = None
    async for s in graph.astream(
        input=initial_state, 
        config=config, 
        stream_mode="values"
    ):
        try:
            final_state = s
            # 处理流式输出
            if isinstance(s, dict) and "messages" in s:
                if len(s["messages"]) > last_message_cnt:
                    last_message_cnt = len(s["messages"])
                    message = s["messages"][-1]
                    if isinstance(message, tuple):
                        print(message)
                    else:
                        message.pretty_print()
        except Exception as e:
            logger.error(f"处理流式输出时出错: {e}")
    
    # 澄清功能处理 - 中断恢复逻辑
    if final_state and isinstance(final_state, dict):
        from src.graph.nodes import needs_clarification
        
        if needs_clarification(final_state):
            # 等待用户输入
            clarification_rounds = final_state.get("clarification_rounds", 0)
            max_clarification_rounds = final_state.get("max_clarification_rounds", 3)
            user_response = input(
                f"您的回复 ({clarification_rounds}/{max_clarification_rounds}): "
            ).strip()
            
            if not user_response:
                logger.warning("空回复,结束澄清")
                return final_state
            
            # 递归继续工作流
            current_state = final_state.copy()
            current_state["messages"] = final_state["messages"] + [
                {"role": "user", "content": user_response}
            ]
            
            return await run_agent_workflow_async(
                user_input=user_response,
                initial_state=current_state,
                **kwargs
            )
    
    return final_state

2. 播客生成工作流

2.1 播客工作流图构建

源码文件路径: src/podcast/graph/builder.py

from langgraph.graph import END, START, StateGraph
from src.podcast.graph.state import PodcastState
from src.podcast.graph.script_writer_node import script_writer_node
from src.podcast.graph.tts_node import tts_node
from src.podcast.graph.audio_mixer_node import audio_mixer_node

def build_graph():
    """构建播客生成工作流图 - 线性流水线架构"""
    builder = StateGraph(PodcastState)
    
    # 添加节点 - 三阶段流水线
    builder.add_node("script_writer", script_writer_node)  # 脚本编写
    builder.add_node("tts", tts_node)                     # 文本转语音
    builder.add_node("audio_mixer", audio_mixer_node)     # 音频混合
    
    # 添加线性边 - 严格的执行顺序
    builder.add_edge(START, "script_writer")              # 开始 -> 脚本编写
    builder.add_edge("script_writer", "tts")              # 脚本编写 -> TTS
    builder.add_edge("tts", "audio_mixer")                # TTS -> 音频混合
    builder.add_edge("audio_mixer", END)                  # 音频混合 -> 结束
    
    return builder.compile()

# 全局工作流实例
workflow = build_graph()

# 工作流测试执行
if __name__ == "__main__":
    from dotenv import load_dotenv
    load_dotenv()
    
    # 读取报告内容作为输入
    report_content = open("examples/nanjing_tangbao.md").read()
    
    # 执行播客生成工作流
    final_state = workflow.invoke({"input": report_content})
    
    # 输出脚本内容
    for line in final_state["script"].lines:
        print("<M>" if line.speaker == "male" else "<F>", line.text)
    
    # 保存音频文件
    with open("final.mp3", "wb") as f:
        f.write(final_state["output"])

2.2 播客状态定义

源码文件路径: src/podcast/graph/state.py

from typing import Optional
from langgraph.graph import MessagesState
from ..types import Script

class PodcastState(MessagesState):
    """播客生成状态定义 - 专用于播客工作流"""
    
    # 输入数据
    input: str = ""                          # 原始输入内容
    
    # 输出数据
    output: Optional[bytes] = None           # 最终音频输出
    
    # 中间资产
    script: Optional[Script] = None          # 生成的脚本
    audio_chunks: list[bytes] = []           # 音频片段列表

2.3 TTS节点实现

源码文件路径: src/podcast/graph/tts_node.py

import base64
import logging
import os
from src.podcast.graph.state import PodcastState
from src.tools.tts import VolcengineTTS

logger = logging.getLogger(__name__)

def tts_node(state: PodcastState):
    """TTS节点 - 文本转语音处理"""
    logger.info("为播客生成音频片段...")
    
    # 创建TTS客户端
    tts_client = _create_tts_client()
    
    # 逐行处理脚本
    for line in state["script"].lines:
        # 根据说话者选择声音类型
        tts_client.voice_type = (
            "BV002_streaming" if line.speaker == "male" else "BV001_streaming"
        )
        
        # 执行文本转语音
        result = tts_client.text_to_speech(line.paragraph, speed_ratio=1.05)
        
        if result["success"]:
            # 解码音频数据
            audio_data = result["audio_data"]
            audio_chunk = base64.b64decode(audio_data)
            state["audio_chunks"].append(audio_chunk)
        else:
            logger.error(result["error"])
    
    return {
        "audio_chunks": state["audio_chunks"],
    }

def _create_tts_client():
    """创建TTS客户端 - 配置管理"""
    app_id = os.getenv("VOLCENGINE_TTS_APPID", "")
    if not app_id:
        raise Exception("VOLCENGINE_TTS_APPID未设置")
    
    access_token = os.getenv("VOLCENGINE_TTS_ACCESS_TOKEN", "")
    if not access_token:
        raise Exception("VOLCENGINE_TTS_ACCESS_TOKEN未设置")
    
    cluster = os.getenv("VOLCENGINE_TTS_CLUSTER", "volcano_tts")
    voice_type = "BV001_streaming"
    
    return VolcengineTTS(
        appid=app_id,
        access_token=access_token,
        cluster=cluster,
        voice_type=voice_type,
    )

3. PPT生成工作流

3.1 PPT工作流图构建

源码文件路径: src/ppt/graph/builder.py

from langgraph.graph import END, START, StateGraph
from src.ppt.graph.state import PPTState
from src.ppt.graph.ppt_composer_node import ppt_composer_node
from src.ppt.graph.ppt_generator_node import ppt_generator_node

def build_graph():
    """构建PPT生成工作流图 - 两阶段处理"""
    builder = StateGraph(PPTState)
    
    # 添加节点 - 两阶段流程
    builder.add_node("ppt_composer", ppt_composer_node)    # PPT内容组织
    builder.add_node("ppt_generator", ppt_generator_node)  # PPT文件生成
    
    # 添加线性边 - 顺序执行
    builder.add_edge(START, "ppt_composer")               # 开始 -> 内容组织
    builder.add_edge("ppt_composer", "ppt_generator")     # 内容组织 -> 文件生成
    builder.add_edge("ppt_generator", END)                # 文件生成 -> 结束
    
    return builder.compile()

# 全局工作流实例
workflow = build_graph()

# 工作流测试执行
if __name__ == "__main__":
    from dotenv import load_dotenv
    load_dotenv()
    
    # 读取报告内容
    report_content = open("examples/nanjing_tangbao.md").read()
    
    # 执行PPT生成工作流
    final_state = workflow.invoke({"input": report_content})

3.2 PPT状态定义

源码文件路径: src/ppt/graph/state.py

from langgraph.graph import MessagesState

class PPTState(MessagesState):
    """PPT生成状态定义 - 专用于PPT工作流"""
    
    # 输入数据
    input: str = ""                    # 原始输入内容
    
    # 输出数据
    generated_file_path: str = ""      # 生成的文件路径
    
    # 中间资产
    ppt_content: str = ""              # PPT内容
    ppt_file_path: str = ""            # PPT文件路径

4. 散文写作工作流

4.1 散文工作流图构建

源码文件路径: src/prose/graph/builder.py

from langgraph.graph import END, START, StateGraph
from src.prose.graph.state import ProseState
from src.prose.graph.prose_continue_node import prose_continue_node
from src.prose.graph.prose_fix_node import prose_fix_node
from src.prose.graph.prose_improve_node import prose_improve_node
from src.prose.graph.prose_longer_node import prose_longer_node
from src.prose.graph.prose_shorter_node import prose_shorter_node
from src.prose.graph.prose_zap_node import prose_zap_node

def optional_node(state: ProseState):
    """选项路由函数 - 基于用户选择的动态路由"""
    return state["option"]

def build_graph():
    """构建散文写作工作流图 - 选项驱动的分支架构"""
    builder = StateGraph(ProseState)
    
    # 添加功能节点 - 多种写作操作
    builder.add_node("prose_continue", prose_continue_node)  # 继续写作
    builder.add_node("prose_improve", prose_improve_node)    # 改进文本
    builder.add_node("prose_shorter", prose_shorter_node)    # 缩短文本
    builder.add_node("prose_longer", prose_longer_node)      # 延长文本
    builder.add_node("prose_fix", prose_fix_node)           # 修复文本
    builder.add_node("prose_zap", prose_zap_node)           # 快速处理
    
    # 添加条件边 - 基于选项的路由
    builder.add_conditional_edges(
        START,                          # 从开始节点
        optional_node,                  # 使用选项路由函数
        {                              # 路由映射表
            "continue": "prose_continue",
            "improve": "prose_improve",
            "shorter": "prose_shorter",
            "longer": "prose_longer",
            "fix": "prose_fix",
            "zap": "prose_zap",
        },
        END,                           # 默认结束节点
    )
    
    return builder.compile()

# 异步测试工作流
async def _test_workflow():
    """测试散文工作流 - 流式输出演示"""
    workflow = build_graph()
    
    # 流式执行工作流
    events = workflow.astream(
        {
            "content": "北京的天气很晴朗",
            "option": "continue",
        },
        stream_mode="messages",
        subgraphs=True,
    )
    
    # 处理流式事件
    async for node, event in events:
        e = event[0]
        print({
            "id": e.id, 
            "object": "chat.completion.chunk", 
            "content": e.content
        })

# 主执行入口
if __name__ == "__main__":
    import asyncio
    import logging
    from dotenv import load_dotenv
    
    load_dotenv()
    logging.basicConfig(level=logging.INFO)
    asyncio.run(_test_workflow())

5. 提示词增强工作流

5.1 提示词增强工作流图构建

源码文件路径: src/prompt_enhancer/graph/builder.py

from langgraph.graph import StateGraph
from src.prompt_enhancer.graph.state import PromptEnhancerState
from src.prompt_enhancer.graph.enhancer_node import prompt_enhancer_node

def build_graph():
    """构建提示词增强工作流图 - 单节点处理架构"""
    builder = StateGraph(PromptEnhancerState)
    
    # 添加增强节点
    builder.add_node("enhancer", prompt_enhancer_node)
    
    # 设置入口点 - 直接进入增强节点
    builder.set_entry_point("enhancer")
    
    # 设置结束点 - 增强完成后结束
    builder.set_finish_point("enhancer")
    
    return builder.compile()

6. 工作流控制模式分析

6.1 线性流水线模式

应用场景: 播客生成、PPT生成
特点:

  • 严格的执行顺序
  • 每个阶段依赖前一阶段的输出
  • 无分支和循环
# 线性流水线示例
builder.add_edge(START, "stage1")
builder.add_edge("stage1", "stage2")  
builder.add_edge("stage2", "stage3")
builder.add_edge("stage3", END)

6.2 条件分支模式

应用场景: 散文写作工作流
特点:

  • 基于输入参数的动态路由
  • 多个并行的处理选项
  • 单次执行,无循环
# 条件分支示例
builder.add_conditional_edges(
    START,
    route_function,           # 路由决策函数
    {                        # 路由映射
        "option1": "node1",
        "option2": "node2",
    },
    END
)

6.3 复杂状态机模式

应用场景: 深度研究工作流
特点:

  • 多层条件路由
  • 循环执行能力
  • 中断和恢复机制
  • 人机交互支持
# 复杂状态机示例
builder.add_conditional_edges(
    "coordinator",
    lambda state: state.get("goto", "planner"),
    ["planner", "background_investigator", "coordinator", END],
)

builder.add_conditional_edges(
    "research_team",
    continue_to_running_research_team,
    ["planner", "researcher", "coder"],
)

6.4 单节点处理模式

应用场景: 提示词增强工作流
特点:

  • 最简单的工作流结构
  • 单一功能处理
  • 直接输入输出
# 单节点处理示例
builder.add_node("processor", process_node)
builder.set_entry_point("processor")
builder.set_finish_point("processor")

7. 工作流配置和管理

7.1 LangGraph Studio配置

源码文件路径: langgraph.json

{
  "dockerfile_lines": [],
  "graphs": {
    "deep_research": "./src/workflow.py:graph",
    "podcast_generation": "./src/podcast/graph/builder.py:workflow",
    "ppt_generation": "./src/ppt/graph/builder.py:workflow"
  },
  "python_version": "3.12",
  "env": "./.env",
  "dependencies": ["."]
}

7.2 工作流执行配置

源码文件路径: src/workflow.py

def get_recursion_limit(default: int = 100) -> int:
    """获取递归限制配置"""
    try:
        limit = int(os.getenv("AGENT_RECURSION_LIMIT", str(default)))
        return limit if limit > 0 else default
    except ValueError:
        return default

# 工作流配置示例
config = {
    "configurable": {
        "thread_id": "default",
        "max_plan_iterations": max_plan_iterations,
        "max_step_num": max_step_num,
        "mcp_settings": mcp_settings,
    },
    "recursion_limit": get_recursion_limit(default=100),
}

8. 总结

8.1 工作流控制特点

  1. 多样化架构: 支持线性、分支、状态机、单节点等多种模式
  2. 状态驱动: 所有工作流都基于状态对象进行数据传递
  3. 模块化设计: 每个工作流独立,可单独部署和测试
  4. 配置灵活: 支持环境变量和配置文件的灵活配置
  5. 调试友好: 集成LangGraph Studio,支持可视化调试

8.2 核心技术要素

  • StateGraph: LangGraph的核心图构建类
  • MessagesState: 基础状态类,支持消息传递
  • 条件边: 实现复杂的路由逻辑
  • Command对象: 精确控制工作流转换
  • 流式执行: 支持实时输出和交互
  • 检查点机制: 支持工作流的暂停和恢复

这种多层次的工作流控制架构使得DeerFlow能够处理从简单的文本处理到复杂的多智能体协作等各种场景,展现了LangGraph在工作流编排方面的强大能力。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐