SSE 流式响应用 Dio 实现 OpenAI 兼容接口的逐 Token 输出(可直接复用)

请添加图片描述
在做 AI 提示词优化器 时,“等 10 秒一次性返回大段文本”的体验通常很差。

更好的体验是:

  • 模型一边生成,你的 UI 一边展示(像打字机逐字出现)
  • 用户能更快确认方向,必要时可以提前取消

这通常依赖 SSE(Server-Sent Events)流式响应

本项目(PromptOptimizer)已经实现了一个可复用的、OpenAI 兼容的 SSE 解析方案

  • 网络层:dio + ResponseType.stream
  • 协议层:解析 data: 行,识别结束符 [DONE]
  • 业务层:把“逐 token 的片段”包装成 Stream<String>
  • 状态层(MVI):Riverpod StateNotifier 订阅 stream,拼接到 optimizedPrompt

本文会用“原理 + 实践 + 踩坑”的方式,带你从 0 到 1 搭出来。


1. 环境与版本(可复现)

  • Flutter:项目约定 Flutter 3.10+
  • Dart SDKpubspec.yaml 中为 sdk: ^3.10.3
  • 网络库dio(见 pubspec.yaml

运行命令(Windows / PowerShell):

flutter pub get

dart run build_runner build --delete-conflicting-outputs

flutter run -d windows

2. SSE 是什么?一句话理解

SSE 是一种“服务器主动推送”的 HTTP 响应形式。

它的特点是:

  • 响应 Content-Type 一般是 text/event-stream
  • 服务器会不断发送类似这样的文本行:
data: {"choices":[{"delta":{"content":"你"}}]}

data: {"choices":[{"delta":{"content":"好"}}]}

data: [DONE]

你的客户端要做的事情就是:

  • 按行读取
  • 只处理以 data: 开头的行
  • 把 JSON 中的 delta.content 提取出来,作为“本次新增的 token”
  • 遇到 [DONE] 就结束

3. 代码结构:本项目把 SSE 拆成了 3 层(推荐复用)

为了让逻辑清晰、好测试、好复用,本项目把它拆成:

  • Data 层(网络 + SSE 解析)

    • lib/features/optimization/data/openai_api_service.dart
    • 负责:请求、读流、解析 data:
  • Domain 层(用例编排)

    • lib/features/optimization/domain/usecases/optimize_prompt_usecase.dart
    • 负责:拿配置、组装 messages、调用 API service,并把 stream 向上透传
  • Presentation 层(状态管理 + UI 拼接)

    • lib/features/optimization/presentation/providers/optimization_provider.dart
    • 负责:订阅 stream,把 chunk 拼接到状态里,驱动 UI 刷新

你要在自己的项目中复用时,也建议用这个分层方式。


4. 核心实现 1:用 Dio 拿到“字节流”

对应文件:

  • lib/features/optimization/data/openai_api_service.dart

关键点:

  • responseType: ResponseType.stream
  • 请求头 Accept: text/event-stream

项目中的关键代码(节选,建议你对照原文件阅读):

  • 请求
    • _dio.post<ResponseBody>(..., responseType: ResponseType.stream)
  • 拿到 stream
    • final stream = response.data?.stream;

为什么要用 ResponseBody

  • 因为 SSE 本质是“服务器不断写入响应体”,你需要一个“可持续读取”的 byte stream

5. 核心实现 2:把 byte stream 变成“逐行的 SSE data”

仍在:openai_api_service.dart

项目的解析策略是:

  1. utf8.decode(chunk) 把字节转成字符串
  2. 拼到 buffer 里(因为 chunk 可能会把一行截断)
  3. while (buffer.contains('\n')) 按行切
  4. 过滤:只处理 data:
  5. jsonDecode(data),提取 choices[0].delta.content
  6. yield content; 把 token 片段作为 Stream<String> 输出

这里有一个“看起来小,但非常关键”的点:

  • 必须用 buffer

因为真实网络环境下:

  • 一个 chunk 可能不是“完整的一行”
  • 甚至 data: 的 JSON 可能被拆成两段

没有 buffer 你会出现:

  • JSON decode 失败
  • UI 断断续续

本项目还定义了 SSE 结束符:

  • AppConstants.sseEndSignal(在 lib/core/constants/app_constants.dart
  • 值是:[DONE]

6. 核心实现 3:业务用例只关心“我拿到一个 Stream”

对应文件:

  • lib/features/optimization/domain/usecases/optimize_prompt_usecase.dart

OptimizePromptUseCase.execute() 做了 4 件事:

  1. 获取 API 配置
  2. 解密 API Key(细节在 ApiConfigRepository,本文不展开)
  3. 用模板构建 messages
  4. 调用 OpenAiApiService.streamChatCompletion()yield* 转发

重点是第 4 步:

  • 通过 yield*,让上层感知的仍然是 Stream<String>
  • 上层不需要知道 SSE 的任何细节

7. 核心实现 4(最有用):Riverpod 订阅 stream 并拼接到状态

对应文件:

  • lib/features/optimization/presentation/providers/optimization_provider.dart

OptimizationNotifier.optimize() 里:

  • 拿到 stream
  • 设置状态 OptimizationStatus.streaming
  • 订阅 _streamSubscription = stream.listen(...)

拼接逻辑(项目原理):

  • 每收到一次 chunk,就:
    • optimizedPrompt: state.optimizedPrompt + chunk

这样 UI 每次 rebuild 都能拿到“当前累积内容”,实现逐字展示。

同时它还做了两件“真实产品必须做”的事:

  • 取消旧的 streamcancelOptimization(),避免用户连续点击导致多个订阅叠加
  • 只记录第一次响应时间:用于“网络响应速度”指标(对优化体验很重要)

8. 你可以直接复制的最小可用版本(伪代码)

如果你想在自己的项目里快速搭一个原型,最小实现如下:

final response = await dio.post<ResponseBody>(
  url,
  options: Options(
    responseType: ResponseType.stream,
    headers: {
      'Accept': 'text/event-stream',
      'Authorization': 'Bearer $apiKey',
      'Content-Type': 'application/json',
    },
  ),
  data: {'stream': true, 'messages': messages, 'model': modelId},
);

final stream = response.data!.stream;
String buffer = '';

await for (final bytes in stream) {
  buffer += utf8.decode(bytes);
  while (buffer.contains('\n')) {
    final idx = buffer.indexOf('\n');
    final line = buffer.substring(0, idx).trim();
    buffer = buffer.substring(idx + 1);

    if (!line.startsWith('data:')) continue;
    final data = line.substring(5).trim();
    if (data == '[DONE]') return;

    final json = jsonDecode(data);
    final content = json['choices']?[0]?['delta']?['content'];
    if (content is String && content.isNotEmpty) {
      yield content;
    }
  }
}

9. 遇到的踩坑清单

9.1 chunk 不等于一行,必须做 buffer

  • 现象:偶发 JSON 解析失败 / 输出断裂
  • 原因:网络分片导致一行被截断
  • 解决:像本项目一样累积到 buffer,按 \n 切行

9.2 SSE 里不止 data:

  • 现象:解析到 event: / id: / 空行导致异常
  • 解决:只处理 line.startsWith('data:')

9.3 不是每个 data 都有 content

  • 现象:有些 provider 返回的 delta 里 content 为空(比如只返回 role)
  • 解决content != null && content.isNotEmpty 才 yield

9.4 UI 卡顿(逐 token rebuild 太频繁)

  • 现象:低端机/长文本时卡顿
  • 思路
    • 可以做节流(例如 16ms/33ms 合并一次 chunk)
    • 或按句子/按 N 字合并后再 setState
  • 本项目当前策略:直接拼接(实现简单),后续可优化

10. 总结:这套方案为什么“可复用”

  • 协议解析清晰:只做 data: → JSON → delta.content
  • 数据流清晰:网络输出 Stream<String> → 用例 yield* → Notifier 订阅
  • UI 不背锅:UI 只需要展示 optimizedPrompt,不碰网络细节

如果你要在自己的 Flutter 项目里做“流式 AI 输出”,直接参考并复用:

  • OpenAiApiService.streamChatCompletion()
  • OptimizePromptUseCase.execute()
  • OptimizationNotifier.optimize() 的订阅与拼接

源码

https://github.com/JIULANG9/PromptOptimizer

11. 可追溯引用(本项目文件路径)

  • SSE 解析:lib/features/optimization/data/openai_api_service.dart
  • 结束符常量:lib/core/constants/app_constants.dartsseEndSignal = '[DONE]'
  • 用例编排:lib/features/optimization/domain/usecases/optimize_prompt_usecase.dart
  • 状态订阅拼接:lib/features/optimization/presentation/providers/optimization_provider.dart
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐