nanobot 是一个超轻量级的 AI 助理框架,其核心处理引擎 AgentLoop 位于 nanobot/agent/loop.py 中。本文将从架构设计、关键流程、模块交互等方面,逐段解析这个仅有几百行代码却支撑起完整 Agent 能力的核心模块。


一、AgentLoop 概述

AgentLoop 是 nanobot 的消息处理中枢,负责:

  1. 从消息总线(MessageBus)接收入站消息;
  2. 结合会话历史、长期记忆、工具定义构建提示上下文;
  3. 调用 LLM 进行推理,解析工具调用并执行;
  4. 将最终响应发布回总线;
  5. 管理会话状态和记忆整合。

其设计遵循经典的“事件循环 + 工具调用”模式,但通过精巧的抽象实现了极简代码量和高度可扩展性。
在这里插入图片描述

二、初始化:参数与默认工具

def __init__(
    self,
    bus: MessageBus,
    provider: LLMProvider,
    workspace: Path,
    model: str | None = None,
    max_iterations: int = 20,
    temperature: float = 0.7,
    max_tokens: int = 4096,
    memory_window: int = 50,
    brave_api_key: str | None = None,
    exec_config: "ExecToolConfig | None" = None,
    cron_service: "CronService | None" = None,
    restrict_to_workspace: bool = False,
    session_manager: SessionManager | None = None,
    mcp_servers: dict | None = None,
):

构造函数接收大量配置参数,核心依赖包括:

  • bus:消息总线,用于消费入站消息和发布出站消息。
  • provider:LLM 提供者(如 OpenAI、Claude 等),通过统一接口进行对话。
  • workspace:工作目录,用于文件操作隔离。
  • model:使用的模型名称。
  • max_iterations:单次对话中允许的最大工具调用轮数,防止无限循环。
  • memory_window:保留在会话中的最新消息数量,超出部分将触发记忆整合。
  • restrict_to_workspace:是否限制文件操作在工作区内。
  • mcp_servers:MCP 服务器配置,用于扩展工具集。

初始化过程中,还会创建一系列辅助组件:

  • ContextBuilder:负责构建发送给 LLM 的提示消息。
  • SessionManager:管理会话状态(历史消息、最后整合索引等)。
  • ToolRegistry:工具注册表,管理所有可用工具。
  • SubagentManager:管理子 Agent(用于 spawn 工具)。
  • 最后调用 _register_default_tools() 注册内置工具。

默认工具注册

def _register_default_tools(self) -> None:
    # 文件工具(受限目录)
    self.tools.register(ReadFileTool(allowed_dir=allowed_dir))
    ...
    # Shell 工具
    self.tools.register(ExecTool(...))
    # 网络工具
    self.tools.register(WebSearchTool(api_key=self.brave_api_key))
    self.tools.register(WebFetchTool())
    # 消息工具(用于发送消息)
    message_tool = MessageTool(send_callback=self.bus.publish_outbound)
    self.tools.register(message_tool)
    # 衍生子 Agent 工具
    spawn_tool = SpawnTool(manager=self.subagents)
    self.tools.register(spawn_tool)
    # 定时任务工具
    if self.cron_service:
        self.tools.register(CronTool(self.cron_service))

这些工具覆盖了文件操作、Shell 执行、网络搜索、消息发送、子 Agent 衍生、定时任务等常见需求,构成了 Agent 的“手脚”。


三、MCP 连接:动态扩展工具集

async def _connect_mcp(self) -> None:
    if self._mcp_connected or not self._mcp_servers:
        return
    from nanobot.agent.tools.mcp import connect_mcp_servers
    self._mcp_stack = AsyncExitStack()
    await self._mcp_stack.__aenter__()
    await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)

MCP(Model Context Protocol)是 Anthropic 提出的标准化工具协议,nanobot 通过 connect_mcp_servers 将外部 MCP 服务器的工具动态注册到 ToolRegistry 中。AsyncExitStack 用于管理多个异步上下文,确保所有连接在退出时正确清理。

连接是懒加载的,只有在第一次处理消息前(run()process_direct() 中调用)才会真正连接。


四、主循环:run() 方法

async def run(self) -> None:
    self._running = True
    await self._connect_mcp()
    while self._running:
        try:
            msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
            try:
                response = await self._process_message(msg)
                if response:
                    await self.bus.publish_outbound(response)
            except Exception as e:
                await self.bus.publish_outbound(OutboundMessage(...))
        except asyncio.TimeoutError:
            continue
  • 使用 asyncio.wait_for 实现非阻塞消费,超时后继续检查 _running 标志。
  • 对每条消息调用 _process_message 处理,异常时返回错误信息。
  • 响应通过 bus.publish_outbound 发出。

这种设计使得 AgentLoop 可以独立运行,与消息总线解耦,易于集成到不同运行时(如 CLI、HTTP 服务器、IM 网关)。


五、核心处理:_process_message()

这是整个 Agent 的逻辑核心,负责将一条入站消息转化为响应。我们分步解析:

1. 系统消息的特殊处理

if msg.channel == "system":
    return await self._process_system_message(msg)

系统消息通常由子 Agent 或后台任务发起,其 chat_id 格式为 "origin_channel:origin_chat_id",用于将响应路由回原始会话。

2. 会话管理与命令处理

key = session_key or msg.session_key
session = self.sessions.get_or_create(key)

if cmd == "/new":
    # 清空会话,异步整合记忆
    ...
if cmd == "/help":
    return OutboundMessage(...)
  • 会话由 session_key 标识(默认格式 channel:chat_id)。
  • 支持 /new/help 两个内置命令:/new 清空当前会话并将旧消息异步整合到长期记忆;/help 显示帮助信息。

3. 触发记忆整合(后台)

if len(session.messages) > self.memory_window:
    asyncio.create_task(self._consolidate_memory(session))

当会话消息数超过 memory_window 时,触发后台记忆整合,避免阻塞主流程。

4. 设置工具上下文

self._set_tool_context(msg.channel, msg.chat_id)

某些工具(如 MessageToolSpawnTool)需要知道当前会话的 channel 和 chat_id,以便发送消息或衍生子 Agent。此方法将上下文注入这些工具实例。

5. 构建初始消息

initial_messages = self.context.build_messages(
    history=session.get_history(max_messages=self.memory_window),
    current_message=msg.content,
    media=msg.media if msg.media else None,
    channel=msg.channel,
    chat_id=msg.chat_id,
)

ContextBuilder 将历史消息、当前用户输入、可能的媒体信息组合成一个符合 LLM 格式的消息列表,并注入系统提示词。

6. 运行代理循环

final_content, tools_used = await self._run_agent_loop(initial_messages)

这是实际与 LLM 交互并执行工具的过程,稍后详述。

7. 保存会话并返回响应

session.add_message("user", msg.content)
session.add_message("assistant", final_content, tools_used=tools_used)
self.sessions.save(session)

return OutboundMessage(...)

将本次交互存入会话,并通过总线返回响应。

8. 系统消息处理

_process_system_message 逻辑类似,但会从 chat_id 解析原始 channel 和 chat_id,并创建对应的会话。

核心处理引擎

/new

/help

InboundMessage
入站消息

系统消息?

解析路由信息

处理系统任务

OutboundMessage
出站消息

获取/创建会话

斜杠命令?

清空当前会话

后台: 异步整合记忆

返回: 新会话已开始

返回: 帮助信息

超出记忆窗口?

触发后台整合任务

构建上下文 Context

运行 ReAct 循环

保存会话历史


六、代理循环:_run_agent_loop()

这是 Agent 的“大脑”,实现多轮工具调用:

messages = initial_messages
iteration = 0
final_content = None
tools_used = []

while iteration < self.max_iterations:
    response = await self.provider.chat(...)

    if response.has_tool_calls:
        # 添加 assistant 消息(包含工具调用请求)
        messages = self.context.add_assistant_message(...)

        for tool_call in response.tool_calls:
            tools_used.append(tool_call.name)
            result = await self.tools.execute(tool_call.name, tool_call.arguments)
            messages = self.context.add_tool_result(...)

        # 添加一条“反思”提示,让 LLM 决定下一步
        messages.append({"role": "user", "content": "Reflect on the results and decide next steps."})
    else:
        final_content = response.content
        break

关键点:

  • 每次迭代调用 LLM,如果返回工具调用,则依次执行所有工具,并将结果以 tool 角色消息追加。
  • 工具执行由 ToolRegistry 负责,根据名称查找并调用对应的工具实例。
  • 执行完工具后,添加一条固定用户消息 "Reflect on the results and decide next steps.",引导 LLM 继续推理(或结束)。
  • 当 LLM 返回纯文本(无工具调用)时,结束循环。
    在这里插入图片描述

这种设计模仿了 Anthropic 的 Claude 工具调用模式,通过显式“反思”步骤让模型决定是否需要更多工具。


七、记忆整合:_consolidate_memory()

当会话消息超过窗口或用户执行 /new 时,调用此方法将旧消息压缩为长期记忆。

memory = MemoryStore(self.workspace)
# 提取待整合的消息
old_messages = session.messages[session.last_consolidated:-keep_count]
# 构建提示,调用 LLM 生成 history_entry 和 memory_update
prompt = f"""..."""
response = await self.provider.chat(...)
result = json_repair.loads(response.content)
# 更新历史文件和长期记忆文件
memory.append_history(entry)
memory.write_long_term(update)
  • 使用 json_repair 解析 LLM 返回的 JSON,即使输出有轻微格式问题也能恢复。
  • 整合结果包含两条信息:
    • history_entry:一段摘要,追加到 HISTORY.md
    • memory_update:更新后的长期记忆内容,写入 MEMORY.md
  • 整合完成后更新 session.last_consolidated 索引。

此机制实现了会话的无限上下文:超出窗口的消息被压缩为摘要和记忆,既节省 token,又保留了重要信息。

🧠 记忆系统

短期记忆

存储载体: Session列表

容量限制: 最近50条

特点: 实时更新

作用: 提供即时上下文

长期记忆

📄 MEMORY.md

核心内容

用户偏好

项目背景

关键事实

特点: 原子化存储

作用: 跨会话保持一致性

历史摘要

📜 HISTORY.md

组织方式

按时间线索引

事件粒度记录

特点: 可搜索文本

作用: 回溯关键决策

整合机制

🔄 LLM后台任务

触发条件

窗口溢出时

/new 指令时

处理流程

提取关键信息

去重更新Memory

增量追加History


八、直接处理:process_direct()

async def process_direct(self, content: str, session_key: str = "cli:direct", ...) -> str:
    await self._connect_mcp()
    msg = InboundMessage(...)
    response = await self._process_message(msg, session_key=session_key)
    return response.content if response else ""

提供一种同步风格的接口,供 CLI、定时任务等场景直接调用。它内部创建 InboundMessage 并走相同的处理流程,但返回的是纯文本响应。


九、停止与清理

def stop(self):
    self._running = False

async def close_mcp(self):
    if self._mcp_stack:
        await self._mcp_stack.aclose()
  • stop() 设置标志位,使主循环退出。
  • close_mcp() 关闭所有 MCP 连接,应在应用关闭时调用。

十、设计亮点与思考

  1. 极简核心:主循环不到 50 行,却完整支持多轮工具调用、会话管理、记忆整合,体现了“少即是多”的设计哲学。

  2. 异步驱动:全程使用 asyncio,从消息消费到工具执行都是非阻塞,为高并发打下基础。

  3. 工具上下文注入:通过 _set_tool_context 在每次消息处理前更新工具的状态,避免了全局变量,保证了线程/协程安全。

  4. 记忆整合异步化:将耗时操作(如 /new 后的记忆整合)放入后台任务,避免阻塞用户响应。

  5. MCP 集成:通过 AsyncExitStack 优雅管理外部工具连接,使扩展工具集变得非常简单。

  6. 错误处理:在 run() 中捕获所有异常并返回友好错误,保证服务不崩溃。


十一、总结

AgentLoop 是 nanobot 的灵魂,它以不到 1000 行代码实现了生产级 Agent 所需的核心能力:消息驱动、多轮推理、工具调用、记忆管理、会话隔离、外部工具扩展。通过阅读其源码,我们可以学到如何用简洁的异步代码构建一个可扩展的 AI 助理引擎,也为开发者定制自己的 Agent 提供了绝佳的范本。

如果你也想打造自己的“贾维斯”,不妨从理解 AgentLoop 开始。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐