OpenClaw 源码深度解析(二):执行篇 — Agent Loop 如何调用 LLM 和工具

OpenClaw v2026.2.25 | Node.js 22+ | TypeScript ESM


引言

规划阶段准备好了一切——session 恢复了、system prompt 组装好了、工具集过滤完毕、模型和 API Key 就位。现在,session.prompt("帮我查一下天气") 被调用,Agent 正式进入执行阶段

这是 OpenClaw 最核心、最精妙的代码路径。它不是简单的"调一次 LLM 拿结果",而是一个循环驱动的 Agentic Loop——LLM 可以反复调用工具、根据工具结果继续思考、直到生成最终回复。本文深入这个循环的每一个齿轮。


一、Agent Loop 全景

session.prompt(userMessage)
    │
    │  ┌─────────────────────────────────────────────────┐
    │  │              Pi Agent Loop                       │
    │  │                                                  │
    │  │  ┌──── 迭代 N ────────────────────────────┐     │
    │  │  │                                        │     │
    │  │  │  1. 构建上下文 (system + history + msg)│     │
    │  │  │  2. 调用 LLM API (流式)               │     │
    │  │  │  3. 解析响应                           │     │
    │  │  │     ├── 纯文本 → 回复用户 → 结束      │     │
    │  │  │     └── tool_calls → 进入步骤 4       │     │
    │  │  │  4. 逐个执行工具                       │     │
    │  │  │  5. 结果追加到历史                     │     │
    │  │  │  6. → 回到步骤 1 (迭代 N+1)           │     │
    │  │  │                                        │     │
    │  │  └────────────────────────────────────────┘     │
    │  │                                                  │
    │  │  终止条件: LLM 返回纯文本(无 tool_calls)       │
    │  └─────────────────────────────────────────────────┘

一次"帮我查天气"的请求,通常需要 2-3 次迭代

  1. LLM 返回 web_fetch 工具调用
  2. 执行工具获取天气数据,结果送回 LLM
  3. LLM 返回纯文本回复

而复杂任务(“帮我改这个 bug”)可能需要 10+ 次迭代——读文件、运行测试、编辑代码、再运行测试…


二、迭代 Step 1:构建上下文

每次迭代开始前,Pi SDK 需要构建发给 LLM 的完整上下文。

2.1 上下文组成

// 发给 LLM 的消息序列
const messages = [
  // 1. System Prompt (由 OpenClaw 预构建)
  { role: 'system', content: systemPrompt },

  // 2. 历史对话 (从 jsonl transcript 读取)
  ...historyMessages,   // user/assistant/tool 消息链

  // 3. 当前用户消息
  { role: 'user', content: '帮我查一下天气' },

  // 4. 之前的工具结果 (如果是第 2+ 次迭代)
  // ...{ role: 'tool', content: '北京 晴 15-25°C' }
];

2.2 历史裁剪

历史消息不是无限注入的。OpenClaw 会根据场景裁剪:

// src/agents/pi-embedded-runner/history.ts
function limitHistoryTurns(params: {
  history: Message[];
  chatType: 'direct' | 'group';
  maxTurns?: number;
}): Message[] {
  // 私聊: 保留更多历史 (完整上下文)
  // 群聊: 保留较少 (节省 token,群聊对话通常较碎片化)
  const defaultMaxTurns = params.chatType === 'group' ? 10 : 50;

  const maxTurns = params.maxTurns ?? defaultMaxTurns;
  if (params.history.length <= maxTurns) return params.history;

  // 保留最近的 turns,裁剪掉旧的
  return params.history.slice(-maxTurns);
}

2.3 Transcript Hygiene (转录卫生)

在构建上下文之前,OpenClaw 会对 transcript 做提供商特定的修复:

// src/agents/pi-embedded-runner/google.ts
function sanitizeSessionHistory(history: Message[], provider: string): Message[] {
  let sanitized = history;

  // 全局: 图片降采样 (防止 token 爆炸)
  sanitized = sanitizeImages(sanitized);

  // 全局: 清除不完整的 tool_calls
  sanitized = dropMalformedToolCalls(sanitized);

  // Google/Gemini 特有:
  if (provider === 'google') {
    sanitized = sanitizeToolCallIdsStrict(sanitized);    // ID 必须纯字母数字
    sanitized = repairToolResultPairing(sanitized);       // 修复 tool_use/tool_result 配对
    sanitized = applyGoogleTurnOrderingFix(sanitized);    // 修复对话顺序
  }

  // Anthropic 特有:
  if (provider === 'anthropic') {
    sanitized = mergeConsecutiveUserTurns(sanitized);     // 合并连续 user 消息
    sanitized = repairToolResultPairing(sanitized);       // 修复配对
  }

  // Mistral 特有:
  if (provider === 'mistral') {
    sanitized = sanitizeToolCallIdsLength9(sanitized);    // ID 必须 9 位字母数字
  }

  return sanitized;
}

这是一个被低估的设计——不同 LLM 对 API 格式的要求差异很大。如果不做这些修复,provider 会直接拒绝请求。OpenClaw 选择在上游解决这些问题,而不是让用户看到莫名其妙的报错。


三、迭代 Step 2:调用 LLM API

3.1 流式调用

// Pi SDK 内部 (简化)
import { streamSimple } from '@mariozechner/pi-ai';

const stream = streamSimple(model, messages, {
  maxTokens: contextWindow - currentTokens,
  thinking: thinkingLevel,
  // provider 特定参数
  ...getProviderExtraParams(provider),
});

3.2 Provider Extra Params

不同 provider 需要不同的额外参数:

// src/agents/pi-embedded-runner/extra-params.ts
function getProviderExtraParams(provider: string) {
  switch (provider) {
    case 'google':
      return {
        // Gemini 需要的特定配置
        turnLocation: 'last',
      };

    case 'openai':
      return {
        // OpenAI 的 reasoning effort
        reasoningEffort: mapThinkingToOpenAI(thinkingLevel),
      };

    case 'anthropic':
      return {
        // Claude 的特定参数
        cacheControl: getCacheControl(),
      };

    default:
      return {};
  }
}

3.3 AbortSignal 支持

每次 LLM 调用都绑定一个 AbortSignal,用户发 /stop 时可以立即中断:

// src/agents/pi-tools.abort.ts
function wrapToolWithAbort(tool: AnyAgentTool, getSignal: () => AbortSignal) {
  return {
    ...tool,
    execute: async (id, params, signal, onUpdate) => {
      const abortSignal = getSignal();
      // 组合两个 signal:工具自己的 + 外部 abort
      const combined = combineAbortSignals(signal, abortSignal);
      return tool.execute(id, params, combined, onUpdate);
    },
  };
}

四、迭代 Step 3:解析 LLM 响应

LLM 的流式响应分为两种情况:

情况 A:纯文本回复 → 结束循环

{
  "type": "text",
  "text": "今天北京晴,气温 15-25°C,适合外出。"
}

Agent Loop 结束,进入流式返回阶段。

情况 B:包含 tool_calls → 继续循环

{
  "type": "tool_calls",
  "text": "我来帮你查一下天气数据...",
  "tool_calls": [
    {
      "id": "call_abc123",
      "name": "web_fetch",
      "params": { "url": "https://wttr.in/Beijing?format=j1" }
    }
  ]
}

进入工具执行阶段。


五、迭代 Step 4:工具执行

5.1 执行流程

// Pi SDK 内部 (简化)
for (const toolCall of response.toolCalls) {
  // 1. 触发事件
  emit('tool_execution_start', { toolName: toolCall.name, callId: toolCall.id });

  // 2. 查找工具
  const tool = toolRegistry.get(toolCall.name);
  if (!tool) {
    emit('tool_execution_end', { error: `Unknown tool: ${toolCall.name}` });
    continue;
  }

  // 3. 执行
  try {
    const result = await tool.execute(
      toolCall.id,
      toolCall.params,
      abortSignal,
      (update) => {
        // 流式更新 (长耗时工具可以报告进度)
        emit('tool_execution_update', update);
      }
    );

    // 4. 触发完成事件
    emit('tool_execution_end', { callId: toolCall.id, result });

    // 5. 追加到历史
    history.push({ role: 'tool', content: result, toolCallId: toolCall.id });
  } catch (error) {
    if (error.name === 'AbortError') {
      emit('tool_execution_end', { callId: toolCall.id, error: 'aborted' });
      throw error; // 中断整个循环
    }
    history.push({ role: 'tool', content: `Error: ${error.message}`, toolCallId: toolCall.id });
  }
}

5.2 工具定义适配器

OpenClaw 的工具需要适配两个不同的签名——pi-agent-coreAgentToolpi-coding-agentToolDefinition

// src/agents/pi-tool-definition-adapter.ts
export function toToolDefinitions(tools: AnyAgentTool[]): ToolDefinition[] {
  return tools.map(tool => ({
    name: tool.name,
    label: tool.label ?? tool.name,
    description: tool.description ?? '',
    parameters: tool.parameters,
    execute: async (toolCallId, params, _ctx, signal, onUpdate) => {
      // 签名转换: pi-coding-agent 的 execute 有 5 个参数
      // AgentTool 的 execute 有 4 个参数
      // 这里做桥接
      return await tool.execute(toolCallId, params, signal, onUpdate);
    },
  }));
}

这看起来很小,但在实际中是一个反复出 bug 的地方——两个 SDK 版本的签名差异导致各种运行时错误。OpenClaw 用适配器层统一处理。

5.3 典型工具执行示例

web_fetch 为例:

// src/agents/tools/web-fetch.ts (简化)
const webFetchTool = {
  name: 'web_fetch',
  description: 'Fetch and extract readable content from a URL',
  parameters: {
    type: 'object',
    properties: {
      url: { type: 'string', description: 'HTTP(S) URL to fetch' },
      extractMode: { type: 'string', enum: ['markdown', 'text'] },
      maxChars: { type: 'number', default: 50000 },
    },
    required: ['url'],
  },
  execute: async (id, params, signal) => {
    // 1. 安全校验 (ClawDefender)
    validateUrl(params.url);  // 防止 SSRF

    // 2. 发起 HTTP 请求
    const response = await fetch(params.url, { signal });

    // 3. 提取正文
    const html = await response.text();
    const readable = extractReadability(html);  // 用 @mozilla/readability

    // 4. 格式化输出
    const output = params.extractMode === 'markdown'
      ? toMarkdown(readable)
      : toText(readable);

    // 5. 截断保护
    return output.slice(0, params.maxChars ?? 50000);
  },
};

exec(命令执行)为例:

// src/agents/tools/exec-tool.ts (简化)
const execTool = {
  name: 'exec',
  description: 'Run shell commands',
  parameters: {
    type: 'object',
    properties: {
      command: { type: 'string' },
      timeout: { type: 'number' },
      workdir: { type: 'string' },
      background: { type: 'boolean' },
    },
    required: ['command'],
  },
  execute: async (id, params, signal, onUpdate) => {
    // 1. 如果启用了沙箱,在容器中执行
    if (sandboxEnabled) {
      return await execInContainer(params, signal);
    }

    // 2. 否则在宿主机执行
    const child = spawn('powershell', ['/c', params.command], {
      cwd: params.workdir,
      signal,
      timeout: params.timeout ?? 30000,
    });

    // 3. 流式报告输出
    child.stdout.on('data', (chunk) => {
      onUpdate({ type: 'stdout', text: chunk.toString() });
    });

    // 4. 等待完成
    return await child.complete();
  },
};

六、流式响应处理

LLM 的输出不是等全部生成完才返回的。OpenClaw 通过事件订阅实时处理流式输出。

6.1 事件订阅系统

// src/agents/pi-embedded-subscribe.ts
function subscribeEmbeddedPiSession(params) {
  const { session, onBlockReply, onPartialReply, onToolResult, ... } = params;

  // 监听所有事件
  session.on('message_start', () => { /* 新消息开始 */ });
  session.on('message_update', (chunk) => { /* 流式文本块 */ });
  session.on('message_end', () => { /* 消息结束 */ });

  session.on('tool_execution_start', (event) => { /* 工具开始 */ });
  session.on('tool_execution_update', (event) => { /* 工具进度 */ });
  session.on('tool_execution_end', (event) => { /* 工具完成 */ });

  session.on('turn_start', () => { /* 一轮开始 */ });
  session.on('turn_end', () => { /* 一轮结束 */ });

  session.on('agent_start', () => { /* 整个 agent 运行开始 */ });
  session.on('agent_end', () => { /* 整个 agent 运行结束 */ });

  session.on('auto_compaction_start', () => { /* 压缩开始 */ });
  session.on('auto_compaction_end', (result) => { /* 压缩完成 */ });
}

6.2 Block Chunker(分块器)

流式输出是逐 token 到达的,但直接逐 token 发给用户会体验很差。OpenClaw 用 EmbeddedBlockChunker 做智能分块:

// src/agents/pi-embedded-block-chunker.ts
class EmbeddedBlockChunker {
  private buffer = '';
  private pendingCodeBlock = false;

  addChunk(text: string): string[] {
    this.buffer += text;

    const blocks: string[] = [];

    // 策略 1: 遇到双换行分段
    if (this.buffer.includes('\n\n')) {
      const parts = this.buffer.split('\n\n');
      blocks.push(...parts.slice(0, -1));
      this.buffer = parts[parts.length - 1];
    }

    // 策略 2: 代码块完整后发送
    if (this.buffer.includes('```')) {
      this.pendingCodeBlock = !this.pendingCodeBlock;
      if (!this.pendingCodeBlock) {
        blocks.push(this.buffer);
        this.buffer = '';
      }
    }

    // 策略 3: 超时强制发送 (避免长时间无输出)
    // ...

    return blocks;
  }
}

6.3 回复指令解析

Agent 可以在回复中嵌入特殊指令:

// src/agents/pi-embedded-subscribe.handlers.ts
function consumeReplyDirectives(chunk: string) {
  let text = chunk;
  const mediaUrls: string[] = [];
  let audioAsVoice = false;
  let replyToId: string | null = null;

  // [[media:url]] → 附加媒体文件
  text = text.replace(/\[\[media:(https?:\/\/[^\]]+)\]\]/g, (_, url) => {
    mediaUrls.push(url);
    return '';
  });

  // [[voice]] → 用 TTS 语音发送
  text = text.replace(/\[\[voice\]\]/g, () => {
    audioAsVoice = true;
    return '';
  });

  // [[reply_to:id]] → 引用回复
  text = text.replace(/\[\[reply_to:(\d+)\]\]/g, (_, id) => {
    replyToId = id;
    return '';
  });

  return { text, mediaUrls, audioAsVoice, replyToId };
}

6.4 Thinking 标签剥离

LLM 可能输出 thinking 内容(思考过程),这些不应该发给用户:

function stripBlockTags(text: string, state: { thinking: boolean }) {
  // 处理 🤔... 区块 (thinking 标记)
  let result = '';
  let i = 0;
  while (i < text.length) {
    if (text.startsWith('🤔', i)) {
      state.thinking = true;
      i += '🤔'.length;
      continue;
    }
    if (text.startsWith('</thinking>', i)) {
      state.thinking = false;
      i += '</thinking>'.length;
      continue;
    }
    if (!state.thinking) {
      result += text[i];
    }
    i++;
  }

  // 如果 enforceFinalTag 模式,只提取 <final>...</final>
  // ...

  return result;
}

七、并发控制与队列

7.1 同一 Session 串行执行

// src/agents/pi-embedded-runner/runs.ts
const activeRuns = new Map<string, RunEntry>();

async function enqueueRun(sessionKey: string, runFn: () => Promise<void>) {
  const existing = activeRuns.get(sessionKey);
  if (existing) {
    // 当前 session 有 run 在执行,排队
    existing.queue.push(runFn);
    return;
  }

  const entry = { current: runFn(), queue: [] };
  activeRuns.set(sessionKey, entry);

  try {
    await entry.current;
  } finally {
    // 执行队列中的下一个
    const next = entry.queue.shift();
    if (next) {
      activeRuns.set(sessionKey, { current: next(), queue: entry.queue });
    } else {
      activeRuns.delete(sessionKey);
    }
  }
}

为什么同一 session 必须串行? 因为 session 的 transcript 是 append-only 的,并发写入会导致数据损坏。

7.2 超时控制

const runResult = await Promise.race([
  session.prompt(prompt, { images }),
  new Promise((_, reject) =>
    setTimeout(() => reject(new Error('timeout')), params.timeoutMs)
  ),
]);

默认超时 120 秒。超时后自动清理 run 状态,释放并发锁。


八、错误处理与重试

8.1 错误分类

// src/agents/pi-embedded-helpers.ts
function classifyError(errorText: string): ErrorClass {
  if (isContextOverflowError(errorText))    return 'context_overflow';
  if (isCompactionFailureError(errorText))  return 'compaction_failure';
  if (isAuthError(errorText))               return 'auth';
  if (isRateLimitError(errorText))          return 'rate_limit';
  if (isQuotaError(errorText))              return 'quota';
  if (isTimeoutError(errorText))            return 'timeout';
  return 'unknown';
}

8.2 Context Overflow 自动恢复

// 上下文溢出时自动压缩后重试
if (classifyError(errorText) === 'context_overflow') {
  // 1. 触发 compaction (让 LLM 总结旧对话)
  await compactSession(sessionManager, model, provider);

  // 2. 用压缩后的历史重试
  return await retryPrompt(compactedHistory);
}

8.3 Thinking Level 降级

// 如果 thinking level 不被模型支持,自动降级
if (isThinkingLevelError(errorText)) {
  const fallback = pickFallbackThinkingLevel({
    attempted: currentThinkingLevel,
    message: errorText,
  });
  if (fallback) {
    return await retryWithThinking(fallback);
  }
}

8.4 Failover 链

// 可配置的多级降级
// models.providers.<p>.failover: { targets: [...] }
async function handleWithFailover(error: Error) {
  if (!config.failover?.targets?.length) throw error;

  const targets = config.failover.targets;
  for (const target of targets) {
    try {
      return await runWithModel(target.provider, target.model);
    } catch (e) {
      continue; // 尝试下一个
    }
  }
  throw error; // 所有 target 都失败
}

九、执行阶段的完整时序图

session.prompt("帮我查一下天气")
    │
    ▼
┌─ 迭代 1 ─────────────────────────────────────────────┐
│  ① 构建 messages: [system, ...history, user_msg]    │
│  ② sanitizeSessionHistory() (provider 适配)          │
│  ③ limitHistoryTurns() (裁剪历史)                    │
│  ④ streamSimple(model, messages) → 调用 LLM         │
│  ⑤ 响应: tool_calls: [{ name: "web_fetch",           │
│                         params: { url: "wttr.in" } }]│
│  ⑥ 执行 web_fetch("wttr.in/Beijing")                │
│  ⑦ 结果: "北京 晴 15-25°C..."                        │
│  ⑧ 追加到 history → 继续迭代                        │
└──────────────────────┬──────────────────────────────┘
                       │
                       ▼
┌─ 迭代 2 ─────────────────────────────────────────────┐
│  ① 构建 messages: [system, ...history, tool_result]  │
│  ② streamSimple(model, messages) → 调用 LLM         │
│  ③ 响应: text: "今天北京晴,15-25°C,适合外出。"     │
│  ④ 无 tool_calls → 循环结束                         │
└──────────────────────┬──────────────────────────────┘
                       │
                       ▼
┌─ 流式返回 ───────────────────────────────────────────┐
│  ① BlockChunker 分块                                │
│  ② stripBlockTags() 剥离 thinking                    │
│  ③ consumeReplyDirectives() 解析 [[media]] 等        │
│  ④ onBlockReply() → Channel Provider 发送到 Telegram  │
│  ⑤ 用户看到: "今天北京晴,15-25°C,适合外出。"       │
└──────────────────────┬──────────────────────────────┘
                       │
                       ▼
┌─ 后处理 ─────────────────────────────────────────────┐
│  ① 更新 sessions.json (token 计数, timestamp)       │
│  ② SessionManager 追加到 jsonl                       │
│  ③ 检查 contextTokens vs contextWindow               │
│  ④ 如果接近上限 → 触发 auto-compaction               │
│  ⑤ 释放 run 锁                                      │
└──────────────────────────────────────────────────────┘

设计亮点总结

  1. Agentic Loop 本质 — 不是单次调用,而是"思考→行动→观察→再思考"的循环
  2. Transcript Hygiene — 上游解决 provider 差异,用户无感知
  3. Block Chunker 智能分块 — 不是逐 token 推送,而是按语义分段
  4. 工具签名适配器 — 桥接两个 SDK 的接口差异
  5. 同 Session 串行执行 — 保护 transcript 一致性
  6. 多层错误恢复 — overflow→压缩重试、thinking→降级、auth→failover
  7. 回复指令系统 — Agent 可以声明式地控制发送格式(媒体/语音/引用)
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐