Agent Loop:AI Agent 的核心引擎【nanobot架构分析系列】
·
关于作者
- 深耕领域:大语言模型开发 / RAG 知识库 / AI Agent 落地 / 模型微调
- 技术栈:Python | RAG (LangChain / Dify + Milvus) | FastAPI + Docker
- 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案
「让 AI 交互更智能,让技术落地更高效」
欢迎技术探讨与项目合作,解锁大模型与智能交互的无限可能!
Agent Loop:AI Agent 的核心引擎
Agent Loop 是 nanobot 的心脏,负责消息处理、上下文构建、LLM 交互和工具调用的完整循环。
概述
Agent Loop 是 nanobot 的核心处理引擎,实现了"感知-思考-行动"的完整循环。它从消息总线接收用户消息,构建包含历史、记忆、技能的上下文,调用 LLM 进行推理,执行工具调用,最后将响应返回给用户。本文将深入剖析 Agent Loop 的设计与实现。
问题背景
Agent Loop 的核心挑战
构建一个 AI Agent 的核心循环需要解决以下问题:
| 挑战 | 描述 | nanobot 的解决方案 |
|---|---|---|
| 消息处理 | 如何高效处理异步消息 | 消息总线 + 异步队列 |
| 上下文构建 | 如何组装完整的对话上下文 | ContextBuilder 模块化构建 |
| 工具调用 | 如何处理多轮工具调用 | 迭代循环 + 结果反馈 |
| 会话管理 | 如何维护对话历史 | SessionManager 持久化 |
| 错误处理 | 如何优雅处理异常 | try-catch + 错误响应 |
Agent Loop 的设计目标
核心架构
AgentLoop 类结构
AgentLoop 是整个系统的核心协调者:
class AgentLoop:
"""
The agent loop is the core processing engine.
It:
1. Receives messages from the bus
2. Builds context with history, memory, skills
3. Calls the LLM
4. Executes tool calls
5. Sends responses back
"""
def __init__(
self,
bus: MessageBus,
provider: LLMProvider,
workspace: Path,
model: str | None = None,
max_iterations: int = 20,
brave_api_key: str | None = None,
exec_config: "ExecToolConfig | None" = None,
cron_service: "CronService | None" = None,
restrict_to_workspace: bool = False,
session_manager: SessionManager | None = None,
):
# ... 初始化代码
依赖组件
消息处理流程
主循环实现
Agent Loop 的核心是一个异步消息处理循环:
async def run(self) -> None:
"""Run the agent loop, processing messages from the bus."""
self._running = True
logger.info("Agent loop started")
while self._running:
try:
# 等待下一条消息
msg = await asyncio.wait_for(
self.bus.consume_inbound(),
timeout=1.0
)
# 处理消息
try:
response = await self._process_message(msg)
if response:
await self.bus.publish_outbound(response)
except Exception as e:
logger.error(f"Error processing message: {e}")
# 发送错误响应
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=f"Sorry, I encountered an error: {str(e)}"
))
except asyncio.TimeoutError:
continue
消息处理详解
上下文构建
ContextBuilder 的职责
ContextBuilder 负责组装完整的对话上下文:
class ContextBuilder:
"""
Builds the context (system prompt + messages) for the agent.
Assembles bootstrap files, memory, skills, and conversation history
into a coherent prompt for the LLM.
"""
BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"]
上下文构建流程
System Prompt 构建
def build_system_prompt(self, skill_names: list[str] | None = None) -> str:
"""Build the system prompt from bootstrap files, memory, and skills."""
parts = []
# 核心身份
parts.append(self._get_identity())
# Bootstrap 文件
bootstrap = self._load_bootstrap_files()
if bootstrap:
parts.append(bootstrap)
# 记忆上下文
memory = self.memory.get_memory_context()
if memory:
parts.append(f"# Memory\n\n{memory}")
# 技能 - 渐进式加载
# 1. Always-loaded 技能: 包含完整内容
always_skills = self.skills.get_always_skills()
if always_skills:
always_content = self.skills.load_skills_for_context(always_skills)
if always_content:
parts.append(f"# Active Skills\n\n{always_content}")
# 2. 可用技能: 只显示摘要
skills_summary = self.skills.build_skills_summary()
if skills_summary:
parts.append(f"""# Skills
The following skills extend your capabilities. To use a skill, read its SKILL.md file using the read_file tool.
Skills with available="false" need dependencies installed first.
{skills_summary}""")
return "\n\n---\n\n".join(parts)
核心身份信息
def _get_identity(self) -> str:
"""Get the core identity section."""
now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)")
tz = _time.strftime("%Z") or "UTC"
workspace_path = str(self.workspace.expanduser().resolve())
system = platform.system()
runtime = f"{'macOS' if system == 'Darwin' else system} {platform.machine()}, Python {platform.python_version()}"
return f"""# nanobot
You are nanobot, a helpful AI assistant. You have access to tools that allow you to:
- Read, write, and edit files
- Execute shell commands
- Search the web and fetch web pages
- Send messages to users on chat channels
- Spawn subagents for complex background tasks
## Current Time
{now} ({tz})
## Runtime
{runtime}
## Workspace
Your workspace is at: {workspace_path}
- Memory files: {workspace_path}/memory/MEMORY.md
- Daily notes: {workspace_path}/memory/YYYY-MM-DD.md
- Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md
"""
工具调用循环
迭代处理机制
Agent Loop 使用迭代循环处理多轮工具调用:
async def _process_message(self, msg: InboundMessage) -> OutboundMessage | None:
"""Process a single inbound message."""
# 获取或创建会话
session = self.sessions.get_or_create(msg.session_key)
# 构建初始消息
messages = self.context.build_messages(
history=session.get_history(),
current_message=msg.content,
media=msg.media if msg.media else None,
channel=msg.channel,
chat_id=msg.chat_id,
)
# Agent 循环
iteration = 0
final_content = None
while iteration < self.max_iterations:
iteration += 1
# 调用 LLM
response = await self.provider.chat(
messages=messages,
tools=self.tools.get_definitions(),
model=self.model
)
# 处理工具调用
if response.has_tool_calls:
# 添加助手消息(包含工具调用)
tool_call_dicts = [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.name,
"arguments": json.dumps(tc.arguments)
}
}
for tc in response.tool_calls
]
messages = self.context.add_assistant_message(
messages, response.content, tool_call_dicts,
reasoning_content=response.reasoning_content,
)
# 执行工具
for tool_call in response.tool_calls:
args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
logger.info(f"Tool call: {tool_call.name}({args_str[:200]})")
result = await self.tools.execute(tool_call.name, tool_call.arguments)
messages = self.context.add_tool_result(
messages, tool_call.id, tool_call.name, result
)
# 交错思维链: 在下一步行动前反思
messages.append({"role": "user", "content": "Reflect on the results and decide next steps."})
else:
# 无工具调用,完成处理
final_content = response.content
break
if final_content is None:
final_content = "I've completed processing but have no response to give."
# 保存到会话
session.add_message("user", msg.content)
session.add_message("assistant", final_content)
self.sessions.save(session)
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=final_content,
metadata=msg.metadata or {},
)
工具调用流程图
会话管理
Session 数据结构
@dataclass
class Session:
"""
A conversation session.
Stores messages in JSONL format for easy reading and persistence.
"""
key: str # channel:chat_id
messages: list[dict[str, Any]] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
metadata: dict[str, Any] = field(default_factory=dict)
def add_message(self, role: str, content: str, **kwargs: Any) -> None:
"""Add a message to the session."""
msg = {
"role": role,
"content": content,
"timestamp": datetime.now().isoformat(),
**kwargs
}
self.messages.append(msg)
self.updated_at = datetime.now()
def get_history(self, max_messages: int = 50) -> list[dict[str, Any]]:
"""Get message history for LLM context."""
recent = self.messages[-max_messages:] if len(self.messages) > max_messages else self.messages
return [{"role": m["role"], "content": m["content"]} for m in recent]
SessionManager 实现
class SessionManager:
"""Manages conversation sessions."""
def __init__(self, workspace: Path):
self.workspace = workspace
self.sessions_dir = ensure_dir(Path.home() / ".nanobot" / "sessions")
self._cache: dict[str, Session] = {}
def get_or_create(self, key: str) -> Session:
"""Get an existing session or create a new one."""
# 检查缓存
if key in self._cache:
return self._cache[key]
# 尝试从磁盘加载
session = self._load(key)
if session is None:
session = Session(key=key)
self._cache[key] = session
return session
系统消息处理
子代理通知机制
Agent Loop 还处理系统消息,如子代理完成通知:
async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
"""
Process a system message (e.g., subagent announce).
The chat_id field contains "original_channel:original_chat_id" to route
the response back to the correct destination.
"""
# 解析来源
if ":" in msg.chat_id:
parts = msg.chat_id.split(":", 1)
origin_channel = parts[0]
origin_chat_id = parts[1]
else:
origin_channel = "cli"
origin_chat_id = msg.chat_id
# 使用原始会话的上下文
session_key = f"{origin_channel}:{origin_chat_id}"
session = self.sessions.get_or_create(session_key)
# ... 处理逻辑与普通消息类似
关键设计决策
1. 最大迭代限制
max_iterations: int = 20
防止无限循环,确保系统稳定性。
2. 交错思维链
messages.append({"role": "user", "content": "Reflect on the results and decide next steps."})
在工具执行后添加反思提示,帮助 LLM 更好地规划下一步行动。
3. 错误恢复
except Exception as e:
logger.error(f"Error processing message: {e}")
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=f"Sorry, I encountered an error: {str(e)}"
))
确保错误不会导致系统崩溃,并向用户提供有意义的反馈。
性能优化
1. 会话缓存
self._cache: dict[str, Session] = {}
避免频繁的磁盘 I/O,提高响应速度。
2. 历史消息限制
def get_history(self, max_messages: int = 50) -> list[dict[str, Any]]:
限制上下文长度,避免超出 LLM 的 token 限制。
3. 异步处理
async def run(self) -> None:
while self._running:
msg = await asyncio.wait_for(
self.bus.consume_inbound(),
timeout=1.0
)
使用异步 I/O 提高并发处理能力。
总结
Agent Loop 是 nanobot 的核心引擎,其设计体现了以下原则:
| 设计原则 | 实现方式 |
|---|---|
| 模块化 | ContextBuilder、SessionManager、ToolRegistry 各司其职 |
| 可扩展 | 通过注册机制动态添加工具和技能 |
| 可靠性 | 迭代限制、错误恢复、会话持久化 |
| 可观测 | 完善的日志记录和状态追踪 |
Agent Loop 的核心是一个简洁但强大的迭代循环,通过上下文构建、LLM 调用和工具执行的组合,实现了完整的 Agent 功能。
更多推荐




所有评论(0)