一、引言:大模型技术范式的根本性转移

2025-2026年,大语言模型(LLM)的发展路径发生了根本性转变:智能提升不再仅仅依赖训练阶段的参数扩展,而是更多地源于"思考"过程本身——即通过推理时的计算优化来实现。
这一转变标志着行业从"训练时扩展"(Training-time Scaling)向"测试时扩展"(Test-time Scaling)的范式迁移。
OpenAI的GPT-5、DeepSeek-R1、Anthropic的Claude 3.5 Sonnet等模型证明:让小模型在推理时"思考更久",其效果可超越大模型的快速回答。

二、推理能力革命:从"快思考"到"慢思考"

2.1 技术原理:RLVR与过程奖励模型

核心突破: 强化学习从"对齐人类偏好"(RLHF)演进为"提升内在推理能力"(RLVR, Reinforcement Learning with Verifiable Rewards)

架构

传统RLHF:模型输出 → 人类打分 → 优化偏好(主观)
RLVR新范式:模型输出 → 自动验证(单元测试/数学证明)→ 优化正确性(客观)

关键组件:

组件 功能 代表实现
PRM (Process Reward Model) 过程奖励模型,对推理步骤打分 OpenAI o1, DeepSeek-R1
GRPO (Group Relative Policy Optimization) 组相对策略优化,稳定训练 DeepSeek-R1核心算法
Self-play RL 自我对弈生成合成数据 Anthropic, AlphaProof

2.2 领域技能矩阵

  • 必备技能:
    • 数学基础: 策略梯度、PPO、DPO、博弈论基础
    • 工程实现: vLLM推理加速、Ray分布式训练、DeepSpeed ZeRO优化
    • 验证器设计: 代码单元测试、数学定理证明、化学方程式配平
  • 进阶能力:
    • 设计过程监督信号:将复杂任务分解为可验证的子步骤
    • 合成数据生成:通过Self-play产生高质量训练数据
    • 推理成本优化:平衡推理深度与计算预算
      在这里插入图片描述

2.3 代码示例

2.3.1 GRPO算法核心逻辑

answers = model.generate(prompt, n=8)  # 生成8个候选答案
scores = verify(answers)               # 自动验证(精确匹配/单元测试)
best = max(scores)
loss = sum(best - s for s in scores)   # 相对优势优化

2.3.2 GRPO算法完整代码

import torch
import torch.nn as nn
from transformers import AutoModelForCausalLM, AutoTokenizer
from typing import List, Tuple

class GRPOTrainer:
    """
    Group Relative Policy Optimization (GRPO)
    核心思想:生成一组答案,用相对优势而非绝对奖励来优化策略
    """
    def __init__(
        self,
        model_name: str = "deepseek-ai/deepseek-llm-7b-base",
        group_size: int = 8,          # 每组生成8个候选答案
        epsilon: float = 0.2,          # PPO裁剪系数
        beta: float = 0.01,            # KL惩罚系数
    ):
        self.policy_model = AutoModelForCausalLM.from_pretrained(
            model_name, 
            torch_dtype=torch.bfloat16,
            device_map="auto"
        )
        self.ref_model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.bfloat16,
            device_map="auto"
        )
        # 冻结参考模型
        for param in self.ref_model.parameters():
            param.requires_grad = False
            
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.group_size = group_size
        self.epsilon = epsilon
        self.beta = beta
        
    def generate_group_answers(self, prompt: str) -> List[str]:
        """生成一组候选答案"""
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.policy_model.device)
        
        outputs = self.policy_model.generate(
            **inputs,
            max_new_tokens=512,
            do_sample=True,
            temperature=0.7,
            num_return_sequences=self.group_size,  # 关键:一次生成多个
            pad_token_id=self.tokenizer.eos_token_id
        )
        
        # 解码,去掉prompt部分
        answers = []
        for output in outputs:
            text = self.tokenizer.decode(output, skip_special_tokens=True)
            answer = text[len(prompt):].strip()
            answers.append(answer)
        return answers
    
    def verify_answer(self, answer: str, ground_truth: str) -> float:
        """
        可验证奖励函数(Verifiable Reward)
        数学问题:精确匹配
        代码问题:单元测试通过
        """
        # 示例:数学问题验证
        try:
            # 提取数字答案(简化版,实际需更鲁棒的解析)
            pred = float(answer.split("####")[-1].strip())
            true = float(ground_truth)
            return 1.0 if abs(pred - true) < 1e-3 else 0.0
        except:
            return 0.0
    
    def compute_grpo_loss(
        self,
        prompts: List[str],
        ground_truths: List[str]
    ) -> torch.Tensor:
        """
        GRPO核心损失函数
        使用组内相对优势,而非绝对奖励
        """
        total_loss = 0.0
        
        for prompt, gt in zip(prompts, ground_truths):
            # 1. 生成G个候选答案
            answers = self.generate_group_answers(prompt)
            
            # 2. 计算每个答案的奖励
            rewards = [self.verify_answer(ans, gt) for ans in answers]
            rewards_tensor = torch.tensor(rewards, dtype=torch.float32)
            
            # 3. 计算组内相对优势(关键创新)
            mean_reward = rewards_tensor.mean()
            std_reward = rewards_tensor.std() + 1e-8
            advantages = (rewards_tensor - mean_reward) / std_reward  # 标准化优势
            
            # 4. 计算每个答案的log概率
            log_probs = []
            ref_log_probs = []
            
            for answer in answers:
                full_text = prompt + answer
                inputs = self.tokenizer(full_text, return_tensors="pt").to(self.policy_model.device)
                
                with torch.no_grad():
                    # 参考模型概率(用于KL惩罚)
                    ref_outputs = self.ref_model(**inputs)
                    ref_logits = ref_outputs.logits[:, :-1, :]
                    ref_tokens = inputs.input_ids[:, 1:]
                    ref_log_prob = torch.nn.functional.log_softmax(ref_logits, dim=-1)
                    ref_log_prob = torch.gather(ref_log_prob, -1, ref_tokens.unsqueeze(-1)).squeeze(-1).mean()
                    ref_log_probs.append(ref_log_prob.item())
                
                # 策略模型概率
                outputs = self.policy_model(**inputs)
                logits = outputs.logits[:, :-1, :]
                tokens = inputs.input_ids[:, 1:]
                log_prob = torch.nn.functional.log_softmax(logits, dim=-1)
                log_prob = torch.gather(log_prob, -1, tokens.unsqueeze(-1)).squeeze(-1).mean()
                log_probs.append(log_prob)
            
            # 5. 计算PPO-clip损失
            log_probs = torch.stack(log_probs)
            ref_log_probs = torch.tensor(ref_log_probs)
            
            # 重要性采样比率
            ratios = torch.exp(log_probs - ref_log_probs.to(log_probs.device))
            
            # PPO裁剪目标
            surr1 = ratios * advantages.to(log_probs.device)
            surr2 = torch.clamp(ratios, 1 - self.epsilon, 1 + self.epsilon) * advantages.to(log_probs.device)
            policy_loss = -torch.min(surr1, surr2).mean()
            
            # 6. KL散度惩罚(防止策略偏离太远)
            kl_penalty = self.beta * (log_probs - ref_log_probs.to(log_probs.device)).mean()
            
            total_loss += policy_loss + kl_penalty
        
        return total_loss / len(prompts)

# 使用示例
trainer = GRPOTrainer(group_size=8)

# 训练数据:数学问题
train_data = [
    {
        "prompt": "问题:小明有15个苹果,给了小红6个,又买了8个,现在有几个?\n思考:",
        "answer": "#### 17"
    },
    # ... 更多数据
]

# 训练循环
optimizer = torch.optim.AdamW(trainer.policy_model.parameters(), lr=1e-5)

for epoch in range(3):
    loss = trainer.compute_grpo_loss(
        [d["prompt"] for d in train_data],
        [d["answer"] for d in train_data]
    )
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    print(f"Epoch {epoch}, Loss: {loss.item():.4f}")

三、测试时计算(Test-Time Compute):小模型的"逆袭"

3.1 技术原理:计算最优的推理策略

  • 核心洞察: 在FLOPs匹配的情况下,使用测试时计算扩展的小模型可以超越14倍更大的模型

  • 四种核心技术

技术 方向 机制 适用场景
Chain-of-Thought 深度 生成详细推理步骤 数学、逻辑推理
Self-Consistency 宽度 采样多条路径,投票表决 开放式问答
Tree-of-Thoughts 搜索 维护推理树,启发式剪枝 规划、博弈
Reflexion/Self-Refine 迭代 根据反馈自我修正 代码调试、写作
  • 计算最优策略(Compute-Optimal Scaling):
    • 简单问题: 单路径快速回答(节省成本)
    • 中等问题: 并行采样+投票(Self-Consistency)
    • 复杂问题: 树搜索+过程奖励(Tree-of-Thoughts + PRM)

3.2 领域技能矩阵

  • 系统架构能力
    • 推理引擎优化: vLLM的PagedAttention、Continuous Batching
    • 投机解码(Speculative Decoding): 使用小模型草稿+大模型验证,提速2-3倍
    • 动态批处理: 根据序列长度动态调整batch size
  • 算法调优能力
    • 早停策略(Early Stopping): 当置信度达标时终止推理
    • 自适应计算: 根据输入复杂度调整思考深度
    • 缓存策略: 复用历史推理路径,避免重复计算
      在这里插入图片描述

3.3 代码示例

3.3.1 自适应计算控制器

from enum import Enum
from typing import Callable, Optional
import numpy as np

class ComplexityLevel(Enum):
    LOW = "low"       # 直接回答
    MEDIUM = "medium" # 多路采样投票
    HIGH = "high"     # 树搜索+PRM

class AdaptiveInferenceEngine:
    """
    自适应推理引擎:根据问题难度动态选择计算策略
    """
    def __init__(
        self,
        model,  # 基础LLM
        prm: Optional[ProcessRewardModel] = None,
        complexity_threshold: dict = None
    ):
        self.model = model
        self.prm = prm
        self.threshold = complexity_threshold or {
            "low": 0.3,   # 困惑度阈值
            "medium": 0.7
        }
        
    def estimate_complexity(self, prompt: str) -> ComplexityLevel:
        """
        估计问题复杂度
        方法:使用基础模型的困惑度(perplexity)作为代理指标
        """
        # 简化版:基于关键词和长度启发式判断
        complexity_keywords = ["证明", "推导", "多步", "如果...那么", "假设"]
        keyword_score = sum(1 for kw in complexity_keywords if kw in prompt) / len(complexity_keywords)
        
        # 长度因子(长问题通常更复杂)
        length_score = min(len(prompt) / 500, 1.0)
        
        # 综合得分
        complexity_score = 0.6 * keyword_score + 0.4 * length_score
        
        if complexity_score < self.threshold["low"]:
            return ComplexityLevel.LOW
        elif complexity_score < self.threshold["medium"]:
            return ComplexityLevel.MEDIUM
        else:
            return ComplexityLevel.HIGH
    
    def fast_generate(self, prompt: str, max_tokens: int = 256) -> str:
        """快速生成:单路径,低温度"""
        return self.model.generate(
            prompt,
            temperature=0.3,  # 低温度,确定性输出
            max_tokens=max_tokens,
            num_samples=1
        )[0]
    
    def self_consistency(
        self, 
        prompt: str, 
        n_paths: int = 5,
        temperature: float = 0.7
    ) -> str:
        """
        Self-Consistency:多路采样,多数投票
        适用于中等复杂度问题
        """
        # 生成多条路径
        candidates = self.model.generate(
            prompt,
            temperature=temperature,
            max_tokens=512,
            num_samples=n_paths
        )
        
        # 提取最终答案(简化:最后数字或最后一行)
        answers = [self._extract_answer(c) for c in candidates]
        
        # 多数投票
        from collections import Counter
        vote_counts = Counter(answers)
        best_answer = vote_counts.most_common(1)[0][0]
        
        # 返回最一致的完整回答
        for c in candidates:
            if self._extract_answer(c) == best_answer:
                return c
        
        return candidates[0]
    
    def tree_of_thoughts(
        self,
        prompt: str,
        max_depth: int = 5,
        branching_factor: int = 3,
        beam_width: int = 2
    ) -> str:
        """
        Tree-of-Thoughts:树搜索 + PRM评估
        适用于高复杂度问题
        """
        if not self.prm:
            raise ValueError("Tree-of-Thoughts requires PRM")
        
        # 初始化根节点
        root = ThoughtNode(content=prompt, parent=None)
        current_level = [root]
        
        for depth in range(max_depth):
            next_level = []
            
            for node in current_level:
                # 生成下一步思考
                expansion_prompt = self._build_thought_prompt(node)
                thoughts = self.model.generate(
                    expansion_prompt,
                    temperature=0.8,
                    max_tokens=128,
                    num_samples=branching_factor
                )
                
                for thought in thoughts:
                    child = ThoughtNode(content=thought, parent=node)
                    node.children.append(child)
                    next_level.append(child)
            
            # 使用PRM评估所有节点,选择top-k
            if next_level:
                scores = []
                for node in next_level:
                    # 构建完整路径
                    path = node.get_path()
                    score = self.prm.forward(prompt, path).mean().item()
                    scores.append(score)
                
                # Beam Search:保留top beam_width个节点
                top_indices = np.argsort(scores)[-beam_width:]
                current_level = [next_level[i] for i in top_indices]
        
        # 返回最佳路径的完整回答
        best_leaf = max(current_level, key=lambda n: self.prm.forward(prompt, n.get_path()).mean())
        return "\n".join(best_leaf.get_path())
    
    def generate(self, prompt: str, max_budget: float = 1.0) -> dict:
        """
        统一入口:根据复杂度自动选择策略
        返回包含推理路径和成本的字典
        """
        complexity = self.estimate_complexity(prompt)
        
        result = {
            "complexity": complexity.value,
            "strategy": None,
            "answer": None,
            "cost": 0.0,
            "reasoning_path": None
        }
        
        if complexity == ComplexityLevel.LOW:
            result["strategy"] = "fast_generate"
            result["answer"] = self.fast_generate(prompt)
            result["cost"] = 0.1 * max_budget
            
        elif complexity == ComplexityLevel.MEDIUM:
            result["strategy"] = "self_consistency"
            result["answer"] = self.self_consistency(prompt, n_paths=5)
            result["cost"] = 0.5 * max_budget
            
        else:  # HIGH
            result["strategy"] = "tree_of_thoughts"
            result["answer"] = self.tree_of_thoughts(prompt)
            result["cost"] = 0.9 * max_budget
            
        return result

class ThoughtNode:
    """树搜索节点"""
    def __init__(self, content: str, parent: Optional['ThoughtNode'] = None):
        self.content = content
        self.parent = parent
        self.children = []
    
    def get_path(self) -> List[str]:
        """获取从根到当前节点的路径"""
        path = []
        current = self
        while current:
            path.append(current.content)
            current = current.parent
        return list(reversed(path))

# 使用示例
engine = AdaptiveInferenceEngine(model=my_llm, prm=my_prm)

# 不同复杂度的问题
questions = [
    "法国首都是哪里?",  # LOW
    "解方程:3x + 7 = 22",  # MEDIUM  
    "证明:对于任意正整数n,n^3 - n能被6整除"  # HIGH
]

for q in questions:
    result = engine.generate(q)
    print(f"问题: {q}")
    print(f"复杂度: {result['complexity']}, 策略: {result['strategy']}")
    print(f"答案: {result['answer'][:100]}...")
    print("---")

3.3.2 投机解码(Speculative Decoding)

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

class SpeculativeDecoder:
    """
    投机解码:用小模型(draft model)快速生成候选token,
    用大模型(target model)并行验证,加速推理
    """
    def __init__(
        self,
        target_model_name: str = "meta-llama/Llama-2-70b",
        draft_model_name: str = "meta-llama/Llama-2-7b",
        gamma: int = 4,  # 每次推测的token数
    ):
        # 大模型(慢但准确)
        self.target_model = AutoModelForCausalLM.from_pretrained(
            target_model_name,
            torch_dtype=torch.bfloat16,
            device_map="auto"
        )
        # 小模型(快但质量较低)
        self.draft_model = AutoModelForCausalLM.from_pretrained(
            draft_model_name,
            torch_dtype=torch.bfloat16,
            device_map="auto"
        )
        self.tokenizer = AutoTokenizer.from_pretrained(target_model_name)
        self.gamma = gamma  # 推测窗口大小
        
    def generate(
        self,
        prompt: str,
        max_new_tokens: int = 100,
        temperature: float = 0.7
    ) -> str:
        """
        投机解码生成
        """
        input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(self.target_model.device)
        accepted_tokens = 0
        draft_tokens = 0
        
        for _ in range(max_new_tokens // self.gamma):
            # 1. 小模型快速生成gamma个候选token
            draft_outputs = self.draft_model.generate(
                input_ids,
                max_new_tokens=self.gamma,
                do_sample=True,
                temperature=temperature,
                return_dict_in_generate=True,
                output_scores=True
            )
            draft_sequence = draft_outputs.sequences[0]  # 包含输入的完整序列
            draft_new_tokens = draft_sequence[len(input_ids[0]):]  # 新生成的部分
            
            # 2. 大模型并行验证(一次前向传播)
            with torch.no_grad():
                target_outputs = self.target_model(draft_sequence.unsqueeze(0))
                target_logits = target_outputs.logits[0, len(input_ids[0])-1:-1]  # 对应draft位置的logits
                target_probs = torch.softmax(target_logits / temperature, dim=-1)
            
            # 3. 接受/拒绝采样
            accepted = []
            for i, draft_token in enumerate(draft_new_tokens):
                draft_tokens += 1
                
                # 获取小模型该位置的概率分布
                draft_prob = torch.softmax(draft_outputs.scores[i] / temperature, dim=-1)
                
                # 计算接受概率:min(1, p_target / p_draft)
                token_accept_prob = torch.min(
                    torch.tensor(1.0),
                    target_probs[i, draft_token] / draft_prob[0, draft_token]
                )
                
                if torch.rand(1).item() < token_accept_prob:
                    # 接受该token
                    accepted.append(draft_token.item())
                    accepted_tokens += 1
                else:
                    # 拒绝:从调整后的分布中采样
                    adjusted_prob = target_probs[i] - draft_prob[0]
                    adjusted_prob = torch.clamp(adjusted_prob, min=0)
                    adjusted_prob = adjusted_prob / adjusted_prob.sum()
                    new_token = torch.multinomial(adjusted_prob, 1).item()
                    accepted.append(new_token)
                    break  # 拒绝后停止本轮推测
            
            # 4. 更新输入序列
            new_tokens_tensor = torch.tensor([accepted]).to(input_ids.device)
            input_ids = torch.cat([input_ids, new_tokens_tensor], dim=1)
            
            # 检查是否生成结束符
            if self.tokenizer.eos_token_id in accepted:
                break
        
        # 计算接受率
        accept_rate = accepted_tokens / draft_tokens if draft_tokens > 0 else 0
        print(f"投机解码接受率: {accept_rate:.2%}")
        
        return self.tokenizer.decode(input_ids[0], skip_special_tokens=True)

# 性能对比
def benchmark():
    decoder = SpeculativeDecoder()
    prompt = "人工智能的发展历史可以追溯到"
    
    import time
    
    # 标准解码
    start = time.time()
    standard_output = decoder.target_model.generate(
        decoder.tokenizer.encode(prompt, return_tensors="pt").to(decoder.target_model.device),
        max_new_tokens=100,
        do_sample=True,
        temperature=0.7
    )
    standard_time = time.time() - start
    
    # 投机解码
    start = time.time()
    speculative_output = decoder.generate(prompt, max_new_tokens=100)
    speculative_time = time.time() - start
    
    print(f"标准解码时间: {standard_time:.2f}s")
    print(f"投机解码时间: {speculative_time:.2f}s")
    print(f"加速比: {standard_time / speculative_time:.2f}x")

# benchmark()

四、自主智能体(Agentic AI):从"对话"到"行动"

4.1 技术原理:OODA循环与长期记忆

  • 核心定义: 具备环境感知、自主决策、长期记忆、工具调用能力的AI实体,遵循OODA循环(观察-定向-决策-行动)。

  • 2026年关键突破:长期自主性

    • 记忆机制: 上下文窗口扩展+向量数据库+记忆压缩算法
    • 持续工作: 支持数周级任务连贯性,保持目标不偏离
    • 自进化能力: 通过强化学习和用户反馈自动优化,月均性能提升15%
  • 架构组件:
    ┌─────────────────────────────────────────┐
    │ 规划引擎(Planning) │
    │ - 目标分解:将复杂任务拆分为子任务 │
    │ - 动态重规划:根据环境反馈调整策略 │
    ├─────────────────────────────────────────┤
    │ 记忆管理(Memory) │
    │ - 短期记忆:Context窗口(128K+ tokens) │
    │ - 长期记忆:向量数据库(Milvus/Pinecone) │
    │ - 情景记忆:时间戳+重要性评分 │
    ├─────────────────────────────────────────┤
    │ 工具调用(Tools) │
    │ - MCP协议:标准化工具接口 │
    │ - 函数调用:API、数据库、代码执行 │
    ├─────────────────────────────────────────┤
    │ 反思机制(Reflection) │
    │ - ReAct:推理-行动循环 │
    │ - 自我批评:评估行动结果,生成改进策略 │
    └─────────────────────────────────────────┘

4.2 领域技能矩阵

  • 核心技术栈
    • 开发框架: LangChain、LlamaIndex、AutoGPT、MetaGPT
    • 记忆数据库: Milvus、Pinecone、Weaviate、Redis
    • 工具协议: MCP(Model Context Protocol)、Function Calling
    • 多Agent协作: 消息总线、角色分工、冲突解决机制
  • 系统设计能力:
    • 容错设计: Agent失败时的回退策略(Fallback)
    • 安全沙箱: 代码执行隔离、权限控制、审计日志
    • 人机协同: 关键决策点的人工确认机制(Human-in-the-loop)

4.3 代码示例

4.3.1 ReAct智能体实现

import json
import re
from typing import List, Dict, Callable, Optional
from dataclasses import dataclass

@dataclass
class Action:
    name: str
    input: str
    observation: Optional[str] = None

class ReActAgent:
    """
    ReAct (Reasoning + Acting) 智能体
    交替进行思考(Thought)和行动(Action),直到完成任务
    """
    def __init__(
        self,
        llm,  # 基础语言模型
        tools: Dict[str, Callable],
        max_iterations: int = 10
    ):
        self.llm = llm
        self.tools = tools  # 可用工具字典 {工具名: 函数}
        self.max_iterations = max_iterations
        self.scratchpad = []  # 思考轨迹
        
    def _build_prompt(self, task: str) -> str:
        """构建ReAct格式的prompt"""
        tool_descriptions = "\n".join([
            f"{name}: {func.__doc__}" 
            for name, func in self.tools.items()
        ])
        
        prompt = f"""解决以下任务,通过交替进行思考和行动。

可用工具:
{tool_descriptions}

任务:{task}

思考轨迹:
"""
        # 添加历史思考
        for item in self.scratchpad:
            prompt += f"\n{item}\n"
        
        prompt += "\n思考:"
        return prompt
    
    def _parse_action(self, text: str) -> Optional[Action]:
        """从模型输出解析行动"""
        # 匹配格式:行动[工具名, 输入]
        action_pattern = r"行动\[(.*?),\s*(.*?)\]"
        match = re.search(action_pattern, text)
        
        if match:
            tool_name = match.group(1).strip()
            tool_input = match.group(2).strip().strip('"\'')
            return Action(name=tool_name, input=tool_input)
        return None
    
    def _parse_thought(self, text: str) -> str:
        """提取思考部分"""
        lines = text.split("\n")
        thought_lines = []
        for line in lines:
            if line.startswith("思考:"):
                thought_lines.append(line[3:].strip())
            elif "行动[" in line:
                break
            elif thought_lines:
                thought_lines.append(line.strip())
        return " ".join(thought_lines)
    
    def run(self, task: str) -> str:
        """执行ReAct循环"""
        for i in range(self.max_iterations):
            # 1. 构建当前prompt
            prompt = self._build_prompt(task)
            
            # 2. LLM生成下一步
            response = self.llm.generate(prompt, max_tokens=200, temperature=0.7)
            text = response[0] if isinstance(response, list) else response
            
            # 3. 解析思考
            thought = self._parse_thought(text)
            self.scratchpad.append(f"思考:{thought}")
            
            # 4. 解析行动
            action = self._parse_action(text)
            
            if not action:
                # 没有行动,可能是最终答案
                if "答案:" in text or "完成" in text:
                    return text
                continue
            
            # 5. 执行工具
            if action.name not in self.tools:
                observation = f"错误:未知工具 '{action.name}'"
            else:
                try:
                    tool_func = self.tools[action.name]
                    observation = tool_func(action.input)
                except Exception as e:
                    observation = f"错误:{str(e)}"
            
            action.observation = observation
            self.scratchpad.append(f"行动[{action.name}, {action.input}]")
            self.scratchpad.append(f"观察:{observation}")
            
            # 6. 检查是否完成
            if "完成" in observation or "成功" in observation:
                final_prompt = self._build_prompt(task) + "\n思考:基于以上观察,给出最终答案。\n答案:"
                final_answer = self.llm.generate(final_prompt, max_tokens=200)
                return final_answer[0] if isinstance(final_answer, list) else final_answer
        
        return "达到最大迭代次数,任务未完成。"

# 工具函数定义
def search(query: str) -> str:
    """搜索互联网获取信息"""
    # 实际实现应调用搜索引擎API
    mock_results = {
        "法国首都": "法国首都是巴黎。",
        "Python": "Python是一种高级编程语言。",
        "2026年AI趋势": "2026年AI趋势包括推理时计算、智能体、多模态模型。"
    }
    return mock_results.get(query, f"搜索结果:关于'{query}'的信息")

def calculator(expression: str) -> str:
    """计算数学表达式"""
    try:
        # 安全计算:只允许基本运算符
        allowed_chars = set("0123456789+-*/(). ")
        if all(c in allowed_chars for c in expression):
            result = eval(expression)
            return f"计算结果:{result}"
        return "错误:表达式包含非法字符"
    except Exception as e:
        return f"计算错误:{str(e)}"

def get_weather(city: str) -> str:
    """获取城市天气"""
    mock_weather = {
        "北京": "晴天,25°C",
        "上海": "多云,28°C",
        "巴黎": "小雨,18°C"
    }
    return f"{city}天气:{mock_weather.get(city, '未知')}"

# 使用示例
tools = {
    "搜索": search,
    "计算": calculator,
    "天气": get_weather
}

agent = ReActAgent(llm=my_llm, tools=tools)

# 执行任务
task = "法国首都的气温是多少?如果温度超过20度,计算25乘以这个温度值。"
result = agent.run(task)
print(result)

# 预期思考轨迹:
# 思考:我需要先知道法国首都,然后查天气,最后根据温度决定是否计算。
# 行动[搜索, 法国首都]
# 观察:法国首都是巴黎。
# 思考:现在我知道首都是巴黎,需要查询巴黎的天气。
# 行动[天气, 巴黎]
# 观察:巴黎天气:小雨,18°C
# 思考:温度是18度,没有超过20度,不需要计算。任务完成。
# 答案:法国首都巴黎的气温是18°C,未超过20度,无需计算。

4.3.2 多智能体风格

from enum import Enum
from typing import List, Dict, Optional
import asyncio

class Role(Enum):
    PRODUCT_MANAGER = "产品经理"
    ARCHITECT = "架构师"
    ENGINEER = "工程师"
    QA = "测试工程师"

class Message:
    """Agent间通信消息"""
    def __init__(
        self, 
        content: str, 
        sender: str, 
        receiver: Optional[str] = None,
        msg_type: str = "text"
    ):
        self.content = content
        self.sender = sender
        self.receiver = receiver  # None表示广播
        self.msg_type = msg_type  # text/code/requirement/test

class Agent:
    """基础智能体类"""
    def __init__(self, name: str, role: Role, llm, tools: List = None):
        self.name = name
        self.role = role
        self.llm = llm
        self.tools = tools or []
        self.memory = []  # 消息历史
        self.inbox = asyncio.Queue()  # 消息收件箱
        
    async def receive(self, msg: Message):
        """接收消息"""
        await self.inbox.put(msg)
        self.memory.append(msg)
        
    async def think(self) -> str:
        """基于记忆生成思考"""
        # 构建上下文
        context = "\n".join([
            f"{m.sender}: {m.content}" 
            for m in self.memory[-10:]  # 最近10条
        ])
        
        system_prompt = f"""你是{self.role.value},名为{self.name}。
你的职责:{self._get_role_prompt()}

当前对话:
{context}

请根据你的角色和上下文,生成下一步行动或回复:"""
        
        response = self.llm.generate(system_prompt, max_tokens=500)
        return response[0] if isinstance(response, list) else response
    
    def _get_role_prompt(self) -> str:
        """获取角色特定的系统提示"""
        prompts = {
            Role.PRODUCT_MANAGER: "分析需求,编写PRD,明确功能点",
            Role.ARCHITECT: "设计系统架构,选择技术栈,定义接口",
            Role.ENGINEER: "编写代码实现功能,修复bug",
            Role.QA: "编写测试用例,执行测试,报告问题"
        }
        return prompts.get(self.role, "协助完成任务")
    
    async def act(self, thought: str) -> List[Message]:
        """根据思考生成行动(发送消息)"""
        # 解析thought,决定发送给谁
        messages = []
        
        # 简单启发式:根据关键词决定接收者
        if "需求" in thought or "PRD" in thought:
            messages.append(Message(thought, self.name, "架构师", "requirement"))
        elif "架构" in thought or "设计" in thought:
            messages.append(Message(thought, self.name, "工程师", "design"))
        elif "代码" in thought or "实现" in thought:
            messages.append(Message(thought, self.name, "测试工程师", "code"))
        elif "测试" in thought or "bug" in thought:
            messages.append(Message(thought, self.name, "工程师", "test"))
        else:
            # 广播
            messages.append(Message(thought, self.name, None, "text"))
            
        return messages
    
    async def run(self):
        """Agent主循环"""
        while True:
            # 检查收件箱
            if not self.inbox.empty():
                msg = await self.inbox.get()
                print(f"[{self.name}] 收到来自 {msg.sender} 的消息: {msg.content[:50]}...")
                
                # 思考
                thought = await self.think()
                print(f"[{self.name}] 思考: {thought[:50]}...")
                
                # 行动
                messages = await self.act(thought)
                yield messages
            else:
                await asyncio.sleep(0.1)

class MultiAgentSystem:
    """多智能体协作系统"""
    def __init__(self):
        self.agents: Dict[str, Agent] = {}
        self.message_bus = asyncio.Queue()  # 全局消息总线
        
    def register_agent(self, agent: Agent):
        """注册Agent"""
        self.agents[agent.name] = agent
        
    async def dispatch(self, msg: Message):
        """消息分发"""
        if msg.receiver:
            # 点对点
            if msg.receiver in self.agents:
                await self.agents[msg.receiver].receive(msg)
        else:
            # 广播给所有Agent(除发送者)
            for name, agent in self.agents.items():
                if name != msg.sender:
                    await agent.receive(msg)
                    
    async def run_workflow(self, initial_task: str, max_rounds: int = 20):
        """运行工作流"""
        # 初始任务发给产品经理
        initial_msg = Message(initial_task, "用户", "产品经理", "requirement")
        await self.agents["产品经理"].receive(initial_msg)
        
        # 启动所有Agent
        agent_tasks = [
            asyncio.create_task(self._agent_loop(name)) 
            for name in self.agents.keys()
        ]
        
        # 运行消息总线
        for _ in range(max_rounds):
            if not self.message_bus.empty():
                msg = await self.message_bus.get()
                await self.dispatch(msg)
            await asyncio.sleep(0.1)
        
        # 取消Agent任务
        for task in agent_tasks:
            task.cancel()
            
    async def _agent_loop(self, agent_name: str):
        """单个Agent的消息循环"""
        agent = self.agents[agent_name]
        async for messages in agent.run():
            for msg in messages:
                await self.message_bus.put(msg)

# 使用示例:软件开发团队
async def main():
    # 创建Agent
    pm = Agent("产品经理Alice", Role.PRODUCT_MANAGER, llm=my_llm)
    architect = Agent("架构师Bob", Role.ARCHITECT, llm=my_llm)
    engineer = Agent("工程师Carol", Role.ENGINEER, llm=my_llm)
    qa = Agent("测试工程师David", Role.QA, llm=my_llm)
    
    # 创建系统
    system = MultiAgentSystem()
    system.register_agent(pm)
    system.register_agent(architect)
    system.register_agent(engineer)
    system.register_agent(qa)
    
    # 运行任务
    task = "开发一个用户登录系统,支持邮箱验证和第三方登录"
    await system.run_workflow(task, max_rounds=50)

# asyncio.run(main())

5、总结

评价一个 Agent 的标准,不再是它有多聪明,而是它掌握了多少技能,以及它能否自我扩展技能库。
未来的 Agent 开发者,将从编写 Prompt 转向编写 Skill Definition 和 Workflow Graph。我们正在见证软件工程从“面向对象编程”向“面向意图编程”的深刻变革。

下一篇,我们会详细聊一聊2026爆火的Skill。

我是小鱼

  • CSDN 博客专家
  • AIGC MVP技术专家
  • 阿里云 专家博主
  • 51CTO博客专家
  • 企业认证金牌面试官
  • 多个名企认证&特邀讲师等
  • 名企签约职场面试培训、职场规划师
  • 多个国内主流技术社区的认证专家博主
  • 多款主流产品(阿里云等)评测一等奖获得者

关注小鱼,学习【机器视觉与目标检测】 和【人工智能与大模型】最新最全的领域知识。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐