DeerFlow多智能体项目分析-通过LangGraph实现工作流的源码解析
源码文件路径:"""深度研究工作流状态定义 - 扩展MessagesState"""# 核心运行时变量locale: str = "en-US" # 语言区域research_topic: str = "" # 研究主题observations: list[str] = [] # 观察结果列表resources: list[Resource] = [] # 资源列表plan_iterations:
DeerFlow中LangGraph工作流控制的完整实现逻辑分析
概述
DeerFlow基于LangGraph构建了多个专业化的工作流,通过节点、边组合构建业务流程处理图,按图的流程驱动完成整个业务。本项目这种通过几个综合案例演示工作流的配置、组建、执行,如深度研究、播客生成、PPT制作、散文写作和提示词增强等。本文档深入分析所有工作流控制的实现逻辑和核心代码。
项目地址:https://github.com/bytedance/deer-flow
LangGraph中文在线文档:https://github.langchain.ac.cn/langgraph/agents/agents/
1. 主工作流 - 深度研究工作流
1.1 核心状态图构建
源码文件路径: src/graph/builder.py
from langgraph.graph import END, START, StateGraph
def _build_base_graph():
"""构建深度研究工作流的基础状态图"""
builder = StateGraph(State)
# 添加核心节点
builder.add_edge(START, "coordinator") # 入口: 协调器
builder.add_node("coordinator", coordinator_node) # 协调器节点
builder.add_node("background_investigator", background_investigation_node) # 背景调查
builder.add_node("planner", planner_node) # 规划器节点
builder.add_node("reporter", reporter_node) # 报告员节点
builder.add_node("research_team", research_team_node) # 研究团队节点
builder.add_node("researcher", researcher_node) # 研究员节点
builder.add_node("coder", coder_node) # 编程员节点
builder.add_node("human_feedback", human_feedback_node) # 人工反馈节点
# 添加固定边
builder.add_edge("background_investigator", "planner") # 背景调查 -> 规划器
builder.add_edge("reporter", END) # 报告员 -> 结束
# 添加条件边 - 核心工作流控制
builder.add_conditional_edges(
"research_team", # 源节点
continue_to_running_research_team, # 路由决策函数
["planner", "researcher", "coder"], # 目标节点列表
)
# 协调器条件边 - 支持澄清功能
builder.add_conditional_edges(
"coordinator",
lambda state: state.get("goto", "planner"), # 动态路由函数
["planner", "background_investigator", "coordinator", END],
)
return builder
def build_graph():
"""构建无内存的工作流图"""
builder = _build_base_graph()
return builder.compile()
def build_graph_with_memory():
"""构建带内存的工作流图"""
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
builder = _build_base_graph()
return builder.compile(checkpointer=memory)
1.2 状态定义和管理
源码文件路径: src/graph/types.py
from langgraph.graph import MessagesState
from src.prompts.planner_model import Plan
from src.rag import Resource
class State(MessagesState):
"""深度研究工作流状态定义 - 扩展MessagesState"""
# 核心运行时变量
locale: str = "en-US" # 语言区域
research_topic: str = "" # 研究主题
observations: list[str] = [] # 观察结果列表
resources: list[Resource] = [] # 资源列表
plan_iterations: int = 0 # 计划迭代次数
current_plan: Plan | str = None # 当前计划
final_report: str = "" # 最终报告
auto_accepted_plan: bool = False # 自动接受计划标志
# 背景调查功能
enable_background_investigation: bool = True
background_investigation_results: str = None
# 澄清功能状态跟踪 (默认禁用)
enable_clarification: bool = False # 启用/禁用澄清功能
clarification_rounds: int = 0 # 澄清轮次
clarification_history: list[str] = [] # 澄清历史
is_clarification_complete: bool = False # 澄清完成标志
clarified_question: str = "" # 澄清后的问题
max_clarification_rounds: int = 3 # 最大澄清轮次
# 工作流控制
goto: str = "planner" # 默认下一个节点
1.3 核心路由控制逻辑
源码文件路径: src/graph/builder.py
def continue_to_running_research_team(state: State):
"""研究团队路由决策函数 - 核心工作流控制逻辑"""
current_plan = state.get("current_plan")
# 路由检查1: 计划有效性
if not current_plan or not current_plan.steps:
return "planner" # 返回规划器重新制定计划
# 路由检查2: 所有步骤完成检查
if all(step.execution_res for step in current_plan.steps):
return "planner" # 所有步骤完成,返回规划器生成报告
# 路由检查3: 按步骤类型分配智能体
for step in current_plan.steps:
if not step.execution_res: # 找到第一个未完成步骤
if step.step_type == StepType.RESEARCH:
return "researcher" # 分配给研究员
if step.step_type == StepType.PROCESSING:
return "coder" # 分配给编程员
return "planner" # 默认返回规划器
1.4 工作流执行引擎
源码文件路径: src/workflow.py
from src.graph import build_graph
# 创建全局工作流图实例
graph = build_graph()
async def run_agent_workflow_async(
user_input: str,
debug: bool = False,
max_plan_iterations: int = 1,
max_step_num: int = 3,
enable_background_investigation: bool = True,
enable_clarification: bool | None = None,
max_clarification_rounds: int | None = None,
initial_state: dict | None = None,
):
"""异步运行深度研究工作流"""
# 输入验证
if not user_input:
raise ValueError("输入不能为空")
# 调试模式配置
if debug:
enable_debug_logging()
# 初始状态构建
if initial_state is None:
initial_state = {
"messages": [{"role": "user", "content": user_input}],
"auto_accepted_plan": True,
"enable_background_investigation": enable_background_investigation,
}
# 澄清功能配置
if enable_clarification is not None:
initial_state["enable_clarification"] = enable_clarification
if max_clarification_rounds is not None:
initial_state["max_clarification_rounds"] = max_clarification_rounds
# 工作流配置
config = {
"configurable": {
"thread_id": "default",
"max_plan_iterations": max_plan_iterations,
"max_step_num": max_step_num,
"mcp_settings": { # MCP服务器配置
"servers": {
"mcp-github-trending": {
"transport": "stdio",
"command": "uvx",
"args": ["mcp-github-trending"],
"enabled_tools": ["get_github_trending_repositories"],
"add_to_agents": ["researcher"],
}
}
},
},
"recursion_limit": get_recursion_limit(default=100),
}
# 流式执行工作流
last_message_cnt = 0
final_state = None
async for s in graph.astream(
input=initial_state,
config=config,
stream_mode="values"
):
try:
final_state = s
# 处理流式输出
if isinstance(s, dict) and "messages" in s:
if len(s["messages"]) > last_message_cnt:
last_message_cnt = len(s["messages"])
message = s["messages"][-1]
if isinstance(message, tuple):
print(message)
else:
message.pretty_print()
except Exception as e:
logger.error(f"处理流式输出时出错: {e}")
# 澄清功能处理 - 中断恢复逻辑
if final_state and isinstance(final_state, dict):
from src.graph.nodes import needs_clarification
if needs_clarification(final_state):
# 等待用户输入
clarification_rounds = final_state.get("clarification_rounds", 0)
max_clarification_rounds = final_state.get("max_clarification_rounds", 3)
user_response = input(
f"您的回复 ({clarification_rounds}/{max_clarification_rounds}): "
).strip()
if not user_response:
logger.warning("空回复,结束澄清")
return final_state
# 递归继续工作流
current_state = final_state.copy()
current_state["messages"] = final_state["messages"] + [
{"role": "user", "content": user_response}
]
return await run_agent_workflow_async(
user_input=user_response,
initial_state=current_state,
**kwargs
)
return final_state
2. 播客生成工作流
2.1 播客工作流图构建
源码文件路径: src/podcast/graph/builder.py
from langgraph.graph import END, START, StateGraph
from src.podcast.graph.state import PodcastState
from src.podcast.graph.script_writer_node import script_writer_node
from src.podcast.graph.tts_node import tts_node
from src.podcast.graph.audio_mixer_node import audio_mixer_node
def build_graph():
"""构建播客生成工作流图 - 线性流水线架构"""
builder = StateGraph(PodcastState)
# 添加节点 - 三阶段流水线
builder.add_node("script_writer", script_writer_node) # 脚本编写
builder.add_node("tts", tts_node) # 文本转语音
builder.add_node("audio_mixer", audio_mixer_node) # 音频混合
# 添加线性边 - 严格的执行顺序
builder.add_edge(START, "script_writer") # 开始 -> 脚本编写
builder.add_edge("script_writer", "tts") # 脚本编写 -> TTS
builder.add_edge("tts", "audio_mixer") # TTS -> 音频混合
builder.add_edge("audio_mixer", END) # 音频混合 -> 结束
return builder.compile()
# 全局工作流实例
workflow = build_graph()
# 工作流测试执行
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
# 读取报告内容作为输入
report_content = open("examples/nanjing_tangbao.md").read()
# 执行播客生成工作流
final_state = workflow.invoke({"input": report_content})
# 输出脚本内容
for line in final_state["script"].lines:
print("<M>" if line.speaker == "male" else "<F>", line.text)
# 保存音频文件
with open("final.mp3", "wb") as f:
f.write(final_state["output"])
2.2 播客状态定义
源码文件路径: src/podcast/graph/state.py
from typing import Optional
from langgraph.graph import MessagesState
from ..types import Script
class PodcastState(MessagesState):
"""播客生成状态定义 - 专用于播客工作流"""
# 输入数据
input: str = "" # 原始输入内容
# 输出数据
output: Optional[bytes] = None # 最终音频输出
# 中间资产
script: Optional[Script] = None # 生成的脚本
audio_chunks: list[bytes] = [] # 音频片段列表
2.3 TTS节点实现
源码文件路径: src/podcast/graph/tts_node.py
import base64
import logging
import os
from src.podcast.graph.state import PodcastState
from src.tools.tts import VolcengineTTS
logger = logging.getLogger(__name__)
def tts_node(state: PodcastState):
"""TTS节点 - 文本转语音处理"""
logger.info("为播客生成音频片段...")
# 创建TTS客户端
tts_client = _create_tts_client()
# 逐行处理脚本
for line in state["script"].lines:
# 根据说话者选择声音类型
tts_client.voice_type = (
"BV002_streaming" if line.speaker == "male" else "BV001_streaming"
)
# 执行文本转语音
result = tts_client.text_to_speech(line.paragraph, speed_ratio=1.05)
if result["success"]:
# 解码音频数据
audio_data = result["audio_data"]
audio_chunk = base64.b64decode(audio_data)
state["audio_chunks"].append(audio_chunk)
else:
logger.error(result["error"])
return {
"audio_chunks": state["audio_chunks"],
}
def _create_tts_client():
"""创建TTS客户端 - 配置管理"""
app_id = os.getenv("VOLCENGINE_TTS_APPID", "")
if not app_id:
raise Exception("VOLCENGINE_TTS_APPID未设置")
access_token = os.getenv("VOLCENGINE_TTS_ACCESS_TOKEN", "")
if not access_token:
raise Exception("VOLCENGINE_TTS_ACCESS_TOKEN未设置")
cluster = os.getenv("VOLCENGINE_TTS_CLUSTER", "volcano_tts")
voice_type = "BV001_streaming"
return VolcengineTTS(
appid=app_id,
access_token=access_token,
cluster=cluster,
voice_type=voice_type,
)
3. PPT生成工作流
3.1 PPT工作流图构建
源码文件路径: src/ppt/graph/builder.py
from langgraph.graph import END, START, StateGraph
from src.ppt.graph.state import PPTState
from src.ppt.graph.ppt_composer_node import ppt_composer_node
from src.ppt.graph.ppt_generator_node import ppt_generator_node
def build_graph():
"""构建PPT生成工作流图 - 两阶段处理"""
builder = StateGraph(PPTState)
# 添加节点 - 两阶段流程
builder.add_node("ppt_composer", ppt_composer_node) # PPT内容组织
builder.add_node("ppt_generator", ppt_generator_node) # PPT文件生成
# 添加线性边 - 顺序执行
builder.add_edge(START, "ppt_composer") # 开始 -> 内容组织
builder.add_edge("ppt_composer", "ppt_generator") # 内容组织 -> 文件生成
builder.add_edge("ppt_generator", END) # 文件生成 -> 结束
return builder.compile()
# 全局工作流实例
workflow = build_graph()
# 工作流测试执行
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
# 读取报告内容
report_content = open("examples/nanjing_tangbao.md").read()
# 执行PPT生成工作流
final_state = workflow.invoke({"input": report_content})
3.2 PPT状态定义
源码文件路径: src/ppt/graph/state.py
from langgraph.graph import MessagesState
class PPTState(MessagesState):
"""PPT生成状态定义 - 专用于PPT工作流"""
# 输入数据
input: str = "" # 原始输入内容
# 输出数据
generated_file_path: str = "" # 生成的文件路径
# 中间资产
ppt_content: str = "" # PPT内容
ppt_file_path: str = "" # PPT文件路径
4. 散文写作工作流
4.1 散文工作流图构建
源码文件路径: src/prose/graph/builder.py
from langgraph.graph import END, START, StateGraph
from src.prose.graph.state import ProseState
from src.prose.graph.prose_continue_node import prose_continue_node
from src.prose.graph.prose_fix_node import prose_fix_node
from src.prose.graph.prose_improve_node import prose_improve_node
from src.prose.graph.prose_longer_node import prose_longer_node
from src.prose.graph.prose_shorter_node import prose_shorter_node
from src.prose.graph.prose_zap_node import prose_zap_node
def optional_node(state: ProseState):
"""选项路由函数 - 基于用户选择的动态路由"""
return state["option"]
def build_graph():
"""构建散文写作工作流图 - 选项驱动的分支架构"""
builder = StateGraph(ProseState)
# 添加功能节点 - 多种写作操作
builder.add_node("prose_continue", prose_continue_node) # 继续写作
builder.add_node("prose_improve", prose_improve_node) # 改进文本
builder.add_node("prose_shorter", prose_shorter_node) # 缩短文本
builder.add_node("prose_longer", prose_longer_node) # 延长文本
builder.add_node("prose_fix", prose_fix_node) # 修复文本
builder.add_node("prose_zap", prose_zap_node) # 快速处理
# 添加条件边 - 基于选项的路由
builder.add_conditional_edges(
START, # 从开始节点
optional_node, # 使用选项路由函数
{ # 路由映射表
"continue": "prose_continue",
"improve": "prose_improve",
"shorter": "prose_shorter",
"longer": "prose_longer",
"fix": "prose_fix",
"zap": "prose_zap",
},
END, # 默认结束节点
)
return builder.compile()
# 异步测试工作流
async def _test_workflow():
"""测试散文工作流 - 流式输出演示"""
workflow = build_graph()
# 流式执行工作流
events = workflow.astream(
{
"content": "北京的天气很晴朗",
"option": "continue",
},
stream_mode="messages",
subgraphs=True,
)
# 处理流式事件
async for node, event in events:
e = event[0]
print({
"id": e.id,
"object": "chat.completion.chunk",
"content": e.content
})
# 主执行入口
if __name__ == "__main__":
import asyncio
import logging
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(level=logging.INFO)
asyncio.run(_test_workflow())
5. 提示词增强工作流
5.1 提示词增强工作流图构建
源码文件路径: src/prompt_enhancer/graph/builder.py
from langgraph.graph import StateGraph
from src.prompt_enhancer.graph.state import PromptEnhancerState
from src.prompt_enhancer.graph.enhancer_node import prompt_enhancer_node
def build_graph():
"""构建提示词增强工作流图 - 单节点处理架构"""
builder = StateGraph(PromptEnhancerState)
# 添加增强节点
builder.add_node("enhancer", prompt_enhancer_node)
# 设置入口点 - 直接进入增强节点
builder.set_entry_point("enhancer")
# 设置结束点 - 增强完成后结束
builder.set_finish_point("enhancer")
return builder.compile()
6. 工作流控制模式分析
6.1 线性流水线模式
应用场景: 播客生成、PPT生成
特点:
- 严格的执行顺序
- 每个阶段依赖前一阶段的输出
- 无分支和循环
# 线性流水线示例
builder.add_edge(START, "stage1")
builder.add_edge("stage1", "stage2")
builder.add_edge("stage2", "stage3")
builder.add_edge("stage3", END)
6.2 条件分支模式
应用场景: 散文写作工作流
特点:
- 基于输入参数的动态路由
- 多个并行的处理选项
- 单次执行,无循环
# 条件分支示例
builder.add_conditional_edges(
START,
route_function, # 路由决策函数
{ # 路由映射
"option1": "node1",
"option2": "node2",
},
END
)
6.3 复杂状态机模式
应用场景: 深度研究工作流
特点:
- 多层条件路由
- 循环执行能力
- 中断和恢复机制
- 人机交互支持
# 复杂状态机示例
builder.add_conditional_edges(
"coordinator",
lambda state: state.get("goto", "planner"),
["planner", "background_investigator", "coordinator", END],
)
builder.add_conditional_edges(
"research_team",
continue_to_running_research_team,
["planner", "researcher", "coder"],
)
6.4 单节点处理模式
应用场景: 提示词增强工作流
特点:
- 最简单的工作流结构
- 单一功能处理
- 直接输入输出
# 单节点处理示例
builder.add_node("processor", process_node)
builder.set_entry_point("processor")
builder.set_finish_point("processor")
7. 工作流配置和管理
7.1 LangGraph Studio配置
源码文件路径: langgraph.json
{
"dockerfile_lines": [],
"graphs": {
"deep_research": "./src/workflow.py:graph",
"podcast_generation": "./src/podcast/graph/builder.py:workflow",
"ppt_generation": "./src/ppt/graph/builder.py:workflow"
},
"python_version": "3.12",
"env": "./.env",
"dependencies": ["."]
}
7.2 工作流执行配置
源码文件路径: src/workflow.py
def get_recursion_limit(default: int = 100) -> int:
"""获取递归限制配置"""
try:
limit = int(os.getenv("AGENT_RECURSION_LIMIT", str(default)))
return limit if limit > 0 else default
except ValueError:
return default
# 工作流配置示例
config = {
"configurable": {
"thread_id": "default",
"max_plan_iterations": max_plan_iterations,
"max_step_num": max_step_num,
"mcp_settings": mcp_settings,
},
"recursion_limit": get_recursion_limit(default=100),
}
8. 总结
8.1 工作流控制特点
- 多样化架构: 支持线性、分支、状态机、单节点等多种模式
- 状态驱动: 所有工作流都基于状态对象进行数据传递
- 模块化设计: 每个工作流独立,可单独部署和测试
- 配置灵活: 支持环境变量和配置文件的灵活配置
- 调试友好: 集成LangGraph Studio,支持可视化调试
8.2 核心技术要素
- StateGraph: LangGraph的核心图构建类
- MessagesState: 基础状态类,支持消息传递
- 条件边: 实现复杂的路由逻辑
- Command对象: 精确控制工作流转换
- 流式执行: 支持实时输出和交互
- 检查点机制: 支持工作流的暂停和恢复
这种多层次的工作流控制架构使得DeerFlow能够处理从简单的文本处理到复杂的多智能体协作等各种场景,展现了LangGraph在工作流编排方面的强大能力。
更多推荐
所有评论(0)