玄同 765

大语言模型 (LLM) 开发工程师 | 中国传媒大学 · 数字媒体技术(智能交互与游戏设计)

CSDN · 个人主页 | GitHub · Follow


关于作者

  • 深耕领域:大语言模型开发 / RAG 知识库 / AI Agent 落地 / 模型微调
  • 技术栈:Python | RAG (LangChain / Dify + Milvus) | FastAPI + Docker
  • 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案

「让 AI 交互更智能,让技术落地更高效」
欢迎技术探讨与项目合作,解锁大模型与智能交互的无限可能!


Agent Loop:AI Agent 的核心引擎

Agent Loop 是 nanobot 的心脏,负责消息处理、上下文构建、LLM 交互和工具调用的完整循环。

概述

Agent Loop 是 nanobot 的核心处理引擎,实现了"感知-思考-行动"的完整循环。它从消息总线接收用户消息,构建包含历史、记忆、技能的上下文,调用 LLM 进行推理,执行工具调用,最后将响应返回给用户。本文将深入剖析 Agent Loop 的设计与实现。

问题背景

Agent Loop 的核心挑战

构建一个 AI Agent 的核心循环需要解决以下问题:

挑战 描述 nanobot 的解决方案
消息处理 如何高效处理异步消息 消息总线 + 异步队列
上下文构建 如何组装完整的对话上下文 ContextBuilder 模块化构建
工具调用 如何处理多轮工具调用 迭代循环 + 结果反馈
会话管理 如何维护对话历史 SessionManager 持久化
错误处理 如何优雅处理异常 try-catch + 错误响应

Agent Loop 的设计目标

设计目标

可靠性

可扩展性

可观测性

错误恢复

迭代限制

工具注册

上下文扩展

日志记录

状态追踪

核心架构

AgentLoop 类结构

AgentLoop 是整个系统的核心协调者:

class AgentLoop:
    """
    The agent loop is the core processing engine.
  
    It:
    1. Receives messages from the bus
    2. Builds context with history, memory, skills
    3. Calls the LLM
    4. Executes tool calls
    5. Sends responses back
    """
  
    def __init__(
        self,
        bus: MessageBus,
        provider: LLMProvider,
        workspace: Path,
        model: str | None = None,
        max_iterations: int = 20,
        brave_api_key: str | None = None,
        exec_config: "ExecToolConfig | None" = None,
        cron_service: "CronService | None" = None,
        restrict_to_workspace: bool = False,
        session_manager: SessionManager | None = None,
    ):
        # ... 初始化代码

依赖组件

InboundMessage

MessageBus

AgentLoop

ContextBuilder

LLMProvider

ToolRegistry

SessionManager

SubagentManager

OutboundMessage

消息处理流程

主循环实现

Agent Loop 的核心是一个异步消息处理循环:

async def run(self) -> None:
    """Run the agent loop, processing messages from the bus."""
    self._running = True
    logger.info("Agent loop started")
  
    while self._running:
        try:
            # 等待下一条消息
            msg = await asyncio.wait_for(
                self.bus.consume_inbound(),
                timeout=1.0
            )
          
            # 处理消息
            try:
                response = await self._process_message(msg)
                if response:
                    await self.bus.publish_outbound(response)
            except Exception as e:
                logger.error(f"Error processing message: {e}")
                # 发送错误响应
                await self.bus.publish_outbound(OutboundMessage(
                    channel=msg.channel,
                    chat_id=msg.chat_id,
                    content=f"Sorry, I encountered an error: {str(e)}"
                ))
        except asyncio.TimeoutError:
            continue

消息处理详解

MessageBus

AgentLoop

SessionManager

ContextBuilder

LLMProvider

ToolRegistry

上下文构建

ContextBuilder 的职责

ContextBuilder 负责组装完整的对话上下文:

class ContextBuilder:
    """
    Builds the context (system prompt + messages) for the agent.
  
    Assembles bootstrap files, memory, skills, and conversation history
    into a coherent prompt for the LLM.
    """
  
    BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"]

上下文构建流程

Identity

System Prompt

Bootstrap Files

Memory System

Skills System

Messages

Session History

Current Message

System Prompt 构建

def build_system_prompt(self, skill_names: list[str] | None = None) -> str:
    """Build the system prompt from bootstrap files, memory, and skills."""
    parts = []
  
    # 核心身份
    parts.append(self._get_identity())
  
    # Bootstrap 文件
    bootstrap = self._load_bootstrap_files()
    if bootstrap:
        parts.append(bootstrap)
  
    # 记忆上下文
    memory = self.memory.get_memory_context()
    if memory:
        parts.append(f"# Memory\n\n{memory}")
  
    # 技能 - 渐进式加载
    # 1. Always-loaded 技能: 包含完整内容
    always_skills = self.skills.get_always_skills()
    if always_skills:
        always_content = self.skills.load_skills_for_context(always_skills)
        if always_content:
            parts.append(f"# Active Skills\n\n{always_content}")
  
    # 2. 可用技能: 只显示摘要
    skills_summary = self.skills.build_skills_summary()
    if skills_summary:
        parts.append(f"""# Skills

The following skills extend your capabilities. To use a skill, read its SKILL.md file using the read_file tool.
Skills with available="false" need dependencies installed first.

{skills_summary}""")
  
    return "\n\n---\n\n".join(parts)

核心身份信息

def _get_identity(self) -> str:
    """Get the core identity section."""
    now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)")
    tz = _time.strftime("%Z") or "UTC"
    workspace_path = str(self.workspace.expanduser().resolve())
    system = platform.system()
    runtime = f"{'macOS' if system == 'Darwin' else system} {platform.machine()}, Python {platform.python_version()}"
  
    return f"""# nanobot

You are nanobot, a helpful AI assistant. You have access to tools that allow you to:
- Read, write, and edit files
- Execute shell commands
- Search the web and fetch web pages
- Send messages to users on chat channels
- Spawn subagents for complex background tasks

## Current Time
{now} ({tz})

## Runtime
{runtime}

## Workspace
Your workspace is at: {workspace_path}
- Memory files: {workspace_path}/memory/MEMORY.md
- Daily notes: {workspace_path}/memory/YYYY-MM-DD.md
- Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md
"""

工具调用循环

迭代处理机制

Agent Loop 使用迭代循环处理多轮工具调用:

async def _process_message(self, msg: InboundMessage) -> OutboundMessage | None:
    """Process a single inbound message."""
  
    # 获取或创建会话
    session = self.sessions.get_or_create(msg.session_key)
  
    # 构建初始消息
    messages = self.context.build_messages(
        history=session.get_history(),
        current_message=msg.content,
        media=msg.media if msg.media else None,
        channel=msg.channel,
        chat_id=msg.chat_id,
    )
  
    # Agent 循环
    iteration = 0
    final_content = None
  
    while iteration < self.max_iterations:
        iteration += 1
      
        # 调用 LLM
        response = await self.provider.chat(
            messages=messages,
            tools=self.tools.get_definitions(),
            model=self.model
        )
      
        # 处理工具调用
        if response.has_tool_calls:
            # 添加助手消息(包含工具调用)
            tool_call_dicts = [
                {
                    "id": tc.id,
                    "type": "function",
                    "function": {
                        "name": tc.name,
                        "arguments": json.dumps(tc.arguments)
                    }
                }
                for tc in response.tool_calls
            ]
            messages = self.context.add_assistant_message(
                messages, response.content, tool_call_dicts,
                reasoning_content=response.reasoning_content,
            )
          
            # 执行工具
            for tool_call in response.tool_calls:
                args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
                logger.info(f"Tool call: {tool_call.name}({args_str[:200]})")
                result = await self.tools.execute(tool_call.name, tool_call.arguments)
                messages = self.context.add_tool_result(
                    messages, tool_call.id, tool_call.name, result
                )
            # 交错思维链: 在下一步行动前反思
            messages.append({"role": "user", "content": "Reflect on the results and decide next steps."})
        else:
            # 无工具调用,完成处理
            final_content = response.content
            break
  
    if final_content is None:
        final_content = "I've completed processing but have no response to give."
  
    # 保存到会话
    session.add_message("user", msg.content)
    session.add_message("assistant", final_content)
    self.sessions.save(session)
  
    return OutboundMessage(
        channel=msg.channel,
        chat_id=msg.chat_id,
        content=final_content,
        metadata=msg.metadata or {},
    )

工具调用流程图

Yes

No

Yes

No

Start

Build Context

Call LLM

Tool Calls?

Add Assistant Msg

Execute Tool

Add Tool Result

Add Reflection

Iteration Limit?

Timeout

Final Response

Save Session

Return Response

会话管理

Session 数据结构

@dataclass
class Session:
    """
    A conversation session.
  
    Stores messages in JSONL format for easy reading and persistence.
    """
  
    key: str  # channel:chat_id
    messages: list[dict[str, Any]] = field(default_factory=list)
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)
    metadata: dict[str, Any] = field(default_factory=dict)
  
    def add_message(self, role: str, content: str, **kwargs: Any) -> None:
        """Add a message to the session."""
        msg = {
            "role": role,
            "content": content,
            "timestamp": datetime.now().isoformat(),
            **kwargs
        }
        self.messages.append(msg)
        self.updated_at = datetime.now()
  
    def get_history(self, max_messages: int = 50) -> list[dict[str, Any]]:
        """Get message history for LLM context."""
        recent = self.messages[-max_messages:] if len(self.messages) > max_messages else self.messages
        return [{"role": m["role"], "content": m["content"]} for m in recent]

SessionManager 实现

class SessionManager:
    """Manages conversation sessions."""
  
    def __init__(self, workspace: Path):
        self.workspace = workspace
        self.sessions_dir = ensure_dir(Path.home() / ".nanobot" / "sessions")
        self._cache: dict[str, Session] = {}
  
    def get_or_create(self, key: str) -> Session:
        """Get an existing session or create a new one."""
        # 检查缓存
        if key in self._cache:
            return self._cache[key]
      
        # 尝试从磁盘加载
        session = self._load(key)
        if session is None:
            session = Session(key=key)
      
        self._cache[key] = session
        return session

系统消息处理

子代理通知机制

Agent Loop 还处理系统消息,如子代理完成通知:

async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
    """
    Process a system message (e.g., subagent announce).
  
    The chat_id field contains "original_channel:original_chat_id" to route
    the response back to the correct destination.
    """
    # 解析来源
    if ":" in msg.chat_id:
        parts = msg.chat_id.split(":", 1)
        origin_channel = parts[0]
        origin_chat_id = parts[1]
    else:
        origin_channel = "cli"
        origin_chat_id = msg.chat_id
  
    # 使用原始会话的上下文
    session_key = f"{origin_channel}:{origin_chat_id}"
    session = self.sessions.get_or_create(session_key)
  
    # ... 处理逻辑与普通消息类似

关键设计决策

1. 最大迭代限制

max_iterations: int = 20

防止无限循环,确保系统稳定性。

2. 交错思维链

messages.append({"role": "user", "content": "Reflect on the results and decide next steps."})

在工具执行后添加反思提示,帮助 LLM 更好地规划下一步行动。

3. 错误恢复

except Exception as e:
    logger.error(f"Error processing message: {e}")
    await self.bus.publish_outbound(OutboundMessage(
        channel=msg.channel,
        chat_id=msg.chat_id,
        content=f"Sorry, I encountered an error: {str(e)}"
    ))

确保错误不会导致系统崩溃,并向用户提供有意义的反馈。

性能优化

1. 会话缓存

self._cache: dict[str, Session] = {}

避免频繁的磁盘 I/O,提高响应速度。

2. 历史消息限制

def get_history(self, max_messages: int = 50) -> list[dict[str, Any]]:

限制上下文长度,避免超出 LLM 的 token 限制。

3. 异步处理

async def run(self) -> None:
    while self._running:
        msg = await asyncio.wait_for(
            self.bus.consume_inbound(),
            timeout=1.0
        )

使用异步 I/O 提高并发处理能力。

总结

Agent Loop 是 nanobot 的核心引擎,其设计体现了以下原则:

设计原则 实现方式
模块化 ContextBuilder、SessionManager、ToolRegistry 各司其职
可扩展 通过注册机制动态添加工具和技能
可靠性 迭代限制、错误恢复、会话持久化
可观测 完善的日志记录和状态追踪

Agent Loop 的核心是一个简洁但强大的迭代循环,通过上下文构建、LLM 调用和工具执行的组合,实现了完整的 Agent 功能。


Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐