Day 4 | OpenClaw 流式输出的艺术:从模型 Token 到用户消息

系列:《从 0 到 1 拆解 AI Agent 框架:OpenClaw 技术深度解析》


前言

你有没有注意到,ChatGPT 的回复是一个字一个字"打"出来的,而不是等全部生成完再一次性显示?

这不是什么视觉特效,而是 AI 模型的底层工作方式——它本来就是一个 token 一个 token 生成的。但从模型吐出第一个 token,到用户在手机屏幕上看到第一个字,中间发生了什么?

这中间有一条完整的流式输出链路:模型 → API → Agent 运行时 → Gateway → 客户端。每一个环节都有自己的缓冲、切割、传输逻辑。一旦某个环节设计不好,用户要么看到"打字机卡顿",要么等很久才看到第一个字,要么收到一条被截断的消息。

本文将沿着这条链路,逐层拆解 OpenClaw 的流式输出实现。


一、为什么要流式输出?

先回答一个基础问题:为什么不等模型生成完整回复再发送?

1.1 等待体验极差

GPT-4 生成一段 500 字的回复,大约需要 10-20 秒。如果用户需要等 20 秒后才能看到第一个字,大多数人会以为系统卡死了。

流式输出的核心价值是降低"首字延迟"(Time to First Token,TTFT)。通常模型在 1-2 秒内就能开始输出第一个 token,流式输出让用户立刻感知到"AI 在工作"。

1.2 消息平台的限制

Telegram、WhatsApp 等平台有消息长度限制(Telegram 单条最大 4096 字符)。如果 AI 生成了一篇 3000 字的文章,必须切割成多条消息发送。

流式输出还带来另一个好处:可以在生成过程中实时"编辑"消息,而不是发出去再撤回重发——大幅减少 API 调用次数。

1.3 工具调用的交织

现代 AI Agent 不只是生成文字,还会调用工具(读文件、搜索、执行代码)。流式输出让工具调用的过程对用户可见:

AI:我来帮你查一下...
[调用搜索工具]
AI:根据搜索结果,今天上海的天气是...

而不是让用户干等 30 秒后突然收到一条完整回复。


二、SSE:流式传输的基础协议

在深入 OpenClaw 实现之前,先了解流式输出的底层协议。

2.1 什么是 SSE?

SSE(Server-Sent Events) 是一种基于 HTTP 的单向推送协议。服务端可以通过一个长连接,持续向客户端推送数据,而不需要客户端轮询。

数据格式极其简单:

data: {"type":"content_block_delta","delta":{"text":"你好"}}

data: {"type":"content_block_delta","delta":{"text":",有什么"}}

data: {"type":"content_block_delta","delta":{"text":"可以帮你?"}}

data: [DONE]

每条消息以 data: 开头,消息之间用空行分隔,[DONE] 表示流结束。

2.2 OpenAI / Anthropic 的流式 API

主流模型提供商都采用 SSE 格式返回流式输出,但字段结构略有不同:

OpenAI 格式:

{
  "choices": [{
    "delta": { "content": "你好" },
    "finish_reason": null
  }]
}

Anthropic (Claude) 格式:

{
  "type": "content_block_delta",
  "index": 0,
  "delta": { "type": "text_delta", "text": "你好" }
}

OpenClaw 需要处理多种模型,因此在内部做了统一的流式抽象层,屏蔽不同 API 的格式差异。


三、Block Streaming:OpenClaw 的核心流式设计

OpenClaw 引入了一个关键概念——Block(块)

3.1 什么是 Block?

一次 AI 响应被切分为多个 Block,每个 Block 代表一种输出类型:

Block 类型 含义
text 普通文字内容
tool_use 工具调用请求
tool_result 工具执行结果
thinking 思维链(推理过程)

一次复杂响应可能包含多个 Block 交替出现:

[text: "我来帮你查一下天气"] 
[tool_use: search("上海天气")] 
[tool_result: "晴,25°C"]
[text: "上海今天天气不错,25°C 晴天。"]

3.2 为什么要抽象 Block?

原因一:统一多模型差异

不同模型的工具调用格式完全不同。抽象为 Block 后,上层逻辑只需处理统一格式,无需关心底层是 GPT 还是 Claude。

原因二:支持流式工具调用

模型在生成工具调用时,也是流式输出的——工具名称、参数都是一个字符一个字符流出来的。Block 设计让这个过程可以被正确"组装":

收到 delta: {"name": "sear"}
收到 delta: {"name": "ch", "input": {"q":}
收到 delta: {"input": {"query": "上海天气"}}
Block 完成,触发工具调用

原因三:流式渲染预览

Block 流允许客户端在响应完成前,就开始渲染已收到的部分——这是"打字机效果"的基础。

3.3 Block 的状态机

每个 Block 有完整的生命周期:

pending → streaming → complete
                   ↘ error
  • pending:Block 已创建,等待内容
  • streaming:正在接收 delta 增量
  • complete:Block 内容完整,可以处理
  • error:流中断,Block 不完整

上层逻辑监听 Block 状态变化,而不是直接处理原始 delta——这让代码结构清晰很多。


四、Chunk 算法:消息的切割与合并

拿到完整的文字内容后,还需要解决一个问题:怎么把它发出去?

4.1 消息平台的限制

不同平台有不同的消息长度限制:

平台 单条消息上限
Telegram 4096 字符
WhatsApp ~65536 字符
Discord 2000 字符
微信 ~2000 字符
短信 160 字符(ASCII)

AI 的回复动辄几千字,必须切割。

4.2 朴素切割的问题

最简单的切割是按字符数截断:每 4000 字符切一刀。但这会产生丑陋的结果:

第1条:这段代码实现了一个快速排序算法。核心思路是选取一个基准元素(pivot),将数组分为两部分:小于 pivot 的放左边,大于 pivot 的放右边,然后递归排序两个
第2条:子数组。时间复杂度为 O(n log n)。

中间一刀切断了语义,很不自然。

4.3 OpenClaw 的智能 Chunk 算法

OpenClaw 的 Chunk 算法遵循语义优先原则,按优先级寻找切割点:

优先级1:代码块边界(```前后)
优先级2:段落边界(连续两个换行)
优先级3:句子边界(。!?\n)
优先级4:词边界(空格)
优先级5:字符边界(兜底)

同时有两个约束:

  • 最大长度:不超过平台限制(留出安全余量)
  • 最小长度:不低于某个阈值(避免碎片化)
function findChunkBoundary(text: string, maxLen: number): number {
  if (text.length <= maxLen) return text.length;
  
  // 优先在代码块边界切割
  const codeBlockEnd = text.lastIndexOf('\n```', maxLen);
  if (codeBlockEnd > maxLen * 0.5) return codeBlockEnd + 4;
  
  // 段落边界
  const paragraphEnd = text.lastIndexOf('\n\n', maxLen);
  if (paragraphEnd > maxLen * 0.6) return paragraphEnd + 2;
  
  // 句子边界
  const sentenceEnd = text.search(/[。!?\n](?=.)/);
  if (sentenceEnd > 0 && sentenceEnd < maxLen) return sentenceEnd + 1;
  
  // 兜底:强制截断
  return maxLen;
}

4.4 流式发送 vs 完成后发送

Chunk 有两种发送时机:

完成后发送:等模型全部生成完毕,再一次性切割发送。

  • 优点:切割最准确,可以看到完整内容后再决定怎么切
  • 缺点:用户等待时间长

流式发送:生成过程中,达到长度阈值就发送。

  • 优点:用户几乎立刻看到内容
  • 缺点:切割点不一定理想(因为还不知道后续内容)

OpenClaw 根据消息平台的特性选择策略:

  • 支持"编辑消息"的平台(Telegram)→ 流式更新同一条消息,生成完毕后确认
  • 不支持编辑的平台(短信、邮件)→ 完成后发送,保证切割质量

五、渐进式消息更新:Telegram 的流式体验

Telegram 支持一个特殊能力:editMessage——可以修改已发送的消息内容。

OpenClaw 利用这个能力实现了丝滑的流式体验:

1. 收到前50个token → 发送第1条消息:"你好,我正在"
2. 收到100个token  → editMessage:"你好,我正在帮你查找"
3. 收到200个token  → editMessage:"你好,我正在帮你查找相关资料..."
4. 生成完毕        → editMessage:"你好,我正在帮你查找相关资料。根据最新数据..."

用户看到的效果是:消息内容在实时"生长",就像有人在打字一样。

5.1 更新频率控制

但不能每收到一个 token 就调用一次 editMessage——Telegram API 有频率限制(每条消息每秒最多更新 1 次)。

OpenClaw 使用防抖(Debounce) 控制更新频率:

class StreamingMessage {
  private pendingUpdate: string = '';
  private updateTimer: NodeJS.Timeout | null = null;
  
  appendText(text: string) {
    this.pendingUpdate += text;
    
    // 防抖:100ms 内没有新内容才触发更新
    if (this.updateTimer) clearTimeout(this.updateTimer);
    this.updateTimer = setTimeout(() => this.flush(), 100);
  }
  
  async flush() {
    if (this.pendingUpdate) {
      await telegram.editMessage(this.messageId, this.currentContent);
      this.updateTimer = null;
    }
  }
}

100ms 的防抖意味着每秒最多 10 次更新,既低于 API 限制,又足够流畅。

5.2 超长回复的分段更新

当回复超过 Telegram 的 4096 字符限制时,需要发送新的一条消息,而不是继续编辑旧消息:

消息1(编辑中):[内容增长到4000字] → 定稿,停止编辑
消息2(新建):[继续接收后续内容] → 继续编辑

这需要 StreamingMessage 维护一个"消息链",自动管理多条消息的创建和编辑。


六、工具调用的流式可见性

流式输出最复杂的场景是工具调用

6.1 让工具调用过程可见

当 AI 调用工具时,用户不应该看到一段沉默。OpenClaw 在工具调用的各个阶段都会发送状态更新:

[用户] 帮我查一下今天上海的天气

[AI 流式输出开始]
"好的,我来帮你查一下..."

[工具调用开始] → 发送提示:"🔍 正在搜索..."
[工具执行中]   → 可选:实时显示进度
[工具返回]     → 更新消息,显示结果摘要

[AI 继续生成]
"根据最新数据,上海今天..."

6.2 工具调用的流式组装

模型在生成工具调用时,参数也是流式输出的:

// delta 1
{"type": "tool_use", "id": "tool_1", "name": "web_se"}
// delta 2  
{"name": "arch", "input": {}}
// delta 3
{"input": {"query": "上海天"}}
// delta 4
{"input": {"query": "上海天气 2025"}}
// delta 完成

OpenClaw 在 Block 层面做流式 JSON 拼接,等工具调用 Block 状态变为 complete 后,才触发实际的工具执行——这保证了参数的完整性。


七、多渠道适配:同一套逻辑,不同的输出格式

AI 生成的是 Markdown 格式的内容,但不同平台对 Markdown 的支持完全不同:

平台 Markdown 支持
Telegram 支持部分(粗体代码、代码块)
Discord 支持大部分
WhatsApp 不支持标准 Markdown,有自己的格式
短信 纯文本,无格式
网页 完整支持

OpenClaw 在输出层做格式转换,将通用 Markdown 转换为目标平台的格式:

function formatForPlatform(markdown: string, platform: Platform): string {
  switch (platform) {
    case 'telegram':
      return convertToTelegramMarkdown(markdown);
    case 'whatsapp':
      return convertToWhatsAppFormat(markdown);  // *粗体* 而不是 **粗体**
    case 'sms':
      return stripAllMarkdown(markdown);
    default:
      return markdown;
  }
}

这让 AI 只需生成一套内容,由框架负责"翻译"成各平台能看懂的格式。


八、我踩过的坑

8.1 流中断的恢复

网络抖动可能导致 SSE 流中断。如果处理不当,用户会收到一条被截断的消息。

OpenClaw 的做法:

  • 检测到流中断 → 记录已接收内容的长度
  • 重试请求时带上 Last-Event-ID → 服务端从断点继续(如果支持)
  • 如果不支持续传 → 将已有内容作为"完整结果"发出,同时标注"(回复可能不完整)"

宁可发出不完整的回复,也比让用户一直等待要好。

8.2 Telegram editMessage 的幂等性

网络超时可能导致 editMessage 请求重发,同一条消息被更新两次。如果两次更新内容不同(因为中间又收到了新 token),可能导致内容倒退。

解决方案:给每次更新带上版本号,服务端拒绝版本号小于当前版本的更新请求。

8.3 代码块切割的边界问题

这是一个代码示例:

```python
def hello():
    print("Hello, World!")
```

上面的代码...

如果在代码块内部切割,会导致 Markdown 渲染错误(代码块没有正确闭合)。

解决方案:Chunk 算法追踪代码块状态,在代码块内部时禁止切割,必须等到 ` ````闭合后再切。


九、设计哲学:用户感知优先

回顾 OpenClaw 的流式输出设计,有一条贯穿始终的原则:

优化用户感知,而不是技术指标。

  • 首字延迟 > 完整延迟:哪怕总时间一样,先让用户看到第一个字
  • 渐进展示 > 完整呈现:边生成边展示,让等待变成"观看"
  • 语义完整 > 技术精确:切割时优先保证语义完整,哪怕技术上有点浪费
  • 容错优先:流中断时优雅降级,不让错误扩散给用户

流式输出不只是一个技术问题,它本质上是一个用户体验工程问题。


小结

本文沿着流式输出的完整链路,拆解了以下核心机制:

环节 技术实现 核心思想
模型 → 框架 SSE + 统一流式抽象 屏蔽多模型差异
Block 组装 状态机管理 delta 累积 流式工具调用
消息切割 语义优先 Chunk 算法 自然切割点
平台发送 editMessage 防抖更新 降低 API 压力
格式转换 多平台 Markdown 适配 一套内容多端展示

下一篇,我们将进入多 Agent 路由——当一个 Gateway 需要同时托管多个 AI 大脑时,消息是怎么被分发到正确的 Agent 的?


作者:一个在折腾 AI Agent 框架的工程师
系列索引:[Day 1 架构概览] | [Day 2 Gateway] | [Day 3 Agent 运行时] | Day 4 流式输出 | Day 5 多 Agent 路由 → 敬请期待

如果这篇文章对你有帮助,欢迎点赞收藏 🎯

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐