大模型开发必备:DeerFlow Agent多智能体协作系统详解,值得收藏的实战指南
DeerFlow的Agent模块是基于LangGraph构建的多智能体协作系统,采用状态驱动的工作流架构。系统包含协调器、规划器、研究员等专业化智能体,通过工厂模式创建管理。其优势在于模块化设计、智能路由、MCP协议支持工具集成,以及人机协作功能。这种架构使系统能处理复杂研究任务,同时保持灵活性和可维护性,为开发者提供了可扩展的AI协作框架。
前言
DeerFlow的Agent模块是基于LangGraph构建的多智能体协作系统,采用状态驱动的工作流架构。该模块实现了协调器、规划器、研究员、编程员、报告员等多个专业化智能体,通过统一的工厂模式创建和管理。
1.核心架构设计
1.1 Agent工厂模式
defcreate_agent( agent_name:str, agent_type:str, tools:list, prompt_template:str, pre_model_hook:callable=None,):"""工厂函数,创建具有一致配置的agents"""return create_react_agent( name=agent_name, model=get_llm_by_type(AGENT_LLM_MAP[agent_type]), tools=tools, prompt=lambda state: apply_prompt_template(prompt_template, state), pre_model_hook=pre_model_hook,)
1.2 Agent-LLM映射配置
AGENT_LLM_MAP:dict[str, LLMType]={"coordinator":"basic",# 协调器 - 用户交互管理"planner":"basic",# 规划器 - 研究计划生成"researcher":"basic",# 研究员 - 信息搜索与分析"coder":"basic",# 编程员 - 代码分析与执行"reporter":"basic",# 报告员 - 最终报告生成"podcast_script_writer":"basic",# 播客脚本编写"ppt_composer":"basic",# PPT制作"prose_writer":"basic",# 散文写作"prompt_enhancer":"basic",# 提示词增强}
2.核心节点实现
2.1 协调器节点 (Coordinator Node)
协调器是系统的入口点,负责用户交互管理和工作流路由:
defcoordinator_node(state: State, config: RunnableConfig):"""协调器节点,处理用户交互和澄清功能""" enable_clarification = state.get("enable_clarification",False)# 传统模式:澄清功能禁用ifnot enable_clarification: messages = apply_prompt_template("coordinator", state) messages.append({"role":"system","content":"澄清功能已禁用。必须立即调用handoff_to_planner工具。"}) tools =[handoff_to_planner]# 澄清模式:支持多轮对话else: clarification_rounds = state.get("clarification_rounds",0) max_clarification_rounds = state.get("max_clarification_rounds",3)if clarification_rounds ==0:# 首轮澄清 messages.append({"role":"system","content":"澄清模式已启用。遵循澄清过程指导原则。"})elif clarification_rounds >0:# 继续澄清对话 clarification_history = state.get("clarification_history",[])# 添加用户响应到澄清历史 last_message = state.get("messages",[])[-1]if state.get("messages")elseNoneif last_message andhasattr(last_message,"content"): clarification_history.append(last_message.content) tools =[handoff_to_planner, handoff_after_clarification]
澄清功能工具定义
@tooldefhandoff_to_planner( research_topic: Annotated[str,"要移交的研究任务主题"], locale: Annotated[str,"用户检测到的语言区域设置"],):"""移交给规划器智能体进行计划制定"""return@tooldefhandoff_after_clarification( locale: Annotated[str,"用户检测到的语言区域设置"],):"""澄清轮次完成后移交给规划器"""return
2.2 规划器节点 (Planner Node)
规划器负责生成和管理研究计划:
defplanner_node(state: State, config: RunnableConfig):"""规划器节点,生成完整的研究计划""" configurable = Configuration.from_runnable_config(config) plan_iterations = state.get("plan_iterations",0)# 澄清模式:仅使用澄清后的问题if state.get("enable_clarification")and state.get("clarified_question"): clean_state ={"messages":[{"role":"user","content": state["clarified_question"]}],"locale": state.get("locale","en-US"),"research_topic": state["clarified_question"],} messages = apply_prompt_template("planner", clean_state, configurable)else:# 正常模式:使用完整对话历史 messages = apply_prompt_template("planner", state, configurable)# 背景调查结果集成if state.get("enable_background_investigation")and state.get("background_investigation_results"): messages +=[{"role":"user","content":f"背景调查结果:\n{state['background_investigation_results']}\n"}]# LLM选择策略if configurable.enable_deep_thinking: llm = get_llm_by_type("reasoning")elif AGENT_LLM_MAP["planner"]=="basic": llm = get_llm_by_type("basic").with_structured_output(Plan, method="json_mode")else: llm = get_llm_by_type(AGENT_LLM_MAP["planner"])# 计划迭代限制检查if plan_iterations >= configurable.max_plan_iterations:return Command(goto="reporter")
2.3 背景调查节点
defbackground_investigation_node(state: State, config: RunnableConfig):"""背景调查节点,进行初步信息收集""" configurable = Configuration.from_runnable_config(config) query = state.get("research_topic")if SELECTED_SEARCH_ENGINE == SearchEngine.TAVILY.value: searched_content = LoggedTavilySearch( max_results=configurable.max_search_results).invoke(query)ifisinstance(searched_content,list): background_investigation_results =[f"## {elem['title']}\n\n{elem['content']}"for elem in searched_content]return{"background_investigation_results":"\n\n".join(background_investigation_results)}else: background_investigation_results = get_web_search_tool( configurable.max_search_results).invoke(query)return{"background_investigation_results": json.dumps( background_investigation_results, ensure_ascii=False)}
2.4 人机协作节点
defhuman_feedback_node(state):"""人机协作节点,处理计划审核和反馈""" current_plan = state.get("current_plan","") auto_accepted_plan = state.get("auto_accepted_plan",False)ifnot auto_accepted_plan: feedback = interrupt("请审核计划。")# 处理编辑计划反馈if feedback andstr(feedback).upper().startswith("[EDIT_PLAN]"):return Command( update={"messages":[HumanMessage(content=feedback, name="feedback")]}, goto="planner",)# 处理接受计划反馈elif feedback andstr(feedback).upper().startswith("[ACCEPTED]"): logger.info("用户接受了计划。")else:raise TypeError(f"不支持的中断值:{feedback}")# 计划解析和验证 plan_iterations = state.get("plan_iterations",0)+1try: current_plan = repair_json_output(current_plan) new_plan = json.loads(current_plan)except json.JSONDecodeError: logger.warning("规划器响应不是有效的JSON")if plan_iterations >1:return Command(goto="reporter")else:return Command(goto="__end__")return Command( update={"current_plan": Plan.model_validate(new_plan),"plan_iterations": plan_iterations,"locale": new_plan["locale"],}, goto="research_team",)
3.研究团队节点架构
3.1 动态工具配置系统
asyncdef_setup_and_execute_agent_step( state: State, config: RunnableConfig, agent_type:str, default_tools:list,):"""设置智能体工具并执行步骤的辅助函数""" configurable = Configuration.from_runnable_config(config) mcp_servers ={} enabled_tools ={}# 提取MCP服务器配置if configurable.mcp_settings:for server_name, server_config in configurable.mcp_settings["servers"].items():if(server_config["enabled_tools"]and agent_type in server_config["add_to_agents"]): mcp_servers[server_name]={ k: v for k, v in server_config.items()if k in("transport","command","args","url","env","headers")}for tool_name in server_config["enabled_tools"]: enabled_tools[tool_name]= server_name# 创建并执行带有MCP工具的智能体if mcp_servers: client = MultiServerMCPClient(mcp_servers) loaded_tools = default_tools[:] all_tools =await client.get_tools()for tool in all_tools:if tool.name in enabled_tools: tool.description =f"由'{enabled_tools[tool.name]}'提供支持。\n{tool.description}" loaded_tools.append(tool)# 上下文压缩钩子 llm_token_limit = get_llm_token_limit_by_type(AGENT_LLM_MAP[agent_type]) pre_model_hook = partial(ContextManager(llm_token_limit,3).compress_messages) agent = create_agent(agent_type, agent_type, loaded_tools, agent_type, pre_model_hook)returnawait _execute_agent_step(state, agent, agent_type)
3.2 研究员节点
asyncdefresearcher_node(state: State, config: RunnableConfig):"""研究员节点,执行信息搜索和分析""" configurable = Configuration.from_runnable_config(config)# 默认工具配置 tools =[ get_web_search_tool(configurable.max_search_results), crawl_tool]# 检索工具集成 retriever_tool = get_retriever_tool(state.get("resources",[]))if retriever_tool: tools.insert(0, retriever_tool)returnawait _setup_and_execute_agent_step(state, config,"researcher", tools)
3.3 编程员节点
asyncdefcoder_node(state: State, config: RunnableConfig):"""编程员节点,执行代码分析和处理"""returnawait _setup_and_execute_agent_step( state, config,"coder",[python_repl_tool])
3.4 步骤执行引擎
asyncdef_execute_agent_step(state: State, agent, agent_name:str):"""执行智能体步骤的辅助函数""" current_plan = state.get("current_plan") observations = state.get("observations",[])# 查找第一个未执行的步骤 current_step =None completed_steps =[]for step in current_plan.steps:ifnot step.execution_res: current_step = stepbreakelse: completed_steps.append(step)ifnot current_step:return Command(goto="research_team")# 格式化已完成步骤信息 completed_steps_info =""if completed_steps: completed_steps_info ="# 已完成的研究步骤\n\n"for i, step inenumerate(completed_steps): completed_steps_info +=f"## 已完成步骤 {i +1}: {step.title}\n\n" completed_steps_info +=f"<finding>\n{step.execution_res}\n</finding>\n\n"# 准备智能体输入 agent_input ={"messages":[HumanMessage( content=f"# 研究主题\n\n{current_plan.title}\n\n"f"{completed_steps_info}"f"# 当前步骤\n\n## 标题\n\n{current_step.title}\n\n"f"## 描述\n\n{current_step.description}\n\n"f"## 语言区域\n\n{state.get('locale','en-US')}")]}# 递归限制配置 recursion_limit =int(os.getenv("AGENT_RECURSION_LIMIT","25"))# 执行智能体 result =await agent.ainvoke(input=agent_input, config={"recursion_limit": recursion_limit})# 处理结果 response_content = result["messages"][-1].content current_step.execution_res = response_contentreturn Command( update={"messages":[HumanMessage(content=response_content, name=agent_name)],"observations": observations +[response_content],}, goto="research_team",)
4.报告员节点
defreporter_node(state: State, config: RunnableConfig):"""报告员节点,生成最终研究报告""" configurable = Configuration.from_runnable_config(config) current_plan = state.get("current_plan")# 构建报告输入 input_ ={"messages":[HumanMessage(f"# 研究要求\n\n## 任务\n\n{current_plan.title}\n\n"f"## 描述\n\n{current_plan.thought}")],"locale": state.get("locale","en-US"),} invoke_messages = apply_prompt_template("reporter", input_, configurable) observations = state.get("observations",[])# 添加格式和引用提醒 invoke_messages.append(HumanMessage( content="重要提示:按照提示中的格式构建报告。记住包含:\n\n""1. 关键要点 - 最重要发现的要点列表\n""2. 概述 - 主题的简要介绍\n""3. 详细分析 - 组织成逻辑部分\n""4. 调查说明(可选)- 用于更全面的报告\n""5. 关键引用 - 在末尾列出所有参考文献\n\n""优先使用MARKDOWN表格进行数据展示和比较。", name="system",))# 观察消息处理 observation_messages =[ HumanMessage( content=f"以下是研究任务的一些观察结果:\n\n{observation}", name="observation",)for observation in observations]# 上下文压缩 llm_token_limit = get_llm_token_limit_by_type(AGENT_LLM_MAP["reporter"]) compressed_state = ContextManager(llm_token_limit).compress_messages({"messages": observation_messages}) invoke_messages += compressed_state.get("messages",[])# 生成报告 response = get_llm_by_type(AGENT_LLM_MAP["reporter"]).invoke(invoke_messages)return{"final_report": response.content}
5.工作流控制系统
5.1 状态驱动路由
defcontinue_to_running_research_team(state: State):"""决定研究团队的下一步行动""" current_plan = state.get("current_plan")ifnot current_plan ornot current_plan.steps:return"planner"# 检查是否所有步骤都已完成ifall(step.execution_res for step in current_plan.steps):return"planner"# 查找第一个未完成的步骤for step in current_plan.steps:ifnot step.execution_res:if step.step_type == StepType.RESEARCH:return"researcher"if step.step_type == StepType.PROCESSING:return"coder"return"planner"
5.2 澄清功能控制
defneeds_clarification(state:dict)->bool:"""检查是否需要澄清"""ifnot state.get("enable_clarification",False):returnFalse clarification_rounds = state.get("clarification_rounds",0) is_clarification_complete = state.get("is_clarification_complete",False) max_clarification_rounds = state.get("max_clarification_rounds",3)return( clarification_rounds >0andnot is_clarification_completeand clarification_rounds <= max_clarification_rounds)
6.图构建器
def_build_base_graph():"""构建并返回包含所有节点和边的基础状态图""" builder = StateGraph(State)# 添加节点 builder.add_edge(START,"coordinator") builder.add_node("coordinator", coordinator_node) builder.add_node("background_investigator", background_investigation_node) builder.add_node("planner", planner_node) builder.add_node("reporter", reporter_node) builder.add_node("research_team", research_team_node) builder.add_node("researcher", researcher_node) builder.add_node("coder", coder_node) builder.add_node("human_feedback", human_feedback_node)# 添加边 builder.add_edge("background_investigator","planner") builder.add_edge("reporter", END)# 条件边 builder.add_conditional_edges("research_team", continue_to_running_research_team,["planner","researcher","coder"],) builder.add_conditional_edges("coordinator",lambda state: state.get("goto","planner"),["planner","background_investigator","coordinator", END],)return builderdefbuild_graph():"""构建并返回不带内存的智能体工作流图""" builder = _build_base_graph()return builder.compile()
7.架构优势
7.1 模块化设计
- 职责分离 :每个智能体专注于特定任务
- 松耦合 :节点间通过状态通信,易于维护
- 可扩展性 :新增智能体只需添加节点和配置
7.2 状态驱动工作流
- 智能路由 :基于当前状态自动决定下一步
- 条件分支 :支持复杂的业务逻辑
- 错误恢复 :具备容错和重试机制
7.3 工具集成能力
- MCP协议支持 :无缝集成外部工具和服务
- 动态工具加载 :运行时配置工具集合
- 工具描述增强 :自动标注工具来源
7.4 人机协作
- 计划审核 :支持人工审核和修改研究计划
- 澄清对话 :多轮对话澄清模糊需求
- 中断恢复 :支持工作流中断和恢复
8.使用示例
# 创建基础智能体agent = create_agent( agent_name="researcher", agent_type="researcher", tools=[search_tool, crawl_tool], prompt_template="researcher", pre_model_hook=context_compressor)# 构建工作流图graph = build_graph()# 执行工作流result =await graph.ainvoke({"messages":[HumanMessage(content="研究人工智能的发展趋势")],"research_topic":"人工智能发展趋势","locale":"zh-CN"})
总结
DeerFlow的Agent模块通过精心设计的架构实现了:
- 专业化智能体系统 :每个智能体专注特定领域
- 灵活的工作流控制 :状态驱动的智能路由
- 强大的工具集成 :MCP协议和动态工具加载
- 优秀的人机协作 :澄清对话和计划审核
- 高度可扩展性 :模块化设计易于扩展
这种架构使得DeerFlow能够处理复杂的研究任务,同时保持系统的灵活性和可维护性。
最后
为什么要学AI大模型
当下,⼈⼯智能市场迎来了爆发期,并逐渐进⼊以⼈⼯通⽤智能(AGI)为主导的新时代。企业纷纷官宣“ AI+ ”战略,为新兴技术⼈才创造丰富的就业机会,⼈才缺⼝将达 400 万!
DeepSeek问世以来,生成式AI和大模型技术爆发式增长,让很多岗位重新成了炙手可热的新星,岗位薪资远超很多后端岗位,在程序员中稳居前列。

与此同时AI与各行各业深度融合,飞速发展,成为炙手可热的新风口,企业非常需要了解AI、懂AI、会用AI的员工,纷纷开出高薪招聘AI大模型相关岗位。
最近很多程序员朋友都已经学习或者准备学习 AI 大模型,后台也经常会有小伙伴咨询学习路线和学习资料,我特别拜托北京清华大学学士和美国加州理工学院博士学位的鲁为民老师给大家这里给大家准备了一份涵盖了AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频 全系列的学习资料,这些学习资料不仅深入浅出,而且非常实用,让大家系统而高效地掌握AI大模型的各个知识点。
这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费】
AI大模型系统学习路线
在面对AI大模型开发领域的复杂与深入,精准学习显得尤为重要。一份系统的技术路线图,不仅能够帮助开发者清晰地了解从入门到精通所需掌握的知识点,还能提供一条高效、有序的学习路径。

但知道是一回事,做又是另一回事,初学者最常遇到的问题主要是理论知识缺乏、资源和工具的限制、模型理解和调试的复杂性,在这基础上,找到高质量的学习资源,不浪费时间、不走弯路,又是重中之重。
AI大模型入门到实战的视频教程+项目包
看视频学习是一种高效、直观、灵活且富有吸引力的学习方式,可以更直观地展示过程,能有效提升学习兴趣和理解力,是现在获取知识的重要途径

光学理论是没用的,要学会跟着一起敲,要动手实操,才能将自己的所学运用到实际当中去,这时候可以搞点实战案例来学习。
海量AI大模型必读的经典书籍(PDF)
阅读AI大模型经典书籍可以帮助读者提高技术水平,开拓视野,掌握核心技术,提高解决问题的能力,同时也可以借鉴他人的经验。对于想要深入学习AI大模型开发的读者来说,阅读经典书籍是非常有必要的。
600+AI大模型报告(实时更新)
这套包含640份报告的合集,涵盖了AI大模型的理论研究、技术实现、行业应用等多个方面。无论您是科研人员、工程师,还是对AI大模型感兴趣的爱好者,这套报告合集都将为您提供宝贵的信息和启示。
AI大模型面试真题+答案解析
我们学习AI大模型必然是想找到高薪的工作,下面这些面试题都是总结当前最新、最热、最高频的面试题,并且每道题都有详细的答案,面试前刷完这套面试题资料,小小offer,不在话下

这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费】

更多推荐



所有评论(0)