穿透AI Agent五大范式:原理、源码与工程实践
AI Agent技术正在重塑人机交互的边界。与传统聊天机器人不同,Agent具备自主决策、工具调用和任务执行的闭环能力。本文将深入剖析五大核心范式,从原理到源码,从理论到实践,为读者提供系统化的技术指南。本文深入解析了AI Agent的五大核心范式,从理论原理到源代码实现,再到工程实践,为读者提供了完整的技术指南。关键要点总结范式选择原则:根据任务复杂度、准确度要求和成本预算动态选择工程实践建议:
·
穿透AI Agent五大范式:原理、源码与工程实践
本文深入解析AI Agent的五大核心范式,包含原理分析、核心源码实现和工程实践经验,帮助读者建立从理论到实践的完整认知体系。
目录
概述
AI Agent技术正在重塑人机交互的边界。与传统聊天机器人不同,Agent具备自主决策、工具调用和任务执行的闭环能力。本文将深入剖析五大核心范式,从原理到源码,从理论到实践,为读者提供系统化的技术指南。
核心原理深度解析
1.1 六步通用闭环原理
所有Agent架构都基于一个核心认知:六步通用闭环。
class AgentLoop:
def __init__(self):
self.max_iterations = 8 # 防止无限循环
self.current_iteration = 0
def run(self, task):
"""六步通用闭环"""
# 1. 理解任务
parsed_task = self.parse_task(task)
# 2. 判断策略
strategy = self.determine_strategy(parsed_task)
while self.current_iteration < self.max_iterations:
# 3. 调用工具
tool_result = self.call_tool(strategy)
# 4. 读取反馈
feedback = self.process_feedback(tool_result)
# 5. 持续推进
if self.is_task_complete(feedback):
break
strategy = self.adjust_strategy(strategy, feedback)
self.current_iteration += 1
# 6. 输出结果
return self.format_output(feedback)
1.2 范式本质对比
| 维度 | ReAct | Plan-Exec | Reflect | Harness | 混合模式 |
|---|---|---|---|---|---|
| 核心定位 | 推理与行动循环 | 任务编排与解耦 | 质量后置校验 | 全流程工程治理 | 场景自适应 |
| 决策主体 | LLM逐轮决策 | Planner+Executor分离 | 叠加于主范式 | 中间件责任链 | 路由判断+动态选择 |
| 规划方式 | 无全局规划 | 前置全局规划 | 不独立规划 | 中间件pipeline | 按复杂度动态切换 |
| 质量保障 | 靠人工检查 | 靠步骤校验 | LLM自检+迭代修正 | Middleware约束 | Reflect层统一兜底 |
ReAct范式源码实现
2.1 核心架构
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
@dataclass
class Action:
name: str
args: Dict[str, Any]
@dataclass
class Observation:
content: Any
status: str # success, error, timeout
class ReActAgent:
def __init__(self, llm, tools: List[Any], max_iterations: int = 6):
self.llm = llm
self.tools = {tool.name: tool for tool in tools}
self.max_iterations = max_iterations
self.history = []
def think(self, query: str, context: List[Dict]) -> str:
"""思考下一步行动"""
prompt = self._build_think_prompt(query, context)
return self.llm.generate(prompt)
def act(self, action: Action) -> Observation:
"""执行行动"""
if action.name not in self.tools:
return Observation(
content=f"Tool {action.name} not found",
status="error"
)
try:
tool = self.tools[action.name]
result = tool.run(**action.args)
return Observation(content=result, status="success")
except Exception as e:
return Observation(content=str(e), status="error")
def run(self, query: str) -> str:
"""ReAct主循环"""
context = []
for i in range(self.max_iterations):
# 思考
thought = self.think(query, context)
context.append({"role": "thought", "content": thought})
# 解析行动
action = self._parse_action(thought)
if not action:
break
# 执行行动
observation = self.act(action)
context.append({
"role": "observation",
"content": observation.content,
"status": observation.status
})
# 检查是否完成
if self._is_complete(thought, observation):
break
return self._generate_final_answer(query, context)
2.2 工具调用解析器
import json
import re
class ActionParser:
@staticmethod
def parse(text: str) -> Optional[Action]:
"""从LLM输出中解析行动"""
# 匹配格式: Action: tool_name{"arg1": "value1", "arg2": "value2"}
pattern = r'Action:\s*(\w+)\s*\{([^}]*)\}'
match = re.search(pattern, text)
if match:
tool_name = match.group(1)
args_str = match.group(2)
try:
args = json.loads(f'{{{args_str}}}') if args_str else {}
return Action(name=tool_name, args=args)
except:
return None
return None
Plan-Exec范式源码实现
3.1 规划器设计
from typing import List, Dict
from dataclasses import dataclass, field
@dataclass
class PlanStep:
id: str
description: str
dependencies: List[str] = field(default_factory=list)
tool: str = ""
args: Dict = field(default_factory=dict)
@dataclass
class ExecutionPlan:
steps: List[PlanStep]
context: Dict = field(default_factory=dict)
class Planner:
def __init__(self, llm):
self.llm = llm
def plan(self, task: str) -> ExecutionPlan:
"""生成执行计划"""
prompt = f"""
请为以下任务生成详细的执行计划:
任务:{task}
要求:
1. 将任务分解为多个步骤
2. 每个步骤明确依赖关系
3. 指定每个步骤使用的工具和参数
4. 以JSON格式返回
返回格式:
{{
"steps": [
{{
"id": "步骤ID",
"description": "步骤描述",
"dependencies": ["依赖步骤ID"],
"tool": "工具名称",
"args": {{"参数": "值"}}
}}
]
}}
"""
response = self.llm.generate(prompt)
return self._parse_plan(response)
class Executor:
def __init__(self, tools: List[Any]):
self.tools = {tool.name: tool for tool in tools}
def execute_step(self, step: PlanStep, context: Dict) -> Any:
"""执行单个步骤"""
if step.tool not in self.tools:
raise ValueError(f"Tool {step.tool} not found")
tool = self.tools[step.tool]
# 渲染参数(支持模板变量)
rendered_args = self._render_args(step.args, context)
return tool.run(**rendered_args)
def _render_args(self, args: Dict, context: Dict) -> Dict:
"""渲染参数模板"""
import jinja2
template = jinja2.Template(json.dumps(args))
rendered = template.render(**context)
return json.loads(rendered)
class PlanExecAgent:
def __init__(self, planner: Planner, executor: Executor):
self.planner = planner
self.executor = executor
def run(self, task: str) -> str:
"""执行Plan-Exec主流程"""
# 1. 规划阶段
plan = self.planner.plan(task)
# 2. 执行阶段
context = {}
completed_steps = set()
while len(completed_steps) < len(plan.steps):
# 找到可执行的步骤(依赖已满足)
ready_steps = [
step for step in plan.steps
if step.id not in completed_steps
and all(dep in completed_steps for dep in step.dependencies)
]
for step in ready_steps:
try:
result = self.executor.execute_step(step, context)
context[step.id] = result
completed_steps.add(step.id)
except Exception as e:
# 错误处理:重试或重新规划
return self._handle_execution_error(step, str(e))
# 3. 整合结果
return self._consolidate_results(context, plan)
Reflect范式源码实现
4.1 反思机制设计
@dataclass
class ReflectionResult:
is_correct: bool
issues: List[str]
suggestions: List[str]
confidence: float
class Reflector:
def __init__(self, llm):
self.llm = llm
def reflect(self, task: str, result: str) -> ReflectionResult:
"""对结果进行反思"""
prompt = f"""
请对以下任务执行结果进行质量检查和反思:
任务:{task}
结果:{result}
请从以下维度进行评估:
1. 事实准确性
2. 逻辑完整性
3. 格式规范性
4. 任务完成度
返回JSON格式:
{{
"is_correct": 布尔值,
"issues": ["问题列表"],
"suggestions": ["改进建议"],
"confidence": 0.0-1.0
}}
"""
response = self.llm.generate(prompt)
return self._parse_reflection(response)
class ReflectiveAgent:
def __init__(self, base_agent, reflector: Reflector, max_reflections: int = 3):
self.base_agent = base_agent
self.reflector = reflector
self.max_reflections = max_reflections
def run(self, task: str) -> str:
"""带反思的Agent执行"""
result = self.base_agent.run(task)
for i in range(self.max_reflections):
reflection = self.reflector.reflect(task, result)
if reflection.is_correct or reflection.confidence > 0.9:
break
# 基于反思结果改进
result = self._improve_result(task, result, reflection)
return result
def _improve_result(self, task: str, current_result: str, reflection: ReflectionResult) -> str:
"""基于反思改进结果"""
prompt = f"""
请基于以下反思意见改进执行结果:
原始任务:{task}
当前结果:{current_result}
发现问题:{reflection.issues}
改进建议:{reflection.suggestions}
请提供改进后的结果:
"""
return self.reflector.llm.generate(prompt)
Harness范式源码实现
5.1 中间件责任链设计
from abc import ABC, abstractmethod
from typing import Any, Dict, Callable
import time
class Middleware(ABC):
def __init__(self, name: str):
self.name = name
@abstractmethod
def process(self, context: Dict, next_fn: Callable) -> Any:
pass
class LoggingMiddleware(Middleware):
"""日志中间件"""
def process(self, context: Dict, next_fn: Callable) -> Any:
print(f"[{self.name}] 开始处理: {context.get('task', 'unknown')}")
start_time = time.time()
try:
result = next_fn(context)
print(f"[{self.name}] 处理成功,耗时: {time.time() - start_time:.2f}s")
return result
except Exception as e:
print(f"[{self.name}] 处理失败: {str(e)}")
raise
class BudgetMiddleware(Middleware):
"""预算控制中间件"""
def __init__(self, name: str, max_tokens: int = 10000):
super().__init__(name)
self.max_tokens = max_tokens
def process(self, context: Dict, next_fn: Callable) -> Any:
# 检查Token消耗
token_used = context.get('token_used', 0)
if token_used > self.max_tokens:
raise Exception(f"Token预算超出限制: {token_used}/{self.max_tokens}")
result = next_fn(context)
# 更新Token消耗
if isinstance(result, dict) and 'token_used' in result:
context['token_used'] = token_used + result['token_used']
return result
class QualityCheckMiddleware(Middleware):
"""质量检查中间件"""
def __init__(self, name: str, min_score: float = 0.8):
super().__init__(name)
self.min_score = min_score
def process(self, context: Dict, next_fn: Callable) -> Any:
result = next_fn(context)
# 质量评分
score = self._evaluate_quality(result)
if score < self.min_score:
# 触发重试机制
context['retry_count'] = context.get('retry_count', 0) + 1
if context['retry_count'] <= 3:
print(f"[{self.name}] 质量不达标({score:.2f}),第{context['retry_count']}次重试")
return self.process(context, next_fn)
else:
raise Exception("质量检查失败,已达到最大重试次数")
return result
def _evaluate_quality(self, result: Any) -> float:
"""质量评估逻辑"""
# 这里可以实现具体的质量评估算法
return 0.9 # 示例
class MiddlewarePipeline:
"""中间件流水线"""
def __init__(self):
self.middlewares = []
def add(self, middleware: Middleware):
"""添加中间件"""
self.middlewares.append(middleware)
def execute(self, context: Dict, final_fn: Callable) -> Any:
"""执行流水线"""
def build_chain(index: int) -> Callable:
if index >= len(self.middlewares):
return final_fn
middleware = self.middlewares[index]
next_fn = build_chain(index + 1)
return lambda ctx: middleware.process(ctx, next_fn)
chain = build_chain(0)
return chain(context)
# 使用示例
class HarnessAgent:
def __init__(self, base_agent):
self.base_agent = base_agent
self.pipeline = MiddlewarePipeline()
# 添加中间件
self.pipeline.add(LoggingMiddleware("logger"))
self.pipeline.add(BudgetMiddleware("budget", max_tokens=5000))
self.pipeline.add(QualityCheckMiddleware("quality", min_score=0.85))
def run(self, task: str) -> str:
"""通过Harness执行Agent"""
context = {"task": task}
def final_execution(ctx: Dict) -> str:
return self.base_agent.run(ctx["task"])
return self.pipeline.execute(context, final_execution)
混合范式架构设计
6.1 智能路由设计
from enum import Enum
from dataclasses import dataclass
class TaskComplexity(Enum):
SIMPLE = "simple"
COMPLEX = "complex"
CRITICAL = "critical"
@dataclass
class TaskProfile:
complexity: TaskComplexity
required_accuracy: float
time_constraint: int # 秒
token_budget: int
class ComplexityAnalyzer:
"""任务复杂度分析器"""
def analyze(self, task: str) -> TaskProfile:
"""分析任务特征"""
# 基于关键词、长度、领域等特征判断复杂度
complexity_score = self._calculate_complexity(task)
if complexity_score < 3:
complexity = TaskComplexity.SIMPLE
elif complexity_score < 7:
complexity = TaskComplexity.COMPLEX
else:
complexity = TaskComplexity.CRITICAL
return TaskProfile(
complexity=complexity,
required_accuracy=0.95 if complexity == TaskComplexity.CRITICAL else 0.8,
time_constraint=30 if complexity == TaskComplexity.SIMPLE else 300,
token_budget=1000 if complexity == TaskComplexity.SIMPLE else 10000
)
def _calculate_complexity(self, task: str) -> float:
"""计算复杂度分数"""
# 示例实现
factors = [
len(task) / 100, # 长度因子
task.count("和") + task.count("与"), # 并列关系因子
task.count("分析") + task.count("报告") + task.count("研究"), # 专业度因子
]
return sum(factors)
class HybridAgent:
"""混合范式Agent"""
def __init__(self,
react_agent: ReActAgent,
plan_exec_agent: PlanExecAgent,
reflective_agent: ReflectiveAgent,
harness_agent: HarnessAgent):
self.analyzer = ComplexityAnalyzer()
self.agents = {
TaskComplexity.SIMPLE: react_agent,
TaskComplexity.COMPLEX: plan_exec_agent,
TaskComplexity.CRITICAL: reflective_agent
}
self.harness_agent = harness_agent
def run(self, task: str) -> str:
"""智能路由执行"""
# 1. 分析任务复杂度
profile = self.analyzer.analyze(task)
print(f"任务分析结果: 复杂度={profile.complexity.value}, "
f"准确度要求={profile.required_accuracy}")
# 2. 选择合适Agent
selected_agent = self.agents[profile.complexity]
# 3. 通过Harness执行(统一质量管控)
return self.harness_agent.run(task)
性能优化与最佳实践
7.1 Token优化策略
class TokenOptimizer:
"""Token使用优化器"""
def __init__(self, llm):
self.llm = llm
def optimize_prompt(self, prompt: str, max_tokens: int) -> str:
"""优化Prompt长度"""
# 摘要长prompt
if self._estimate_tokens(prompt) > max_tokens * 0.8:
summary = self._summarize_context(prompt, max_tokens // 2)
return summary
return prompt
def _estimate_tokens(self, text: str) -> int:
"""估算Token数量"""
# 简单估算:中文≈2字符/Token,英文≈4字符/Token
return len(text.encode('utf-8')) // 3
def _summarize_context(self, context: str, target_tokens: int) -> str:
"""摘要上下文"""
prompt = f"请将以下内容摘要为{target_tokens}个Token以内:\n\n{context}"
return self.llm.generate(prompt)
7.2 缓存机制
import hashlib
import time
from functools import wraps
class CacheManager:
"""结果缓存管理器"""
def __init__(self, ttl: int = 3600): # 1小时TTL
self.cache = {}
self.ttl = ttl
def get_key(self, *args, **kwargs) -> str:
"""生成缓存键"""
content = str(args) + str(sorted(kwargs.items()))
return hashlib.md5(content.encode()).hexdigest()
def get(self, key: str) -> Any:
"""获取缓存"""
if key in self.cache:
value, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl:
return value
else:
del self.cache[key]
return None
def set(self, key: str, value: Any):
"""设置缓存"""
self.cache[key] = (value, time.time())
def cached(cache_manager: CacheManager):
"""缓存装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = cache_manager.get_key(*args, **kwargs)
cached_result = cache_manager.get(cache_key)
if cached_result is not None:
print(f"缓存命中: {cache_key[:8]}...")
return cached_result
result = func(*args, **kwargs)
cache_manager.set(cache_key, result)
return result
return wrapper
return decorator
7.3 监控与可观测性
import logging
from datetime import datetime
from dataclasses import dataclass
@dataclass
class AgentMetrics:
task_id: str
start_time: datetime
end_time: datetime
tokens_used: int
steps_taken: int
success: bool
error_message: str = ""
class AgentMonitor:
"""Agent监控器"""
def __init__(self):
self.logger = logging.getLogger("AgentMonitor")
self.metrics = []
def start_task(self, task_id: str) -> str:
"""开始任务监控"""
self.current_task = task_id
self.start_time = datetime.now()
return task_id
def record_step(self, step_name: str, tokens: int = 0):
"""记录步骤"""
self.logger.info(f"[{self.current_task}] 步骤: {step_name}, Tokens: {tokens}")
def end_task(self, success: bool = True, error: str = ""):
"""结束任务监控"""
end_time = datetime.now()
duration = (end_time - self.start_time).total_seconds()
metric = AgentMetrics(
task_id=self.current_task,
start_time=self.start_time,
end_time=end_time,
tokens_used=getattr(self, 'tokens_used', 0),
steps_taken=getattr(self, 'steps_taken', 0),
success=success,
error_message=error
)
self.metrics.append(metric)
self.logger.info(f"任务完成: {self.current_task}, "
f"耗时: {duration:.2f}s, "
f"成功: {success}")
总结
本文深入解析了AI Agent的五大核心范式,从理论原理到源代码实现,再到工程实践,为读者提供了完整的技术指南。
关键要点总结:
- 范式选择原则:根据任务复杂度、准确度要求和成本预算动态选择
- 工程实践建议:从ReAct入门,逐步过渡到混合架构,最后用Harness治理
- 性能优化方向:Token优化、缓存机制、监控可观测性
- 生产落地路径:LangChain快速原型 → LangGraph生产部署 → Harness质量管控
通过系统化的架构设计和工程化实践,我们可以构建出既灵活又可靠的AI Agent系统,真正发挥大语言模型的潜力。
更多推荐


所有评论(0)