DeepSeek-R1 + LangGraph :搭建多智能体协作系统

前言

在前面,我们使用了 DeepSeek来 写 SQL、写测试,它们都有一个共同点:线性流程。

如果任务太复杂,比如“帮我写一个贪吃蛇游戏”,单一的 Prompt 往往会导致代码太长被截断,或者逻辑前后矛盾。

今天,我们将构建一个多智能体协作系统 (Multi-Agent System)。在这个系统中,我们有多个智能体共同搭建起来一个协作小组:

产品经理 (PM): 由 DeepSeek-R1 扮演,负责逻辑拆解和需求分析。

程序员 (Coder): 由 DeepSeek-V3 扮演,负责具体代码实现。

测试员 (Reviewer): 由 DeepSeek-V3 扮演,负责找 Bug,如果发现错误,直接把任务打回给程序员重写。。


一、 系统架构设计

构建一个带“显式记忆”和“真实反馈”的有环图

  • Brain (大脑): DeepSeek-R1 (负责架构) + DeepSeek-V3 (负责编码)。
  • Tools (手脚): write_to_file (写文件), run_command (运行测试)。
  • Memory (记忆): Checkpointer (保存每一步的状态,允许人类随时介入)。

二、 赋予 AI “手脚”:自定义工具

首先,我们要定义 Agent 可以调用的 Python 函数。

import subprocess
import os
from langchain_core.tools import tool

# 1. 写文件工具
@tool
def write_file(filepath: str, content: str):
    """将代码写入指定路径。如果目录不存在会自动创建。"""
    os.makedirs(os.path.dirname(filepath), exist_ok=True)
    with open(filepath, 'w', encoding='utf-8') as f:
        f.write(content)
    return f"File {filepath} written successfully."

# 2. 运行测试工具
@tool
def run_pytest(test_file: str):
    """运行 Pytest 并返回真实的终端输出(包含报错信息)。"""
    try:
        result = subprocess.run(
            ["pytest", test_file], 
            capture_output=True, 
            text=True, 
            timeout=10
        )
        if result.returncode == 0:
            return "PASS"
        else:
            return f"FAIL:\n{result.stdout}\n{result.stderr}"
    except Exception as e:
        return f"Execution Error: {str(e)}"

# 工具列表
tools = [write_file, run_pytest]


三、 定义状态与节点逻辑

这部分是 LangGraph 的核心。我们需要一个能够承载**“项目文件结构”**的状态。

1. 状态定义

from typing import TypedDict, List, Annotated
import operator

class ProjectState(TypedDict):
    requirement: str        # 用户需求
    architecture_plan: str  # R1 生成的设计方案
    file_paths: List[str]   # 生成的文件列表
    test_output: str        # 最近一次测试的输出
    messages: Annotated[List[dict], operator.add] # 对话历史 (用于自动修正)
    iteration: int          # 迭代次数

2. 架构师节点 (DeepSeek-R1)

利用 R1 的推理能力,生成不仅仅是代码,而是文件结构

def architect_node(state: ProjectState):
    print("🧠 [Architect] DeepSeek-R1 正在规划项目结构...")
    req = state['requirement']
    
    # R1 擅长思维链,让它思考文件分层
    prompt = f"""
    需求:{req}
    请设计一个 Python 项目结构。
    输出格式要求:详细列出需要创建的文件名及其核心逻辑描述。
    """
    response = llm_reasoner.invoke(prompt)
    return {"architecture_plan": response.content}

3. 工程师节点 (DeepSeek-V3 + Tools)

这是最复杂的节点。它需要根据架构师的方案,调用 write_file 工具真的去写文件。

from langchain_core.prompts import ChatPromptTemplate

def coder_node(state: ProjectState):
    print("👨‍💻 [Coder] DeepSeek-V3 正在编写代码并写入磁盘...")
    plan = state['architecture_plan']
    feedback = state.get('test_output', "")
    
    # 如果有报错反馈,说明是 Fix 阶段
    if "FAIL" in feedback:
        instruction = f"上次运行测试失败,报错信息如下:\n{feedback}\n请调用 write_file 修改对应文件以修复 Bug。"
    else:
        instruction = f"请根据设计方案实现代码,并创建一个 test_main.py 测试文件。\n设计方案:{plan}"

    # 绑定工具的 V3 模型
    llm_with_tools = llm_coder.bind_tools(tools)
    response = llm_with_tools.invoke(instruction)
    
    # 这里的 response 包含工具调用指令 (Function Call)
    return {"messages": [response]} 

4. 工具执行节点

LangGraph 内置了 ToolNode,它会自动解析 V3 的 Function Call 并执行 Python 函数(即真的写文件)。

from langgraph.prebuilt import ToolNode
tool_node = ToolNode(tools)

5. 测试节点 (执行真实环境)

def tester_node(state: ProjectState):
    print("🧪 [Tester] 正在运行 Pytest...")
    # 假设我们约定好测试文件叫 test_main.py
    result = run_pytest("test_main.py") 
    print(f"   测试结果: {'✅ 通过' if 'PASS' in result else '❌ 失败'}")
    return {"test_output": result, "iteration": state["iteration"] + 1}


四、 编排:加入人工审核(Human-in-the-loop)

企业级应用必须有 Safety Check。在代码通过测试准备交付前,我们增加一个中断 (Interrupt)

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

# 1. 构建图
workflow = StateGraph(ProjectState)

workflow.add_node("architect", architect_node)
workflow.add_node("coder", coder_node)
workflow.add_node("tools", tool_node) # 执行写文件操作
workflow.add_node("tester", tester_node)

# 2. 编排连线
workflow.set_entry_point("architect")
workflow.add_edge("architect", "coder")
workflow.add_edge("coder", "tools")
workflow.add_edge("tools", "tester")

# 3. 关键:条件路由 (自动修复逻辑)
def should_continue(state: ProjectState):
    if state["iteration"] > 5:
        return "human_approval" # 防止死循环,强制转人工
    if "PASS" in state["test_output"]:
        return "human_approval" # 测试通过,申请人类审批
    else:
        return "coder" # 测试失败,回炉重造

workflow.add_conditional_edges("tester", should_continue)

# 4. 人类审批节点 (空节点,仅作暂停点)
workflow.add_node("human_approval", lambda x: x)
workflow.add_edge("human_approval", END)

# 5. 编译 (开启记忆功能)
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer, interrupt_before=["human_approval"])


五、 DeepSeek 修复 Bug 的全过程

假设我们让它写一个“计算斐波那契数列”的接口。

第一轮运行:

  1. Architect (R1): 规划 fib.pytest_fib.py
  2. Coder (V3): 调用 write_file 写入代码。(故意埋坑:它可能写了个递归没加缓存,或者边界条件错了)
  3. Tester: 运行 pytest
  • 真实报错: RecursionError: maximum recursion depth exceeded (因为没写终止条件)。

自动修复(关键时刻):

  1. 路由逻辑: 发现 FAIL,指回 coder 节点。
  2. Coder (V3): 接收到 RecursionError 的 Traceback。
  • DeepSeek 思考: “啊,我忘记加 if n <= 1 return n 了。”
  • 行动: 再次调用 write_file 覆盖 fib.py
  1. Tester: 再次运行。
  • 结果: PASS

人工介入:
程序在 human_approval 节点暂停。
开发者看到控制台提示:“测试已通过,等待审批。”
开发者检查生成的代码,输入 app.invoke(..., command="resume"),流程结束。

六、总结

由于现在AI能力还没有想象中的那么强大,文章只是介绍流程作为参考。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐