DeepSeek-R1 + LangGraph :搭建多智能体协作系统
首先,我们要定义 Agent 可以调用的 Python 函数。import os# 1. 写文件工具@tool"""将代码写入指定路径。如果目录不存在会自动创建。"""import subprocess import os from langchain_core . tools import tool # 1. 写文件工具 @tool def write_file(filepath : str ,
DeepSeek-R1 + LangGraph :搭建多智能体协作系统
前言
在前面,我们使用了 DeepSeek来 写 SQL、写测试,它们都有一个共同点:线性流程。
如果任务太复杂,比如“帮我写一个贪吃蛇游戏”,单一的 Prompt 往往会导致代码太长被截断,或者逻辑前后矛盾。
今天,我们将构建一个多智能体协作系统 (Multi-Agent System)。在这个系统中,我们有多个智能体共同搭建起来一个协作小组:
产品经理 (PM): 由 DeepSeek-R1 扮演,负责逻辑拆解和需求分析。
程序员 (Coder): 由 DeepSeek-V3 扮演,负责具体代码实现。
测试员 (Reviewer): 由 DeepSeek-V3 扮演,负责找 Bug,如果发现错误,直接把任务打回给程序员重写。。
一、 系统架构设计
构建一个带“显式记忆”和“真实反馈”的有环图。
- Brain (大脑): DeepSeek-R1 (负责架构) + DeepSeek-V3 (负责编码)。
- Tools (手脚):
write_to_file(写文件),run_command(运行测试)。 - Memory (记忆): Checkpointer (保存每一步的状态,允许人类随时介入)。
二、 赋予 AI “手脚”:自定义工具
首先,我们要定义 Agent 可以调用的 Python 函数。
import subprocess
import os
from langchain_core.tools import tool
# 1. 写文件工具
@tool
def write_file(filepath: str, content: str):
"""将代码写入指定路径。如果目录不存在会自动创建。"""
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
return f"File {filepath} written successfully."
# 2. 运行测试工具
@tool
def run_pytest(test_file: str):
"""运行 Pytest 并返回真实的终端输出(包含报错信息)。"""
try:
result = subprocess.run(
["pytest", test_file],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
return "PASS"
else:
return f"FAIL:\n{result.stdout}\n{result.stderr}"
except Exception as e:
return f"Execution Error: {str(e)}"
# 工具列表
tools = [write_file, run_pytest]
三、 定义状态与节点逻辑
这部分是 LangGraph 的核心。我们需要一个能够承载**“项目文件结构”**的状态。
1. 状态定义
from typing import TypedDict, List, Annotated
import operator
class ProjectState(TypedDict):
requirement: str # 用户需求
architecture_plan: str # R1 生成的设计方案
file_paths: List[str] # 生成的文件列表
test_output: str # 最近一次测试的输出
messages: Annotated[List[dict], operator.add] # 对话历史 (用于自动修正)
iteration: int # 迭代次数
2. 架构师节点 (DeepSeek-R1)
利用 R1 的推理能力,生成不仅仅是代码,而是文件结构。
def architect_node(state: ProjectState):
print("🧠 [Architect] DeepSeek-R1 正在规划项目结构...")
req = state['requirement']
# R1 擅长思维链,让它思考文件分层
prompt = f"""
需求:{req}
请设计一个 Python 项目结构。
输出格式要求:详细列出需要创建的文件名及其核心逻辑描述。
"""
response = llm_reasoner.invoke(prompt)
return {"architecture_plan": response.content}
3. 工程师节点 (DeepSeek-V3 + Tools)
这是最复杂的节点。它需要根据架构师的方案,调用 write_file 工具真的去写文件。
from langchain_core.prompts import ChatPromptTemplate
def coder_node(state: ProjectState):
print("👨💻 [Coder] DeepSeek-V3 正在编写代码并写入磁盘...")
plan = state['architecture_plan']
feedback = state.get('test_output', "")
# 如果有报错反馈,说明是 Fix 阶段
if "FAIL" in feedback:
instruction = f"上次运行测试失败,报错信息如下:\n{feedback}\n请调用 write_file 修改对应文件以修复 Bug。"
else:
instruction = f"请根据设计方案实现代码,并创建一个 test_main.py 测试文件。\n设计方案:{plan}"
# 绑定工具的 V3 模型
llm_with_tools = llm_coder.bind_tools(tools)
response = llm_with_tools.invoke(instruction)
# 这里的 response 包含工具调用指令 (Function Call)
return {"messages": [response]}
4. 工具执行节点
LangGraph 内置了 ToolNode,它会自动解析 V3 的 Function Call 并执行 Python 函数(即真的写文件)。
from langgraph.prebuilt import ToolNode
tool_node = ToolNode(tools)
5. 测试节点 (执行真实环境)
def tester_node(state: ProjectState):
print("🧪 [Tester] 正在运行 Pytest...")
# 假设我们约定好测试文件叫 test_main.py
result = run_pytest("test_main.py")
print(f" 测试结果: {'✅ 通过' if 'PASS' in result else '❌ 失败'}")
return {"test_output": result, "iteration": state["iteration"] + 1}
四、 编排:加入人工审核(Human-in-the-loop)
企业级应用必须有 Safety Check。在代码通过测试准备交付前,我们增加一个中断 (Interrupt)。
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
# 1. 构建图
workflow = StateGraph(ProjectState)
workflow.add_node("architect", architect_node)
workflow.add_node("coder", coder_node)
workflow.add_node("tools", tool_node) # 执行写文件操作
workflow.add_node("tester", tester_node)
# 2. 编排连线
workflow.set_entry_point("architect")
workflow.add_edge("architect", "coder")
workflow.add_edge("coder", "tools")
workflow.add_edge("tools", "tester")
# 3. 关键:条件路由 (自动修复逻辑)
def should_continue(state: ProjectState):
if state["iteration"] > 5:
return "human_approval" # 防止死循环,强制转人工
if "PASS" in state["test_output"]:
return "human_approval" # 测试通过,申请人类审批
else:
return "coder" # 测试失败,回炉重造
workflow.add_conditional_edges("tester", should_continue)
# 4. 人类审批节点 (空节点,仅作暂停点)
workflow.add_node("human_approval", lambda x: x)
workflow.add_edge("human_approval", END)
# 5. 编译 (开启记忆功能)
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer, interrupt_before=["human_approval"])
五、 DeepSeek 修复 Bug 的全过程
假设我们让它写一个“计算斐波那契数列”的接口。
第一轮运行:
- Architect (R1): 规划
fib.py和test_fib.py。 - Coder (V3): 调用
write_file写入代码。(故意埋坑:它可能写了个递归没加缓存,或者边界条件错了) - Tester: 运行
pytest。
- 真实报错:
RecursionError: maximum recursion depth exceeded(因为没写终止条件)。
自动修复(关键时刻):
- 路由逻辑: 发现
FAIL,指回coder节点。 - Coder (V3): 接收到
RecursionError的 Traceback。
- DeepSeek 思考: “啊,我忘记加
if n <= 1 return n了。” - 行动: 再次调用
write_file覆盖fib.py。
- Tester: 再次运行。
- 结果:
PASS。
人工介入:
程序在 human_approval 节点暂停。
开发者看到控制台提示:“测试已通过,等待审批。”
开发者检查生成的代码,输入 app.invoke(..., command="resume"),流程结束。
六、总结
由于现在AI能力还没有想象中的那么强大,文章只是介绍流程作为参考。
更多推荐



所有评论(0)