LLM Agent 基本架构与核心功能详解:从理论到实践
引言近年来,大语言模型(LLM)的快速发展催生了一个全新的研究方向——LLM Agent(大型[语言模型]智能体)。LLM Agent 不仅仅是简单的对话系统,而是具备能力的智能实体,能够理解复杂指令、使用工具、规划任务,并在真实环境中完成目标。本文将深入探讨 LLM Agent 的基本架构、核心组件和常见功能,通过详细的代码实现和[架构图]帮助开发者全面理解这一前沿技术。一、LLM Agent
引言
近年来,大语言模型(LLM)的快速发展催生了一个全新的研究方向——LLM Agent(大型[语言模型]智能体)。LLM Agent 不仅仅是简单的对话系统,而是具备感知、决策、执行能力的智能实体,能够理解复杂指令、使用工具、规划任务,并在真实环境中完成目标。本文将深入探讨 LLM Agent 的基本架构、核心组件和常见功能,通过详细的代码实现和[架构图]帮助开发者全面理解这一前沿技术。
一、LLM Agent 概述与核心价值
1.1 什么是 LLM Agent?
LLM Agent 是基于大语言模型构建的智能系统,具备以下核心特性:
- 自主性:能够独立制定计划和执行任务
- 工具使用:可以调用外部工具和 API 扩展能力
- 记忆机制:维护对话历史和任务上下文
- 推理能力:进行逻辑推理和问题分解
- 多轮交互:支持复杂的多轮对话和任务执行
1.2 LLM Agent 与传统聊天机器人的区别
| 特性 | 传统聊天机器人 | LLM Agent |
|---|---|---|
| 任务处理 | 单一回合问答 | 复杂多步任务 |
| 工具使用 | 有限或固定 | 动态工具调用 |
| 记忆能力 | 短期对话记忆 | 长期记忆和知识库 |
| 决策过程 | 基于规则或检索 | 自主规划和推理 |
| 适应性 | 需要大量标注数据 | 少量示例即可适应 |
二、LLM Agent 基本架构
LLM Agent 的完整架构包含多个核心组件,让我们通过下面的架构图来整体了解:
用户输入
感知模块
推理与决策引擎
记忆系统
工具调用系统
短期记忆
长期记忆
向量知识库
工具注册中心
工具执行器
API 管理器
行动输出
环境交互
结果处理
用户输出
2.1 核心架构组件详解
2.1.1 感知模块 (Perception Module)
from typing import Dict, Any, List
import re
import json
class PerceptionModule:
"""感知模块:处理原始输入,提取结构化信息"""
def __init__(self):
self.supported_modalities = ['text', 'image', 'audio']
def process_input(self, raw_input: Any, modality: str = 'text') -> Dict[str, Any]:
"""处理多模态输入"""
processed_data = {
'raw_input': raw_input,
'modality': modality,
'timestamp': self._get_timestamp(),
'processed_content': None,
'intent': None,
'entities': []
}
if modality == 'text':
processed_data.update(self._process_text_input(raw_input))
elif modality == 'image':
processed_data.update(self._process_image_input(raw_input))
elif modality == 'audio':
processed_data.update(self._process_audio_input(raw_input))
return processed_data
def _process_text_input(self, text: str) -> Dict[str, Any]:
"""处理文本输入"""
# 意图识别
intent = self._detect_intent(text)
# 实体提取
entities = self._extract_entities(text)
# 情感分析
sentiment = self._analyze_sentiment(text)
return {
'processed_content': text,
'intent': intent,
'entities': entities,
'sentiment': sentiment,
'urgency': self._detect_urgency(text)
}
def _detect_intent(self, text: str) -> str:
"""简单意图识别"""
text_lower = text.lower()
if any(word in text_lower for word in ['计算', '算一下', '等于多少']):
return 'calculation'
elif any(word in text_lower for word in ['搜索', '查找', '查询']):
return 'search'
elif any(word in text_lower for word in ['天气', '温度', '气象']):
return 'weather'
elif any(word in text_lower for word in ['翻译', '英文', '中文']):
return 'translation'
else:
return 'conversation'
def _extract_entities(self, text: str) -> List[Dict]:
"""简单实体提取"""
entities = []
# 提取数字
numbers = re.findall(r'\d+.?\d*', text)
for num in numbers:
entities.append({
'type': 'number',
'value': float(num) if '.' in num else int(num),
'text': num
})
# 提取货币金额
currency_matches = re.findall(r'[¥$€]?\s*(\d+.?\d*)', text)
for match in currency_matches:
entities.append({
'type': 'currency',
'value': float(match),
'text': match
})
return entities
def _analyze_sentiment(self, text: str) -> str:
"""简单情感分析"""
positive_words = ['好', '棒', '优秀', '满意', '高兴', '喜欢']
negative_words = ['差', '坏', '糟糕', '不满', '生气', '讨厌']
pos_count = sum(1 for word in positive_words if word in text)
neg_count = sum(1 for word in negative_words if word in text)
if pos_count > neg_count:
return 'positive'
elif neg_count > pos_count:
return 'negative'
else:
return 'neutral'
def _detect_urgency(self, text: str) -> int:
"""检测紧急程度"""
urgent_words = ['紧急', '立刻', '马上', '尽快', '急']
return sum(3 for word in urgent_words if word in text)
def _process_image_input(self, image_data: Any) -> Dict[str, Any]:
"""处理图像输入(简化实现)"""
return {
'processed_content': '图像内容描述',
'image_size': '未知',
'contains_text': False
}
def _process_audio_input(self, audio_data: Any) -> Dict[str, Any]:
"""处理音频输入(简化实现)"""
return {
'processed_content': '转写文本',
'audio_duration': 0,
'speaker_count': 1
}
def _get_timestamp(self) -> str:
"""获取时间戳"""
from datetime import datetime
return datetime.now().isoformat()
# 使用示例
perception = PerceptionModule()
user_input = "请计算一下 125 加上 38 等于多少,这个很紧急!"
processed = perception.process_input(user_input)
print(json.dumps(processed, ensure_ascii=False, indent=2))
AI写代码python
运行
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
2.1.2 推理与决策引擎 (Reasoning and Decision Engine)
class ReasoningEngine:
"""推理与决策引擎:核心的思考和处理模块"""
def __init__(self, llm_client=None):
self.llm_client = llm_client
self.planning_depth = 3 # 规划深度
def analyze_and_plan(self, processed_input: Dict,
memory_context: Dict = None) -> Dict[str, Any]:
"""分析输入并制定行动计划"""
# 构建推理上下文
context = self._build_reasoning_context(processed_input, memory_context)
# 进行多步推理
reasoning_steps = self._multi_step_reasoning(context)
# 生成行动计划
action_plan = self._generate_action_plan(reasoning_steps, context)
return {
'reasoning_steps': reasoning_steps,
'action_plan': action_plan,
'confidence': self._calculate_confidence(reasoning_steps),
'required_tools': self._extract_required_tools(action_plan)
}
def _build_reasoning_context(self, processed_input: Dict,
memory_context: Dict) -> Dict[str, Any]:
"""构建推理上下文"""
context = {
'current_input': processed_input,
'user_intent': processed_input.get('intent', 'unknown'),
'extracted_entities': processed_input.get('entities', []),
'timestamp': processed_input.get('timestamp'),
'sentiment': processed_input.get('sentiment', 'neutral'),
'urgency': processed_input.get('urgency', 0)
}
if memory_context:
context.update({
'conversation_history': memory_context.get('recent_conversation', []),
'user_preferences': memory_context.get('user_preferences', {}),
'past_actions': memory_context.get('past_actions', [])
})
return context
def _multi_step_reasoning(self, context: Dict) -> List[Dict]:
"""多步推理过程"""
reasoning_steps = []
# 步骤1:理解用户意图
step1 = {
'step': 1,
'type': 'intent_understanding',
'question': '用户的核心需求是什么?',
'answer': self._understand_intent(context),
'confidence': 0.9
}
reasoning_steps.append(step1)
# 步骤2:分析任务复杂度
step2 = {
'step': 2,
'type': 'task_analysis',
'question': '这个任务需要哪些步骤?',
'answer': self._analyze_task_complexity(context),
'confidence': 0.8
}
reasoning_steps.append(step2)
# 步骤3:确定所需资源
step3 = {
'step': 3,
'type': 'resource_planning',
'question': '完成这个任务需要什么工具或信息?',
'answer': self._identify_required_resources(context),
'confidence': 0.85
}
reasoning_steps.append(step3)
return reasoning_steps
def _understand_intent(self, context: Dict) -> str:
"""理解用户意图"""
intent = context['user_intent']
entities = context['extracted_entities']
if intent == 'calculation' and entities:
numbers = [e['value'] for e in entities if e['type'] == 'number']
if len(numbers) >= 2:
return f"用户需要进行数学计算,涉及数字: {numbers}"
return f"用户意图是 {intent},需要进一步处理"
def _analyze_task_complexity(self, context: Dict) -> str:
"""分析任务复杂度"""
intent = context['user_intent']
complexity_map = {
'calculation': '简单任务,可直接执行',
'search': '中等复杂度,需要外部工具',
'weather': '中等复杂度,需要API调用',
'translation': '简单任务,可使用内置能力',
'conversation': '复杂度可变,需要上下文理解'
}
return complexity_map.get(intent, '未知复杂度')
def _identify_required_resources(self, context: Dict) -> List[str]:
"""确定所需资源"""
intent = context['user_intent']
resources = []
if intent == 'calculation':
resources.append('calculator_tool')
elif intent == 'search':
resources.append('web_search_tool')
resources.append('information_processor')
elif intent == 'weather':
resources.append('weather_api')
resources.append('location_service')
elif intent == 'translation':
resources.append('translation_service')
return resources
def _generate_action_plan(self, reasoning_steps: List[Dict],
context: Dict) -> List[Dict]:
"""生成行动计划"""
action_plan = []
# 基于推理结果生成具体行动
intent = context['user_intent']
entities = context['extracted_entities']
if intent == 'calculation':
numbers = [e['value'] for e in entities if e['type'] == 'number']
if len(numbers) >= 2:
action_plan.append({
'action_id': 'calc_1',
'type': 'tool_call',
'tool': 'calculator',
'operation': 'add',
'parameters': {'numbers': numbers},
'description': f'计算 {numbers} 的和'
})
elif intent == 'search':
action_plan.append({
'action_id': 'search_1',
'type': 'tool_call',
'tool': 'web_search',
'parameters': {'query': context['current_input']['raw_input']},
'description': '执行网络搜索'
})
# 总是包含响应生成行动
action_plan.append({
'action_id': 'response_1',
'type': 'generate_response',
'parameters': {'context': context},
'description': '生成最终响应'
})
return action_plan
def _calculate_confidence(self, reasoning_steps: List[Dict]) -> float:
"""计算整体置信度"""
if not reasoning_steps:
return 0.0
total_confidence = sum(step.get('confidence', 0) for step in reasoning_steps)
return total_confidence / len(reasoning_steps)
def _extract_required_tools(self, action_plan: List[Dict]) -> List[str]:
"""提取需要的工具列表"""
tools = set()
for action in action_plan:
if action['type'] == 'tool_call':
tools.add(action['tool'])
return list(tools)
# 使用示例
reasoning_engine = ReasoningEngine()
processed_input = {
'raw_input': '计算 125 加上 38 等于多少',
'intent': 'calculation',
'entities': [
{'type': 'number', 'value': 125, 'text': '125'},
{'type': 'number', 'value': 38, 'text': '38'}
],
'timestamp': '2024-01-01T10:00:00'
}
plan = reasoning_engine.analyze_and_plan(processed_input)
print(json.dumps(plan, ensure_ascii=False, indent=2))
AI写代码python
运行
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
2.1.3 记忆系统 (Memory System)
import sqlite3
from datetime import datetime, timedelta
import numpy as np
from typing import List, Dict, Any
class MemorySystem:
"""记忆系统:管理短期和长期记忆"""
def __init__(self, db_path: str = ":memory:"):
self.db_path = db_path
self.init_database()
# 记忆缓存
self.short_term_memory = []
self.long_term_memory = []
self.max_short_term = 10 # 短期记忆容量
def init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建对话历史表
cursor.execute('''
CREATE TABLE IF NOT EXISTS conversation_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_input TEXT NOT NULL,
agent_response TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
intent TEXT,
sentiment TEXT
)
''')
# 创建用户偏好表
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_preferences (
user_id TEXT,
preference_type TEXT,
preference_value TEXT,
confidence REAL,
last_updated DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, preference_type)
)
''')
# 创建知识记忆表
cursor.execute('''
CREATE TABLE IF NOT EXISTS knowledge_memory (
id INTEGER PRIMARY KEY AUTOINCREMENT,
fact_text TEXT NOT NULL,
category TEXT,
source TEXT,
confidence REAL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_accessed DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def add_conversation(self, user_input: str, agent_response: str,
intent: str = None, sentiment: str = None):
"""添加对话记录"""
# 更新短期记忆
self.short_term_memory.append({
'user_input': user_input,
'agent_response': agent_response,
'timestamp': datetime.now(),
'intent': intent,
'sentiment': sentiment
})
# 维护短期记忆大小
if len(self.short_term_memory) > self.max_short_term:
self.short_term_memory.pop(0)
# 保存到长期记忆(数据库)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO conversation_history
(user_input, agent_response, intent, sentiment)
VALUES (?, ?, ?, ?)
''', (user_input, agent_response, intent, sentiment))
conn.commit()
conn.close()
def get_recent_conversation(self, limit: int = 5) -> List[Dict]:
"""获取最近的对话记录"""
if len(self.short_term_memory) >= limit:
return self.short_term_memory[-limit:]
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT user_input, agent_response, timestamp, intent, sentiment
FROM conversation_history
ORDER BY timestamp DESC
LIMIT ?
''', (limit,))
results = []
for row in cursor.fetchall():
results.append({
'user_input': row[0],
'agent_response': row[1],
'timestamp': row[2],
'intent': row[3],
'sentiment': row[4]
})
conn.close()
return list(reversed(results)) # 按时间顺序返回
def update_user_preference(self, user_id: str, preference_type: str,
preference_value: str, confidence: float = 1.0):
"""更新用户偏好"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO user_preferences
(user_id, preference_type, preference_value, confidence)
VALUES (?, ?, ?, ?)
''', (user_id, preference_type, preference_value, confidence))
conn.commit()
conn.close()
def get_user_preferences(self, user_id: str) -> Dict[str, Any]:
"""获取用户偏好"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT preference_type, preference_value, confidence
FROM user_preferences
WHERE user_id = ?
''', (user_id,))
preferences = {}
for row in cursor.fetchall():
preferences[row[0]] = {
'value': row[1],
'confidence': row[2]
}
conn.close()
return preferences
def add_knowledge(self, fact_text: str, category: str = None,
source: str = None, confidence: float = 1.0):
"""添加知识记忆"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO knowledge_memory
(fact_text, category, source, confidence)
VALUES (?, ?, ?, ?)
''', (fact_text, category, source, confidence))
conn.commit()
conn.close()
def search_knowledge(self, query: str, category: str = None,
min_confidence: float = 0.5) -> List[Dict]:
"""搜索知识记忆"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if category:
cursor.execute('''
SELECT fact_text, category, source, confidence
FROM knowledge_memory
WHERE fact_text LIKE ? AND category = ? AND confidence >= ?
ORDER BY last_accessed DESC
''', (f'%{query}%', category, min_confidence))
else:
cursor.execute('''
SELECT fact_text, category, source, confidence
FROM knowledge_memory
WHERE fact_text LIKE ? AND confidence >= ?
ORDER BY last_accessed DESC
''', (f'%{query}%', min_confidence))
results = []
for row in cursor.fetchall():
results.append({
'fact_text': row[0],
'category': row[1],
'source': row[2],
'confidence': row[3]
})
# 更新访问时间
cursor.execute('''
UPDATE knowledge_memory
SET last_accessed = CURRENT_TIMESTAMP
WHERE fact_text = ? AND category = ?
''', (row[0], row[1]))
conn.commit()
conn.close()
return results
def get_memory_context(self, user_id: str = "default") -> Dict[str, Any]:
"""获取记忆上下文用于推理"""
return {
'recent_conversation': self.get_recent_conversation(3),
'user_preferences': self.get_user_preferences(user_id),
'past_actions': self.get_recent_actions(5)
}
def get_recent_actions(self, limit: int) -> List[Dict]:
"""获取最近的操作记录(简化实现)"""
# 实际实现应该从行动日志中获取
return []
# 使用示例
memory_system = MemorySystem()
# 添加对话记录
memory_system.add_conversation(
"今天天气怎么样?",
"目前无法获取实时天气,请提供具体位置。",
intent="weather",
sentiment="neutral"
)
# 更新用户偏好
memory_system.update_user_preference("user123", "language", "中文", 0.9)
# 获取记忆上下文
context = memory_system.get_memory_context("user123")
print(json.dumps(context, ensure_ascii=False, indent=2))
AI写代码python
运行
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
2.1.4 工具调用系统 (Tool Calling System)
class ToolCallingSystem:
"""工具调用系统:管理和执行各种工具"""
def __init__(self):
self.registered_tools = {}
self.tool_descriptions = {}
self.init_default_tools()
def init_default_tools(self):
"""初始化默认工具集"""
self.register_tool("calculator", self.calculate,
"执行数学计算,支持加减乘除")
self.register_tool("web_search", self.web_search,
"执行网络搜索获取最新信息")
self.register_tool("time_provider", self.get_current_time,
"获取当前时间")
self.register_tool("file_reader", self.read_file,
"读取文件内容")
def register_tool(self, tool_name: str, tool_function: callable,
description: str):
"""注册新工具"""
self.registered_tools[tool_name] = tool_function
self.tool_descriptions[tool_name] = description
def execute_tool(self, tool_name: str, parameters: Dict) -> Dict[str, Any]:
"""执行工具调用"""
if tool_name not in self.registered_tools:
return {
'success': False,
'error': f"工具未注册: {tool_name}",
'result': None
}
try:
tool_function = self.registered_tools[tool_name]
result = tool_function(**parameters)
return {
'success': True,
'result': result,
'tool_used': tool_name,
'execution_time': '0.1s' # 模拟执行时间
}
except Exception as e:
return {
'success': False,
'error': str(e),
'result': None,
'tool_used': tool_name
}
def get_available_tools(self) -> List[Dict]:
"""获取可用工具列表"""
tools = []
for name, description in self.tool_descriptions.items():
tools.append({
'name': name,
'description': description,
'parameters': self._get_tool_parameters(name)
})
return tools
def _get_tool_parameters(self, tool_name: str) -> List[Dict]:
"""获取工具参数信息(简化实现)"""
parameter_templates = {
"calculator": [
{"name": "operation", "type": "str", "required": True},
{"name": "numbers", "type": "list", "required": True}
],
"web_search": [
{"name": "query", "type": "str", "required": True},
{"name": "limit", "type": "int", "required": False}
]
}
return parameter_templates.get(tool_name, [])
# 工具具体实现
def calculate(self, operation: str, numbers: List[float]) -> float:
"""计算器工具"""
if operation == 'add':
return sum(numbers)
elif operation == 'subtract':
result = numbers[0]
for num in numbers[1:]:
result -= num
return result
elif operation == 'multiply':
result = 1
for num in numbers:
result *= num
return result
elif operation == 'divide':
result = numbers[0]
for num in numbers[1:]:
if num == 0:
raise ValueError("除数不能为零")
result /= num
return result
else:
raise ValueError(f"不支持的操作: {operation}")
def web_search(self, query: str, limit: int = 5) -> List[Dict]:
"""网络搜索工具(模拟实现)"""
# 实际实现应该调用搜索API
return [
{
'title': f"关于 '{query}' 的搜索结果 1",
'url': 'https://example.com/result1',
'snippet': f'这是关于 {query} 的模拟搜索结果...'
},
{
'title': f"关于 '{query}' 的搜索结果 2",
'url': 'https://example.com/result2',
'snippet': f'更多关于 {query} 的信息...'
}
]
def get_current_time(self) -> str:
"""时间获取工具"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def read_file(self, file_path: str) -> str:
"""文件读取工具(模拟实现)"""
# 实际实现应该安全地读取文件
return f"模拟文件内容: {file_path}"
# 使用示例
tool_system = ToolCallingSystem()
# 执行计算工具
result = tool_system.execute_tool("calculator", {
"operation": "add",
"numbers": [125, 38]
})
print(json.dumps(result, ensure_ascii=False, indent=2))
# 获取可用工具
tools = tool_system.get_available_tools()
print("可用工具:", json.dumps(tools, ensure_ascii=False, indent=2))
AI写代码python
运行
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
三、LLM Agent 的完整工作流程
现在让我们将各个组件整合成一个完整的 LLM Agent:
class LLMAgent:
"""完整的 LLM Agent 实现"""
def __init__(self, name: str = "智能助手"):
self.name = name
self.perception = PerceptionModule()
self.reasoning = ReasoningEngine()
self.memory = MemorySystem()
self.tools = ToolCallingSystem()
# Agent 状态
self.is_active = True
self.conversation_count = 0
def process_message(self, user_input: str, user_id: str = "default") -> str:
"""处理用户消息的完整流程"""
self.conversation_count += 1
print(f"\n=== Agent 处理流程 (对话 #{self.conversation_count}) ===")
# 步骤1: 感知输入
print("1. 感知模块处理中...")
processed_input = self.perception.process_input(user_input)
print(f" - 识别意图: {processed_input['intent']}")
print(f" - 提取实体: {len(processed_input['entities'])} 个")
# 步骤2: 获取记忆上下文
print("2. 检索记忆上下文...")
memory_context = self.memory.get_memory_context(user_id)
# 步骤3: 推理和规划
print("3. 推理引擎工作中...")
reasoning_result = self.reasoning.analyze_and_plan(
processed_input, memory_context)
print(f" - 生成 {len(reasoning_result['action_plan'])} 个行动计划")
print(f" - 置信度: {reasoning_result['confidence']:.2f}")
# 步骤4: 执行行动计划
print("4. 执行行动计划...")
execution_results = self._execute_action_plan(
reasoning_result['action_plan'])
# 步骤5: 生成响应
print("5. 生成最终响应...")
final_response = self._generate_final_response(
processed_input, reasoning_result, execution_results)
# 步骤6: 更新记忆
print("6. 更新记忆系统...")
self.memory.add_conversation(
user_input, final_response,
processed_input['intent'],
processed_input['sentiment']
)
print("=== 处理完成 ===\n")
return final_response
def _execute_action_plan(self, action_plan: List[Dict]) -> List[Dict]:
"""执行行动计划"""
results = []
for action in action_plan:
action_id = action['action_id']
action_type = action['type']
print(f" 执行行动 {action_id}: {action['description']}")
if action_type == 'tool_call':
# 执行工具调用
tool_result = self.tools.execute_tool(
action['tool'], action['parameters'])
results.append({
'action_id': action_id,
'type': 'tool_call',
'result': tool_result
})
elif action_type == 'generate_response':
# 标记响应生成行动,稍后处理
results.append({
'action_id': action_id,
'type': 'generate_response',
'parameters': action['parameters']
})
elif action_type == 'memory_operation':
# 记忆操作
results.append({
'action_id': action_id,
'type': 'memory_operation',
'result': '记忆操作完成'
})
return results
def _generate_final_response(self, processed_input: Dict,
reasoning_result: Dict,
execution_results: List[Dict]) -> str:
"""生成最终响应"""
# 收集工具执行结果
tool_results = {}
for result in execution_results:
if result['type'] == 'tool_call' and result['result']['success']:
tool_name = result['result']['tool_used']
tool_results[tool_name] = result['result']['result']
# 基于意图和结果生成响应
intent = processed_input['intent']
if intent == 'calculation' and 'calculator' in tool_results:
calculation_result = tool_results['calculator']
return f"计算结果为: {calculation_result}"
elif intent == 'search' and 'web_search' in tool_results:
search_results = tool_results['web_search']
response = "搜索到以下信息:\n"
for i, result in enumerate(search_results[:3], 1):
response += f"{i}. {result['title']}: {result['snippet']}\n"
return response
elif intent == 'weather':
return "目前天气服务暂不可用,请稍后再试或提供具体位置信息。"
else:
# 默认响应
reasoning_steps = reasoning_result['reasoning_steps']
last_reasoning = reasoning_steps[-1]['answer'] if reasoning_steps else ""
return f"我理解您的需求是: {last_reasoning}。我已经处理完成。"
def get_agent_status(self) -> Dict[str, Any]:
"""获取 Agent 状态"""
return {
'name': self.name,
'conversation_count': self.conversation_count,
'is_active': self.is_active,
'available_tools': len(self.tools.get_available_tools()),
'memory_stats': {
'short_term': len(self.memory.short_term_memory),
'database_tables': 3 # 简化统计
}
}
# 使用示例
def demo_llm_agent():
"""演示 LLM Agent 的完整工作流程"""
agent = LLMAgent("智能计算助手")
# 测试对话
test_messages = [
"请计算 125 加上 38 等于多少?",
"搜索人工智能的最新发展",
"今天天气怎么样?",
"帮我翻译一下这句话"
]
for message in test_messages:
print(f"用户: {message}")
response = agent.process_message(message)
print(f"助手: {response}")
print("-" * 50)
# 显示 Agent 状态
status = agent.get_agent_status()
print("\nAgent 状态:", json.dumps(status, ensure_ascii=False, indent=2))
if __name__ == "__main__":
demo_llm_agent()
AI写代码python
运行
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
四、LLM Agent 的常见功能
4.1 工具使用与扩展
LLM Agent 的核心能力之一就是工具使用,让我们看看如何扩展这一功能:
class AdvancedToolSystem(ToolCallingSystem):
"""高级工具系统,支持工具组合和条件执行"""
def __init__(self):
super().__init__()
self.tool_dependencies = {}
self.execution_history = []
def register_tool_with_dependencies(self, tool_name: str,
tool_function: callable,
description: str,
dependencies: List[str] = None):
"""注册工具并指定依赖关系"""
self.register_tool(tool_name, tool_function, description)
self.tool_dependencies[tool_name] = dependencies or []
def execute_tool_chain(self, tool_chain: List[Dict]) -> List[Dict]:
"""执行工具链,支持顺序执行和数据传递"""
results = []
previous_output = None
for i, tool_call in enumerate(tool_chain):
tool_name = tool_call['tool']
parameters = tool_call.get('parameters', {})
# 如果前一个工具的输出可用,传递给下一个工具
if previous_output is not None and 'use_previous_output' in tool_call:
parameters['previous_result'] = previous_output
print(f"执行工具链步骤 {i+1}: {tool_name}")
result = self.execute_tool(tool_name, parameters)
if result['success']:
previous_output = result['result']
results.append(result)
else:
# 工具执行失败,终止链条
results.append(result)
break
return results
def conditional_tool_execution(self, condition: callable,
tool_calls: Dict) -> Dict:
"""条件工具执行"""
if condition():
return self.execute_tool(tool_calls['tool'], tool_calls['parameters'])
else:
return {
'success': True,
'result': None,
'skipped': True,
'reason': '条件未满足'
}
# 扩展工具示例
class ExtendedTools(AdvancedToolSystem):
"""扩展工具集"""
def __init__(self):
super().__init__()
self.register_advanced_tools()
def register_advanced_tools(self):
"""注册高级工具"""
self.register_tool_with_dependencies(
"data_analyzer",
self.analyze_data,
"分析数据集,生成统计信息",
dependencies=["calculator"]
)
self.register_tool_with_dependencies(
"report_generator",
self.generate_report,
"生成分析报告",
dependencies=["data_analyzer"]
)
def analyze_data(self, dataset: List[float]) -> Dict[str, float]:
"""数据分析工具"""
if not dataset:
return {}
return {
'mean': sum(dataset) / len(dataset),
'max': max(dataset),
'min': min(dataset),
'count': len(dataset)
}
def generate_report(self, analysis_result: Dict,
previous_result: Any = None) -> str:
"""报告生成工具"""
report = "数据分析报告:\n"
for key, value in analysis_result.items():
report += f"- {key}: {value}\n"
if previous_result:
report += f"\n基于前一步结果: {previous_result}"
return report
# 使用高级工具系统
advanced_tools = ExtendedTools()
# 执行工具链
tool_chain = [
{
'tool': 'calculator',
'parameters': {
'operation': 'add',
'numbers': [10, 20, 30]
}
},
{
'tool': 'data_analyzer',
'parameters': {
'dataset': [1, 2, 3, 4, 5]
},
'use_previous_output': False
},
{
'tool': 'report_generator',
'parameters': {},
'use_previous_output': True
}
]
results = advanced_tools.execute_tool_chain(tool_chain)
for i, result in enumerate(results):
print(f"步骤 {i+1} 结果: {result['success']}")
if result['success'] and not result.get('skipped'):
print(f" 输出: {result['result']}")
AI写代码python
运行
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
4.2 多轮对话管理
class ConversationManager:
"""多轮对话管理器"""
def __init__(self, memory_system: MemorySystem):
self.memory = memory_system
self.conversation_states = {}
self.pending_actions = {}
def get_conversation_state(self, user_id: str) -> Dict:
"""获取对话状态"""
return self.conversation_states.get(user_id, {
'state': 'idle',
'current_task': None,
'missing_information': [],
'confirmed_facts': [],
'retry_count': 0
})
def update_conversation_state(self, user_id: str, updates: Dict):
"""更新对话状态"""
current_state = self.get_conversation_state(user_id)
current_state.update(updates)
self.conversation_states[user_id] = current_state
def handle_follow_up(self, user_input: str, user_id: str) -> Dict:
"""处理后续对话"""
state = self.get_conversation_state(user_id)
if state['state'] == 'awaiting_information':
return self._handle_missing_information(user_input, user_id, state)
elif state['state'] == 'confirmation_needed':
return self._handle_confirmation(user_input, user_id, state)
else:
return {'action': 'new_conversation', 'reason': '新对话开始'}
def _handle_missing_information(self, user_input: str, user_id: str,
state: Dict) -> Dict:
"""处理缺失信息补充"""
missing_info = state['missing_information']
if missing_info:
info_type = missing_info[0]
extracted_value = self._extract_information(user_input, info_type)
if extracted_value:
# 成功提取信息
state['confirmed_facts'].append({
'type': info_type,
'value': extracted_value
})
state['missing_information'] = missing_info[1:]
if not state['missing_information']:
state['state'] = 'ready_to_proceed'
self.update_conversation_state(user_id, state)
return {
'action': 'information_received',
'extracted_value': extracted_value,
'remaining_missing': state['missing_information']
}
return {
'action': 'still_missing',
'missing_info': missing_info
}
def _extract_information(self, text: str, info_type: str) -> Any:
"""从文本中提取特定类型信息"""
if info_type == 'location':
# 简单的位置提取
location_keywords = ['在北京', '在上海', '在广州', '在深圳']
for keyword in location_keywords:
if keyword in text:
return keyword[1:] # 去除"在"
elif info_type == 'time':
# 简单的时间提取
time_patterns = [r'(\d+)点', r'(\d+):(\d+)']
for pattern in time_patterns:
match = re.search(pattern, text)
if match:
return match.group()
elif info_type == 'number':
numbers = re.findall(r'\d+', text)
if numbers:
return int(numbers[0])
return None
def _handle_confirmation(self, user_input: str, user_id: str,
state: Dict) -> Dict:
"""处理确认请求"""
confirmation_keywords = {
'是的': True, '对的': True, '正确': True, '好': True,
'不是': False, '不对': False, '错误': False, '不': False
}
user_input_lower = user_input.lower()
confirmed = None
for keyword, value in confirmation_keywords.items():
if keyword in user_input_lower:
confirmed = value
break
if confirmed is not None:
if confirmed:
state['state'] = 'confirmed'
else:
state['state'] = 'rejected'
state['retry_count'] += 1
self.update_conversation_state(user_id, state)
return {
'action': 'confirmation_received',
'confirmed': confirmed,
'new_state': state['state']
}
return {
'action': 'confirmation_unclear',
'request_repeat': True
}
# 使用示例
def demo_conversation_management():
"""演示多轮对话管理"""
memory = MemorySystem()
conv_manager = ConversationManager(memory)
# 模拟多轮对话
test_cases = [
("我想查询天气", "user123"),
("在北京", "user123"), # 补充位置信息
("是的", "user123"), # 确认信息
("下午3点", "user123") # 补充时间信息
]
for user_input, user_id in test_cases:
print(f"用户({user_id}): {user_input}")
state_before = conv_manager.get_conversation_state(user_id)
print(f"对话状态: {state_before['state']}")
result = conv_manager.handle_follow_up(user_input, user_id)
print(f"处理结果: {result['action']}")
state_after = conv_manager.get_conversation_state(user_id)
print(f"新状态: {state_after['state']}")
print("-" * 30)
# demo_conversation_management()
AI写代码python
运行
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
4.3 任务分解与规划
class TaskPlanner:
"""任务分解与规划器"""
def __init__(self):
self.task_templates = self._load_task_templates()
def _load_task_templates(self) -> Dict[str, Any]:
"""加载任务模板"""
return {
'research_topic': {
'description': '研究某个主题',
'steps': [
{'action': 'web_search', 'purpose': '收集基本信息'},
{'action': 'analyze_information', 'purpose': '整理关键点'},
{'action': 'generate_summary', 'purpose': '生成总结报告'}
]
},
'data_analysis': {
'description': '分析数据集',
'steps': [
{'action': 'load_data', 'purpose': '加载数据'},
{'action': 'calculate_statistics', 'purpose': '计算统计量'},
{'action': 'generate_visualization', 'purpose': '创建可视化'},
{'action': 'write_report', 'purpose': '撰写分析报告'}
]
},
'travel_planning': {
'description': '制定旅行计划',
'steps': [
{'action': 'search_flights', 'purpose': '查询航班'},
{'action': 'find_hotels', 'purpose': '寻找酒店'},
{'action': 'plan_itinerary', 'purpose': '安排行程'},
{'action': 'calculate_budget', 'purpose': '计算预算'}
]
}
}
def decompose_task(self, task_description: str,
available_tools: List[str]) -> Dict[str, Any]:
"""分解复杂任务"""
# 识别任务类型
task_type = self._identify_task_type(task_description)
if task_type in self.task_templates:
template = self.task_templates[task_type]
steps = self._adapt_steps_to_tools(template['steps'], available_tools)
return {
'task_type': task_type,
'original_description': task_description,
'decomposed_steps': steps,
'estimated_complexity': len(steps),
'prerequisites': self._identify_prerequisites(task_description)
}
else:
# 通用任务分解
return self._generic_task_decomposition(task_description, available_tools)
def _identify_task_type(self, task_description: str) -> str:
"""识别任务类型"""
description_lower = task_description.lower()
for task_type, template in self.task_templates.items():
keywords = task_type.split('_')
if any(keyword in description_lower for keyword in keywords):
return task_type
return 'generic'
def _adapt_steps_to_tools(self, steps: List[Dict],
available_tools: List[str]) -> List[Dict]:
"""根据可用工具调整任务步骤"""
adapted_steps = []
tool_mapping = {
'web_search': 'web_search',
'analyze_information': 'data_analyzer',
'generate_summary': 'report_generator',
'calculate_statistics': 'calculator',
'search_flights': 'web_search',
'find_hotels': 'web_search'
}
for step in steps:
action = step['action']
if action in tool_mapping and tool_mapping[action] in available_tools:
adapted_step = step.copy()
adapted_step['tool'] = tool_mapping[action]
adapted_steps.append(adapted_step)
else:
# 如果没有对应工具,标记为需要手动处理
adapted_step = step.copy()
adapted_step['status'] = 'requires_manual_action'
adapted_steps.append(adapted_step)
return adapted_steps
def _identify_prerequisites(self, task_description: str) -> List[str]:
"""识别任务前提条件"""
prerequisites = []
if '天气' in task_description:
prerequisites.append('需要位置信息')
if '计算' in task_description:
prerequisites.append('需要数值数据')
if '搜索' in task_description:
prerequisites.append('需要网络连接')
return prerequisites
def _generic_task_decomposition(self, task_description: str,
available_tools: List[str]) -> Dict[str, Any]:
"""通用任务分解"""
# 基于任务描述的关键词进行简单分解
words = task_description.lower().split()
steps = []
if any(word in words for word in ['查找', '搜索', '查询']):
steps.append({
'action': 'search_information',
'purpose': '搜索相关信息',
'tool': 'web_search' if 'web_search' in available_tools else None
})
if any(word in words for word in ['计算', '算', '等于']):
steps.append({
'action': 'perform_calculation',
'purpose': '执行数学计算',
'tool': 'calculator' if 'calculator' in available_tools else None
})
if any(word in words for word in ['总结', '汇总', '报告']):
steps.append({
'action': 'generate_summary',
'purpose': '生成总结报告',
'tool': 'report_generator' if 'report_generator' in available_tools else None
})
# 如果没有匹配到具体步骤,添加通用步骤
if not steps:
steps = [
{'action': 'understand_requirements', 'purpose': '理解任务需求'},
{'action': 'gather_information', 'purpose': '收集相关信息'},
{'action': 'process_data', 'purpose': '处理数据'},
{'action': 'deliver_results', 'purpose': '交付结果'}
]
return {
'task_type': 'generic',
'original_description': task_description,
'decomposed_steps': steps,
'estimated_complexity': len(steps),
'prerequisites': ['需要明确需求']
}
# 使用示例
def demo_task_planning():
"""演示任务规划功能"""
planner = TaskPlanner()
test_tasks = [
"研究人工智能的伦理问题",
"分析销售数据并生成报告",
"规划去北京的旅行行程",
"计算公司季度财务报表"
]
available_tools = ['web_search', 'calculator', 'data_analyzer', 'report_generator']
for task in test_tasks:
print(f"任务: {task}")
plan = planner.decompose_task(task, available_tools)
print(f"任务类型: {plan['task_type']}")
print(f"分解步骤:")
for i, step in enumerate(plan['decomposed_steps'], 1):
tool_info = f" -> 使用工具: {step.get('tool', '无')}"
print(f" {i}. {step['purpose']}{tool_info}")
print(f"预估复杂度: {plan['estimated_complexity']} 步骤")
print(f"前提条件: {', '.join(plan['prerequisites'])}")
print("-" * 50)
# demo_task_planning()
AI写代码python
运行
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
五、LLM Agent 的高级特性
5.1 自我反思与错误修复
class SelfReflectionModule:
"""自我反思模块:监控和改进 Agent 表现"""
def __init__(self, memory_system: MemorySystem):
self.memory = memory_system
self.performance_metrics = {
'successful_actions': 0,
'failed_actions': 0,
'user_satisfaction': [],
'common_errors': {}
}
def analyze_conversation(self, user_input: str, agent_response: str,
execution_results: List[Dict]) -> Dict[str, Any]:
"""分析对话质量"""
analysis = {
'response_quality': self._assess_response_quality(agent_response),
'task_success': self._assess_task_success(user_input, execution_results),
'user_engagement': self._assess_user_engagement(agent_response),
'improvement_suggestions': []
}
# 生成改进建议
if analysis['response_quality'] < 0.7:
analysis['improvement_suggestions'].append(
"响应质量有待提高,建议提供更详细的信息")
if analysis['task_success'] < 0.5:
analysis['improvement_suggestions'].append(
"任务完成度较低,建议检查工具配置或提供更清晰的指引")
# 更新性能指标
self._update_performance_metrics(analysis)
return analysis
def _assess_response_quality(self, response: str) -> float:
"""评估响应质量"""
quality_indicators = {
'length_appropriate': 50 <= len(response) <= 500,
'has_clear_structure': any(marker in response for marker in ['\n', '。', '!', '?']),
'provides_value': len(response.strip()) > 10,
'polite_tone': not any(word in response for word in ['愚蠢', '错误', '不对'])
}
positive_indicators = sum(quality_indicators.values())
return positive_indicators / len(quality_indicators)
def _assess_task_success(self, user_input: str,
execution_results: List[Dict]) -> float:
"""评估任务成功度"""
if not execution_results:
return 0.0
successful_tools = sum(1 for result in execution_results
if result.get('success', False))
total_tools = len(execution_results)
return successful_tools / total_tools if total_tools > 0 else 0.0
def _assess_user_engagement(self, response: str) -> float:
"""评估用户参与度(预测)"""
engagement_indicators = {
'question_count': response.count('?'),
'suggestion_count': response.count('建议') + response.count('可以'),
'personalized': any(word in response for word in ['您', '你', '我们']),
'action_oriented': any(word in response for word in ['请', '尝试', '下一步'])
}
score = sum(engagement_indicators.values()) / 10.0 # 归一化
return min(score, 1.0)
def _update_performance_metrics(self, analysis: Dict):
"""更新性能指标"""
if analysis['task_success'] > 0.7:
self.performance_metrics['successful_actions'] += 1
else:
self.performance_metrics['failed_actions'] += 1
self.performance_metrics['user_satisfaction'].append(
analysis['response_quality'])
def get_performance_report(self) -> Dict[str, Any]:
"""获取性能报告"""
total_actions = (self.performance_metrics['successful_actions'] +
self.performance_metrics['failed_actions'])
success_rate = (self.performance_metrics['successful_actions'] /
total_actions if total_actions > 0 else 0)
avg_satisfaction = (sum(self.performance_metrics['user_satisfaction']) /
len(self.performance_metrics['user_satisfaction'])
if self.performance_metrics['user_satisfaction'] else 0)
return {
'success_rate': success_rate,
'average_satisfaction': avg_satisfaction,
'total_conversations': total_actions,
'improvement_areas': self._identify_improvement_areas()
}
def _identify_improvement_areas(self) -> List[str]:
"""识别需要改进的领域"""
areas = []
success_rate = self.get_performance_report()['success_rate']
if success_rate < 0.6:
areas.append("任务执行成功率需要提升")
if self.performance_metrics['failed_actions'] > 10:
areas.append("工具调用失败次数较多,需要检查工具配置")
return areas
# 使用示例
def demo_self_reflection():
"""演示自我反思功能"""
memory = MemorySystem()
reflection = SelfReflectionModule(memory)
# 模拟对话分析
test_conversations = [
("计算1+1", "1+1=2", [{'success': True}]),
("搜索未知内容", "抱歉,我找不到相关信息", [{'success': False}]),
("复杂问题", "这是一个复杂问题,需要分步骤解决...", [{'success': True}, {'success': False}])
]
for user_input, agent_response, results in test_conversations:
analysis = reflection.analyze_conversation(user_input, agent_response, results)
print(f"用户输入: {user_input}")
print(f"响应质量: {analysis['response_quality']:.2f}")
print(f"任务成功度: {analysis['task_success']:.2f}")
print(f"改进建议: {analysis['improvement_suggestions']}")
print("-" * 40)
# 生成性能报告
report = reflection.get_performance_report()
print("性能报告:")
print(json.dumps(report, ensure_ascii=False, indent=2))
# demo_self_reflection()
AI写代码python
运行
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
六、总结与展望
6.1 LLM Agent 架构核心要点
通过本文的详细探讨,我们可以总结出 LLM Agent 架构的几个核心要点:
- 模块化设计:感知、推理、记忆、工具调用等组件各司其职
- 状态管理:维护对话状态、用户偏好、任务进度等信息
- 工具扩展性:支持灵活的工具注册和组合调用
- 记忆持久化:短期记忆与长期记忆相结合
- 错误处理:完善的异常处理和自我反思机制
6.2 未来发展方向
LLM Agent 技术仍在快速发展中,未来的重要方向包括:
- 多模态能力:整合文本、图像、音频、视频处理
- 自主学习:从交互中持续学习和改进
- 多Agent协作:多个 Agent 协同完成复杂任务
- 安全与对齐:确保 Agent 行为符合人类价值观
- 现实世界交互:与物理世界和真实系统集成
6.3 实践建议
对于想要开发 LLM Agent 的开发者,建议:
- 从简单开始:先实现核心功能,再逐步扩展
- 重视测试:建立完善的测试和评估体系
- 关注用户体验:设计自然的交互流程和反馈机制
- 保证可靠性:实现错误处理和降级方案
- 持续迭代:基于用户反馈和数据不断优化
LLM Agent 技术正在重新定义人机交互的方式,通过本文提供的架构设计和代码实现,开发者可以快速入门并构建自己的智能 Agent 系统。随着技术的不断进步,LLM Agent 将在各个领域发挥越来越重要的作用。
更多推荐



所有评论(0)