uni-app 中实现 AI 对话功能的流式数据(SSE - Server-Sent Events)处理
H5 端:最简单,直接使用标准的API。App 端不支持流式,需要借助fetchpolyfill 库(如)来实现,这也是最现代、最接近 Web 标准的做法。微信小程序端:能力最受限,但可以通过标志接收分块数据,然后手动编写解析器来处理 SSE 格式的字符串流。这是最复杂但也是唯一可行的方法。代码组织:利用uni-app强大的条件编译#ifdef#endif),可以将所有平台的逻辑封装在统一的接口后
下面我将为你详细讲解这个功能的实现,包括 后端 SSE 规范、H5 端、App 端 和 小程序端 的具体实现,并附上完整的代码和注释。
核心概念:SSE (Server-Sent Events)
AI 对话(如 ChatGPT)的打字机效果,是通过 SSE 技术实现的。它与 WebSocket 不同,是一个单向的、从服务器到客户端的持久化 HTTP 连接。
- 服务器:保持连接打开,并不断地以
data: ...\n\n
的格式推送消息片段。 - 客户端:接收这些消息片段并实时渲染到界面上。
步骤一:后端 SSE 接口规范 (关键前提)
无论前端如何实现,你都需要一个符合 SSE 规范的后端接口。这个接口必须:
- 设置正确的响应头:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
- 以
data: <your_json_string>\n\n
的格式推送数据。 - 在数据流结束时,发送一个特殊的结束标志,例如
data: [DONE]\n\n
。
Node.js (Express) 后端示例:
// server.js
const express = require('express');
const app = express();
app.post('/ai-stream', (req, res) => {
// 1. 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders(); // 立即发送响应头
const messages = ["你", "好", ",", "我", "是", "一", "个", "AI", "模", "型", "。"];
let index = 0;
const intervalId = setInterval(() => {
if (index < messages.length) {
const chunk = { text: messages[index] };
// 2. 按照 SSE 格式发送数据
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
index++;
} else {
// 3. 发送结束标志
res.write(`data: [DONE]\n\n`);
clearInterval(intervalId);
res.end(); // 关闭连接
}
}, 200); // 每200ms发送一个字
req.on('close', () => {
clearInterval(intervalId);
res.end();
});
});
app.listen(3000, () => console.log('SSE server listening on port 3000'));
步骤二:uni-app 在不同端的实现
现在我们来看 uni-app
前端的实现。由于各端对网络请求的能力支持不同,我们需要使用条件编译来编写特定平台的代码。
1. H5 平台:使用原生 EventSource
(最简单、最标准)
H5 运行在标准浏览器环境中,可以直接使用 Web API EventSource
,这是处理 SSE 的标准方式。
代码实现:
// H5 端的处理逻辑
// #ifdef H5
let eventSource = null;
function startH5Stream(prompt, onMessage, onDone, onError) {
const url = 'http://localhost:3000/ai-stream'; // 你的后端接口
// 创建 EventSource 实例
eventSource = new EventSource(url); // 注意:EventSource 不支持 POST,通常通过 GET 参数传递
// 如果必须用POST,需要用 fetch API 的流式读取来模拟,这里用 GET 简化演示
// 监听 message 事件
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
console.log("H5 Stream finished.");
eventSource.close(); // 关闭连接
if (onDone) onDone();
return;
}
try {
const data = JSON.parse(event.data);
if (onMessage) onMessage(data.text);
} catch (e) {
if (onError) onError(e);
}
};
// 监听 error 事件
eventSource.onerror = (err) => {
console.error("EventSource failed:", err);
if (onError) onError(err);
eventSource.close();
};
}
function stopH5Stream() {
if (eventSource) {
eventSource.close();
eventSource = null;
console.log("H5 Stream manually stopped.");
}
}
// #endif
2. App 平台:使用 fetch
Polyfill (推荐方案)
uni-app
在 App 端的 uni.request
不支持流式返回。因此,我们不能直接用它。最佳实践是引入一个支持流式 fetch
的 Polyfill 库,如 @microsoft/fetch-event-source
。
a) 安装依赖:
npm install @microsoft/fetch-event-source
b) 代码实现:
// App 端的处理逻辑
// #ifdef APP-PLUS
import { fetchEventSource } from '@microsoft/fetch-event-source';
let abortController = null;
function startAppStream(prompt, onMessage, onDone, onError) {
const url = 'http://localhost:3000/ai-stream';
abortController = new AbortController();
fetchEventSource(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ prompt: prompt }),
signal: abortController.signal,
onmessage(event) {
if (event.data === '[DONE]') {
console.log("App Stream finished.");
if (onDone) onDone();
abortController.abort(); // 结束
return;
}
try {
const data = JSON.parse(event.data);
if (onMessage) onMessage(data.text);
} catch (e) {
if (onError) onError(e);
}
},
onerror(err) {
console.error("App Stream error:", err);
if (onError) onError(err);
abortController.abort();
throw err; // 必须抛出错误以停止重试
}
});
}
function stopAppStream() {
if (abortController) {
abortController.abort();
abortController = null;
console.log("App Stream manually stopped.");
}
}
// #endif
3. 微信小程序平台:使用 uni.request
+ enableChunked
(最特殊)
微信小程序环境最封闭,没有 EventSource
或 fetch
。但微信为 wx.request
(即 uni.request
在微信端的底层实现)提供了一个实验性参数 enableChunked: true
,允许我们接收分块数据。我们需要手动解析这些数据块。
代码实现:
// 微信小程序端的处理逻辑
// #ifdef MP-WEIXIN
let requestTask = null;
let buffer = ''; // 用于存储不完整的消息片段
function startWeixinStream(prompt, onMessage, onDone, onError) {
buffer = ''; // 重置缓冲区
requestTask = uni.request({
url: 'http://localhost:3000/ai-stream',
method: 'POST',
data: { prompt: prompt },
enableChunked: true, // 开启分块接收
success: () => {
// 整个请求结束,如果是正常结束,[DONE]信号会处理
},
fail: (err) => {
if (onError) onError(err);
}
});
// 监听接收到的分块数据
requestTask.onChunkReceived((res) => {
const arrayBuffer = res.data;
const uint8Array = new Uint8Array(arrayBuffer);
// 将 ArrayBuffer 转换为字符串
const textChunk = new TextDecoder().decode(uint8Array);
// --- 手动解析 SSE 格式 ---
buffer += textChunk;
const messages = buffer.split('\n\n');
// 最后一个消息可能不完整,保留在 buffer 中
buffer = messages.pop();
messages.forEach(msg => {
if (msg.startsWith('data: ')) {
const dataStr = msg.substring(6); // 去掉 "data: "
if (dataStr === '[DONE]') {
console.log("Weixin Stream finished.");
if (onDone) onDone();
stopWeixinStream(); // 结束任务
return;
}
try {
const data = JSON.parse(dataStr);
if (onMessage) onMessage(data.text);
} catch (e) {
if (onError) onError(e);
}
}
});
});
}
function stopWeixinStream() {
if (requestTask) {
requestTask.abort();
requestTask = null;
console.log("Weixin Stream manually stopped.");
}
}
// #endif
步骤三:整合到 Vue 页面中
现在,我们将上述平台特定的代码整合到一个 Vue 组件中,使用条件编译来调用正确的函数。
pages/ai-chat/ai-chat.vue
示例:
<template>
<view class="chat-container">
<view class="response-area">
{{ aiResponse }}
</view>
<button @click="startStream" :disabled="isLoading">发送</button>
<button @click="stopStream" v-if="isLoading">停止</button>
</view>
</template>
<script>
// 导入所有平台的代码(实际开发中可以组织成单独的 JS 文件)
// #ifdef H5
// ... 粘贴 H5 代码 ...
// #endif
// #ifdef APP-PLUS
// ... 粘贴 App 代码 ...
// #endif
// #ifdef MP-WEIXIN
// ... 粘贴小程序代码 ...
// #endif
export default {
data() {
return {
aiResponse: '',
isLoading: false
};
},
methods: {
startStream() {
if (this.isLoading) return;
this.isLoading = true;
this.aiResponse = '';
const prompt = "你好"; // 示例 prompt
const onMessage = (text) => {
this.aiResponse += text;
};
const onDone = () => {
this.isLoading = false;
};
const onError = (err) => {
this.aiResponse += "\n[出错了]";
this.isLoading = false;
console.error("Stream Error:", err);
};
// --- 使用条件编译调用对应平台的函数 ---
// #ifdef H5
startH5Stream(prompt, onMessage, onDone, onError);
// #endif
// #ifdef APP-PLUS
startAppStream(prompt, onMessage, onDone, onError);
// #endif
// #ifdef MP-WEIXIN
startWeixinStream(prompt, onMessage, onDone, onError);
// #endif
},
stopStream() {
// --- 使用条件编译调用对应的停止函数 ---
// #ifdef H5
stopH5Stream();
// #endif
// #ifdef APP-PLUS
stopAppStream();
// #endif
// #ifdef MP-WEIXIN
stopWeixinStream();
// #endif
this.isLoading = false;
}
}
}
</script>
<style>
.chat-container {
padding: 20px;
}
.response-area {
width: 100%;
min-height: 200px;
border: 1px solid #ccc;
padding: 10px;
white-space: pre-wrap; /* 保持换行和空格 */
}
</style>
总结
- H5 端:最简单,直接使用标准的
EventSource
API。 - App 端:
uni.request
不支持流式,需要借助fetch
polyfill 库(如@microsoft/fetch-event-source
)来实现,这也是最现代、最接近 Web 标准的做法。 - 微信小程序端:能力最受限,但可以通过
enableChunked
标志接收分块数据,然后手动编写解析器来处理 SSE 格式的字符串流。这是最复杂但也是唯一可行的方法。 - 代码组织:利用
uni-app
强大的条件编译 (#ifdef
/#endif
),可以将所有平台的逻辑封装在统一的接口后面,让业务代码保持干净整洁。
更多推荐
所有评论(0)