1. 背景与需求分析

在自主智能体(Autonomous Agent)领域,React(Reasoning+Action)模式因其清晰的"思考-行动"循环而备受关注。本文基于改进版的BabyAGI代码,详细介绍如何使用LangChain框架实现一个完整的React模型,该模型能够:

  1. 通过思考分解任务并制定计划
  2. 使用工具执行具体操作(如搜索、计算)
  3. 基于结果反思并调整后续行动

特别适合需要自主决策的场景,如数据分析、报告生成、智能客服等。

2. 系统架构设计

2.1 核心组件

目标输入
任务分解
任务优先级排序
任务执行
结果存储
新任务生成
目标达成?
输出结果

2.2 技术选型

组件 技术方案 备选方案
框架核心 LangChain AutoGPT, LangFlow
大语言模型 DeepSeek/Qianfan GPT-4, Claude
工具系统 LangChain Tool 自定义API调用
记忆存储 FAISS向量数据库 Pinecone, Chroma
任务管理 优先队列(deque) 数据库存储

3. 核心实现代码

3.1 基础组件定义

from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
from collections import deque
import faiss
from langchain_community.vectorstores import FAISS
from langchain_community.docstore import InMemoryDocstore
from langchain_core.language_models import BaseChatModel
from langchain_core.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.agents import ZeroShotAgent, AgentExecutor, Tool
 
class TaskCreationChain(LLMChain):
    """任务生成模块:根据执行结果创建新任务"""
    
    @classmethod
    def from_llm(cls, llm: BaseChatModel) -> LLMChain:
        prompt = PromptTemplate(
            template="""
            你是一个专业项目经理,需要根据当前任务结果创建后续任务。
            
            当前目标: {objective}
            已完成任务: {task_description}
            执行结果: {result}
            待完成列表: {incomplete_tasks}
            
            请基于以上信息生成3个以内的新任务(JSON数组格式):
            """,
            input_variables=["objective", "task_description", "result", "incomplete_tasks"]
        )
        return cls(prompt=prompt, llm=llm)
 
class TaskPrioritizationChain(LLMChain):
    """任务排序模块:优化任务执行顺序"""
    
    @classmethod
    def from_llm(cls, llm: BaseChatModel) -> LLMChain:
        prompt = PromptTemplate(
            template="""
            你是一个高效的任务规划师,需要优化任务执行顺序。
            
            当前目标: {objective}
            待处理任务: {task_names}
            
            请按优先级重新排序(从{next_task_id}开始编号):
            """,
            input_variables=["objective", "task_names", "next_task_id"]
        )
        return cls(prompt=prompt, llm=llm)

3.2 工具系统集成

def get_tools(llm: BaseChatModel) -> List[Tool]:
    """配置智能体可用工具集"""
    
    # 搜索工具(支持网页检索)
    search = MySearchWrapper()
    
    # 待办事项生成工具
    todo_prompt = PromptTemplate.from_template(
        "为以下目标生成详细执行计划:{objective}"
    )
    todo_chain = LLMChain(llm=llm, prompt=todo_prompt)
    
    return [
        Tool(
            name="Search",
            func=search.run,
            description="用于获取实时信息(如股价、新闻等)"
        ),
        Tool(
            name="TODO",
            func=todo_chain.invoke,
            description="生成任务执行计划。输入:目标描述。输出:详细步骤"
        ),
        Tool(
            name="Calculator",
            func=lambda x: str(eval(x)),
            description="执行数学计算(如跌幅计算:(100-90)/100)"
        )
    ]
 
def get_react_prompt(tools: List[Tool]) -> PromptTemplate:
    """创建React风格提示模板"""
    
    tool_strings = "\n".join([
        f"- {tool.name}: {tool.description}" for tool in tools
    ])
    
    template = """
    你是一个自主智能体,需要完成以下目标:{objective}
    
    **可用工具**:
    {tool_descriptions}
    
    **执行规范**:
    1. 先思考(Thought)如何完成任务
    2. 选择合适工具(Action)
    3. 提供工具输入(Action Input)
    
    **响应格式**:
    Thought: [详细思考过程]
    Action: [工具名称]
    Action Input: [JSON或文本输入]
    
    **历史上下文**:
    {context}
    
    当前任务:{task}
    
    {agent_scratchpad}
    """
    
    return PromptTemplate.from_template(template)

搜索工具

from bs4 import BeautifulSoup
import json
from pipes import quote

class MySearchWrapper:
    """建议搜索请求"""

    def run(self, query: str) -> str:
        print("正在使用搜索...")
        keyword = json.loads(query)["query"]
        query = quote(keyword)
        # 输出 {"query": "GoPro Hero 12 Black 最新价格 京东 天猫 淘宝 拼多多 2024"}
        url = f"https://www.bing.com/search?q={query}"
        headers = {"User-Agent": "Mozilla/5.0"}
        response = requests.get(url, headers=headers)
        soup = BeautifulSoup(response.text, "html.parser")
        results = soup.find_all("li", class_="b_algo")[:3]  # 取前3条结果
        list_hrefs = "\n".join([result.find("a")['href'] for result in results if result.find("a") is not None])
        #print("查询结果",list_hrefs)
        return list_hrefs
        # results = self.search_with_engine("baidu", query)
        # return "\n".join([result.find("a").text for result in results])

3.3 核心执行引擎

class ReactAgent(BaseModel):
    """React模式智能体实现"""
    
    task_queue: deque = Field(default_factory=deque)
    task_creator: TaskCreationChain = Field(...)
    task_prioritizer: TaskPrioritizationChain = Field(...)
    executor: AgentExecutor = Field(...)
    vectorstore: Any = Field(...)  # 用于记忆存储
    max_iterations: int = 10
    current_iteration: int = 0
    
    def add_task(self, task: Dict):
        """添加任务到队列"""
        self.task_queue.append(task)
    
    def execute_task(self, objective: str, task: str) -> Dict:
        """执行单个任务"""
        # 获取相关记忆
        context = self._get_relevant_memories(objective)
        
        # 执行任务
        result = self.executor.invoke({
            "objective": objective,
            "context": context,
            "task": task
        })
        
        # 存储结果
        self._store_memory(task, result["output"])
        return result
    
    def _get_relevant_memories(self, query: str, k: int = 3) -> List[str]:
        """获取相关记忆"""
        results = self.vectorstore.similarity_search(query, k=k)
        return [doc.page_content for doc in results]
    
    def _store_memory(self, task: str, content: str):
        """存储任务结果"""
        self.vectorstore.add_texts(
            texts=[content],
            metadatas=[{"task": task}]
        )
    
    def run(self, objective: str) -> Dict:
        """主运行循环"""
        self.current_iteration = 0
        
        # 添加初始任务
        self.add_task({"task_id": 1, "task_name": "制定初始执行计划"})
        
        while self.task_queue and self.current_iteration < self.max_iterations:
            # 获取并打印当前任务
            task = self.task_queue.popleft()
            print(f"\n=== 执行任务 #{task['task_id']}: {task['task_name']} ===")
            
            # 执行任务
            result = self.execute_task(objective, task["task_name"])
            print(f"执行结果:\n{result['output']}")
            
            # 生成新任务
            new_tasks = self._generate_new_tasks(
                objective, task["task_name"], result["output"]
            )
            
            # 添加新任务并重新排序
            for new_task in new_tasks:
                self.add_task({"task_id": len(self.task_queue)+1, "task_name": new_task})
            self._prioritize_tasks(objective)
            
            self.current_iteration += 1
        
        return {"status": "completed", "iterations": self.current_iteration}
    
    def _generate_new_tasks(self, objective: str, completed_task: str, result: str) -> List[str]:
        """基于执行结果生成新任务"""
        incomplete_tasks = [t["task_name"] for t in self.task_queue]
        response = self.task_creator.invoke({
            "objective": objective,
            "task_description": completed_task,
            "result": result,
            "incomplete_tasks": incomplete_tasks
        })
        return RobustJsonParser.parseToJson(response["text"])
    
    def _prioritize_tasks(self, objective: str):
        """重新排序任务队列"""
        task_names = [t["task_name"] for t in self.task_queue]
        if not task_names:
            return
            
        next_id = max([t["task_id"] for t in self.task_queue]) + 1
        response = self.task_prioritizer.invoke({
            "objective": objective,
            "task_names": task_names,
            "next_task_id": next_id
        })
        
        # 解析并更新任务顺序
        new_order = []
        for line in response["text"].split("\n"):
            if "." in line:
                task_id, task_name = line.split(".", 1)
                new_order.append({
                    "task_id": int(task_id.strip()),
                    "task_name": task_name.strip()
                })
        
        # 保持deque顺序
        self.task_queue = deque(new_order)

4. 系统初始化与运行

4.1 初始化配置

def init_vectorstore(embedding_model_name: str = "zhipu") -> FAISS:
    """初始化向量存储"""
    from lang_chain_qa.pdf_qa_system import getEmbeeding
    
    embeddings = getEmbeeding(embedding_model_name)
    dimension = len(embeddings.embed_query("测试文本"))
    
    # 创建FAISS索引
    index = faiss.IndexFlatL2(dimension)
    return FAISS(
        embeddings.embed_query,
        index,
        InMemoryDocstore({}),
        {}
    )
 
def create_react_agent(llm: BaseChatModel, vectorstore: FAISS) -> ReactAgent:
    """创建React智能体实例"""
    
    # 初始化核心组件
    task_creator = TaskCreationChain.from_llm(llm)
    task_prioritizer = TaskPrioritizationChain.from_llm(llm)
    
    # 配置工具系统
    tools = get_tools(llm)
    prompt = get_react_prompt(tools)
    
    # 创建执行器
    llm_chain = LLMChain(llm=llm, prompt=prompt)
    agent = ZeroShotAgent(llm_chain=llm_chain, allowed_tools=[t.name for t in tools])
    executor = AgentExecutor.from_agent_and_tools(
        agent=agent,
        tools=tools,
        verbose=True,
        max_iterations=3
    )
    
    return ReactAgent(
        task_creator=task_creator,
        task_prioritizer=task_prioritizer,
        executor=executor,
        vectorstore=vectorstore,
        max_iterations=10
    )

4.2 完整运行示例

if __name__ == "__main__":
    # 1. 初始化组件
    llm = getLLM("deepseek")  # 使用DeepSeek模型
    vectorstore = init_vectorstore()
    agent = create_react_agent(llm, vectorstore)
    
    # 2. 设置运行目标
    objective = "分析特斯拉最近一年的股价波动,并计算最大跌幅"
    
    # 3. 添加初始任务
    initial_tasks = [
        "收集特斯拉过去一年股价数据",
        "计算每日涨跌幅",
        "找出最大跌幅及发生日期"
    ]
    for idx, task in enumerate(initial_tasks, 1):
        agent.add_task({"task_id": idx, "task_name": task})
    
    # 4. 运行智能体
    print(f"\n=== 开始执行目标: {objective} ===")
    result = agent.run(objective)
    
    # 5. 输出最终结果
    print("\n=== 执行完成 ===")
    print(f"总迭代次数: {result['iterations']}")
    print("最终记忆存储:")
    for doc in vectorstore.docstore._dict.values():
        print(f"- {doc.metadata['task']}: {doc.page_content[:50]}...")

5. 关键特性实现

5.1 增强的错误处理

def robust_task_execution(agent: ReactAgent, objective: str, task: str, max_retries: int = 2) -> Dict:
    """带重试机制的任务执行"""
    last_error = None
    
    for attempt in range(max_retries):
        try:
            print(f"尝试 {attempt + 1}/{max_retries} 执行任务: {task}")
            result = agent.execute_task(objective, task)
            
            # 简单验证结果有效性
            if "output" in result and result["output"]:
                return result
                
            raise ValueError("空结果返回")
            
        except Exception as e:
            last_error = str(e)
            print(f"执行失败: {e}")
            
            # 如果是最后一次尝试,创建错误处理任务
            if attempt == max_retries - 1:
                error_task = f"处理执行错误: {last_error}"
                agent.add_task({"task_id": len(agent.task_queue)+1, "task_name": error_task})
                return {"status": "failed", "error": last_error}
    
    return {"status": "unknown_error"}

5.2 动态工具加载

class DynamicToolManager:
    """动态工具加载器"""
    
    def __init__(self, llm: BaseChatModel):
        self.llm = llm
        self.tools = {}
        
    def register_tool(self, name: str, description: str, func: callable):
        """注册新工具"""
        self.tools[name] = {
            "description": description,
            "func": func
        }
        
    def get_tools(self) -> List[Tool]:
        """获取当前所有工具"""
        return [
            Tool(
                name=name,
                func=info["func"],
                description=info["description"]
            )
            for name, info in self.tools.items()
        ]
        
    def suggest_tools(self, objective: str) -> List[str]:
        """基于目标推荐工具"""
        prompt = PromptTemplate.from_template(
            "分析以下目标,推荐最合适的3个工具(从{tool_list}中选择):\n"
            "目标: {objective}\n"
            "推荐工具(逗号分隔):"
        )
        
        tool_list = ", ".join(self.tools.keys())
        chain = LLMChain(llm=self.llm, prompt=prompt)
        result = chain.invoke({"objective": objective, "tool_list": tool_list})
        
        return [t.strip() for t in result["text"].split(",")]

6. 性能优化与最佳实践

6.1 提示词优化策略

OPTIMIZED_REACT_PROMPT = """
你是一个专业的金融分析师,需要完成以下目标:{objective}
 
**严格遵循的执行规范**:
1. 每个响应必须包含Thought、Action、Action Input三部分
2. Action必须从以下工具中选择:{tool_names}
3. Action Input必须是有效的JSON或纯文本
4. 思考过程要详细(至少3个考虑点)
 
**可用工具**:
{tool_descriptions}
 
**记忆增强**:
{context}
 
当前任务:{task}
 
{agent_scratchpad}
"""
 
def get_optimized_prompt(tools: List[Tool]) -> PromptTemplate:
    """获取优化后的提示模板"""
    return PromptTemplate.from_template(OPTIMIZED_REACT_PROMPT.format(
        tool_names=", ".join([f"'{t.name}'" for t in tools]),
        tool_descriptions="\n".join([f"- {t.name}: {t.description}" for t in tools])
    ))

6.2 记忆管理优化

class EnhancedMemoryManager:
    """增强的记忆管理系统"""
    
    def __init__(self, vectorstore: FAISS):
        self.vectorstore = vectorstore
        self.max_memory_size = 100
        
    def add_memory(self, task: str, content: str, relevance_score: float = 1.0):
        """添加记忆并控制大小"""
        self.vectorstore.add_texts(
            texts=[content],
            metadatas=[{
                "task": task,
                "timestamp": time.time(),
                "relevance": relevance_score
            }]
        )
        
        # 如果超过最大容量,移除最不相关的记忆
        if len(self.vectorstore.docstore) > self.max_memory_size:
            self._compact_memories()
            
    def _compact_memories(self):
        """压缩记忆存储"""
        docs = list(self.vectorstore.docstore._dict.values())
        # 按相关性和时间排序
        docs.sort(key=lambda x: (x.metadata["relevance"], x.metadata["timestamp"]))
        # 保留前50%
        to_keep = docs[:self.max_memory_size//2]
        # 重置docstore
        self.vectorstore.docstore._dict = {doc.id: doc for doc in to_keep}
        
    def get_relevant_memories(self, query: str, k: int = 3) -> List[str]:
        """获取相关记忆(带相关性加权)"""
        results = self.vectorstore.similarity_search_with_score(query, k=k*2)  # 多取一些
        # 按分数和任务类型排序
        results.sort(key=lambda x: (x[1], x[0].metadata.get("relevance", 0)))
        return [doc.page_content for doc, _ in results[:k]]

7. 总结与展望

7.1 实现要点回顾

  1. React模式核心:
    • 清晰的"思考-行动"循环
    • 严格的工具使用规范
    • 动态的任务管理
  2. LangChain集成关键:
    • 使用Chain组织处理流程
    • 通过Tool抽象外部能力
    • 利用VectorStore管理记忆
  3. 性能优化点:
    • 提示词工程确保输出格式
    • 记忆管理控制存储大小
    • 错误处理保障系统健壮性

7.2 实际应用建议

  1. 领域适配:
    • 金融领域:添加财务数据API工具
    • 医疗领域:集成专业数据库查询
    • 教育领域:增加知识点解释工具
  2. 模型选择:
    • 高精度场景:GPT-4或Claude
    • 成本敏感:国产模型(如DeepSeek)
    • 私有部署:Llama2或ChatGLM
  3. 扩展方向:
    • 增加多模态输入(PDF解析、图像识别)
    • 实现多人协作模式
    • 添加情感分析调整沟通方式

7.3 未来工作展望

  1. 自主学习能力:
    • 根据执行结果自动优化提示词
    • 动态加载新工具而无需重启
  2. 复杂决策支持:
    • 多智能体协作
    • 长期规划与短期执行结合
  3. 可解释性增强:
    • 决策过程可视化
    • 关键步骤人工审核机制

通过本文实现的React模型,开发者可以快速构建具有自主决策能力的智能系统。完整代码已开源,建议根据实际需求调整工具集和提示词模板,以获得最佳效果。

链接: 基于LangChain框架实现React模型

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐