目录

一、什么是多智能体系统(MAS)?

1.1 核心概念

1.2 单智能体 vs 多智能体

二、MAS核心架构解析

2.1 三大核心模块

三、快速上手:简单使用实例

3.1 环境准备

3.2 构建一个简单的双智能体系统

3.3 使用CrewAI快速构建团队

四、深度了解:企业级MAS架构

4.1 主流框架对比

4.2 高级架构模式:智能体间通信

4.3 记忆与状态管理

五、MAS vs 单体智能体

六、总结:2026年的分水岭


一、什么是多智能体系统(MAS)?

1.1 核心概念

多智能体系统(Multi-Agent System, MAS) 是由多个自主智能体(Agent)组成的分布式协作网络。与单智能体相比,MAS通过功能模块化动态协作解决复杂问题,每个Agent拥有独立的角色、感知能力、决策能力和通信能力。

💡 类比理解:想象一个医院手术室团队——麻醉师、主刀医生、护士各自专业,通过实时沟通协作完成复杂手术。MAS中的Agent就像这些专家,既有独立能力,又能协同工作。

1.2 单智能体 vs 多智能体

维度 单智能体 多智能体系统(MAS)
知识覆盖 有限,难以跨领域 能力互补,专业化分工
复杂任务处理 线性处理,易出错 并行处理,任务分解
容错性 单点故障风险高 去中心化,可动态调整
可扩展性 受限于模型上下文 模块化扩展,易于维护

根据谷歌2026年发布的指南,MAS相当于AI领域的微服务架构,通过去中心化和专业化提升系统可靠性。

二、MAS核心架构解析

2.1 三大核心模块

典型的Multi-Agent系统由以下模块构成

  1. 规划模块(Planning)

    • 任务理解与分解

    • 制定执行计划(DAG结构)

    • 资源分配与工具选择

    • 异常处理与流程优化

  2. 记忆模块(Memory)

    • 用户画像:记录偏好习惯

    • 历史交互:对话上下文持久化

    • 中间结果:支持复杂任务分步执行

  3. 通信协作模块

    • 智能体间消息传递

    • 共享状态与环境感知

    • 冲突解决与共识达成

三、快速上手:简单使用实例

3.1 环境准备

pip install langgraph langchain-openai

3.2 构建一个简单的双智能体系统

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
import operator

# 定义状态
class AgentState(TypedDict):
    messages: Annotated[list, operator.add]
    next_agent: str

# 初始化模型
llm = ChatOpenAI(model="gpt-4", temperature=0.7)

# 定义研究员Agent
def researcher(state: AgentState):
    """研究Agent:负责信息收集与分析"""
    messages = state["messages"]
    response = llm.invoke(
        messages + [("system", "你是一位专业研究员,负责深度调研并提供详细报告。请基于用户问题收集关键信息。")]
    )
    return {
        "messages": [("researcher", response.content)],
        "next_agent": "writer"
    }

# 定义写手Agent  
def writer(state: AgentState):
    """写作Agent:负责内容创作"""
    messages = state["messages"]
    response = llm.invoke(
        messages + [("system", "你是一位资深作家,基于研究员提供的信息撰写流畅、有深度的文章。保持客观准确。")]
    )
    return {
        "messages": [("writer", response.content)],
        "next_agent": "end"
    }

# 构建状态图
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("researcher", researcher)
workflow.add_node("writer", writer)

# 定义边和路由逻辑
workflow.add_edge("researcher", "writer")
workflow.add_edge("writer", END)

# 设置入口
workflow.set_entry_point("researcher")

# 编译执行
app = workflow.compile()

# 运行
result = app.invoke({
    "messages": [("user", "请写一篇关于量子计算在药物研发中应用的文章")],
    "next_agent": "researcher"
})

print(result["messages"][-1][1])  # 输出最终结果

执行流程可视化:

3.3 使用CrewAI快速构建团队

CrewAI是另一个流行的MAS框架,更适合角色扮演场景:

from crewai import Agent, Task, Crew
from langchain_openai import ChatOpenAI

# 创建专业Agent
researcher = Agent(
    role='高级研究员',
    goal='收集并分析最新AI技术趋势',
    backstory='你是一位在科技领域有10年经验的研究专家,擅长深度分析',
    verbose=True,
    allow_delegation=False,
    llm=ChatOpenAI(model="gpt-4", temperature=0.5)
)

writer = Agent(
    role='技术作家',
    goal='将技术概念转化为易懂的文章',
    backstory='你是一位擅长技术写作的资深编辑,曾任职于顶级科技媒体',
    verbose=True,
    allow_delegation=True,
    llm=ChatOpenAI(model="gpt-4", temperature=0.7)
)

# 定义任务
research_task = Task(
    description='调研2025年多智能体系统的最新发展趋势,列出3个关键方向',
    agent=researcher,
    expected_output='一份包含3个关键趋势的研究报告'
)

writing_task = Task(
    description='基于研究报告撰写一篇面向开发者的技术博客,包含代码示例',
    agent=writer,
    expected_output='一篇完整的CSDN风格技术博客文章',
    context=[research_task]  # 依赖研究任务的结果
)

# 组建团队
crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, writing_task],
    verbose=2,
    process='sequential'  # 顺序执行,也可改为'parallel'
)

# 启动
result = crew.kickoff()
print(result)

四、深度了解:企业级MAS架构

4.1 主流框架对比

表格

框架 核心特点 适用场景 学习曲线
LangGraph 图结构工作流,状态管理强大 复杂业务流程、需要循环/条件分支 中等
CrewAI 角色扮演导向,配置简单 内容创作、研究团队模拟
AutoGen 对话式协作,代码生成强 软件开发、多轮对话系统 中等
Microsoft Semantic Kernel 企业级集成,支持DI和过滤器 .NET生态、企业级应用 中高
Google ADK 云原生,支持8种设计模式 GCP生态、云端部署 中等

4.2 高级架构模式:智能体间通信

A2A(Agent2Agent)协议示例

# 模拟智能体间通信协议
class AgentMessage:
    def __init__(self, sender, receiver, message_type, content, metadata=None):
        self.sender = sender
        self.receiver = receiver
        self.type = message_type  # 'task', 'query', 'response', 'broadcast'
        self.content = content
        self.timestamp = time.time()
        self.metadata = metadata or {}

class CommunicationChannel:
    """智能体通信总线"""
    def __init__(self):
        self.subscribers = {}
        self.message_history = []
    
    def subscribe(self, agent_id, callback):
        """智能体订阅消息"""
        self.subscribers[agent_id] = callback
    
    def publish(self, message: AgentMessage):
        """发布消息到总线"""
        self.message_history.append(message)
        
        if message.receiver in self.subscribers:
            # 定向消息
            self.subscribers[message.receiver](message)
        elif message.type == 'broadcast':
            # 广播消息
            for agent_id, callback in self.subscribers.items():
                if agent_id != message.sender:
                    callback(message)
    
    def get_history(self, agent_id=None, limit=10):
        """获取通信历史"""
        if agent_id:
            return [m for m in self.message_history 
                   if m.sender == agent_id or m.receiver == agent_id][-limit:]
        return self.message_history[-limit:]

# 实战:多智能体协作解决复杂问题
class CoordinatorAgent:
    """协调者Agent:负责任务分配和结果整合"""
    def __init__(self, channel: CommunicationChannel):
        self.id = "coordinator"
        self.channel = channel
        channel.subscribe(self.id, self.handle_message)
        self.results = {}
    
    def handle_message(self, message: AgentMessage):
        if message.type == 'response':
            self.results[message.sender] = message.content
            print(f"[协调者] 收到来自 {message.sender} 的结果")
            
            # 检查是否所有子任务完成
            if len(self.results) == 3:  # 假设有3个子任务
                self.integrate_results()
    
    def integrate_results(self):
        """整合所有子任务结果"""
        final_result = f"""
        === 综合报告 ===
        数据分析:{self.results.get('analyst', '无')}
        技术方案:{self.results.get('architect', '无')}
        风险评估:{self.results.get('risk_manager', '无')}
        """
        print(final_result)
        return final_result
    
    def delegate_task(self, task_description):
        """分解并委派任务"""
        subtasks = [
            ("analyst", "分析需求数据并提取关键指标"),
            ("architect", "设计技术架构方案"),
            ("risk_manager", "评估潜在风险并提出缓解措施")
        ]
        
        for agent_id, subtask in subtasks:
            msg = AgentMessage(
                sender=self.id,
                receiver=agent_id,
                message_type='task',
                content=f"{task_description} | 子任务:{subtask}"
            )
            self.channel.publish(msg)

class SpecialistAgent:
    """专业Agent:执行特定子任务"""
    def __init__(self, agent_id, specialty, llm, channel: CommunicationChannel):
        self.id = agent_id
        self.specialty = specialty
        self.llm = llm
        self.channel = channel
        channel.subscribe(self.id, self.handle_message)
    
    def handle_message(self, message: AgentMessage):
        if message.type == 'task':
            print(f"[{self.id}] 收到任务:{message.content[:50]}...")
            result = self.execute_task(message.content)
            
            # 发送结果回协调者
            response = AgentMessage(
                sender=self.id,
                receiver="coordinator",
                message_type='response',
                content=result,
                metadata={'specialty': self.specialty}
            )
            self.channel.publish(response)
    
    def execute_task(self, task):
        """使用LLM执行任务"""
        prompt = f"作为{self.specialty}专家,请完成以下任务:\n{task}"
        return self.llm.invoke(prompt).content

# 运行示例
channel = CommunicationChannel()
coordinator = CoordinatorAgent(channel)

# 创建专业团队
analyst = SpecialistAgent("analyst", "数据分析师", llm, channel)
architect = SpecialistAgent("architect", "系统架构师", llm, channel)
risk_manager = SpecialistAgent("risk_manager", "风险管理专家", llm, channel)

# 启动任务
coordinator.delegate_task("设计一个高并发电商系统")

4.3 记忆与状态管理

在复杂MAS中,共享记忆私有记忆的分离至关重要:

from typing import Dict, Any, List
import json

class MemoryStore:
    """分层记忆存储"""
    def __init__(self):
        self.short_term: Dict[str, List[Dict]] = {}      # 短期记忆:当前会话
        self.long_term: Dict[str, List[Dict]] = {}       # 长期记忆:跨会话持久化
        self.shared: Dict[str, Any] = {}                  # 共享记忆:所有Agent可访问
    
    def add_short_term(self, agent_id: str, observation: Dict):
        """添加短期记忆"""
        if agent_id not in self.short_term:
            self.short_term[agent_id] = []
        self.short_term[agent_id].append({
            **observation,
            'timestamp': time.time()
        })
        # 保持最近20条
        self.short_term[agent_id] = self.short_term[agent_id][-20:]
    
    def add_long_term(self, agent_id: str, insight: Dict):
        """提取长期记忆(重要洞察)"""
        if agent_id not in self.long_term:
            self.long_term[agent_id] = []
        self.long_term[agent_id].append(insight)
    
    def update_shared(self, key: str, value: Any, agent_id: str):
        """更新共享状态(需权限控制)"""
        self.shared[key] = {
            'value': value,
            'updated_by': agent_id,
            'timestamp': time.time()
        }
    
    def get_context(self, agent_id: str, query: str = None) -> str:
        """为Agent检索相关上下文"""
        context_parts = []
        
        # 1. 共享环境状态
        context_parts.append(f"【共享环境】\n{json.dumps(self.shared, indent=2)}")
        
        # 2. 该Agent的短期记忆
        stm = self.short_term.get(agent_id, [])
        context_parts.append(f"【近期操作】\n{json.dumps(stm[-5:], indent=2)}")
        
        # 3. 相关长期记忆(简化实现,实际可用向量检索)
        ltm = self.long_term.get(agent_id, [])
        relevant = [m for m in ltm if query and query in str(m)]
        context_parts.append(f"【历史经验】\n{json.dumps(relevant[:3], indent=2)}")
        
        return "\n\n".join(context_parts)

# 在Agent中使用记忆
class MemoryAugmentedAgent:
    def __init__(self, agent_id, role, memory: MemoryStore, llm):
        self.id = agent_id
        self.role = role
        self.memory = memory
        self.llm = llm
    
    def act(self, task: str):
        # 1. 检索上下文
        context = self.memory.get_context(self.id, task)
        
        # 2. 构建增强提示
        prompt = f"""基于以下上下文完成任务:

{context}

【当前任务】
{task}

【你的角色】
{self.role}

请提供详细执行方案:"""
        
        # 3. 执行并记录
        result = self.llm.invoke(prompt).content
        
        # 4. 更新记忆
        self.memory.add_short_term(self.id, {
            'task': task,
            'action': 'executed',
            'result_summary': result[:100]
        })
        
        return result

五、MAS vs 单体智能体

维度 单体智能体 多智能体系统
解决范围 局部效率问题 复杂业务系统的长期自治
认知负载 单个智能体承载所有知识 分散到多个专业智能体
调试难度 黑箱,难以定位问题 每个智能体输出可追溯
扩展性 修改影响全局 新增智能体即可扩展能力
可靠性 单点故障风险 分布式架构,容错性强
知识更新 需整体重新训练 单个智能体独立更新

六、总结:2026年的分水岭

多智能体系统的本质,不是“更炫的AI”,而是企业级基础设施能力的全面升级。它从“对话Agent + Demo展示”迈向“能跑业务的智能系统”,代表了从“单点智能”到“系统智能”的范式跃迁。

核心洞察

  • 跑通MAS的企业,将获得结构性效率红利

  • 仍停留在单体智能体的企业,很可能错过这一轮生产力跃迁窗口

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐