Flutter SSE 流式响应用 Dio 实现 OpenAI 兼容接口的逐 Token 输出
在做 AI 提示词优化器时,等 10 秒一次性返回大段文本”的体验通常很差。更好的体验是:模型一边生成,你的 UI 一边展示(像打字机逐字出现)
SSE 流式响应用 Dio 实现 OpenAI 兼容接口的逐 Token 输出(可直接复用)

在做 AI 提示词优化器 时,“等 10 秒一次性返回大段文本”的体验通常很差。
更好的体验是:
- 模型一边生成,你的 UI 一边展示(像打字机逐字出现)
- 用户能更快确认方向,必要时可以提前取消
这通常依赖 SSE(Server-Sent Events)流式响应。
本项目(PromptOptimizer)已经实现了一个可复用的、OpenAI 兼容的 SSE 解析方案:
- 网络层:
dio+ResponseType.stream - 协议层:解析
data:行,识别结束符[DONE] - 业务层:把“逐 token 的片段”包装成
Stream<String> - 状态层(MVI):Riverpod
StateNotifier订阅 stream,拼接到optimizedPrompt
本文会用“原理 + 实践 + 踩坑”的方式,带你从 0 到 1 搭出来。
1. 环境与版本(可复现)
- Flutter:项目约定 Flutter 3.10+
- Dart SDK:
pubspec.yaml中为sdk: ^3.10.3 - 网络库:
dio(见pubspec.yaml)
运行命令(Windows / PowerShell):
flutter pub get
dart run build_runner build --delete-conflicting-outputs
flutter run -d windows
2. SSE 是什么?一句话理解
SSE 是一种“服务器主动推送”的 HTTP 响应形式。
它的特点是:
- 响应
Content-Type一般是text/event-stream - 服务器会不断发送类似这样的文本行:
data: {"choices":[{"delta":{"content":"你"}}]}
data: {"choices":[{"delta":{"content":"好"}}]}
data: [DONE]
你的客户端要做的事情就是:
- 按行读取
- 只处理以
data:开头的行 - 把 JSON 中的
delta.content提取出来,作为“本次新增的 token” - 遇到
[DONE]就结束
3. 代码结构:本项目把 SSE 拆成了 3 层(推荐复用)
为了让逻辑清晰、好测试、好复用,本项目把它拆成:
-
Data 层(网络 + SSE 解析)
lib/features/optimization/data/openai_api_service.dart- 负责:请求、读流、解析
data:
-
Domain 层(用例编排)
lib/features/optimization/domain/usecases/optimize_prompt_usecase.dart- 负责:拿配置、组装 messages、调用 API service,并把 stream 向上透传
-
Presentation 层(状态管理 + UI 拼接)
lib/features/optimization/presentation/providers/optimization_provider.dart- 负责:订阅 stream,把 chunk 拼接到状态里,驱动 UI 刷新
你要在自己的项目中复用时,也建议用这个分层方式。
4. 核心实现 1:用 Dio 拿到“字节流”
对应文件:
lib/features/optimization/data/openai_api_service.dart
关键点:
responseType: ResponseType.stream- 请求头
Accept: text/event-stream
项目中的关键代码(节选,建议你对照原文件阅读):
- 请求:
_dio.post<ResponseBody>(..., responseType: ResponseType.stream)
- 拿到 stream:
final stream = response.data?.stream;
为什么要用 ResponseBody?
- 因为 SSE 本质是“服务器不断写入响应体”,你需要一个“可持续读取”的 byte stream
5. 核心实现 2:把 byte stream 变成“逐行的 SSE data”
仍在:openai_api_service.dart
项目的解析策略是:
utf8.decode(chunk)把字节转成字符串- 拼到
buffer里(因为 chunk 可能会把一行截断) while (buffer.contains('\n'))按行切- 过滤:只处理
data: jsonDecode(data),提取choices[0].delta.contentyield content;把 token 片段作为Stream<String>输出
这里有一个“看起来小,但非常关键”的点:
- 必须用 buffer
因为真实网络环境下:
- 一个 chunk 可能不是“完整的一行”
- 甚至
data:的 JSON 可能被拆成两段
没有 buffer 你会出现:
- JSON decode 失败
- UI 断断续续
本项目还定义了 SSE 结束符:
AppConstants.sseEndSignal(在lib/core/constants/app_constants.dart)- 值是:
[DONE]
6. 核心实现 3:业务用例只关心“我拿到一个 Stream”
对应文件:
lib/features/optimization/domain/usecases/optimize_prompt_usecase.dart
OptimizePromptUseCase.execute() 做了 4 件事:
- 获取 API 配置
- 解密 API Key(细节在
ApiConfigRepository,本文不展开) - 用模板构建 messages
- 调用
OpenAiApiService.streamChatCompletion()并yield*转发
重点是第 4 步:
- 通过
yield*,让上层感知的仍然是Stream<String> - 上层不需要知道 SSE 的任何细节
7. 核心实现 4(最有用):Riverpod 订阅 stream 并拼接到状态
对应文件:
lib/features/optimization/presentation/providers/optimization_provider.dart
在 OptimizationNotifier.optimize() 里:
- 拿到
stream - 设置状态
OptimizationStatus.streaming - 订阅
_streamSubscription = stream.listen(...)
拼接逻辑(项目原理):
- 每收到一次
chunk,就:optimizedPrompt: state.optimizedPrompt + chunk
这样 UI 每次 rebuild 都能拿到“当前累积内容”,实现逐字展示。
同时它还做了两件“真实产品必须做”的事:
- 取消旧的 stream:
cancelOptimization(),避免用户连续点击导致多个订阅叠加 - 只记录第一次响应时间:用于“网络响应速度”指标(对优化体验很重要)
8. 你可以直接复制的最小可用版本(伪代码)
如果你想在自己的项目里快速搭一个原型,最小实现如下:
final response = await dio.post<ResponseBody>(
url,
options: Options(
responseType: ResponseType.stream,
headers: {
'Accept': 'text/event-stream',
'Authorization': 'Bearer $apiKey',
'Content-Type': 'application/json',
},
),
data: {'stream': true, 'messages': messages, 'model': modelId},
);
final stream = response.data!.stream;
String buffer = '';
await for (final bytes in stream) {
buffer += utf8.decode(bytes);
while (buffer.contains('\n')) {
final idx = buffer.indexOf('\n');
final line = buffer.substring(0, idx).trim();
buffer = buffer.substring(idx + 1);
if (!line.startsWith('data:')) continue;
final data = line.substring(5).trim();
if (data == '[DONE]') return;
final json = jsonDecode(data);
final content = json['choices']?[0]?['delta']?['content'];
if (content is String && content.isNotEmpty) {
yield content;
}
}
}
9. 遇到的踩坑清单
9.1 chunk 不等于一行,必须做 buffer
- 现象:偶发 JSON 解析失败 / 输出断裂
- 原因:网络分片导致一行被截断
- 解决:像本项目一样累积到
buffer,按\n切行
9.2 SSE 里不止 data: 行
- 现象:解析到
event:/id:/ 空行导致异常 - 解决:只处理
line.startsWith('data:')
9.3 不是每个 data 都有 content
- 现象:有些 provider 返回的 delta 里 content 为空(比如只返回 role)
- 解决:
content != null && content.isNotEmpty才 yield
9.4 UI 卡顿(逐 token rebuild 太频繁)
- 现象:低端机/长文本时卡顿
- 思路:
- 可以做节流(例如 16ms/33ms 合并一次 chunk)
- 或按句子/按 N 字合并后再 setState
- 本项目当前策略:直接拼接(实现简单),后续可优化
10. 总结:这套方案为什么“可复用”
- 协议解析清晰:只做
data:→ JSON →delta.content - 数据流清晰:网络输出
Stream<String>→ 用例yield*→ Notifier 订阅 - UI 不背锅:UI 只需要展示
optimizedPrompt,不碰网络细节
如果你要在自己的 Flutter 项目里做“流式 AI 输出”,直接参考并复用:
OpenAiApiService.streamChatCompletion()OptimizePromptUseCase.execute()OptimizationNotifier.optimize()的订阅与拼接
源码
https://github.com/JIULANG9/PromptOptimizer
11. 可追溯引用(本项目文件路径)
- SSE 解析:
lib/features/optimization/data/openai_api_service.dart - 结束符常量:
lib/core/constants/app_constants.dart(sseEndSignal = '[DONE]') - 用例编排:
lib/features/optimization/domain/usecases/optimize_prompt_usecase.dart - 状态订阅拼接:
lib/features/optimization/presentation/providers/optimization_provider.dart
更多推荐



所有评论(0)