通用 Multi-Agent 问题求解系统 - 完整交付 (上)
基础设施层工具层执行层 - Worker Agents编排层 - LangGraph StateGraph用户层需改进通过用户输入最终输出Coordinator协调者Planner规划者Agent Router智能路由Researcher研究员Coder编码者Executor执行者Critic审核者Synthesizer综合者Tool RouterCalculatorFile ManagerWeb
·
通用 Multi-Agent 问题求解系统 - 完整交付
文章目录
- 通用 Multi-Agent 问题求解系统 - 完整交付
-
- 一、系统架构设计文档
- 二、模块与类设计说明
- 三、项目目录结构树
- 四、完整 Python 源代码
-
- `pyproject.toml`
- `requirements.txt`
- `.env.example`
- `src/__init__.py`
- `src/main.py`
- `src/config/__init__.py`
- `src/config/settings.py`
- `src/config/prompts.py`
- `src/agents/__init__.py`
- `src/agents/base.py`
- `src/agents/coordinator.py`
- `src/agents/planner.py`
- `src/agents/researcher.py`
- `src/agents/coder.py`
- `src/agents/coder.py` (续)
- `src/agents/executor.py`
- `src/agents/critic.py`
一、系统架构设计文档
1.1 整体架构图
1.2 组件职责说明
| 组件 | 职责 | 实现要点 |
|---|---|---|
| Coordinator | 任务理解、全局决策、进度监控 | 解析用户意图,决定工作流走向 |
| Planner | 任务分解、依赖分析、执行计划生成 | 将复杂任务拆分为可执行子任务 |
| Researcher | 信息检索、知识整合 | 调用搜索工具,整理相关资料 |
| Coder | 代码编写、技术实现 | 生成高质量代码,处理技术细节 |
| Executor | 工具调用、代码执行 | 执行具体操作,返回执行结果 |
| Critic | 质量审核、错误发现 | 检查输出质量,提出改进建议 |
| Synthesizer | 结果汇总、最终输出 | 整合所有产出,生成最终答案 |
1.3 数据流说明
1. 用户输入 → InputParser → 结构化任务
2. 结构化任务 → Coordinator → 任务分类 & 路由决策
3. 路由决策 → Planner → 子任务列表 + 执行顺序
4. 子任务 → AgentRouter → 分派到对应 Worker
5. Worker 执行 → 调用工具 → 产出中间结果
6. 中间结果 → Critic → 质量评估
7. 评估通过 → Synthesizer → 最终输出
8. 评估不通过 → 反馈修改 → 重新执行(最多 N 次)
1.4 LangGraph 节点/边设计
1.5 状态结构定义
from typing import TypedDict, Optional, List, Dict, Any, Literal
from pydantic import BaseModel, Field
from langchain_core.messages import BaseMessage
class SubTask(BaseModel):
"""子任务定义"""
id: str = Field(description="子任务唯一标识")
description: str = Field(description="子任务描述")
assigned_agent: str = Field(description="分配的智能体")
dependencies: List[str] = Field(default_factory=list, description="依赖的子任务ID")
status: Literal["pending", "in_progress", "completed", "failed"] = "pending"
result: Optional[str] = None
retry_count: int = 0
class ToolCallLog(BaseModel):
"""工具调用日志"""
tool_name: str
input_args: Dict[str, Any]
output: Any
success: bool
error_message: Optional[str] = None
execution_time: float
class AgentState(TypedDict):
"""全局状态定义 - LangGraph StateGraph 的核心状态"""
messages: List[BaseMessage] # 完整对话历史
original_task: str # 原始用户输入
task_type: str # 任务类型分类
subtasks: List[SubTask] # 子任务列表
current_subtask_index: int # 当前执行的子任务索引
agent_outputs: Dict[str, Any] # 每个 agent 的产出 {agent_name: output}
tool_call_logs: List[ToolCallLog] # 工具调用记录
current_agent: str # 当前执行的 agent
iteration_count: int # 全局迭代次数
max_iterations: int # 最大迭代次数限制
reflection_notes: List[str] # 反思笔记
final_answer: Optional[str] # 最终答案
error_log: List[str] # 错误日志
next: str # 下一个节点路由
require_human_review: bool # 是否需要人工审核
human_feedback: Optional[str] # 人工反馈
execution_stats: Dict[str, Any] # 执行统计信息
1.6 失败重试与终止条件
重试策略:
- 单任务重试:每个子任务最多重试 3 次
- 全局迭代限制:整体迭代不超过 10 次
- 指数退避:重试间隔递增(1s, 2s, 4s)
终止条件:
- 所有子任务成功完成 → 正常结束
- 达到最大迭代次数 → 返回部分结果 + 警告
- 关键错误(如 API 不可用)→ 立即终止 + 错误报告
- 用户主动取消 → 保存当前状态 + 退出
降级策略:
- 工具调用失败 → 尝试替代工具或纯 LLM 推理
- Agent 失败 → 回退到 Coordinator 重新规划
- LLM 超时 → 切换备用模型
二、模块与类设计说明
2.1 核心类结构
BaseAgent (抽象基类)
├── CoordinatorAgent # 协调者
├── PlannerAgent # 规划者
├── ResearcherAgent # 研究员
├── CoderAgent # 编码者
├── ExecutorAgent # 执行者
├── CriticAgent # 审核者
└── SynthesizerAgent # 综合者
BaseTool (抽象基类)
├── CalculatorTool # 计算工具
├── FileManagerTool # 文件管理
├── CodeExecutorTool # 代码执行
└── WebSearchTool # 网络搜索
GraphBuilder # 图构建器
├── build_graph() # 构建完整工作流图
├── add_node() # 添加节点
└── add_edge() # 添加边
StateManager # 状态管理器
├── get_state() # 获取当前状态
├── update_state() # 更新状态
└── save_checkpoint() # 保存检查点
MemoryManager # 记忆管理
├── ShortTermMemory # 短期记忆
└── LongTermMemory # 长期记忆
LLMFactory # LLM工厂
├── create_llm() # 创建LLM实例
└── get_available_models() # 获取可用模型
2.2 接口定义
# Agent 基类接口
class BaseAgent(ABC):
@abstractmethod
async def process(self, state: AgentState) -> AgentState:
"""处理状态并返回更新后的状态"""
pass
@abstractmethod
def get_system_prompt(self) -> str:
"""获取系统提示词"""
pass
# Tool 基类接口
class BaseTool(ABC):
@abstractmethod
def execute(self, **kwargs) -> Any:
"""执行工具"""
pass
@abstractmethod
def validate_input(self, **kwargs) -> bool:
"""验证输入参数"""
pass
三、项目目录结构树
multi_agent_system/
├── pyproject.toml
├── requirements.txt
├── README.md
├── .env.example
├── src/
│ ├── __init__.py
│ ├── main.py
│ ├── config/
│ │ ├── __init__.py
│ │ ├── settings.py
│ │ └── prompts.py
│ ├── agents/
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── coordinator.py
│ │ ├── planner.py
│ │ ├── researcher.py
│ │ ├── coder.py
│ │ ├── executor.py
│ │ ├── critic.py
│ │ └── synthesizer.py
│ ├── graph/
│ │ ├── __init__.py
│ │ ├── state.py
│ │ ├── nodes.py
│ │ ├── edges.py
│ │ └── builder.py
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── calculator.py
│ │ ├── file_manager.py
│ │ ├── code_executor.py
│ │ └── search.py
│ ├── memory/
│ │ ├── __init__.py
│ │ ├── short_term.py
│ │ └── long_term.py
│ ├── llm/
│ │ ├── __init__.py
│ │ └── factory.py
│ └── utils/
│ ├── __init__.py
│ ├── logger.py
│ └── visualizer.py
├── examples/
│ ├── example_planning.py
│ ├── example_tool_execution.py
│ └── example_code_generation.py
├── tests/
│ ├── __init__.py
│ ├── test_graph.py
│ ├── test_flow.py
│ └── test_tools.py
├── workspace/
│ └── .gitkeep
└── logs/
└── .gitkeep
四、完整 Python 源代码
pyproject.toml
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "multi-agent-system"
version = "1.0.0"
description = "通用多智能体协作问题求解系统"
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.10"
authors = [
{name = "Multi-Agent Team", email = "team@example.com"}
]
keywords = ["multi-agent", "langgraph", "langchain", "ai", "llm"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
dependencies = [
"langgraph>=0.2.0",
"langchain>=0.2.0",
"langchain-openai>=0.1.0",
"langchain-anthropic>=0.1.0",
"langchain-community>=0.2.0",
"pydantic>=2.0.0",
"python-dotenv>=1.0.0",
"rich>=13.0.0",
"aiohttp>=3.9.0",
"tiktoken>=0.5.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"pytest-cov>=4.0.0",
"black>=23.0.0",
"isort>=5.12.0",
"mypy>=1.0.0",
]
[project.scripts]
multi-agent = "src.main:main"
[tool.setuptools.packages.find]
where = ["."]
include = ["src*"]
[tool.black]
line-length = 100
target-version = ['py310', 'py311']
include = '\.pyi?$'
[tool.isort]
profile = "black"
line_length = 100
[tool.mypy]
python_version = "3.11"
warn_return_any = true
warn_unused_configs = true
ignore_missing_imports = true
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
requirements.txt
langgraph>=0.2.0
langchain>=0.2.0
langchain-openai>=0.1.0
langchain-anthropic>=0.1.0
langchain-community>=0.2.0
pydantic>=2.0.0
python-dotenv>=1.0.0
rich>=13.0.0
aiohttp>=3.9.0
tiktoken>=0.5.0
pytest>=7.0.0
pytest-asyncio>=0.21.0
.env.example
# LLM 配置
OPENAI_API_KEY=your_openai_api_key_here
ANTHROPIC_API_KEY=your_anthropic_api_key_here
# 默认使用的模型提供商: openai, anthropic, local
LLM_PROVIDER=openai
# 模型名称
OPENAI_MODEL=gpt-4o
ANTHROPIC_MODEL=claude-3-5-sonnet-20241022
# 本地模型配置(可选)
LOCAL_MODEL_URL=http://localhost:11434
LOCAL_MODEL_NAME=llama3
# 系统配置
MAX_ITERATIONS=10
MAX_RETRIES=3
DEBUG_MODE=true
LOG_LEVEL=INFO
# 工作目录(工具操作限定)
WORKSPACE_DIR=workspace
# 记忆系统
ENABLE_LONG_TERM_MEMORY=false
MEMORY_STORAGE_PATH=./memory_store
src/__init__.py
"""
Multi-Agent Problem Solving System
通用多智能体协作问题求解系统 - 基于 LangGraph 实现
"""
__version__ = "1.0.0"
__author__ = "Multi-Agent Team"
from src.graph.builder import GraphBuilder
from src.graph.state import AgentState, SubTask, ToolCallLog
__all__ = [
"GraphBuilder",
"AgentState",
"SubTask",
"ToolCallLog",
]
src/main.py
#!/usr/bin/env python3
"""
Multi-Agent System 主入口
提供命令行接口和交互式运行模式
"""
import asyncio
import sys
import os
from pathlib import Path
from typing import Optional
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent.parent))
from dotenv import load_dotenv
from rich.console import Console
from rich.panel import Panel
from rich.markdown import Markdown
from rich.prompt import Prompt, Confirm
from src.config.settings import Settings
from src.graph.builder import GraphBuilder
from src.graph.state import AgentState
from src.utils.logger import setup_logger, get_logger
from src.utils.visualizer import Visualizer
# 加载环境变量
load_dotenv()
console = Console()
logger = get_logger(__name__)
class MultiAgentSystem:
"""多智能体系统主类"""
def __init__(self, settings: Optional[Settings] = None):
"""
初始化多智能体系统
Args:
settings: 系统配置,如果为None则使用默认配置
"""
self.settings = settings or Settings()
self.graph_builder = GraphBuilder(self.settings)
self.graph = None
self.visualizer = Visualizer()
self._compiled = False
def compile(self) -> None:
"""
编译 LangGraph 工作流
这是 LangGraph 的核心步骤:
1. 构建状态图(StateGraph)
2. 添加所有节点(nodes)
3. 添加边和条件路由(edges)
4. 设置入口点和终止条件
5. 调用 compile() 生成可执行图
"""
if not self._compiled:
console.print("[bold blue]正在编译工作流图...[/bold blue]")
self.graph = self.graph_builder.build()
self._compiled = True
console.print("[bold green]✓ 工作流图编译完成[/bold green]")
# 如果启用调试模式,显示图结构
if self.settings.debug_mode:
self._show_graph_structure()
def _show_graph_structure(self) -> None:
"""显示图结构(调试用)"""
try:
mermaid_code = self.visualizer.generate_mermaid(self.graph)
console.print(Panel(mermaid_code, title="工作流图结构 (Mermaid)", border_style="blue"))
except Exception as e:
logger.warning(f"无法生成图结构可视化: {e}")
async def run(self, task: str, human_in_loop: bool = False) -> dict:
"""
执行任务
Args:
task: 用户输入的任务描述
human_in_loop: 是否启用人工审核环节
Returns:
执行结果字典,包含最终答案和执行统计
"""
if not self._compiled:
self.compile()
# 初始化状态
initial_state: AgentState = {
"messages": [],
"original_task": task,
"task_type": "",
"subtasks": [],
"current_subtask_index": 0,
"agent_outputs": {},
"tool_call_logs": [],
"current_agent": "coordinator",
"iteration_count": 0,
"max_iterations": self.settings.max_iterations,
"reflection_notes": [],
"final_answer": None,
"error_log": [],
"next": "parse_input",
"require_human_review": human_in_loop,
"human_feedback": None,
"execution_stats": {
"start_time": None,
"end_time": None,
"total_tokens": 0,
"agent_calls": {},
"tool_calls": 0,
}
}
console.print(Panel(
f"[bold]任务:[/bold] {task}",
title="🚀 开始执行",
border_style="green"
))
# 执行工作流
import time
start_time = time.time()
initial_state["execution_stats"]["start_time"] = start_time
try:
# LangGraph 的核心执行:通过 ainvoke 异步执行整个图
# 图会自动按照定义的节点和边进行状态流转
# 直到达到 END 状态或满足终止条件
final_state = await self.graph.ainvoke(
initial_state,
config={"recursion_limit": self.settings.max_iterations * 10}
)
end_time = time.time()
final_state["execution_stats"]["end_time"] = end_time
final_state["execution_stats"]["total_time"] = end_time - start_time
# 显示结果
self._display_result(final_state)
return final_state
except Exception as e:
logger.error(f"执行失败: {e}", exc_info=True)
console.print(f"[bold red]执行失败: {e}[/bold red]")
raise
def _display_result(self, state: dict) -> None:
"""显示执行结果"""
console.print("\n")
# 最终答案
if state.get("final_answer"):
console.print(Panel(
Markdown(state["final_answer"]),
title="📋 最终结果",
border_style="green"
))
# 执行统计
stats = state.get("execution_stats", {})
stats_text = f"""
**执行时间**: {stats.get('total_time', 0):.2f} 秒
**迭代次数**: {state.get('iteration_count', 0)}
**子任务数**: {len(state.get('subtasks', []))}
**工具调用**: {len(state.get('tool_call_logs', []))}
**错误数量**: {len(state.get('error_log', []))}
"""
console.print(Panel(
Markdown(stats_text),
title="📊 执行统计",
border_style="blue"
))
# 如果有错误,显示错误日志
if state.get("error_log"):
error_text = "\n".join(f"- {err}" for err in state["error_log"][-5:])
console.print(Panel(
error_text,
title="⚠️ 错误日志(最近5条)",
border_style="yellow"
))
def interactive_mode(system: MultiAgentSystem) -> None:
"""交互式模式"""
console.print(Panel(
"[bold]多智能体问题求解系统[/bold]\n\n"
"输入您的任务,系统将自动分解并协调多个智能体完成。\n"
"输入 'quit' 或 'exit' 退出,输入 'help' 查看帮助。",
title="欢迎",
border_style="cyan"
))
while True:
try:
task = Prompt.ask("\n[bold cyan]请输入任务[/bold cyan]")
if task.lower() in ['quit', 'exit', 'q']:
console.print("[yellow]再见![/yellow]")
break
if task.lower() == 'help':
show_help()
continue
if not task.strip():
console.print("[yellow]请输入有效的任务描述[/yellow]")
continue
# 询问是否需要人工审核
human_review = Confirm.ask("是否启用人工审核环节?", default=False)
# 执行任务
asyncio.run(system.run(task, human_in_loop=human_review))
except KeyboardInterrupt:
console.print("\n[yellow]已取消当前任务[/yellow]")
continue
except Exception as e:
console.print(f"[bold red]错误: {e}[/bold red]")
logger.exception("交互模式错误")
def show_help() -> None:
"""显示帮助信息"""
help_text = """
## 使用帮助
### 基本命令
- `quit` / `exit` / `q` - 退出程序
- `help` - 显示此帮助信息
### 任务示例
1. **规划类任务**
- "制定一个学习Python的30天计划"
- "帮我规划一次北京三日游"
2. **代码生成类任务**
- "编写一个Python爬虫抓取Hacker News首页"
- "创建一个简单的REST API"
3. **分析类任务**
- "分析电商网站的核心功能需求"
- "评估这个技术方案的优缺点"
### 环境变量
- `OPENAI_API_KEY` - OpenAI API密钥
- `LLM_PROVIDER` - 模型提供商 (openai/anthropic)
- `DEBUG_MODE` - 调试模式 (true/false)
"""
console.print(Markdown(help_text))
def main():
"""主函数入口"""
import argparse
parser = argparse.ArgumentParser(
description="通用多智能体问题求解系统",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"-t", "--task",
type=str,
help="直接执行指定任务"
)
parser.add_argument(
"-i", "--interactive",
action="store_true",
help="进入交互式模式"
)
parser.add_argument(
"--human-review",
action="store_true",
help="启用人工审核环节"
)
parser.add_argument(
"--debug",
action="store_true",
help="启用调试模式"
)
parser.add_argument(
"--log-level",
type=str,
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="日志级别"
)
args = parser.parse_args()
# 设置日志
setup_logger(args.log_level)
# 创建设置
settings = Settings()
if args.debug:
settings.debug_mode = True
# 创建系统实例
system = MultiAgentSystem(settings)
if args.task:
# 直接执行任务模式
asyncio.run(system.run(args.task, human_in_loop=args.human_review))
elif args.interactive or not args.task:
# 交互式模式
interactive_mode(system)
else:
parser.print_help()
if __name__ == "__main__":
main()
src/config/__init__.py
"""配置模块"""
from src.config.settings import Settings
from src.config.prompts import PromptTemplates
__all__ = ["Settings", "PromptTemplates"]
src/config/settings.py
"""
系统配置管理
使用 Pydantic 进行配置验证和环境变量加载
"""
import os
from pathlib import Path
from typing import Optional, Literal
from pydantic import BaseModel, Field, field_validator
from pydantic_settings import BaseSettings
class LLMSettings(BaseModel):
"""LLM 配置"""
provider: Literal["openai", "anthropic", "local"] = "openai"
openai_model: str = "gpt-4o"
anthropic_model: str = "claude-3-5-sonnet-20241022"
local_model_url: str = "http://localhost:11434"
local_model_name: str = "llama3"
temperature: float = 0.7
max_tokens: int = 4096
@field_validator("temperature")
@classmethod
def validate_temperature(cls, v: float) -> float:
if not 0 <= v <= 2:
raise ValueError("temperature 必须在 0-2 之间")
return v
class AgentSettings(BaseModel):
"""智能体配置"""
enabled_agents: list[str] = Field(
default=[
"coordinator",
"planner",
"researcher",
"coder",
"executor",
"critic",
"synthesizer"
],
description="启用的智能体列表"
)
max_retries_per_agent: int = 3
agent_timeout: int = 120 # 秒
class MemorySettings(BaseModel):
"""记忆系统配置"""
enable_short_term: bool = True
enable_long_term: bool = False
short_term_max_messages: int = 50
long_term_storage_path: str = "./memory_store"
class ToolSettings(BaseModel):
"""工具配置"""
workspace_dir: str = "workspace"
enable_code_execution: bool = True
enable_file_operations: bool = True
enable_web_search: bool = True
code_execution_timeout: int = 30 # 秒
@field_validator("workspace_dir")
@classmethod
def validate_workspace(cls, v: str) -> str:
"""确保工作目录存在"""
path = Path(v)
path.mkdir(parents=True, exist_ok=True)
return str(path.absolute())
class Settings(BaseSettings):
"""
主配置类
支持从环境变量和 .env 文件加载配置
"""
# API Keys
openai_api_key: Optional[str] = Field(default=None, alias="OPENAI_API_KEY")
anthropic_api_key: Optional[str] = Field(default=None, alias="ANTHROPIC_API_KEY")
# LLM 设置
llm_provider: str = Field(default="openai", alias="LLM_PROVIDER")
openai_model: str = Field(default="gpt-4o", alias="OPENAI_MODEL")
anthropic_model: str = Field(default="claude-3-5-sonnet-20241022", alias="ANTHROPIC_MODEL")
# 系统设置
max_iterations: int = Field(default=10, alias="MAX_ITERATIONS")
max_retries: int = Field(default=3, alias="MAX_RETRIES")
debug_mode: bool = Field(default=False, alias="DEBUG_MODE")
log_level: str = Field(default="INFO", alias="LOG_LEVEL")
# 工作目录
workspace_dir: str = Field(default="workspace", alias="WORKSPACE_DIR")
# 记忆系统
enable_long_term_memory: bool = Field(default=False, alias="ENABLE_LONG_TERM_MEMORY")
memory_storage_path: str = Field(default="./memory_store", alias="MEMORY_STORAGE_PATH")
# 子配置
llm: LLMSettings = Field(default_factory=LLMSettings)
agents: AgentSettings = Field(default_factory=AgentSettings)
memory: MemorySettings = Field(default_factory=MemorySettings)
tools: ToolSettings = Field(default_factory=ToolSettings)
model_config = {
"env_file": ".env",
"env_file_encoding": "utf-8",
"extra": "ignore"
}
def __init__(self, **kwargs):
super().__init__(**kwargs)
# 同步顶层配置到子配置
self.llm.provider = self.llm_provider
self.llm.openai_model = self.openai_model
self.llm.anthropic_model = self.anthropic_model
self.memory.enable_long_term = self.enable_long_term_memory
self.memory.long_term_storage_path = self.memory_storage_path
self.tools.workspace_dir = self.workspace_dir
def get_workspace_path(self) -> Path:
"""获取工作目录路径"""
path = Path(self.workspace_dir)
path.mkdir(parents=True, exist_ok=True)
return path.absolute()
def validate_api_keys(self) -> bool:
"""验证必要的 API Keys"""
if self.llm_provider == "openai" and not self.openai_api_key:
return False
if self.llm_provider == "anthropic" and not self.anthropic_api_key:
return False
return True
# 创建全局默认配置实例
default_settings = Settings()
src/config/prompts.py
"""
提示词模板管理
所有智能体的系统提示词和任务提示词模板
支持动态变量替换
"""
from typing import Dict, Any
from string import Template
class PromptTemplates:
"""提示词模板集合"""
# ==================== 协调者 Coordinator ====================
COORDINATOR_SYSTEM = """你是一个高效的任务协调者(Coordinator),负责:
1. 理解用户的原始任务需求
2. 判断任务类型和复杂度
3. 决定是否需要进行任务分解
4. 监控整体执行进度
5. 在需要时进行干预和调整
你必须始终保持清晰的推理过程,并在回复中展示你的思考链。
## 任务类型分类
- planning: 需要制定计划或方案
- coding: 需要编写代码
- research: 需要信息检索和分析
- analysis: 需要深度分析
- execution: 需要执行具体操作
- simple: 简单问答,可直接回答
## 输出格式
你必须以JSON格式输出决策:
{
"reasoning": "你的推理过程",
"task_type": "任务类型",
"complexity": "simple/medium/complex",
"needs_planning": true/false,
"direct_answer": "如果是简单任务,直接给出答案,否则为null"
}
"""
COORDINATOR_TASK = Template("""
## 当前任务
$task
## 已有上下文
$context
## 请分析这个任务并给出你的决策。
""")
# ==================== 规划者 Planner ====================
PLANNER_SYSTEM = """你是一个专业的任务规划者(Planner),负责:
1. 将复杂任务分解为可执行的子任务
2. 分析子任务之间的依赖关系
3. 确定每个子任务应该分配给哪个智能体
4. 制定合理的执行顺序
## 可用的智能体
- researcher: 信息检索和研究分析
- coder: 代码编写和技术实现
- executor: 工具调用和操作执行
- critic: 质量审核和改进建议
- synthesizer: 结果汇总和最终输出
## 输出格式
你必须以JSON格式输出任务分解结果:
{
"reasoning": "任务分解的推理过程",
"subtasks": [
{
"id": "task_1",
"description": "子任务描述",
"assigned_agent": "智能体名称",
"dependencies": [],
"expected_output": "预期输出描述"
}
],
"execution_order": ["task_1", "task_2", ...]
}
## 规划原则
1. 子任务应该具体、可执行
2. 依赖关系要清晰,避免循环依赖
3. 合理利用并行执行提高效率
4. 每个子任务都要有明确的完成标准"""
PLANNER_TASK = Template("""
## 需要规划的任务
$task
## 任务类型
$task_type
## 请制定详细的执行计划。
""")
# ==================== 研究员 Researcher ====================
RESEARCHER_SYSTEM = """你是一个专业的研究员(Researcher),负责:
1. 信息检索和收集
2. 资料整理和分析
3. 知识提取和总结
4. 提供研究报告
## 工作方法
1. 首先明确研究目标
2. 确定信息来源和检索策略
3. 收集和验证信息
4. 整理成结构化的研究报告
## 输出格式
{
"reasoning": "研究思路",
"findings": [
{
"topic": "主题",
"content": "内容",
"source": "来源",
"reliability": "high/medium/low"
}
],
"summary": "研究总结",
"recommendations": ["建议1", "建议2"]
}
"""
RESEARCHER_TASK = Template("""
## 研究任务
$task
## 上下文信息
$context
## 请进行研究并提供报告。
""")
# ==================== 编码者 Coder ====================
CODER_SYSTEM = """你是一个专业的程序员(Coder),负责:
1. 代码设计和实现
2. 代码优化和重构
3. Bug修复和调试
4. 技术文档编写
## 编码规范
1. 遵循 PEP 8 代码风格
2. 添加完整的类型注解
3. 编写清晰的文档字符串
4. 处理异常情况
5. 考虑代码可维护性
## 输出格式
{
"reasoning": "设计思路和实现方案",
"code": "完整的代码实现",
"language": "编程语言",
"dependencies": ["依赖库列表"],
"usage": "使用说明",
"notes": "注意事项"
}
## 代码质量要求
- 代码必须可以直接运行
- 包含必要的错误处理
- 变量命名清晰有意义
- 函数职责单一"""
CODER_TASK = Template("""
## 编码任务
$task
## 技术要求
$requirements
## 上下文信息
$context
## 请编写代码实现。
""")
# ==================== 执行者 Executor ====================
EXECUTOR_SYSTEM = """你是一个可靠的任务执行者(Executor),负责:
1. 调用各种工具完成具体操作
2. 执行代码和脚本
3. 处理文件操作
4. 返回执行结果
## 可用工具
- calculator: 数学计算和表达式求值
- file_manager: 文件读写操作(限定workspace目录)
- code_executor: Python代码执行
- web_search: 网络信息检索
## 工作原则
1. 仔细验证输入参数
2. 选择合适的工具
3. 处理执行错误
4. 详细记录执行过程
## 输出格式
{
"reasoning": "执行思路",
"tool_used": "使用的工具",
"input": "工具输入",
"output": "执行结果",
"success": true/false,
"error": "错误信息(如有)"
}
"""
EXECUTOR_TASK = Template("""
## 执行任务
$task
## 可用工具
$available_tools
## 上下文信息
$context
## 请执行任务并返回结果。
""")
# ==================== 审核者 Critic ====================
CRITIC_SYSTEM = """你是一个严格的质量审核者(Critic),负责:
1. 检查输出质量
2. 发现问题和错误
3. 提出改进建议
4. 决定是否需要修改
## 审核标准
1. 正确性:结果是否正确
2. 完整性:是否覆盖所有要求
3. 可用性:结果是否可以直接使用
4. 代码质量:代码是否符合规范
## 输出格式
{
"reasoning": "审核过程和发现",
"quality_score": 1-10,
"issues": [
{
"type": "error/warning/suggestion",
"description": "问题描述",
"location": "问题位置",
"fix_suggestion": "修复建议"
}
],
"passed": true/false,
"overall_feedback": "总体评价"
}
## 审核原则
- 客观公正,基于事实
- 批评要具体,建议要可行
- 平衡质量和效率"""
CRITIC_TASK = Template("""
## 审核目标
$task
## 待审核内容
$content
## 审核标准
$criteria
## 请进行审核并给出评价。
""")
# ==================== 综合者 Synthesizer ====================
SYNTHESIZER_SYSTEM = """你是一个专业的结果综合者(Synthesizer),负责:
1. 汇总所有智能体的产出
2. 整合成连贯的最终结果
3. 确保输出格式规范
4. 生成清晰的总结
## 输出原则
1. 结构清晰,层次分明
2. 突出重点,避免冗余
3. 使用用户友好的格式
4. 包含必要的说明
## 输出格式
{
"reasoning": "综合过程",
"final_answer": "最终答案(Markdown格式)",
"key_points": ["要点1", "要点2"],
"deliverables": [
{
"type": "code/document/plan/...",
"content": "内容"
}
],
"next_steps": ["后续建议"]
}
"""
SYNTHESIZER_TASK = Template("""
## 原始任务
$original_task
## 各智能体产出
$agent_outputs
## 请综合所有内容,生成最终结果。
""")
# ==================== 工具方法 ====================
@classmethod
def render(cls, template_name: str, **kwargs) -> str:
"""
渲染模板
Args:
template_name: 模板名称,如 "COORDINATOR_TASK"
**kwargs: 模板变量
Returns:
渲染后的字符串
"""
template = getattr(cls, template_name, None)
if template is None:
raise ValueError(f"模板 {template_name} 不存在")
if isinstance(template, Template):
# 设置默认值避免 KeyError
defaults = {
"task": "",
"context": "无",
"requirements": "无特殊要求",
"content": "",
"criteria": "标准质量检查",
"original_task": "",
"agent_outputs": "{}",
"available_tools": "calculator, file_manager, code_executor, web_search",
"task_type": "unknown"
}
defaults.update(kwargs)
return template.safe_substitute(defaults)
else:
return template
@classmethod
def get_system_prompt(cls, agent_name: str) -> str:
"""获取智能体的系统提示词"""
prompt_map = {
"coordinator": cls.COORDINATOR_SYSTEM,
"planner": cls.PLANNER_SYSTEM,
"researcher": cls.RESEARCHER_SYSTEM,
"coder": cls.CODER_SYSTEM,
"executor": cls.EXECUTOR_SYSTEM,
"critic": cls.CRITIC_SYSTEM,
"synthesizer": cls.SYNTHESIZER_SYSTEM,
}
return prompt_map.get(agent_name, "你是一个AI助手。")
@classmethod
def get_task_template(cls, agent_name: str) -> Template:
"""获取智能体的任务模板"""
template_map = {
"coordinator": cls.COORDINATOR_TASK,
"planner": cls.PLANNER_TASK,
"researcher": cls.RESEARCHER_TASK,
"coder": cls.CODER_TASK,
"executor": cls.EXECUTOR_TASK,
"critic": cls.CRITIC_TASK,
"synthesizer": cls.SYNTHESIZER_TASK,
}
return template_map.get(agent_name, Template("$task"))
src/agents/__init__.py
"""智能体模块"""
from src.agents.base import BaseAgent
from src.agents.coordinator import CoordinatorAgent
from src.agents.planner import PlannerAgent
from src.agents.researcher import ResearcherAgent
from src.agents.coder import CoderAgent
from src.agents.executor import ExecutorAgent
from src.agents.critic import CriticAgent
from src.agents.synthesizer import SynthesizerAgent
# 智能体注册表
AGENT_REGISTRY = {
"coordinator": CoordinatorAgent,
"planner": PlannerAgent,
"researcher": ResearcherAgent,
"coder": CoderAgent,
"executor": ExecutorAgent,
"critic": CriticAgent,
"synthesizer": SynthesizerAgent,
}
def get_agent(name: str, **kwargs) -> BaseAgent:
"""
工厂方法:根据名称获取智能体实例
Args:
name: 智能体名称
**kwargs: 初始化参数
Returns:
智能体实例
"""
agent_class = AGENT_REGISTRY.get(name)
if agent_class is None:
raise ValueError(f"未知的智能体: {name}")
return agent_class(**kwargs)
__all__ = [
"BaseAgent",
"CoordinatorAgent",
"PlannerAgent",
"ResearcherAgent",
"CoderAgent",
"ExecutorAgent",
"CriticAgent",
"SynthesizerAgent",
"AGENT_REGISTRY",
"get_agent",
]
src/agents/base.py
"""
智能体基类
定义所有智能体的通用接口和基础实现
"""
import json
import time
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Type
from pydantic import BaseModel
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, BaseMessage
from langchain_core.language_models import BaseChatModel
from src.config.settings import Settings
from src.config.prompts import PromptTemplates
from src.graph.state import AgentState
from src.llm.factory import LLMFactory
from src.utils.logger import get_logger
logger = get_logger(__name__)
class AgentOutput(BaseModel):
"""智能体输出的基础结构"""
reasoning: str
success: bool = True
error: Optional[str] = None
data: Dict[str, Any] = {}
class BaseAgent(ABC):
"""
智能体抽象基类
所有具体智能体都需要继承此类并实现抽象方法。
提供了通用的 LLM 调用、状态管理、日志记录等功能。
"""
# 类变量:智能体名称,子类必须覆盖
name: str = "base"
description: str = "基础智能体"
def __init__(
self,
settings: Optional[Settings] = None,
llm: Optional[BaseChatModel] = None
):
"""
初始化智能体
Args:
settings: 系统配置
llm: LLM实例,如果为None则使用工厂创建
"""
self.settings = settings or Settings()
self.llm = llm or LLMFactory.create(self.settings)
self._call_count = 0
self._total_tokens = 0
def get_system_prompt(self) -> str:
"""获取系统提示词"""
return PromptTemplates.get_system_prompt(self.name)
def get_task_prompt(self, **kwargs) -> str:
"""获取任务提示词"""
template = PromptTemplates.get_task_template(self.name)
return template.safe_substitute(**kwargs)
async def invoke_llm(
self,
messages: List[BaseMessage],
parse_json: bool = True
) -> Dict[str, Any]:
"""
调用 LLM 并解析响应
Args:
messages: 消息列表
parse_json: 是否尝试解析JSON
Returns:
解析后的响应字典
"""
start_time = time.time()
self._call_count += 1
try:
# 调用 LLM
response = await self.llm.ainvoke(messages)
content = response.content
elapsed = time.time() - start_time
logger.debug(f"[{self.name}] LLM调用耗时: {elapsed:.2f}s")
# 尝试提取并解析JSON
if parse_json:
try:
# 尝试从代码块中提取JSON
json_content = self._extract_json(content)
return json.loads(json_content)
except (json.JSONDecodeError, ValueError) as e:
logger.warning(f"[{self.name}] JSON解析失败: {e}")
return {"raw_content": content, "parse_error": str(e)}
return {"content": content}
except Exception as e:
logger.error(f"[{self.name}] LLM调用失败: {e}")
return {"error": str(e), "success": False}
def _extract_json(self, text: str) -> str:
"""从文本中提取JSON内容"""
# 尝试从 ```json ```代码块提取
import re
json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', text)
if json_match:
return json_match.group(1).strip()
# 尝试找到 { } 包裹的内容
brace_match = re.search(r'\{[\s\S]*\}', text)
if brace_match:
return brace_match.group(0)
raise ValueError("未找到有效的JSON内容")
def build_messages(
self,
state: AgentState,
task_prompt: str
) -> List[BaseMessage]:
"""
构建消息列表
Args:
state: 当前状态
task_prompt: 任务提示词
Returns:
消息列表
"""
messages = [
SystemMessage(content=self.get_system_prompt()),
]
# 添加历史消息(最近N条)
history = state.get("messages", [])[-10:]
messages.extend(history)
# 添加当前任务
messages.append(HumanMessage(content=task_prompt))
return messages
@abstractmethod
async def process(self, state: AgentState) -> AgentState:
"""
处理状态的核心方法(子类必须实现)
Args:
state: 当前状态
Returns:
更新后的状态
"""
pass
def update_state(
self,
state: AgentState,
updates: Dict[str, Any]
) -> AgentState:
"""
更新状态(创建新字典而非修改原状态)
Args:
state: 原状态
updates: 更新内容
Returns:
新状态
"""
new_state = dict(state)
new_state.update(updates)
return new_state
def add_message(
self,
state: AgentState,
message: BaseMessage
) -> List[BaseMessage]:
"""添加消息到历史"""
messages = list(state.get("messages", []))
messages.append(message)
return messages
def log_output(self, output: Dict[str, Any]) -> None:
"""记录智能体输出"""
if self.settings.debug_mode:
logger.info(f"[{self.name}] 输出: {json.dumps(output, ensure_ascii=False, indent=2)[:500]}...")
def get_stats(self) -> Dict[str, Any]:
"""获取智能体统计信息"""
return {
"name": self.name,
"call_count": self._call_count,
"total_tokens": self._total_tokens,
}
src/agents/coordinator.py
"""
协调者智能体(Coordinator)
负责任务理解、分类和整体流程控制
"""
import json
from typing import Any, Dict
from langchain_core.messages import AIMessage
from src.agents.base import BaseAgent
from src.graph.state import AgentState
from src.utils.logger import get_logger
logger = get_logger(__name__)
class CoordinatorAgent(BaseAgent):
"""
协调者智能体
职责:
1. 理解用户的原始任务
2. 判断任务类型和复杂度
3. 决定下一步路由
4. 监控整体执行进度
"""
name = "coordinator"
description = "任务协调与路由决策"
async def process(self, state: AgentState) -> AgentState:
"""
处理任务协调
Args:
state: 当前状态
Returns:
更新后的状态,包含任务分类和路由决策
"""
logger.info(f"[Coordinator] 开始处理任务: {state['original_task'][:100]}...")
# 构建上下文
context = self._build_context(state)
# 构建提示词
task_prompt = self.get_task_prompt(
task=state["original_task"],
context=context
)
# 构建消息并调用LLM
messages = self.build_messages(state, task_prompt)
result = await self.invoke_llm(messages, parse_json=True)
# 处理结果
if "error" in result:
logger.error(f"[Coordinator] LLM调用失败: {result['error']}")
return self._handle_error(state, result["error"])
# 解析协调决策
reasoning = result.get("reasoning", "")
task_type = result.get("task_type", "unknown")
complexity = result.get("complexity", "medium")
needs_planning = result.get("needs_planning", True)
direct_answer = result.get("direct_answer")
logger.info(f"[Coordinator] 任务类型: {task_type}, 复杂度: {complexity}, 需要规划: {needs_planning}")
# 记录推理过程
self.log_output({
"reasoning": reasoning,
"task_type": task_type,
"complexity": complexity,
"needs_planning": needs_planning
})
# 更新状态
new_messages = self.add_message(
state,
AIMessage(content=f"[Coordinator] 推理过程:\n{reasoning}")
)
# 决定下一步
if direct_answer and not needs_planning:
# 简单任务,直接给出答案
next_node = "synthesize"
agent_outputs = dict(state.get("agent_outputs", {}))
agent_outputs["coordinator"] = {
"decision": "direct_answer",
"answer": direct_answer,
"reasoning": reasoning
}
else:
# 复杂任务,需要规划
next_node = "plan"
agent_outputs = dict(state.get("agent_outputs", {}))
agent_outputs["coordinator"] = {
"decision": "needs_planning",
"task_type": task_type,
"complexity": complexity,
"reasoning": reasoning
}
return self.update_state(state, {
"messages": new_messages,
"task_type": task_type,
"current_agent": self.name,
"agent_outputs": agent_outputs,
"next": next_node,
"iteration_count": state.get("iteration_count", 0) + 1,
})
def _build_context(self, state: AgentState) -> str:
"""构建上下文信息"""
parts = []
# 已有的智能体输出
if state.get("agent_outputs"):
parts.append("### 已完成的工作")
for agent, output in state["agent_outputs"].items():
parts.append(f"- {agent}: {str(output)[:200]}...")
# 错误日志
if state.get("error_log"):
parts.append("\n### 历史错误")
for err in state["error_log"][-3:]:
parts.append(f"- {err}")
# 迭代信息
parts.append(f"\n### 状态信息")
parts.append(f"- 当前迭代: {state.get('iteration_count', 0)}/{state.get('max_iterations', 10)}")
parts.append(f"- 子任务数: {len(state.get('subtasks', []))}")
return "\n".join(parts) if parts else "无历史上下文"
def _handle_error(self, state: AgentState, error: str) -> AgentState:
"""处理错误"""
error_log = list(state.get("error_log", []))
error_log.append(f"[Coordinator] {error}")
return self.update_state(state, {
"error_log": error_log,
"next": "synthesize", # 错误时直接到综合阶段
"current_agent": self.name,
})
src/agents/planner.py
"""
规划者智能体(Planner)
负责任务分解和执行计划制定
"""
import json
import uuid
from typing import Any, Dict, List
from langchain_core.messages import AIMessage
from src.agents.base import BaseAgent
from src.graph.state import AgentState, SubTask
from src.utils.logger import get_logger
logger = get_logger(__name__)
class PlannerAgent(BaseAgent):
"""
规划者智能体
职责:
1. 将复杂任务分解为子任务
2. 分析子任务依赖关系
3. 分配执行智能体
4. 确定执行顺序
"""
name = "planner"
description = "任务分解与执行计划制定"
async def process(self, state: AgentState) -> AgentState:
"""
处理任务规划
Args:
state: 当前状态
Returns:
包含子任务列表的新状态
"""
logger.info(f"[Planner] 开始规划任务...")
task_type = state.get("task_type", "unknown")
# 构建提示词
task_prompt = self.get_task_prompt(
task=state["original_task"],
task_type=task_type
)
# 调用LLM
messages = self.build_messages(state, task_prompt)
result = await self.invoke_llm(messages, parse_json=True)
if "error" in result:
logger.error(f"[Planner] 规划失败: {result['error']}")
return self._handle_error(state, result["error"])
# 解析规划结果
reasoning = result.get("reasoning", "")
subtasks_data = result.get("subtasks", [])
execution_order = result.get("execution_order", [])
logger.info(f"[Planner] 生成了 {len(subtasks_data)} 个子任务")
# 转换为SubTask对象
subtasks = self._create_subtasks(subtasks_data, execution_order)
self.log_output({
"reasoning": reasoning,
"subtask_count": len(subtasks),
"execution_order": execution_order
})
# 更新状态
new_messages = self.add_message(
state,
AIMessage(content=f"[Planner] 规划完成:\n{reasoning}\n\n子任务数: {len(subtasks)}")
)
agent_outputs = dict(state.get("agent_outputs", {}))
agent_outputs["planner"] = {
"reasoning": reasoning,
"subtask_count": len(subtasks),
"execution_order": execution_order
}
return self.update_state(state, {
"messages": new_messages,
"subtasks": subtasks,
"current_subtask_index": 0,
"current_agent": self.name,
"agent_outputs": agent_outputs,
"next": "route_agent",
"iteration_count": state.get("iteration_count", 0) + 1,
})
def _create_subtasks(
self,
subtasks_data: List[Dict],
execution_order: List[str]
) -> List[SubTask]:
"""
创建子任务对象列表
Args:
subtasks_data: 原始子任务数据
execution_order: 执行顺序
Returns:
SubTask对象列表
"""
subtasks = []
# 创建ID映射
id_map = {}
for i, task_data in enumerate(subtasks_data):
task_id = task_data.get("id", f"task_{i+1}")
id_map[task_id] = task_id
subtask = SubTask(
id=task_id,
description=task_data.get("description", f"子任务 {i+1}"),
assigned_agent=self._validate_agent(task_data.get("assigned_agent", "executor")),
dependencies=task_data.get("dependencies", []),
status="pending",
result=None,
retry_count=0
)
subtasks.append(subtask)
# 按执行顺序排序(如果提供)
if execution_order:
order_map = {task_id: idx for idx, task_id in enumerate(execution_order)}
subtasks.sort(key=lambda t: order_map.get(t.id, 999))
return subtasks
def _validate_agent(self, agent_name: str) -> str:
"""验证并规范化智能体名称"""
valid_agents = ["researcher", "coder", "executor", "critic"]
agent_name = agent_name.lower().strip()
if agent_name in valid_agents:
return agent_name
# 尝试映射常见变体
mappings = {
"research": "researcher",
"code": "coder",
"coding": "coder",
"execute": "executor",
"review": "critic",
"reviewer": "critic",
}
return mappings.get(agent_name, "executor")
def _handle_error(self, state: AgentState, error: str) -> AgentState:
"""处理错误:创建一个默认的执行子任务"""
error_log = list(state.get("error_log", []))
error_log.append(f"[Planner] {error}")
# 创建一个默认的直接执行任务
default_subtask = SubTask(
id="default_task",
description=state["original_task"],
assigned_agent="executor",
dependencies=[],
status="pending"
)
return self.update_state(state, {
"subtasks": [default_subtask],
"current_subtask_index": 0,
"error_log": error_log,
"next": "route_agent",
"current_agent": self.name,
})
src/agents/researcher.py
"""
研究员智能体(Researcher)
负责信息检索、资料收集和分析
"""
from typing import Any, Dict, List
from langchain_core.messages import AIMessage
from src.agents.base import BaseAgent
from src.graph.state import AgentState
from src.tools import get_tool
from src.utils.logger import get_logger
logger = get_logger(__name__)
class ResearcherAgent(BaseAgent):
"""
研究员智能体
职责:
1. 信息检索和收集
2. 资料整理和分析
3. 知识提取和总结
"""
name = "researcher"
description = "信息检索与研究分析"
def __init__(self, **kwargs):
super().__init__(**kwargs)
# 初始化可用工具
self.search_tool = get_tool("web_search", self.settings)
async def process(self, state: AgentState) -> AgentState:
"""
执行研究任务
Args:
state: 当前状态
Returns:
包含研究结果的新状态
"""
# 获取当前子任务
subtasks = state.get("subtasks", [])
current_idx = state.get("current_subtask_index", 0)
if current_idx >= len(subtasks):
logger.warning("[Researcher] 没有待处理的子任务")
return self.update_state(state, {"next": "review"})
current_task = subtasks[current_idx]
logger.info(f"[Researcher] 处理任务: {current_task.description[:100]}...")
# 构建上下文
context = self._build_context(state)
# 构建提示词
task_prompt = self.get_task_prompt(
task=current_task.description,
context=context
)
# 调用LLM
messages = self.build_messages(state, task_prompt)
result = await self.invoke_llm(messages, parse_json=True)
if "error" in result:
return self._handle_error(state, current_task, result["error"])
# 处理研究结果
reasoning = result.get("reasoning", "")
findings = result.get("findings", [])
summary = result.get("summary", "")
recommendations = result.get("recommendations", [])
logger.info(f"[Researcher] 发现 {len(findings)} 条信息")
# 更新子任务状态
updated_subtasks = list(subtasks)
current_task.status = "completed"
current_task.result = summary
updated_subtasks[current_idx] = current_task
# 记录输出
self.log_output({
"task_id": current_task.id,
"findings_count": len(findings),
"summary": summary[:200]
})
# 更新消息
new_messages = self.add_message(
state,
AIMessage(content=f"[Researcher] 研究完成:\n{summary}")
)
# 更新agent输出
agent_outputs = dict(state.get("agent_outputs", {}))
researcher_outputs = agent_outputs.get("researcher", [])
if not isinstance(researcher_outputs, list):
researcher_outputs = []
researcher_outputs.append({
"task_id": current_task.id,
"reasoning": reasoning,
"findings": findings,
"summary": summary,
"recommendations": recommendations
})
agent_outputs["researcher"] = researcher_outputs
return self.update_state(state, {
"messages": new_messages,
"subtasks": updated_subtasks,
"current_agent": self.name,
"agent_outputs": agent_outputs,
"next": "review",
"iteration_count": state.get("iteration_count", 0) + 1,
})
def _build_context(self, state: AgentState) -> str:
"""构建研究上下文"""
parts = []
# 添加原始任务
parts.append(f"### 原始任务\n{state['original_task']}")
# 添加已完成的研究
agent_outputs = state.get("agent_outputs", {})
if "researcher" in agent_outputs:
prev_research = agent_outputs["researcher"]
if isinstance(prev_research, list) and prev_research:
parts.append("\n### 已完成的研究")
for r in prev_research[-3:]:
parts.append(f"- {r.get('summary', '')[:200]}")
return "\n".join(parts)
def _handle_error(
self,
state: AgentState,
task: Any,
error: str
) -> AgentState:
"""处理错误"""
error_log = list(state.get("error_log", []))
error_log.append(f"[Researcher] {task.id}: {error}")
# 标记任务失败
subtasks = list(state.get("subtasks", []))
current_idx = state.get("current_subtask_index", 0)
if current_idx < len(subtasks):
subtasks[current_idx].status = "failed"
subtasks[current_idx].retry_count += 1
return self.update_state(state, {
"subtasks": subtasks,
"error_log": error_log,
"next": "review",
"current_agent": self.name,
})
src/agents/coder.py
src/agents/coder.py (续)
"""
编码者智能体(Coder)
负责代码设计、编写和技术实现
"""
from typing import Any, Dict
from langchain_core.messages import AIMessage
from src.agents.base import BaseAgent
from src.graph.state import AgentState
from src.utils.logger import get_logger
logger = get_logger(__name__)
class CoderAgent(BaseAgent):
"""
编码者智能体
职责:
1. 代码设计和实现
2. 代码优化和重构
3. 技术文档编写
"""
name = "coder"
description = "代码编写与技术实现"
async def process(self, state: AgentState) -> AgentState:
"""
执行编码任务
Args:
state: 当前状态
Returns:
包含代码实现的新状态
"""
# 获取当前子任务
subtasks = state.get("subtasks", [])
current_idx = state.get("current_subtask_index", 0)
if current_idx >= len(subtasks):
logger.warning("[Coder] 没有待处理的子任务")
return self.update_state(state, {"next": "review"})
current_task = subtasks[current_idx]
logger.info(f"[Coder] 处理编码任务: {current_task.description[:100]}...")
# 构建上下文
context = self._build_context(state)
requirements = self._extract_requirements(state)
# 构建提示词
task_prompt = self.get_task_prompt(
task=current_task.description,
requirements=requirements,
context=context
)
# 调用LLM
messages = self.build_messages(state, task_prompt)
result = await self.invoke_llm(messages, parse_json=True)
if "error" in result:
return self._handle_error(state, current_task, result["error"])
# 处理编码结果
reasoning = result.get("reasoning", "")
code = result.get("code", "")
language = result.get("language", "python")
dependencies = result.get("dependencies", [])
usage = result.get("usage", "")
notes = result.get("notes", "")
logger.info(f"[Coder] 生成代码: {len(code)} 字符, 语言: {language}")
# 更新子任务状态
updated_subtasks = list(subtasks)
current_task.status = "completed"
current_task.result = code
updated_subtasks[current_idx] = current_task
# 记录输出
self.log_output({
"task_id": current_task.id,
"language": language,
"code_length": len(code),
"dependencies": dependencies
})
# 格式化代码输出
code_output = f"```{language}\n{code}\n```"
# 更新消息
new_messages = self.add_message(
state,
AIMessage(content=f"[Coder] 代码实现完成:\n\n{reasoning}\n\n{code_output}\n\n使用说明: {usage}")
)
# 更新agent输出
agent_outputs = dict(state.get("agent_outputs", {}))
coder_outputs = agent_outputs.get("coder", [])
if not isinstance(coder_outputs, list):
coder_outputs = []
coder_outputs.append({
"task_id": current_task.id,
"reasoning": reasoning,
"code": code,
"language": language,
"dependencies": dependencies,
"usage": usage,
"notes": notes
})
agent_outputs["coder"] = coder_outputs
return self.update_state(state, {
"messages": new_messages,
"subtasks": updated_subtasks,
"current_agent": self.name,
"agent_outputs": agent_outputs,
"next": "review",
"iteration_count": state.get("iteration_count", 0) + 1,
})
def _build_context(self, state: AgentState) -> str:
"""构建编码上下文"""
parts = []
# 添加原始任务
parts.append(f"### 原始任务\n{state['original_task']}")
# 添加研究结果(如果有)
agent_outputs = state.get("agent_outputs", {})
if "researcher" in agent_outputs:
research_data = agent_outputs["researcher"]
if isinstance(research_data, list) and research_data:
parts.append("\n### 相关研究")
for r in research_data[-2:]:
summary = r.get("summary", "")
if summary:
parts.append(f"- {summary[:300]}")
# 添加之前的代码(如果有)
if "coder" in agent_outputs:
prev_code = agent_outputs["coder"]
if isinstance(prev_code, list) and prev_code:
parts.append("\n### 已有代码")
for c in prev_code[-2:]:
code_snippet = c.get("code", "")[:500]
parts.append(f"```\n{code_snippet}\n```")
return "\n".join(parts)
def _extract_requirements(self, state: AgentState) -> str:
"""从状态中提取技术要求"""
requirements = []
# 从任务类型推断
task_type = state.get("task_type", "")
if task_type == "coding":
requirements.append("- 代码必须完整可运行")
requirements.append("- 包含错误处理")
requirements.append("- 遵循PEP8规范")
# 从协调者输出提取
agent_outputs = state.get("agent_outputs", {})
if "coordinator" in agent_outputs:
coord_output = agent_outputs["coordinator"]
if isinstance(coord_output, dict):
complexity = coord_output.get("complexity", "medium")
if complexity == "complex":
requirements.append("- 需要模块化设计")
requirements.append("- 添加详细注释")
return "\n".join(requirements) if requirements else "无特殊要求"
def _handle_error(
self,
state: AgentState,
task: Any,
error: str
) -> AgentState:
"""处理错误"""
error_log = list(state.get("error_log", []))
error_log.append(f"[Coder] {task.id}: {error}")
# 标记任务失败
subtasks = list(state.get("subtasks", []))
current_idx = state.get("current_subtask_index", 0)
if current_idx < len(subtasks):
subtasks[current_idx].status = "failed"
subtasks[current_idx].retry_count += 1
return self.update_state(state, {
"subtasks": subtasks,
"error_log": error_log,
"next": "review",
"current_agent": self.name,
})
src/agents/executor.py
"""
执行者智能体(Executor)
负责工具调用和具体操作执行
"""
import time
from typing import Any, Dict, List, Optional
from langchain_core.messages import AIMessage
from src.agents.base import BaseAgent
from src.graph.state import AgentState, ToolCallLog
from src.tools import get_tool, get_all_tools
from src.utils.logger import get_logger
logger = get_logger(__name__)
class ExecutorAgent(BaseAgent):
"""
执行者智能体
职责:
1. 调用各种工具完成具体操作
2. 执行代码和脚本
3. 处理文件操作
4. 返回执行结果
"""
name = "executor"
description = "工具调用与操作执行"
def __init__(self, **kwargs):
super().__init__(**kwargs)
# 初始化所有可用工具
self.tools = get_all_tools(self.settings)
self.tool_names = list(self.tools.keys())
async def process(self, state: AgentState) -> AgentState:
"""
执行任务
Args:
state: 当前状态
Returns:
包含执行结果的新状态
"""
# 获取当前子任务
subtasks = state.get("subtasks", [])
current_idx = state.get("current_subtask_index", 0)
if current_idx >= len(subtasks):
logger.warning("[Executor] 没有待处理的子任务")
return self.update_state(state, {"next": "review"})
current_task = subtasks[current_idx]
logger.info(f"[Executor] 执行任务: {current_task.description[:100]}...")
# 构建上下文
context = self._build_context(state)
available_tools = ", ".join(self.tool_names)
# 构建提示词
task_prompt = self.get_task_prompt(
task=current_task.description,
available_tools=available_tools,
context=context
)
# 调用LLM决定执行策略
messages = self.build_messages(state, task_prompt)
result = await self.invoke_llm(messages, parse_json=True)
if "error" in result:
return self._handle_error(state, current_task, result["error"])
# 解析执行计划
reasoning = result.get("reasoning", "")
tool_name = result.get("tool_used", "")
tool_input = result.get("input", {})
# 执行工具调用
execution_result = await self._execute_tool(
tool_name=tool_name,
tool_input=tool_input,
state=state
)
# 更新工具调用日志
tool_logs = list(state.get("tool_call_logs", []))
tool_logs.append(execution_result["log"])
# 更新子任务状态
updated_subtasks = list(subtasks)
if execution_result["success"]:
current_task.status = "completed"
current_task.result = str(execution_result["output"])
else:
current_task.status = "failed"
current_task.retry_count += 1
updated_subtasks[current_idx] = current_task
# 记录输出
self.log_output({
"task_id": current_task.id,
"tool_used": tool_name,
"success": execution_result["success"],
"output": str(execution_result["output"])[:200]
})
# 更新消息
status_text = "成功" if execution_result["success"] else "失败"
output_text = str(execution_result["output"])[:500]
new_messages = self.add_message(
state,
AIMessage(content=f"[Executor] 执行{status_text}:\n工具: {tool_name}\n结果: {output_text}")
)
# 更新agent输出
agent_outputs = dict(state.get("agent_outputs", {}))
executor_outputs = agent_outputs.get("executor", [])
if not isinstance(executor_outputs, list):
executor_outputs = []
executor_outputs.append({
"task_id": current_task.id,
"reasoning": reasoning,
"tool_used": tool_name,
"input": tool_input,
"output": execution_result["output"],
"success": execution_result["success"],
"error": execution_result.get("error")
})
agent_outputs["executor"] = executor_outputs
return self.update_state(state, {
"messages": new_messages,
"subtasks": updated_subtasks,
"tool_call_logs": tool_logs,
"current_agent": self.name,
"agent_outputs": agent_outputs,
"next": "review",
"iteration_count": state.get("iteration_count", 0) + 1,
})
async def _execute_tool(
self,
tool_name: str,
tool_input: Any,
state: AgentState
) -> Dict[str, Any]:
"""
执行工具调用
Args:
tool_name: 工具名称
tool_input: 工具输入
state: 当前状态
Returns:
执行结果字典
"""
start_time = time.time()
try:
# 获取工具
if tool_name not in self.tools:
# 如果工具不存在,尝试直接返回LLM的回答
logger.warning(f"[Executor] 工具 {tool_name} 不存在,使用默认处理")
return {
"success": True,
"output": f"任务已理解,但无需工具调用: {tool_input}",
"log": ToolCallLog(
tool_name=tool_name or "none",
input_args={"input": tool_input} if not isinstance(tool_input, dict) else tool_input,
output="无工具调用",
success=True,
execution_time=time.time() - start_time
)
}
tool = self.tools[tool_name]
# 执行工具
if isinstance(tool_input, dict):
output = await tool.aexecute(**tool_input)
else:
output = await tool.aexecute(input=tool_input)
execution_time = time.time() - start_time
return {
"success": True,
"output": output,
"log": ToolCallLog(
tool_name=tool_name,
input_args=tool_input if isinstance(tool_input, dict) else {"input": tool_input},
output=output,
success=True,
execution_time=execution_time
)
}
except Exception as e:
execution_time = time.time() - start_time
error_msg = str(e)
logger.error(f"[Executor] 工具执行失败: {error_msg}")
return {
"success": False,
"output": None,
"error": error_msg,
"log": ToolCallLog(
tool_name=tool_name or "unknown",
input_args=tool_input if isinstance(tool_input, dict) else {"input": tool_input},
output=None,
success=False,
error_message=error_msg,
execution_time=execution_time
)
}
def _build_context(self, state: AgentState) -> str:
"""构建执行上下文"""
parts = []
# 添加原始任务
parts.append(f"### 原始任务\n{state['original_task']}")
# 添加代码(如果有)
agent_outputs = state.get("agent_outputs", {})
if "coder" in agent_outputs:
code_data = agent_outputs["coder"]
if isinstance(code_data, list) and code_data:
parts.append("\n### 可用代码")
for c in code_data[-1:]:
code = c.get("code", "")
if code:
parts.append(f"```python\n{code[:1000]}\n```")
# 添加之前的执行结果
if "executor" in agent_outputs:
prev_exec = agent_outputs["executor"]
if isinstance(prev_exec, list) and prev_exec:
parts.append("\n### 之前的执行")
for e in prev_exec[-3:]:
tool = e.get("tool_used", "unknown")
success = "✓" if e.get("success") else "✗"
parts.append(f"- {tool}: {success}")
return "\n".join(parts)
def _handle_error(
self,
state: AgentState,
task: Any,
error: str
) -> AgentState:
"""处理错误"""
error_log = list(state.get("error_log", []))
error_log.append(f"[Executor] {task.id}: {error}")
# 标记任务失败
subtasks = list(state.get("subtasks", []))
current_idx = state.get("current_subtask_index", 0)
if current_idx < len(subtasks):
subtasks[current_idx].status = "failed"
subtasks[current_idx].retry_count += 1
return self.update_state(state, {
"subtasks": subtasks,
"error_log": error_log,
"next": "review",
"current_agent": self.name,
})
src/agents/critic.py
"""
审核者智能体(Critic)
负责质量检查和改进建议
"""
from typing import Any, Dict, List
from langchain_core.messages import AIMessage
from src.agents.base import BaseAgent
from src.graph.state import AgentState
from src.utils.logger import get_logger
logger = get_logger(__name__)
class CriticAgent(BaseAgent):
"""
审核者智能体
职责:
1. 检查输出质量
2. 发现问题和错误
3. 提出改进建议
4. 决定是否需要修改
"""
name = "critic"
description = "质量审核与改进建议"
# 质量阈值
QUALITY_THRESHOLD = 7
MAX_RETRIES = 3
async def process(self, state: AgentState) -> AgentState:
"""
执行质量审核
Args:
state: 当前状态
Returns:
包含审核结果的新状态
"""
logger.info("[Critic] 开始质量审核...")
# 获取当前子任务
subtasks = state.get("subtasks", [])
current_idx = state.get("current_subtask_index", 0)
if current_idx >= len(subtasks):
logger.info("[Critic] 没有待审核的子任务,跳过")
return self.update_state(state, {"next": "check_complete"})
current_task = subtasks[current_idx]
# 获取待审核内容
content = self._get_review_content(state, current_task)
criteria = self._get_review_criteria(state)
# 构建提示词
task_prompt = self.get_task_prompt(
task=current_task.description,
content=content,
criteria=criteria
)
# 调用LLM
messages = self.build_messages(state, task_prompt)
result = await self.invoke_llm(messages, parse_json=True)
if "error" in result:
return self._handle_error(state, result["error"])
# 解析审核结果
reasoning = result.get("reasoning", "")
quality_score = result.get("quality_score", 5)
issues = result.get("issues", [])
passed = result.get("passed", False)
overall_feedback = result.get("overall_feedback", "")
logger.info(f"[Critic] 质量评分: {quality_score}/10, 通过: {passed}")
# 记录反思笔记
reflection_notes = list(state.get("reflection_notes", []))
if issues:
for issue in issues:
note = f"[{current_task.id}] {issue.get('type', 'issue')}: {issue.get('description', '')}"
reflection_notes.append(note)
# 决定下一步
if passed or quality_score >= self.QUALITY_THRESHOLD:
next_node = "check_complete"
decision = "passed"
elif current_task.retry_count >= self.MAX_RETRIES:
# 超过最大重试次数,强制通过
next_node = "check_complete"
decision = "forced_pass"
reflection_notes.append(f"[{current_task.id}] 达到最大重试次数,强制通过")
else:
# 需要改进
next_node = "route_agent"
decision = "needs_improvement"
# 标记任务为pending以便重新执行
current_task.status = "pending"
# 更新子任务
updated_subtasks = list(subtasks)
updated_subtasks[current_idx] = current_task
# 记录输出
self.log_output({
"task_id": current_task.id,
"quality_score": quality_score,
"passed": passed,
"decision": decision,
"issues_count": len(issues)
})
# 更新消息
new_messages = self.add_message(
state,
AIMessage(content=f"[Critic] 审核完成:\n评分: {quality_score}/10\n决定: {decision}\n反馈: {overall_feedback}")
)
# 更新agent输出
agent_outputs = dict(state.get("agent_outputs", {}))
critic_outputs = agent_outputs.get("critic", [])
if not isinstance(critic_outputs, list):
critic_outputs = []
critic_outputs.append({
"task_id": current_task.id,
"reasoning": reasoning,
"quality_score": quality_score,
"issues": issues,
"passed": passed,
"decision": decision,
"overall_feedback": overall_feedback
})
agent_outputs["critic"] = critic_outputs
return self.update_state(state, {
"messages": new_messages,
"subtasks": updated_subtasks,
"reflection_notes": reflection_notes,
"current_agent": self.name,
"agent_outputs": agent_outputs,
"next": next_node,
"iteration_count": state.get("iteration_count", 0) + 1,
})
def _get_review_content(self, state: AgentState, task: Any) -> str:
"""获取待审核的内容"""
parts = []
# 任务结果
if task.result:
parts.append(f"### 任务结果\n{task.result}")
# 相关智能体输出
agent_outputs = state.get("agent_outputs", {})
# 检查编码输出
if "coder" in agent_outputs:
coder_data = agent_outputs["coder"]
if isinstance(coder_data, list):
for c in coder_data:
if c.get("task_id") == task.id:
code = c.get("code", "")
if code:
parts.append(f"\n### 代码\n```python\n{code}\n```")
# 检查执行输出
if "executor" in agent_outputs:
exec_data = agent_outputs["executor"]
if isinstance(exec_data, list):
for e in exec_data:
if e.get("task_id") == task.id:
output = e.get("output", "")
parts.append(f"\n### 执行结果\n{output}")
return "\n".join(parts) if parts else "无内容可审核"
def _get_review_criteria(self, state: AgentState) -> str:
"""获取审核标准"""
criteria = []
task_type = state.get("task_type", "")
if task_type == "coding":
criteria.extend([
"1. 代码是否完整可运行",
"2. 是否有适当的错误处理",
"3. 代码风格是否规范",
"4. 是否满足功能需求"
])
elif task_type == "research":
criteria.extend([
"1. 信息是否准确可靠",
"2. 是否覆盖主要方面",
"3. 结论是否有依据",
"4. 是否有实用价值"
])
else:
criteria.extend([
"1. 是否满足任务要求",
"2. 输出是否完整",
"3. 质量是否达标",
"4. 是否有明显错误"
])
return "\n".join(criteria)
def _handle_error(self, state: AgentState, error: str) -> AgentState:
"""处理错误"""
error_log = list(state.get("error_log", []))
error_log.append(f"[Critic] {error}")
# 错误时跳过审核,继续下一步
return self.update_state(state, {
"error_log": error_log,
"next": "check_complete",
"current_agent": self.name,
})
更多推荐


所有评论(0)