Agent执行链路中的"自我反思"机制设计:如何在提升智能的同时不浪费Token

副标题:从理论到实践,构建高效、经济的智能体自我反思系统


摘要/引言

在大语言模型(LLM)驱动的智能体(Agent)系统中,“自我反思”(Self-Reflection)机制被视为提升Agent智能水平、任务完成质量和自适应能力的关键技术。然而,传统的反思实现往往伴随着高昂的Token消耗,这不仅增加了系统运行成本,还可能导致响应延迟,影响用户体验。

本文将深入探讨Agent执行链路中自我反思机制的设计原则与实践方案。我们将从问题背景出发,分析现有反思机制的局限性,然后系统性地介绍如何设计一个既智能又经济的自我反思系统。文章将涵盖核心概念解析、数学模型建立、算法设计、代码实现、性能优化等多个方面,并提供详细的最佳实践指导。

读完本文,你将:

  • 深刻理解Agent自我反思的核心概念与理论基础
  • 掌握设计高效反思机制的关键技术与方法
  • 学会如何在实际项目中平衡智能性与Token消耗
  • 获得一套可直接应用的反思系统设计框架与代码示例

让我们开始这段探索之旅。


目标读者与前置知识

目标读者:

  • 有一定LLM应用开发经验的软件工程师
  • 对Agent系统设计感兴趣的AI研究者
  • 希望优化LLM应用成本与性能的技术负责人
  • 具备Python编程基础和基本机器学习概念的开发者

前置知识:

  • 基本的Python编程能力
  • 对大语言模型(如GPT-4、Claude等)及其API使用有一定了解
  • 理解Agent系统的基本概念和工作原理
  • 熟悉Token计算与LLM成本模型

文章目录

  1. 第一部分:引言与基础

    • 引人注目的标题
    • 摘要/引言
    • 目标读者与前置知识
    • 文章目录
  2. 第二部分:核心内容

    • 问题背景与动机
    • 核心概念与理论基础
    • 环境准备
    • 分步实现
    • 关键代码解析与深度剖析
  3. 第三部分:验证与扩展

    • 结果展示与验证
    • 性能优化与最佳实践
    • 常见问题与解决方案
    • 未来展望与扩展方向
  4. 第四部分:总结与附录

    • 总结
    • 参考资料
    • 附录

问题背景与动机

Agent系统的兴起与自我反思的重要性

近年来,随着大语言模型(LLMs)的快速发展,基于LLM的智能体(Agent)系统成为了AI领域的研究热点和应用焦点。从简单的问答助手到复杂的任务执行系统,Agent正在逐步改变我们与AI交互的方式。

一个完整的Agent系统通常包含以下核心组件:

  • 感知模块:获取和理解环境信息
  • 决策模块:基于感知结果和目标制定行动计划
  • 执行模块:按照计划执行具体操作
  • 记忆模块:存储历史信息和经验知识
  • 反思模块:回顾、评估和改进整个过程

在这些组件中,自我反思机制扮演着至关重要的角色。它使得Agent能够:

  • 评估任务执行结果的质量
  • 识别执行过程中的错误和不足
  • 从成功和失败中学习经验
  • 调整策略以适应变化的环境
  • 优化未来的决策和行动

然而,传统的反思实现方式往往存在一个严重的问题:Token消耗过高

Token消耗问题的严重性

在当前的LLM API计费模式下,Token是衡量计算资源使用的基本单位。无论是输入(Prompt)还是输出(Completion),都会消耗Token,而Token消耗直接对应着成本。

对于包含自我反思机制的Agent系统,Token消耗通常来自以下几个方面:

  1. 原始任务处理:理解用户需求、生成初始响应
  2. 反思Prompt构建:收集相关信息,构建反思所需的上下文
  3. 反思过程执行:调用LLM进行反思思考
  4. 反思结果应用:基于反思结果调整后续行为

在一些简单的实现中,反思过程可能会消耗比原始任务处理更多的Token。例如,一个系统可能会将完整的对话历史、任务执行记录、中间结果等全部发送给LLM进行反思,这会导致Token消耗呈指数级增长。

高Token消耗带来的问题是多方面的:

  • 经济成本高:直接增加了系统运营成本
  • 响应延迟大:更多的Token意味着更长的处理时间
  • 上下文限制:可能导致超过模型的上下文窗口限制
  • 资源浪费:很多信息可能对反思并无实际帮助

因此,如何设计一个既有效又经济的自我反思机制,成为了Agent系统开发中一个亟待解决的关键问题。

现有解决方案的局限性

目前,业界已经提出了一些减少Token消耗的方法,包括:

  • 上下文截断:只保留最近的部分对话历史
  • 信息摘要:对历史信息进行摘要处理
  • 反思触发条件化:只在必要时触发反思
  • 使用更小的模型:在反思环节使用轻量级模型

然而,这些方法往往存在各自的局限性:

  • 上下文截断可能导致重要信息丢失
  • 信息摘要可能引入信息失真
  • 触发条件设计不当可能导致反思不足或过度
  • 小模型可能无法提供高质量的反思结果

我们需要一个更加系统化、智能化的解决方案,能够在保证反思质量的前提下,最大限度地减少Token消耗。


核心概念与理论基础

核心概念解析

在深入探讨设计方案之前,让我们先明确一些核心概念:

1. Agent执行链路

Agent执行链路是指Agent从接收任务到完成任务的完整过程,通常包括以下阶段:

任务接收

任务理解

计划制定

执行行动

结果评估

任务完成?

任务结束

反思调整

在这个链路中,自我反思通常发生在"结果评估"和"反思调整"阶段,但也可能贯穿整个执行过程。

2. 自我反思(Self-Reflection)

在Agent系统中,自我反思是指Agent对自己的认知过程、决策逻辑、行为表现和结果质量进行主动审视、分析和评价的过程。

从功能角度,自我反思可以分为以下几种类型:

反思类型 描述 主要目的
过程反思 对任务执行过程的审视 优化执行流程,提高效率
结果反思 对任务结果质量的评估 确保输出质量,修正错误
策略反思 对决策策略的分析 改进决策逻辑,提升智能性
元认知反思 对自身认知能力的认知 了解自身局限,设定合理预期
3. Token效率(Token Efficiency)

Token效率是指在完成特定任务时,所获得的价值与消耗的Token数量之间的比率。提高Token效率意味着用更少的Token实现相同或更好的效果。

我们可以用以下公式来量化Token效率:

E=VT E = \frac{V}{T} E=TV

其中:

  • EEE 表示Token效率
  • VVV 表示任务完成的价值(可以是质量分、准确度等)
  • TTT 表示消耗的Token数量

在反思机制设计中,我们的目标是最大化反思过程的Token效率,即在保证反思质量的前提下,尽可能减少Token消耗。

反思机制的理论模型

为了更好地设计高效的反思机制,我们需要建立一个理论模型来描述反思过程。

1. 反思信息价值模型

并非所有信息对反思都有同等价值。我们可以用信息价值(Information Value, IV)来衡量某条信息对反思过程的重要性:

IV(i,c)=R(i,c)×U(i,c) IV(i, c) = R(i, c) \times U(i, c) IV(i,c)=R(i,c)×U(i,c)

其中:

  • IV(i,c)IV(i, c)IV(i,c) 表示在上下文 ccc 中信息 iii 的价值
  • R(i,c)R(i, c)R(i,c) 表示信息 iii 与反思目标的相关性
  • U(i,c)U(i, c)U(i,c) 表示信息 iii 的独特性(即不被其他信息包含的程度)

在选择反思所需的信息时,我们应该优先选择IV值高的信息。

2. 反思触发决策模型

并非每个执行步骤都需要进行反思。我们可以建立一个决策模型来确定何时触发反思:

D(t)={1if C(t)>θ0otherwise D(t) = \begin{cases} 1 & \text{if } C(t) > \theta \\ 0 & \text{otherwise} \end{cases} D(t)={10if C(t)>θotherwise

其中:

  • D(t)D(t)D(t) 表示在时间 ttt 是否触发反思(1表示触发,0表示不触发)
  • C(t)C(t)C(t) 表示在时间 ttt 的反思必要性分数
  • θ\thetaθ 表示反思触发阈值

反思必要性分数 C(t)C(t)C(t) 可以考虑以下因素:

  • 任务的复杂性和重要性
  • 执行结果的不确定性
  • 上次反思以来的时间或步骤数
  • 系统的信心水平变化
3. 反思粒度分层模型

反思不应该只有一种模式,而应该根据需要采用不同的粒度。我们可以将反思分为以下几个层次:

全局反思
Global Reflection

阶段反思
Stage Reflection

步骤反思
Step Reflection

即时反思
Instant Reflection

每个层次的特点:

反思层次 覆盖范围 触发频率 Token消耗 反思深度
全局反思 整个任务
阶段反思 任务阶段
步骤反思 单个步骤 较高 较低 较浅
即时反思 即时反馈

通过灵活组合不同层次的反思,我们可以在Token效率和反思质量之间取得更好的平衡。

概念结构与核心要素组成

一个高效的自我反思系统应该包含以下核心要素:

反思控制器
Reflection Controller

信息选择器
Information Selector

反思触发器
Reflection Trigger

反思执行器
Reflection Executor

结果整合器
Result Integrator

信息价值评估
IV Evaluation

触发条件判断
Trigger Condition

反思策略选择
Strategy Selection

反思结果应用
Result Application

各核心要素的功能:

  1. 反思控制器:协调整个反思过程,决定何时、如何进行反思
  2. 信息选择器:从大量信息中筛选出对反思有价值的部分
  3. 反思触发器:根据预设条件判断是否需要触发反思
  4. 反思执行器:执行具体的反思过程,生成反思结果
  5. 结果整合器:将反思结果应用到Agent的后续行为中

这些要素相互配合,共同构成一个完整的反思系统。在接下来的章节中,我们将详细讨论如何设计和实现这些要素,以实现高效的Token利用。


环境准备

在开始实现之前,让我们先准备好开发环境。本文将使用Python作为主要编程语言,并结合一些常用的库来构建我们的反思系统。

软件与库要求

软件/库 版本要求 用途
Python >= 3.8 主要编程语言
OpenAI API 最新版 调用大语言模型
LangChain >= 0.1.0 Agent开发框架
Pydantic >= 2.0 数据验证与设置管理
python-dotenv >= 1.0 环境变量管理
tiktoken 最新版 Token计数

安装依赖

创建一个新的Python虚拟环境,并安装所需的库:

# 创建虚拟环境
python -m venv efficient-reflection-env

# 激活虚拟环境
# Windows
efficient-reflection-env\Scripts\activate
# Linux/Mac
source efficient-reflection-env/bin/activate

# 安装依赖
pip install openai langchain pydantic python-dotenv tiktoken

项目结构

我们将采用以下项目结构:

efficient-reflection/
├── .env.example
├── requirements.txt
├── src/
│   ├── __init__.py
│   ├── agent/
│   │   ├── __init__.py
│   │   ├── base_agent.py
│   │   └── reflective_agent.py
│   ├── reflection/
│   │   ├── __init__.py
│   │   ├── controller.py
│   │   ├── selector.py
│   │   ├── trigger.py
│   │   ├── executor.py
│   │   └── integrator.py
│   ├── utils/
│   │   ├── __init__.py
│   │   ├── token_counter.py
│   │   └── config.py
│   └── examples/
│       ├── __init__.py
│       └── simple_task.py
└── README.md

配置环境变量

复制.env.example.env,并填入你的API密钥:

OPENAI_API_KEY=your_api_key_here
OPENAI_MODEL_NAME=gpt-4-turbo-preview  # 或其他模型

分步实现

现在,让我们开始逐步实现我们的高效反思系统。我们将从基础组件开始,逐步构建完整的系统。

步骤1:实现Token计数工具

首先,我们需要一个准确的Token计数工具,以便我们能够测量和优化Token消耗。

# src/utils/token_counter.py
import tiktoken
from typing import List, Any


class TokenCounter:
    """Token计数器,用于计算文本和消息的Token数量"""
    
    def __init__(self, model_name: str = "gpt-4"):
        """
        初始化Token计数器
        
        Args:
            model_name: 使用的模型名称,影响编码方式
        """
        try:
            self.encoding = tiktoken.encoding_for_model(model_name)
        except KeyError:
            # 如果模型不被tiktoken直接支持,使用cl100k_base作为默认
            self.encoding = tiktoken.get_encoding("cl100k_base")
    
    def count_text_tokens(self, text: str) -> int:
        """
        计算单个文本的Token数量
        
        Args:
            text: 待计算的文本
            
        Returns:
            Token数量
        """
        return len(self.encoding.encode(text))
    
    def count_messages_tokens(self, messages: List[dict]) -> int:
        """
        计算消息列表的Token数量
        
        Args:
            messages: 消息列表,格式为OpenAI API要求的格式
            
        Returns:
            Token数量
        """
        # 参考OpenAI的Token计算方法
        num_tokens = 0
        for message in messages:
            # 每个消息的格式开销
            num_tokens += 4
            for key, value in message.items():
                num_tokens += len(self.encoding.encode(str(value)))
                if key == "name":
                    num_tokens += 1  # 名称的额外开销
        
        # 每次回复的基础开销
        num_tokens += 3
        return num_tokens
    
    def count_chain_tokens(self, inputs: Any, outputs: Any) -> int:
        """
        计算LangChain链的Token消耗
        
        Args:
            inputs: 链的输入
            outputs: 链的输出
            
        Returns:
            Token数量
        """
        # 简化实现,实际应用中可能需要更复杂的处理
        input_text = str(inputs)
        output_text = str(outputs)
        return self.count_text_tokens(input_text) + self.count_text_tokens(output_text)

这个Token计数器将帮助我们在后续的实现中准确测量不同反思策略的Token消耗。

步骤2:实现信息选择器

信息选择器是高效反思的关键组件之一,它负责从大量信息中筛选出对反思最有价值的部分。

# src/reflection/selector.py
from typing import List, Dict, Any, Tuple
from abc import ABC, abstractmethod
from ..utils.token_counter import TokenCounter


class BaseInformationSelector(ABC):
    """信息选择器基类"""
    
    @abstractmethod
    def select(self, information: List[Dict[str, Any]], context: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        从信息列表中选择最有价值的信息
        
        Args:
            information: 待选择的信息列表
            context: 当前上下文信息
            
        Returns:
            选择后的信息列表
        """
        pass


class IVBasedSelector(BaseInformationSelector):
    """基于信息价值(IV)的选择器"""
    
    def __init__(self, token_counter: TokenCounter, max_tokens: int = 2000):
        """
        初始化IV选择器
        
        Args:
            token_counter: Token计数器
            max_tokens: 最大允许的Token数量
        """
        self.token_counter = token_counter
        self.max_tokens = max_tokens
    
    def _calculate_relevance(self, info: Dict[str, Any], context: Dict[str, Any]) -> float:
        """
        计算信息与上下文的相关性
        
        Args:
            info: 待评估的信息
            context: 当前上下文
            
        Returns:
            相关性分数 (0-1)
        """
        # 简化的相关性计算,实际应用中可以使用更复杂的方法
        # 如语义相似度计算、关键词匹配等
        info_text = info.get("content", "")
        context_text = context.get("current_task", "") + " " + context.get("reflection_goal", "")
        
        # 简单的关键词匹配
        info_keywords = set(info_text.lower().split())
        context_keywords = set(context_text.lower().split())
        
        if not context_keywords:
            return 0.5  # 默认相关性
        
        intersection = info_keywords.intersection(context_keywords)
        return len(intersection) / len(context_keywords)
    
    def _calculate_uniqueness(self, info: Dict[str, Any], selected_infos: List[Dict[str, Any]]) -> float:
        """
        计算信息相对于已选信息的独特性
        
        Args:
            info: 待评估的信息
            selected_infos: 已选择的信息列表
            
        Returns:
            独特性分数 (0-1)
        """
        if not selected_infos:
            return 1.0  # 如果还没有选择任何信息,独特性为1
        
        info_text = info.get("content", "")
        info_keywords = set(info_text.lower().split())
        
        max_overlap = 0
        for selected_info in selected_infos:
            selected_text = selected_info.get("content", "")
            selected_keywords = set(selected_text.lower().split())
            
            if not info_keywords:
                overlap = 0
            else:
                intersection = info_keywords.intersection(selected_keywords)
                overlap = len(intersection) / len(info_keywords)
            
            max_overlap = max(max_overlap, overlap)
        
        return 1.0 - max_overlap  # 重叠越少,独特性越高
    
    def _calculate_iv(self, info: Dict[str, Any], context: Dict[str, Any], 
                     selected_infos: List[Dict[str, Any]]) -> float:
        """
        计算信息的价值(IV)
        
        Args:
            info: 待评估的信息
            context: 当前上下文
            selected_infos: 已选择的信息列表
            
        Returns:
            信息价值分数
        """
        relevance = self._calculate_relevance(info, context)
        uniqueness = self._calculate_uniqueness(info, selected_infos)
        
        # 可以根据需要调整权重
        return relevance * 0.7 + uniqueness * 0.3
    
    def select(self, information: List[Dict[str, Any]], context: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        使用贪心算法选择IV最高的信息,同时不超过Token限制
        
        Args:
            information: 待选择的信息列表
            context: 当前上下文信息
            
        Returns:
            选择后的信息列表
        """
        # 复制列表以避免修改原始数据
        remaining_infos = information.copy()
        selected_infos = []
        total_tokens = 0
        
        while remaining_infos and total_tokens < self.max_tokens:
            # 计算剩余信息的IV
            iv_scores = []
            for info in remaining_infos:
                iv = self._calculate_iv(info, context, selected_infos)
                iv_scores.append((info, iv))
            
            # 按IV排序
            iv_scores.sort(key=lambda x: x[1], reverse=True)
            
            # 选择IV最高的信息
            best_info, best_iv = iv_scores[0]
            
            # 计算该信息的Token数量
            info_text = best_info.get("content", "")
            info_tokens = self.token_counter.count_text_tokens(info_text)
            
            # 检查是否超过Token限制
            if total_tokens + info_tokens > self.max_tokens:
                # 如果加入这个信息会超过限制,尝试下一个
                remaining_infos.remove(best_info)
                continue
            
            # 选择这个信息
            selected_infos.append(best_info)
            remaining_infos.remove(best_info)
            total_tokens += info_tokens
        
        return selected_infos

这个选择器使用了贪心算法,基于信息价值(IV)来选择最有价值的信息,同时确保不会超过Token限制。在实际应用中,你可以根据需要优化_calculate_relevance_calculate_uniqueness方法,例如使用语义相似度模型来更准确地评估信息价值。

步骤3:实现反思触发器

反思触发器负责决定何时应该进行反思,避免不必要的Token消耗。

# src/reflection/trigger.py
from typing import Dict, Any
from abc import ABC, abstractmethod
from enum import Enum


class ReflectionLevel(Enum):
    """反思级别枚举"""
    INSTANT = "instant"  # 即时反思
    STEP = "step"        # 步骤反思
    STAGE = "stage"      # 阶段反思
    GLOBAL = "global"    # 全局反思


class BaseReflectionTrigger(ABC):
    """反思触发器基类"""
    
    @abstractmethod
    def should_reflect(self, context: Dict[str, Any]) -> tuple[bool, ReflectionLevel]:
        """
        判断是否应该进行反思,以及应该进行什么级别的反思
        
        Args:
            context: 当前上下文信息
            
        Returns:
            (是否应该反思, 反思级别)
        """
        pass


class MultiConditionTrigger(BaseReflectionTrigger):
    """基于多条件的反思触发器"""
    
    def __init__(self):
        """初始化多条件触发器"""
        # 各条件的权重
        self.condition_weights = {
            "confidence_drop": 0.3,
            "task_complexity": 0.2,
            "steps_since_last": 0.2,
            "result_uncertainty": 0.2,
            "error_occurred": 0.1
        }
        
        # 各反思级别的阈值
        self.level_thresholds = {
            ReflectionLevel.INSTANT: 0.3,
            ReflectionLevel.STEP: 0.5,
            ReflectionLevel.STAGE: 0.7,
            ReflectionLevel.GLOBAL: 0.9
        }
    
    def _calculate_confidence_drop(self, context: Dict[str, Any]) -> float:
        """
        计算信心下降程度
        
        Args:
            context: 当前上下文
            
        Returns:
            信心下降分数 (0-1)
        """
        current_confidence = context.get("current_confidence", 1.0)
        previous_confidence = context.get("previous_confidence", 1.0)
        
        if previous_confidence == 0:
            return 0
        
        drop = (previous_confidence - current_confidence) / previous_confidence
        return max(0, drop)
    
    def _calculate_task_complexity(self, context: Dict[str, Any]) -> float:
        """
        计算任务复杂度
        
        Args:
            context: 当前上下文
            
        Returns:
            任务复杂度分数 (0-1)
        """
        # 简化实现,可以根据任务类型、步骤数、不确定性等因素计算
        task_type = context.get("task_type", "simple")
        complexity_map = {
            "simple": 0.2,
            "medium": 0.5,
            "complex": 0.8,
            "very_complex": 1.0
        }
        return complexity_map.get(task_type, 0.5)
    
    def _calculate_steps_since_last(self, context: Dict[str, Any]) -> float:
        """
        计算距离上次反思的步骤数
        
        Args:
            context: 当前上下文
            
        Returns:
            步骤相关分数 (0-1)
        """
        steps_since_last = context.get("steps_since_last_reflection", 0)
        max_steps = context.get("max_steps_without_reflection", 10)
        
        return min(1.0, steps_since_last / max_steps)
    
    def _calculate_result_uncertainty(self, context: Dict[str, Any]) -> float:
        """
        计算结果不确定性
        
        Args:
            context: 当前上下文
            
        Returns:
            不确定性分数 (0-1)
        """
        return context.get("result_uncertainty", 0.5)
    
    def _check_error_occurred(self, context: Dict[str, Any]) -> float:
        """
        检查是否发生了错误
        
        Args:
            context: 当前上下文
            
        Returns:
            错误分数 (0-1)
        """
        return 1.0 if context.get("error_occurred", False) else 0.0
    
    def should_reflect(self, context: Dict[str, Any]) -> tuple[bool, ReflectionLevel]:
        """
        判断是否应该进行反思,以及应该进行什么级别的反思
        
        Args:
            context: 当前上下文信息
            
        Returns:
            (是否应该反思, 反思级别)
        """
        # 计算各条件的分数
        condition_scores = {
            "confidence_drop": self._calculate_confidence_drop(context),
            "task_complexity": self._calculate_task_complexity(context),
            "steps_since_last": self._calculate_steps_since_last(context),
            "result_uncertainty": self._calculate_result_uncertainty(context),
            "error_occurred": self._check_error_occurred(context)
        }
        
        # 计算加权总分
        total_score = 0.0
        for condition, weight in self.condition_weights.items():
            total_score += condition_scores[condition] * weight
        
        # 确定反思级别
        reflection_level = None
        should_trigger = False
        
        # 按阈值从高到低检查
        for level in sorted(self.level_thresholds.keys(), 
                           key=lambda x: self.level_thresholds[x], 
                           reverse=True):
            if total_score >= self.level_thresholds[level]:
                reflection_level = level
                should_trigger = True
                break
        
        return should_trigger, reflection_level

这个触发器考虑了多个因素来决定是否进行反思以及反思的级别,避免了过度反思或反思不足的问题。

步骤4:实现反思执行器

反思执行器负责执行具体的反思过程,根据不同的反思级别采用不同的策略。

# src/reflection/executor.py
from typing import Dict, Any, List
from abc import ABC, abstractmethod
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.chat_models import ChatOpenAI
from .trigger import ReflectionLevel


class BaseReflectionExecutor(ABC):
    """反思执行器基类"""
    
    @abstractmethod
    def execute(self, level: ReflectionLevel, selected_info: List[Dict[str, Any]], 
                context: Dict[str, Any]) -> Dict[str, Any]:
        """
        执行反思过程
        
        Args:
            level: 反思级别
            selected_info: 选择的信息
            context: 当前上下文
            
        Returns:
            反思结果
        """
        pass


class LevelBasedExecutor(BaseReflectionExecutor):
    """基于反思级别的执行器"""
    
    def __init__(self, llm: ChatOpenAI):
        """
        初始化基于级别的执行器
        
        Args:
            llm: 语言模型实例
        """
        self.llm = llm
        self.chains = self._build_chains()
    
    def _build_chains(self) -> Dict[ReflectionLevel, LLMChain]:
        """
        构建不同反思级别的链
        
        Returns:
            反射级别到链的映射
        """
        # 即时反思 - 快速检查当前步骤
        instant_prompt = PromptTemplate(
            input_variables=["task", "current_step", "result"],
            template="""你是一个AI助手的即时反思模块。请快速检查当前执行步骤是否有明显问题。

当前任务: {task}
当前步骤: {current_step}
执行结果: {result}

请用简短的语言回答:
1. 这一步是否成功执行?
2. 是否有任何需要立即注意的问题?
3. 如果有问题,建议如何修正?

请保持回答简洁,不要超过3句话。"""
        )
        
        # 步骤反思 - 评估单个步骤并规划下一步
        step_prompt = PromptTemplate(
            input_variables=["task", "step_history", "current_result", "next_steps_plan"],
            template="""你是一个AI助手的步骤反思模块。请评估刚刚完成的步骤,并优化后续计划。

任务目标: {task}
已完成的步骤:
{step_history}

当前步骤结果:
{current_result}

原定后续步骤:
{next_steps_plan}

请提供:
1. 当前步骤的执行质量评估(1-10分)
2. 当前步骤的关键成功/失败因素
3. 对后续步骤的调整建议
4. 更新后的步骤计划

请保持回答结构化,但避免不必要的冗长。"""
        )
        
        # 阶段反思 - 评估整个阶段的进展
        stage_prompt = PromptTemplate(
            input_variables=["task", "stage_goal", "completed_work", "remaining_work", "issues_encountered"],
            template="""你是一个AI助手的阶段反思模块。请评估当前阶段的工作进展。

整体任务: {task}
当前阶段目标: {stage_goal}

已完成的工作:
{completed_work}

剩余工作:
{remaining_work}

遇到的主要问题:
{issues_encountered}

请提供:
1. 阶段目标完成度评估(百分比)
2. 工作方法的有效性分析
3. 遇到的问题的根本原因分析
4. 下一阶段的工作策略调整建议

请确保分析有深度,但保持语言简洁。"""
        )
        
        # 全局反思 - 全面评估整个任务
        global_prompt = PromptTemplate(
            input_variables=["task", "full_history", "final_result", "success_criteria"],
            template="""你是一个AI助手的全局反思模块。请对整个任务执行过程进行全面评估。

任务描述: {task}
成功标准: {success_criteria}

完整执行历史:
{full_history}

最终结果:
{final_result}

请提供全面的反思报告,包括:
1. 任务完成情况评估(与成功标准对比)
2. 整体执行策略的优点和缺点
3. 关键决策点的分析和改进建议
4. 从这次任务中获得的经验教训
5. 未来类似任务的执行建议

请确保报告全面、有深度,但避免冗余。"""
        )
        
        return {
            ReflectionLevel.INSTANT: LLMChain(llm=self.llm, prompt=instant_prompt),
            ReflectionLevel.STEP: LLMChain(llm=self.llm, prompt=step_prompt),
            ReflectionLevel.STAGE: LLMChain(llm=self.llm, prompt=stage_prompt),
            ReflectionLevel.GLOBAL: LLMChain(llm=self.llm, prompt=global_prompt)
        }
    
    def _prepare_inputs(self, level: ReflectionLevel, selected_info: List[Dict[str, Any]], 
                       context: Dict[str, Any]) -> Dict[str, str]:
        """
        为不同级别的反思准备输入
        
        Args:
            level: 反思级别
            selected_info: 选择的信息
            context: 当前上下文
            
        Returns:
            格式化的输入
        """
        # 将选择的信息整理成文本
        info_texts = [info.get("content", "") for info in selected_info]
        
        if level == ReflectionLevel.INSTANT:
            return {
                "task": context.get("current_task", ""),
                "current_step": context.get("current_step", ""),
                "result": "\n".join(info_texts)
            }
        
        elif level == ReflectionLevel.STEP:
            return {
                "task": context.get("current_task", ""),
                "step_history": context.get("step_history", ""),
                "current_result": "\n".join(info_texts),
                "next_steps_plan": context.get("next_steps_plan", "")
            }
        
        elif level == ReflectionLevel.STAGE:
            return {
                "task": context.get("current_task", ""),
                "stage_goal": context.get("stage_goal", ""),
                "completed_work": context.get("completed_work", ""),
                "remaining_work": context.get("remaining_work", ""),
                "issues_encountered": "\n".join(info_texts)
            }
        
        elif level == ReflectionLevel.GLOBAL:
            return {
                "task": context.get("current_task", ""),
                "full_history": context.get("full_history", ""),
                "final_result": "\n".join(info_texts),
                "success_criteria": context.get("success_criteria", "")
            }
        
        return {}
    
    def execute(self, level: ReflectionLevel, selected_info: List[Dict[str, Any]], 
                context: Dict[str, Any]) -> Dict[str, Any]:
        """
        执行反思过程
        
        Args:
            level: 反思级别
            selected_info: 选择的信息
            context: 当前上下文
            
        Returns:
            反思结果
        """
        if level not in self.chains:
            return {"error": f"Unsupported reflection level: {level}"}
        
        # 准备输入
        inputs = self._prepare_inputs(level, selected_info, context)
        
        # 执行反思
        chain = self.chains[level]
        result = chain.run(**inputs)
        
        return {
            "level": level.value,
            "result": result,
            "timestamp": context.get("timestamp", "")
        }

这个执行器为不同级别的反思提供了不同的Prompt模板,确保反思的深度和详细程度与反思级别相匹配,从而在保证反思质量的同时控制Token消耗。

步骤5:实现结果整合器

结果整合器负责将反思结果应用到Agent的后续行为中,完成反思的闭环。

# src/reflection/integrator.py
from typing import Dict, Any, List
from abc import ABC, abstractmethod
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.chat_models import ChatOpenAI
from .trigger import ReflectionLevel


class BaseResultIntegrator(ABC):
    """结果整合器基类"""
    
    @abstractmethod
    def integrate(self, reflection_result: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
        """
        整合反思结果,更新上下文和Agent状态
        
        Args:
            reflection_result: 反思结果
            context: 当前上下文
            
        Returns:
            更新后的上下文
        """
        pass


class SmartResultIntegrator(BaseResultIntegrator):
    """智能结果整合器"""
    
    def __init__(self, llm: ChatOpenAI):
        """
        初始化智能结果整合器
        
        Args:
            llm: 语言模型实例
        """
        self.llm = llm
        self.update_chain = self._build_update_chain()
    
    def _build_update_chain(self) -> LLMChain:
        """
        构建更新链
        
        Returns:
            更新链
        """
        update_prompt = PromptTemplate(
            input_variables=["current_context", "reflection_result", "reflection_level"],
            template="""你是一个AI助手的结果整合模块。请根据反思结果更新Agent的上下文。

当前上下文:
{current_context}

反思级别: {reflection_level}
反思结果:
{reflection_result}

请分析反思结果,并提供对上下文的更新建议。请以JSON格式返回,包含以下字段:
1. "updated_plan": 更新后的执行计划(如果需要修改)
2. "key_insights": 从反思中获得的关键洞见(列表形式)
3. "adjusted_confidence": 调整后的信心水平(0-1之间的小数)
4. "next_action": 建议的下一步行动
5. "context_updates": 其他需要更新的上下文字段(对象形式)

请只返回JSON,不要包含其他解释性文字。"""
        )
        
        return LLMChain(llm=self.llm, prompt=update_prompt)
    
    def _extract_structured_updates(self, reflection_result: Dict[str, Any], 
                                   context: Dict[str, Any]) -> Dict[str, Any]:
        """
        从反思结果中提取结构化更新
        
        Args:
            reflection_result: 反思结果
            context: 当前上下文
            
        Returns:
            结构化更新
        """
        # 首先尝试直接从反思结果中提取结构化信息
        level = reflection_result.get("level", "")
        result_text = reflection_result.get("result", "")
        
        # 准备当前上下文的文本表示
        context_text = "\n".join([f"{k}: {v}" for k, v in context.items() 
                                if not isinstance(v, (dict, list))])
        
        # 使用LLM提取结构化更新
        try:
            update_input = {
                "current_context": context_text,
                "reflection_result": result_text,
                "reflection_level": level
            }
            
            update_output = self.update_chain.run(**update_input)
            
            # 尝试解析JSON输出
            import json
            return json.loads(update_output)
        
        except Exception as e:
            # 如果解析失败,返回一个基本的更新
            return {
                "updated_plan": context.get("next_steps_plan", ""),
                "key_insights": ["反思执行完成,但未能提取结构化更新"],
                "adjusted_confidence": context.get("current_confidence", 0.5),
                "next_action": "继续执行原定计划",
                "context_updates": {}
            }
    
    def _apply_updates(self, context: Dict[str, Any], structured_updates: Dict[str, Any]) -> Dict[str, Any]:
        """
        应用结构化更新到上下文
        
        Args:
            context: 原始上下文
            structured_updates: 结构化更新
            
        Returns:
            更新后的上下文
        """
        # 创建上下文的副本以避免修改原始数据
        updated_context = context.copy()
        
        # 更新计划
        if "updated_plan" in structured_updates:
            updated_context["next_steps_plan"] = structured_updates["updated_plan"]
        
        # 添加洞见到记忆
        if "key_insights" in structured_updates and structured_updates["key_insights"]:
            if "reflection_insights" not in updated_context:
                updated_context["reflection_insights"] = []
            
            for insight in structured_updates["key_insights"]:
                updated_context["reflection_insights"].append({
                    "insight": insight,
                    "timestamp": updated_context.get("timestamp", ""),
                    "source": "reflection"
                })
        
        # 更新信心水平
        if "adjusted_confidence" in structured_updates:
            updated_context["previous_confidence"] = updated_context.get("current_confidence", 0.5)
            updated_context["current_confidence"] = structured_updates["adjusted_confidence"]
        
        # 更新下一步行动
        if "next_action" in structured_updates:
            updated_context["next_action"] = structured_updates["next_action"]
        
        # 应用其他上下文更新
        if "context_updates" in structured_updates:
            for key, value in structured_updates["context_updates"].items():
                updated_context[key] = value
        
        # 重置反思相关计数器
        updated_context["steps_since_last_reflection"] = 0
        updated_context["last_reflection_timestamp"] = updated_context.get("timestamp", "")
        
        return updated_context
    
    def integrate(self, reflection_result: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
        """
        整合反思结果,更新上下文和Agent状态
        
        Args:
            reflection_result: 反思结果
            context: 当前上下文
            
        Returns:
            更新后的上下文
        """
        # 提取结构化更新
        structured_updates = self._extract_structured_updates(reflection_result, context)
        
        # 应用更新
        updated_context = self._apply_updates(context, structured_updates)
        
        # 添加反思记录
        if "reflection_history" not in updated_context:
            updated_context["reflection_history"] = []
        
        updated_context["reflection_history"].append({
            "level": reflection_result.get("level", ""),
            "result": reflection_result.get("result", ""),
            "structured_updates": structured_updates,
            "timestamp": updated_context.get("timestamp", "")
        })
        
        return updated_context

这个整合器不仅将反思结果应用到Agent的当前行为中,还会将有价值的洞见保存到记忆中,为未来的决策提供参考。

步骤6:实现反思控制器

最后,我们将所有组件整合到反思控制器中,形成完整的反思系统。

# src/reflection/controller.py
from typing import Dict, Any, List
from datetime import datetime
from .selector import BaseInformationSelector, IVBasedSelector
from .trigger import BaseReflectionTrigger, MultiConditionTrigger, ReflectionLevel
from .executor import BaseReflectionExecutor, LevelBasedExecutor
from .integrator import BaseResultIntegrator, SmartResultIntegrator
from ..utils.token_counter import TokenCounter
from langchain.chat_models import ChatOpenAI


class ReflectionController:
    """反思控制器,协调整个反思过程"""
    
    def __init__(self, llm: ChatOpenAI, config: Dict[str, Any] = None):
        """
        初始化反思控制器
        
        Args:
            llm: 语言模型实例
            config: 配置参数
        """
        self.config = config or {}
        
        # 初始化Token计数器
        token_model = self.config.get("token_model", "gpt-4")
        self.token_counter = TokenCounter(token_model)
        
        # 初始化各个组件
        max_reflection_tokens = self.config.get("max_reflection_tokens", 2000)
        self.selector = IVBasedSelector(self.token_counter, max_reflection_tokens)
        self.trigger = MultiConditionTrigger()
        self.executor = LevelBasedExecutor(llm)
        self.integrator = SmartResultIntegrator(llm)
        
        # 统计信息
        self.stats = {
            "total_reflections": 0,
            "reflections_by_level": {level.value: 0 for level in ReflectionLevel},
            "total_tokens_used": 0,
            "tokens_by_level": {level.value: 0 for level in ReflectionLevel}
        }
    
    def _collect_information(self, context: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        收集可能用于反思的信息
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐