Day 4 | OpenClaw 流式输出的艺术:从模型 Token 到用户消息
Day 4 | OpenClaw 流式输出的艺术:从模型 Token 到用户消息
系列:《从 0 到 1 拆解 AI Agent 框架:OpenClaw 技术深度解析》
前言
你有没有注意到,ChatGPT 的回复是一个字一个字"打"出来的,而不是等全部生成完再一次性显示?
这不是什么视觉特效,而是 AI 模型的底层工作方式——它本来就是一个 token 一个 token 生成的。但从模型吐出第一个 token,到用户在手机屏幕上看到第一个字,中间发生了什么?
这中间有一条完整的流式输出链路:模型 → API → Agent 运行时 → Gateway → 客户端。每一个环节都有自己的缓冲、切割、传输逻辑。一旦某个环节设计不好,用户要么看到"打字机卡顿",要么等很久才看到第一个字,要么收到一条被截断的消息。
本文将沿着这条链路,逐层拆解 OpenClaw 的流式输出实现。
一、为什么要流式输出?
先回答一个基础问题:为什么不等模型生成完整回复再发送?
1.1 等待体验极差
GPT-4 生成一段 500 字的回复,大约需要 10-20 秒。如果用户需要等 20 秒后才能看到第一个字,大多数人会以为系统卡死了。
流式输出的核心价值是降低"首字延迟"(Time to First Token,TTFT)。通常模型在 1-2 秒内就能开始输出第一个 token,流式输出让用户立刻感知到"AI 在工作"。
1.2 消息平台的限制
Telegram、WhatsApp 等平台有消息长度限制(Telegram 单条最大 4096 字符)。如果 AI 生成了一篇 3000 字的文章,必须切割成多条消息发送。
流式输出还带来另一个好处:可以在生成过程中实时"编辑"消息,而不是发出去再撤回重发——大幅减少 API 调用次数。
1.3 工具调用的交织
现代 AI Agent 不只是生成文字,还会调用工具(读文件、搜索、执行代码)。流式输出让工具调用的过程对用户可见:
AI:我来帮你查一下...
[调用搜索工具]
AI:根据搜索结果,今天上海的天气是...
而不是让用户干等 30 秒后突然收到一条完整回复。
二、SSE:流式传输的基础协议
在深入 OpenClaw 实现之前,先了解流式输出的底层协议。
2.1 什么是 SSE?
SSE(Server-Sent Events) 是一种基于 HTTP 的单向推送协议。服务端可以通过一个长连接,持续向客户端推送数据,而不需要客户端轮询。
数据格式极其简单:
data: {"type":"content_block_delta","delta":{"text":"你好"}}
data: {"type":"content_block_delta","delta":{"text":",有什么"}}
data: {"type":"content_block_delta","delta":{"text":"可以帮你?"}}
data: [DONE]
每条消息以 data: 开头,消息之间用空行分隔,[DONE] 表示流结束。
2.2 OpenAI / Anthropic 的流式 API
主流模型提供商都采用 SSE 格式返回流式输出,但字段结构略有不同:
OpenAI 格式:
{
"choices": [{
"delta": { "content": "你好" },
"finish_reason": null
}]
}
Anthropic (Claude) 格式:
{
"type": "content_block_delta",
"index": 0,
"delta": { "type": "text_delta", "text": "你好" }
}
OpenClaw 需要处理多种模型,因此在内部做了统一的流式抽象层,屏蔽不同 API 的格式差异。
三、Block Streaming:OpenClaw 的核心流式设计
OpenClaw 引入了一个关键概念——Block(块)。
3.1 什么是 Block?
一次 AI 响应被切分为多个 Block,每个 Block 代表一种输出类型:
| Block 类型 | 含义 |
|---|---|
text |
普通文字内容 |
tool_use |
工具调用请求 |
tool_result |
工具执行结果 |
thinking |
思维链(推理过程) |
一次复杂响应可能包含多个 Block 交替出现:
[text: "我来帮你查一下天气"]
[tool_use: search("上海天气")]
[tool_result: "晴,25°C"]
[text: "上海今天天气不错,25°C 晴天。"]
3.2 为什么要抽象 Block?
原因一:统一多模型差异
不同模型的工具调用格式完全不同。抽象为 Block 后,上层逻辑只需处理统一格式,无需关心底层是 GPT 还是 Claude。
原因二:支持流式工具调用
模型在生成工具调用时,也是流式输出的——工具名称、参数都是一个字符一个字符流出来的。Block 设计让这个过程可以被正确"组装":
收到 delta: {"name": "sear"}
收到 delta: {"name": "ch", "input": {"q":}
收到 delta: {"input": {"query": "上海天气"}}
Block 完成,触发工具调用
原因三:流式渲染预览
Block 流允许客户端在响应完成前,就开始渲染已收到的部分——这是"打字机效果"的基础。
3.3 Block 的状态机
每个 Block 有完整的生命周期:
pending → streaming → complete
↘ error
pending:Block 已创建,等待内容streaming:正在接收 delta 增量complete:Block 内容完整,可以处理error:流中断,Block 不完整
上层逻辑监听 Block 状态变化,而不是直接处理原始 delta——这让代码结构清晰很多。
四、Chunk 算法:消息的切割与合并
拿到完整的文字内容后,还需要解决一个问题:怎么把它发出去?
4.1 消息平台的限制
不同平台有不同的消息长度限制:
| 平台 | 单条消息上限 |
|---|---|
| Telegram | 4096 字符 |
| ~65536 字符 | |
| Discord | 2000 字符 |
| 微信 | ~2000 字符 |
| 短信 | 160 字符(ASCII) |
AI 的回复动辄几千字,必须切割。
4.2 朴素切割的问题
最简单的切割是按字符数截断:每 4000 字符切一刀。但这会产生丑陋的结果:
第1条:这段代码实现了一个快速排序算法。核心思路是选取一个基准元素(pivot),将数组分为两部分:小于 pivot 的放左边,大于 pivot 的放右边,然后递归排序两个
第2条:子数组。时间复杂度为 O(n log n)。
中间一刀切断了语义,很不自然。
4.3 OpenClaw 的智能 Chunk 算法
OpenClaw 的 Chunk 算法遵循语义优先原则,按优先级寻找切割点:
优先级1:代码块边界(```前后)
优先级2:段落边界(连续两个换行)
优先级3:句子边界(。!?\n)
优先级4:词边界(空格)
优先级5:字符边界(兜底)
同时有两个约束:
- 最大长度:不超过平台限制(留出安全余量)
- 最小长度:不低于某个阈值(避免碎片化)
function findChunkBoundary(text: string, maxLen: number): number {
if (text.length <= maxLen) return text.length;
// 优先在代码块边界切割
const codeBlockEnd = text.lastIndexOf('\n```', maxLen);
if (codeBlockEnd > maxLen * 0.5) return codeBlockEnd + 4;
// 段落边界
const paragraphEnd = text.lastIndexOf('\n\n', maxLen);
if (paragraphEnd > maxLen * 0.6) return paragraphEnd + 2;
// 句子边界
const sentenceEnd = text.search(/[。!?\n](?=.)/);
if (sentenceEnd > 0 && sentenceEnd < maxLen) return sentenceEnd + 1;
// 兜底:强制截断
return maxLen;
}
4.4 流式发送 vs 完成后发送
Chunk 有两种发送时机:
完成后发送:等模型全部生成完毕,再一次性切割发送。
- 优点:切割最准确,可以看到完整内容后再决定怎么切
- 缺点:用户等待时间长
流式发送:生成过程中,达到长度阈值就发送。
- 优点:用户几乎立刻看到内容
- 缺点:切割点不一定理想(因为还不知道后续内容)
OpenClaw 根据消息平台的特性选择策略:
- 支持"编辑消息"的平台(Telegram)→ 流式更新同一条消息,生成完毕后确认
- 不支持编辑的平台(短信、邮件)→ 完成后发送,保证切割质量
五、渐进式消息更新:Telegram 的流式体验
Telegram 支持一个特殊能力:editMessage——可以修改已发送的消息内容。
OpenClaw 利用这个能力实现了丝滑的流式体验:
1. 收到前50个token → 发送第1条消息:"你好,我正在"
2. 收到100个token → editMessage:"你好,我正在帮你查找"
3. 收到200个token → editMessage:"你好,我正在帮你查找相关资料..."
4. 生成完毕 → editMessage:"你好,我正在帮你查找相关资料。根据最新数据..."
用户看到的效果是:消息内容在实时"生长",就像有人在打字一样。
5.1 更新频率控制
但不能每收到一个 token 就调用一次 editMessage——Telegram API 有频率限制(每条消息每秒最多更新 1 次)。
OpenClaw 使用防抖(Debounce) 控制更新频率:
class StreamingMessage {
private pendingUpdate: string = '';
private updateTimer: NodeJS.Timeout | null = null;
appendText(text: string) {
this.pendingUpdate += text;
// 防抖:100ms 内没有新内容才触发更新
if (this.updateTimer) clearTimeout(this.updateTimer);
this.updateTimer = setTimeout(() => this.flush(), 100);
}
async flush() {
if (this.pendingUpdate) {
await telegram.editMessage(this.messageId, this.currentContent);
this.updateTimer = null;
}
}
}
100ms 的防抖意味着每秒最多 10 次更新,既低于 API 限制,又足够流畅。
5.2 超长回复的分段更新
当回复超过 Telegram 的 4096 字符限制时,需要发送新的一条消息,而不是继续编辑旧消息:
消息1(编辑中):[内容增长到4000字] → 定稿,停止编辑
消息2(新建):[继续接收后续内容] → 继续编辑
这需要 StreamingMessage 维护一个"消息链",自动管理多条消息的创建和编辑。
六、工具调用的流式可见性
流式输出最复杂的场景是工具调用。
6.1 让工具调用过程可见
当 AI 调用工具时,用户不应该看到一段沉默。OpenClaw 在工具调用的各个阶段都会发送状态更新:
[用户] 帮我查一下今天上海的天气
[AI 流式输出开始]
"好的,我来帮你查一下..."
[工具调用开始] → 发送提示:"🔍 正在搜索..."
[工具执行中] → 可选:实时显示进度
[工具返回] → 更新消息,显示结果摘要
[AI 继续生成]
"根据最新数据,上海今天..."
6.2 工具调用的流式组装
模型在生成工具调用时,参数也是流式输出的:
// delta 1
{"type": "tool_use", "id": "tool_1", "name": "web_se"}
// delta 2
{"name": "arch", "input": {}}
// delta 3
{"input": {"query": "上海天"}}
// delta 4
{"input": {"query": "上海天气 2025"}}
// delta 完成
OpenClaw 在 Block 层面做流式 JSON 拼接,等工具调用 Block 状态变为 complete 后,才触发实际的工具执行——这保证了参数的完整性。
七、多渠道适配:同一套逻辑,不同的输出格式
AI 生成的是 Markdown 格式的内容,但不同平台对 Markdown 的支持完全不同:
| 平台 | Markdown 支持 |
|---|---|
| Telegram | 支持部分(粗体、代码、代码块) |
| Discord | 支持大部分 |
| 不支持标准 Markdown,有自己的格式 | |
| 短信 | 纯文本,无格式 |
| 网页 | 完整支持 |
OpenClaw 在输出层做格式转换,将通用 Markdown 转换为目标平台的格式:
function formatForPlatform(markdown: string, platform: Platform): string {
switch (platform) {
case 'telegram':
return convertToTelegramMarkdown(markdown);
case 'whatsapp':
return convertToWhatsAppFormat(markdown); // *粗体* 而不是 **粗体**
case 'sms':
return stripAllMarkdown(markdown);
default:
return markdown;
}
}
这让 AI 只需生成一套内容,由框架负责"翻译"成各平台能看懂的格式。
八、我踩过的坑
8.1 流中断的恢复
网络抖动可能导致 SSE 流中断。如果处理不当,用户会收到一条被截断的消息。
OpenClaw 的做法:
- 检测到流中断 → 记录已接收内容的长度
- 重试请求时带上
Last-Event-ID→ 服务端从断点继续(如果支持) - 如果不支持续传 → 将已有内容作为"完整结果"发出,同时标注"(回复可能不完整)"
宁可发出不完整的回复,也比让用户一直等待要好。
8.2 Telegram editMessage 的幂等性
网络超时可能导致 editMessage 请求重发,同一条消息被更新两次。如果两次更新内容不同(因为中间又收到了新 token),可能导致内容倒退。
解决方案:给每次更新带上版本号,服务端拒绝版本号小于当前版本的更新请求。
8.3 代码块切割的边界问题
这是一个代码示例:
```python
def hello():
print("Hello, World!")
```
上面的代码...
如果在代码块内部切割,会导致 Markdown 渲染错误(代码块没有正确闭合)。
解决方案:Chunk 算法追踪代码块状态,在代码块内部时禁止切割,必须等到 ` ````闭合后再切。
九、设计哲学:用户感知优先
回顾 OpenClaw 的流式输出设计,有一条贯穿始终的原则:
优化用户感知,而不是技术指标。
- 首字延迟 > 完整延迟:哪怕总时间一样,先让用户看到第一个字
- 渐进展示 > 完整呈现:边生成边展示,让等待变成"观看"
- 语义完整 > 技术精确:切割时优先保证语义完整,哪怕技术上有点浪费
- 容错优先:流中断时优雅降级,不让错误扩散给用户
流式输出不只是一个技术问题,它本质上是一个用户体验工程问题。
小结
本文沿着流式输出的完整链路,拆解了以下核心机制:
| 环节 | 技术实现 | 核心思想 |
|---|---|---|
| 模型 → 框架 | SSE + 统一流式抽象 | 屏蔽多模型差异 |
| Block 组装 | 状态机管理 delta 累积 | 流式工具调用 |
| 消息切割 | 语义优先 Chunk 算法 | 自然切割点 |
| 平台发送 | editMessage 防抖更新 | 降低 API 压力 |
| 格式转换 | 多平台 Markdown 适配 | 一套内容多端展示 |
下一篇,我们将进入多 Agent 路由——当一个 Gateway 需要同时托管多个 AI 大脑时,消息是怎么被分发到正确的 Agent 的?
作者:一个在折腾 AI Agent 框架的工程师
系列索引:[Day 1 架构概览] | [Day 2 Gateway] | [Day 3 Agent 运行时] | Day 4 流式输出 | Day 5 多 Agent 路由 → 敬请期待
如果这篇文章对你有帮助,欢迎点赞收藏 🎯
更多推荐



所有评论(0)