穿透AI Agent五大范式:原理、源码与工程实践

本文深入解析AI Agent的五大核心范式,包含原理分析、核心源码实现和工程实践经验,帮助读者建立从理论到实践的完整认知体系。

目录

  1. 概述
  2. 核心原理深度解析
  3. ReAct范式源码实现
  4. Plan-Exec范式源码实现
  5. Reflect范式源码实现
  6. Harness范式源码实现
  7. 混合范式架构设计
  8. 性能优化与最佳实践
  9. 总结

概述

AI Agent技术正在重塑人机交互的边界。与传统聊天机器人不同,Agent具备自主决策、工具调用和任务执行的闭环能力。本文将深入剖析五大核心范式,从原理到源码,从理论到实践,为读者提供系统化的技术指南。

核心原理深度解析

1.1 六步通用闭环原理

所有Agent架构都基于一个核心认知:六步通用闭环。

class AgentLoop:
    def __init__(self):
        self.max_iterations = 8  # 防止无限循环
        self.current_iteration = 0
        
    def run(self, task):
        """六步通用闭环"""
        # 1. 理解任务
        parsed_task = self.parse_task(task)
        
        # 2. 判断策略
        strategy = self.determine_strategy(parsed_task)
        
        while self.current_iteration < self.max_iterations:
            # 3. 调用工具
            tool_result = self.call_tool(strategy)
            
            # 4. 读取反馈
            feedback = self.process_feedback(tool_result)
            
            # 5. 持续推进
            if self.is_task_complete(feedback):
                break
                
            strategy = self.adjust_strategy(strategy, feedback)
            self.current_iteration += 1
        
        # 6. 输出结果
        return self.format_output(feedback)

1.2 范式本质对比

维度 ReAct Plan-Exec Reflect Harness 混合模式
核心定位 推理与行动循环 任务编排与解耦 质量后置校验 全流程工程治理 场景自适应
决策主体 LLM逐轮决策 Planner+Executor分离 叠加于主范式 中间件责任链 路由判断+动态选择
规划方式 无全局规划 前置全局规划 不独立规划 中间件pipeline 按复杂度动态切换
质量保障 靠人工检查 靠步骤校验 LLM自检+迭代修正 Middleware约束 Reflect层统一兜底

ReAct范式源码实现

2.1 核心架构

from typing import List, Dict, Any, Optional
from dataclasses import dataclass

@dataclass
class Action:
    name: str
    args: Dict[str, Any]
    
@dataclass
class Observation:
    content: Any
    status: str  # success, error, timeout

class ReActAgent:
    def __init__(self, llm, tools: List[Any], max_iterations: int = 6):
        self.llm = llm
        self.tools = {tool.name: tool for tool in tools}
        self.max_iterations = max_iterations
        self.history = []
        
    def think(self, query: str, context: List[Dict]) -> str:
        """思考下一步行动"""
        prompt = self._build_think_prompt(query, context)
        return self.llm.generate(prompt)
        
    def act(self, action: Action) -> Observation:
        """执行行动"""
        if action.name not in self.tools:
            return Observation(
                content=f"Tool {action.name} not found", 
                status="error"
            )
        
        try:
            tool = self.tools[action.name]
            result = tool.run(**action.args)
            return Observation(content=result, status="success")
        except Exception as e:
            return Observation(content=str(e), status="error")
    
    def run(self, query: str) -> str:
        """ReAct主循环"""
        context = []
        
        for i in range(self.max_iterations):
            # 思考
            thought = self.think(query, context)
            context.append({"role": "thought", "content": thought})
            
            # 解析行动
            action = self._parse_action(thought)
            if not action:
                break
                
            # 执行行动
            observation = self.act(action)
            context.append({
                "role": "observation", 
                "content": observation.content,
                "status": observation.status
            })
            
            # 检查是否完成
            if self._is_complete(thought, observation):
                break
        
        return self._generate_final_answer(query, context)

2.2 工具调用解析器

import json
import re

class ActionParser:
    @staticmethod
    def parse(text: str) -> Optional[Action]:
        """从LLM输出中解析行动"""
        # 匹配格式: Action: tool_name{"arg1": "value1", "arg2": "value2"}
        pattern = r'Action:\s*(\w+)\s*\{([^}]*)\}'
        match = re.search(pattern, text)
        
        if match:
            tool_name = match.group(1)
            args_str = match.group(2)
            try:
                args = json.loads(f'{{{args_str}}}') if args_str else {}
                return Action(name=tool_name, args=args)
            except:
                return None
        return None

Plan-Exec范式源码实现

3.1 规划器设计

from typing import List, Dict
from dataclasses import dataclass, field

@dataclass
class PlanStep:
    id: str
    description: str
    dependencies: List[str] = field(default_factory=list)
    tool: str = ""
    args: Dict = field(default_factory=dict)
    
@dataclass
class ExecutionPlan:
    steps: List[PlanStep]
    context: Dict = field(default_factory=dict)

class Planner:
    def __init__(self, llm):
        self.llm = llm
        
    def plan(self, task: str) -> ExecutionPlan:
        """生成执行计划"""
        prompt = f"""
        请为以下任务生成详细的执行计划:
        任务:{task}
        
        要求:
        1. 将任务分解为多个步骤
        2. 每个步骤明确依赖关系
        3. 指定每个步骤使用的工具和参数
        4. 以JSON格式返回
        
        返回格式:
        {{
            "steps": [
                {{
                    "id": "步骤ID",
                    "description": "步骤描述",
                    "dependencies": ["依赖步骤ID"],
                    "tool": "工具名称",
                    "args": {{"参数": "值"}}
                }}
            ]
        }}
        """
        
        response = self.llm.generate(prompt)
        return self._parse_plan(response)

class Executor:
    def __init__(self, tools: List[Any]):
        self.tools = {tool.name: tool for tool in tools}
        
    def execute_step(self, step: PlanStep, context: Dict) -> Any:
        """执行单个步骤"""
        if step.tool not in self.tools:
            raise ValueError(f"Tool {step.tool} not found")
            
        tool = self.tools[step.tool]
        # 渲染参数(支持模板变量)
        rendered_args = self._render_args(step.args, context)
        return tool.run(**rendered_args)
        
    def _render_args(self, args: Dict, context: Dict) -> Dict:
        """渲染参数模板"""
        import jinja2
        template = jinja2.Template(json.dumps(args))
        rendered = template.render(**context)
        return json.loads(rendered)

class PlanExecAgent:
    def __init__(self, planner: Planner, executor: Executor):
        self.planner = planner
        self.executor = executor
        
    def run(self, task: str) -> str:
        """执行Plan-Exec主流程"""
        # 1. 规划阶段
        plan = self.planner.plan(task)
        
        # 2. 执行阶段
        context = {}
        completed_steps = set()
        
        while len(completed_steps) < len(plan.steps):
            # 找到可执行的步骤(依赖已满足)
            ready_steps = [
                step for step in plan.steps 
                if step.id not in completed_steps 
                and all(dep in completed_steps for dep in step.dependencies)
            ]
            
            for step in ready_steps:
                try:
                    result = self.executor.execute_step(step, context)
                    context[step.id] = result
                    completed_steps.add(step.id)
                except Exception as e:
                    # 错误处理:重试或重新规划
                    return self._handle_execution_error(step, str(e))
        
        # 3. 整合结果
        return self._consolidate_results(context, plan)

Reflect范式源码实现

4.1 反思机制设计

@dataclass
class ReflectionResult:
    is_correct: bool
    issues: List[str]
    suggestions: List[str]
    confidence: float

class Reflector:
    def __init__(self, llm):
        self.llm = llm
        
    def reflect(self, task: str, result: str) -> ReflectionResult:
        """对结果进行反思"""
        prompt = f"""
        请对以下任务执行结果进行质量检查和反思:
        
        任务:{task}
        结果:{result}
        
        请从以下维度进行评估:
        1. 事实准确性
        2. 逻辑完整性
        3. 格式规范性
        4. 任务完成度
        
        返回JSON格式:
        {{
            "is_correct": 布尔值,
            "issues": ["问题列表"],
            "suggestions": ["改进建议"],
            "confidence": 0.0-1.0
        }}
        """
        
        response = self.llm.generate(prompt)
        return self._parse_reflection(response)

class ReflectiveAgent:
    def __init__(self, base_agent, reflector: Reflector, max_reflections: int = 3):
        self.base_agent = base_agent
        self.reflector = reflector
        self.max_reflections = max_reflections
        
    def run(self, task: str) -> str:
        """带反思的Agent执行"""
        result = self.base_agent.run(task)
        
        for i in range(self.max_reflections):
            reflection = self.reflector.reflect(task, result)
            
            if reflection.is_correct or reflection.confidence > 0.9:
                break
                
            # 基于反思结果改进
            result = self._improve_result(task, result, reflection)
            
        return result
        
    def _improve_result(self, task: str, current_result: str, reflection: ReflectionResult) -> str:
        """基于反思改进结果"""
        prompt = f"""
        请基于以下反思意见改进执行结果:
        
        原始任务:{task}
        当前结果:{current_result}
        发现问题:{reflection.issues}
        改进建议:{reflection.suggestions}
        
        请提供改进后的结果:
        """
        
        return self.reflector.llm.generate(prompt)

Harness范式源码实现

5.1 中间件责任链设计

from abc import ABC, abstractmethod
from typing import Any, Dict, Callable
import time

class Middleware(ABC):
    def __init__(self, name: str):
        self.name = name
        
    @abstractmethod
    def process(self, context: Dict, next_fn: Callable) -> Any:
        pass

class LoggingMiddleware(Middleware):
    """日志中间件"""
    def process(self, context: Dict, next_fn: Callable) -> Any:
        print(f"[{self.name}] 开始处理: {context.get('task', 'unknown')}")
        start_time = time.time()
        
        try:
            result = next_fn(context)
            print(f"[{self.name}] 处理成功,耗时: {time.time() - start_time:.2f}s")
            return result
        except Exception as e:
            print(f"[{self.name}] 处理失败: {str(e)}")
            raise

class BudgetMiddleware(Middleware):
    """预算控制中间件"""
    def __init__(self, name: str, max_tokens: int = 10000):
        super().__init__(name)
        self.max_tokens = max_tokens
        
    def process(self, context: Dict, next_fn: Callable) -> Any:
        # 检查Token消耗
        token_used = context.get('token_used', 0)
        if token_used > self.max_tokens:
            raise Exception(f"Token预算超出限制: {token_used}/{self.max_tokens}")
            
        result = next_fn(context)
        
        # 更新Token消耗
        if isinstance(result, dict) and 'token_used' in result:
            context['token_used'] = token_used + result['token_used']
            
        return result

class QualityCheckMiddleware(Middleware):
    """质量检查中间件"""
    def __init__(self, name: str, min_score: float = 0.8):
        super().__init__(name)
        self.min_score = min_score
        
    def process(self, context: Dict, next_fn: Callable) -> Any:
        result = next_fn(context)
        
        # 质量评分
        score = self._evaluate_quality(result)
        if score < self.min_score:
            # 触发重试机制
            context['retry_count'] = context.get('retry_count', 0) + 1
            if context['retry_count'] <= 3:
                print(f"[{self.name}] 质量不达标({score:.2f}),第{context['retry_count']}次重试")
                return self.process(context, next_fn)
            else:
                raise Exception("质量检查失败,已达到最大重试次数")
                
        return result
        
    def _evaluate_quality(self, result: Any) -> float:
        """质量评估逻辑"""
        # 这里可以实现具体的质量评估算法
        return 0.9  # 示例

class MiddlewarePipeline:
    """中间件流水线"""
    def __init__(self):
        self.middlewares = []
        
    def add(self, middleware: Middleware):
        """添加中间件"""
        self.middlewares.append(middleware)
        
    def execute(self, context: Dict, final_fn: Callable) -> Any:
        """执行流水线"""
        def build_chain(index: int) -> Callable:
            if index >= len(self.middlewares):
                return final_fn
                
            middleware = self.middlewares[index]
            next_fn = build_chain(index + 1)
            
            return lambda ctx: middleware.process(ctx, next_fn)
            
        chain = build_chain(0)
        return chain(context)

# 使用示例
class HarnessAgent:
    def __init__(self, base_agent):
        self.base_agent = base_agent
        self.pipeline = MiddlewarePipeline()
        
        # 添加中间件
        self.pipeline.add(LoggingMiddleware("logger"))
        self.pipeline.add(BudgetMiddleware("budget", max_tokens=5000))
        self.pipeline.add(QualityCheckMiddleware("quality", min_score=0.85))
        
    def run(self, task: str) -> str:
        """通过Harness执行Agent"""
        context = {"task": task}
        
        def final_execution(ctx: Dict) -> str:
            return self.base_agent.run(ctx["task"])
            
        return self.pipeline.execute(context, final_execution)

混合范式架构设计

6.1 智能路由设计

from enum import Enum
from dataclasses import dataclass

class TaskComplexity(Enum):
    SIMPLE = "simple"
    COMPLEX = "complex"
    CRITICAL = "critical"

@dataclass
class TaskProfile:
    complexity: TaskComplexity
    required_accuracy: float
    time_constraint: int  # 秒
    token_budget: int

class ComplexityAnalyzer:
    """任务复杂度分析器"""
    def analyze(self, task: str) -> TaskProfile:
        """分析任务特征"""
        # 基于关键词、长度、领域等特征判断复杂度
        complexity_score = self._calculate_complexity(task)
        
        if complexity_score < 3:
            complexity = TaskComplexity.SIMPLE
        elif complexity_score < 7:
            complexity = TaskComplexity.COMPLEX
        else:
            complexity = TaskComplexity.CRITICAL
            
        return TaskProfile(
            complexity=complexity,
            required_accuracy=0.95 if complexity == TaskComplexity.CRITICAL else 0.8,
            time_constraint=30 if complexity == TaskComplexity.SIMPLE else 300,
            token_budget=1000 if complexity == TaskComplexity.SIMPLE else 10000
        )
        
    def _calculate_complexity(self, task: str) -> float:
        """计算复杂度分数"""
        # 示例实现
        factors = [
            len(task) / 100,  # 长度因子
            task.count("和") + task.count("与"),  # 并列关系因子
            task.count("分析") + task.count("报告") + task.count("研究"),  # 专业度因子
        ]
        return sum(factors)

class HybridAgent:
    """混合范式Agent"""
    def __init__(self, 
                 react_agent: ReActAgent,
                 plan_exec_agent: PlanExecAgent,
                 reflective_agent: ReflectiveAgent,
                 harness_agent: HarnessAgent):
        self.analyzer = ComplexityAnalyzer()
        self.agents = {
            TaskComplexity.SIMPLE: react_agent,
            TaskComplexity.COMPLEX: plan_exec_agent,
            TaskComplexity.CRITICAL: reflective_agent
        }
        self.harness_agent = harness_agent
        
    def run(self, task: str) -> str:
        """智能路由执行"""
        # 1. 分析任务复杂度
        profile = self.analyzer.analyze(task)
        print(f"任务分析结果: 复杂度={profile.complexity.value}, "
              f"准确度要求={profile.required_accuracy}")
        
        # 2. 选择合适Agent
        selected_agent = self.agents[profile.complexity]
        
        # 3. 通过Harness执行(统一质量管控)
        return self.harness_agent.run(task)

性能优化与最佳实践

7.1 Token优化策略

class TokenOptimizer:
    """Token使用优化器"""
    def __init__(self, llm):
        self.llm = llm
        
    def optimize_prompt(self, prompt: str, max_tokens: int) -> str:
        """优化Prompt长度"""
        # 摘要长prompt
        if self._estimate_tokens(prompt) > max_tokens * 0.8:
            summary = self._summarize_context(prompt, max_tokens // 2)
            return summary
            
        return prompt
        
    def _estimate_tokens(self, text: str) -> int:
        """估算Token数量"""
        # 简单估算:中文≈2字符/Token,英文≈4字符/Token
        return len(text.encode('utf-8')) // 3
        
    def _summarize_context(self, context: str, target_tokens: int) -> str:
        """摘要上下文"""
        prompt = f"请将以下内容摘要为{target_tokens}个Token以内:\n\n{context}"
        return self.llm.generate(prompt)

7.2 缓存机制

import hashlib
import time
from functools import wraps

class CacheManager:
    """结果缓存管理器"""
    def __init__(self, ttl: int = 3600):  # 1小时TTL
        self.cache = {}
        self.ttl = ttl
        
    def get_key(self, *args, **kwargs) -> str:
        """生成缓存键"""
        content = str(args) + str(sorted(kwargs.items()))
        return hashlib.md5(content.encode()).hexdigest()
        
    def get(self, key: str) -> Any:
        """获取缓存"""
        if key in self.cache:
            value, timestamp = self.cache[key]
            if time.time() - timestamp < self.ttl:
                return value
            else:
                del self.cache[key]
        return None
        
    def set(self, key: str, value: Any):
        """设置缓存"""
        self.cache[key] = (value, time.time())

def cached(cache_manager: CacheManager):
    """缓存装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            cache_key = cache_manager.get_key(*args, **kwargs)
            cached_result = cache_manager.get(cache_key)
            
            if cached_result is not None:
                print(f"缓存命中: {cache_key[:8]}...")
                return cached_result
                
            result = func(*args, **kwargs)
            cache_manager.set(cache_key, result)
            return result
        return wrapper
    return decorator

7.3 监控与可观测性

import logging
from datetime import datetime
from dataclasses import dataclass

@dataclass
class AgentMetrics:
    task_id: str
    start_time: datetime
    end_time: datetime
    tokens_used: int
    steps_taken: int
    success: bool
    error_message: str = ""
    
class AgentMonitor:
    """Agent监控器"""
    def __init__(self):
        self.logger = logging.getLogger("AgentMonitor")
        self.metrics = []
        
    def start_task(self, task_id: str) -> str:
        """开始任务监控"""
        self.current_task = task_id
        self.start_time = datetime.now()
        return task_id
        
    def record_step(self, step_name: str, tokens: int = 0):
        """记录步骤"""
        self.logger.info(f"[{self.current_task}] 步骤: {step_name}, Tokens: {tokens}")
        
    def end_task(self, success: bool = True, error: str = ""):
        """结束任务监控"""
        end_time = datetime.now()
        duration = (end_time - self.start_time).total_seconds()
        
        metric = AgentMetrics(
            task_id=self.current_task,
            start_time=self.start_time,
            end_time=end_time,
            tokens_used=getattr(self, 'tokens_used', 0),
            steps_taken=getattr(self, 'steps_taken', 0),
            success=success,
            error_message=error
        )
        
        self.metrics.append(metric)
        self.logger.info(f"任务完成: {self.current_task}, "
                        f"耗时: {duration:.2f}s, "
                        f"成功: {success}")

总结

本文深入解析了AI Agent的五大核心范式,从理论原理到源代码实现,再到工程实践,为读者提供了完整的技术指南。

关键要点总结

  1. 范式选择原则:根据任务复杂度、准确度要求和成本预算动态选择
  2. 工程实践建议:从ReAct入门,逐步过渡到混合架构,最后用Harness治理
  3. 性能优化方向:Token优化、缓存机制、监控可观测性
  4. 生产落地路径:LangChain快速原型 → LangGraph生产部署 → Harness质量管控

通过系统化的架构设计和工程化实践,我们可以构建出既灵活又可靠的AI Agent系统,真正发挥大语言模型的潜力。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐