下面我将为你详细讲解这个功能的实现,包括 后端 SSE 规范H5 端App 端小程序端 的具体实现,并附上完整的代码和注释。

核心概念:SSE (Server-Sent Events)

AI 对话(如 ChatGPT)的打字机效果,是通过 SSE 技术实现的。它与 WebSocket 不同,是一个单向的、从服务器到客户端的持久化 HTTP 连接。

  • 服务器:保持连接打开,并不断地以 data: ...\n\n 的格式推送消息片段。
  • 客户端:接收这些消息片段并实时渲染到界面上。

步骤一:后端 SSE 接口规范 (关键前提)

无论前端如何实现,你都需要一个符合 SSE 规范的后端接口。这个接口必须:

  1. 设置正确的响应头:
    • Content-Type: text/event-stream
    • Cache-Control: no-cache
    • Connection: keep-alive
  2. data: <your_json_string>\n\n 的格式推送数据。
  3. 在数据流结束时,发送一个特殊的结束标志,例如 data: [DONE]\n\n

Node.js (Express) 后端示例:

// server.js
const express = require('express');
const app = express();

app.post('/ai-stream', (req, res) => {
    // 1. 设置 SSE 响应头
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders(); // 立即发送响应头

    const messages = ["你", "好", ",", "我", "是", "一", "个", "AI", "模", "型", "。"];
    let index = 0;

    const intervalId = setInterval(() => {
        if (index < messages.length) {
            const chunk = { text: messages[index] };
            // 2. 按照 SSE 格式发送数据
            res.write(`data: ${JSON.stringify(chunk)}\n\n`);
            index++;
        } else {
            // 3. 发送结束标志
            res.write(`data: [DONE]\n\n`);
            clearInterval(intervalId);
            res.end(); // 关闭连接
        }
    }, 200); // 每200ms发送一个字

    req.on('close', () => {
        clearInterval(intervalId);
        res.end();
    });
});

app.listen(3000, () => console.log('SSE server listening on port 3000'));

步骤二:uni-app 在不同端的实现

现在我们来看 uni-app 前端的实现。由于各端对网络请求的能力支持不同,我们需要使用条件编译来编写特定平台的代码。

1. H5 平台:使用原生 EventSource (最简单、最标准)

H5 运行在标准浏览器环境中,可以直接使用 Web API EventSource,这是处理 SSE 的标准方式。

代码实现:

// H5 端的处理逻辑
// #ifdef H5
let eventSource = null;

function startH5Stream(prompt, onMessage, onDone, onError) {
    const url = 'http://localhost:3000/ai-stream'; // 你的后端接口
    
    // 创建 EventSource 实例
    eventSource = new EventSource(url); // 注意:EventSource 不支持 POST,通常通过 GET 参数传递
    // 如果必须用POST,需要用 fetch API 的流式读取来模拟,这里用 GET 简化演示

    // 监听 message 事件
    eventSource.onmessage = (event) => {
        if (event.data === '[DONE]') {
            console.log("H5 Stream finished.");
            eventSource.close(); // 关闭连接
            if (onDone) onDone();
            return;
        }
        try {
            const data = JSON.parse(event.data);
            if (onMessage) onMessage(data.text);
        } catch (e) {
            if (onError) onError(e);
        }
    };

    // 监听 error 事件
    eventSource.onerror = (err) => {
        console.error("EventSource failed:", err);
        if (onError) onError(err);
        eventSource.close();
    };
}

function stopH5Stream() {
    if (eventSource) {
        eventSource.close();
        eventSource = null;
        console.log("H5 Stream manually stopped.");
    }
}
// #endif
2. App 平台:使用 fetch Polyfill (推荐方案)

uni-app 在 App 端的 uni.request 不支持流式返回。因此,我们不能直接用它。最佳实践是引入一个支持流式 fetch 的 Polyfill 库,如 @microsoft/fetch-event-source

a) 安装依赖:

npm install @microsoft/fetch-event-source

b) 代码实现:

// App 端的处理逻辑
// #ifdef APP-PLUS
import { fetchEventSource } from '@microsoft/fetch-event-source';

let abortController = null;

function startAppStream(prompt, onMessage, onDone, onError) {
    const url = 'http://localhost:3000/ai-stream';
    abortController = new AbortController();

    fetchEventSource(url, {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json',
        },
        body: JSON.stringify({ prompt: prompt }),
        signal: abortController.signal,

        onmessage(event) {
            if (event.data === '[DONE]') {
                console.log("App Stream finished.");
                if (onDone) onDone();
                abortController.abort(); // 结束
                return;
            }
            try {
                const data = JSON.parse(event.data);
                if (onMessage) onMessage(data.text);
            } catch (e) {
                if (onError) onError(e);
            }
        },
        onerror(err) {
            console.error("App Stream error:", err);
            if (onError) onError(err);
            abortController.abort();
            throw err; // 必须抛出错误以停止重试
        }
    });
}

function stopAppStream() {
    if (abortController) {
        abortController.abort();
        abortController = null;
        console.log("App Stream manually stopped.");
    }
}
// #endif
3. 微信小程序平台:使用 uni.request + enableChunked (最特殊)

微信小程序环境最封闭,没有 EventSourcefetch。但微信为 wx.request(即 uni.request 在微信端的底层实现)提供了一个实验性参数 enableChunked: true,允许我们接收分块数据。我们需要手动解析这些数据块。

代码实现:

// 微信小程序端的处理逻辑
// #ifdef MP-WEIXIN
let requestTask = null;
let buffer = ''; // 用于存储不完整的消息片段

function startWeixinStream(prompt, onMessage, onDone, onError) {
    buffer = ''; // 重置缓冲区
    requestTask = uni.request({
        url: 'http://localhost:3000/ai-stream',
        method: 'POST',
        data: { prompt: prompt },
        enableChunked: true, // 开启分块接收
        success: () => {
             // 整个请求结束,如果是正常结束,[DONE]信号会处理
        },
        fail: (err) => {
            if (onError) onError(err);
        }
    });

    // 监听接收到的分块数据
    requestTask.onChunkReceived((res) => {
        const arrayBuffer = res.data;
        const uint8Array = new Uint8Array(arrayBuffer);
        // 将 ArrayBuffer 转换为字符串
        const textChunk = new TextDecoder().decode(uint8Array);
        
        // --- 手动解析 SSE 格式 ---
        buffer += textChunk;
        const messages = buffer.split('\n\n');
        
        // 最后一个消息可能不完整,保留在 buffer 中
        buffer = messages.pop();

        messages.forEach(msg => {
            if (msg.startsWith('data: ')) {
                const dataStr = msg.substring(6); // 去掉 "data: "
                if (dataStr === '[DONE]') {
                    console.log("Weixin Stream finished.");
                    if (onDone) onDone();
                    stopWeixinStream(); // 结束任务
                    return;
                }
                try {
                    const data = JSON.parse(dataStr);
                    if (onMessage) onMessage(data.text);
                } catch (e) {
                    if (onError) onError(e);
                }
            }
        });
    });
}

function stopWeixinStream() {
    if (requestTask) {
        requestTask.abort();
        requestTask = null;
        console.log("Weixin Stream manually stopped.");
    }
}
// #endif

步骤三:整合到 Vue 页面中

现在,我们将上述平台特定的代码整合到一个 Vue 组件中,使用条件编译来调用正确的函数。

pages/ai-chat/ai-chat.vue 示例:

<template>
    <view class="chat-container">
        <view class="response-area">
            {{ aiResponse }}
        </view>
        <button @click="startStream" :disabled="isLoading">发送</button>
        <button @click="stopStream" v-if="isLoading">停止</button>
    </view>
</template>

<script>
// 导入所有平台的代码(实际开发中可以组织成单独的 JS 文件)
// #ifdef H5
// ... 粘贴 H5 代码 ...
// #endif
// #ifdef APP-PLUS
// ... 粘贴 App 代码 ...
// #endif
// #ifdef MP-WEIXIN
// ... 粘贴小程序代码 ...
// #endif

export default {
    data() {
        return {
            aiResponse: '',
            isLoading: false
        };
    },
    methods: {
        startStream() {
            if (this.isLoading) return;
            this.isLoading = true;
            this.aiResponse = '';
            
            const prompt = "你好"; // 示例 prompt

            const onMessage = (text) => {
                this.aiResponse += text;
            };
            const onDone = () => {
                this.isLoading = false;
            };
            const onError = (err) => {
                this.aiResponse += "\n[出错了]";
                this.isLoading = false;
                console.error("Stream Error:", err);
            };

            // --- 使用条件编译调用对应平台的函数 ---
            // #ifdef H5
            startH5Stream(prompt, onMessage, onDone, onError);
            // #endif
            
            // #ifdef APP-PLUS
            startAppStream(prompt, onMessage, onDone, onError);
            // #endif
            
            // #ifdef MP-WEIXIN
            startWeixinStream(prompt, onMessage, onDone, onError);
            // #endif
        },
        
        stopStream() {
            // --- 使用条件编译调用对应的停止函数 ---
            // #ifdef H5
            stopH5Stream();
            // #endif

            // #ifdef APP-PLUS
            stopAppStream();
            // #endif
            
            // #ifdef MP-WEIXIN
            stopWeixinStream();
            // #endif
            
            this.isLoading = false;
        }
    }
}
</script>

<style>
.chat-container {
    padding: 20px;
}
.response-area {
    width: 100%;
    min-height: 200px;
    border: 1px solid #ccc;
    padding: 10px;
    white-space: pre-wrap; /* 保持换行和空格 */
}
</style>

总结

  1. H5 端:最简单,直接使用标准的 EventSource API。
  2. App 端uni.request 不支持流式,需要借助 fetch polyfill 库(如 @microsoft/fetch-event-source)来实现,这也是最现代、最接近 Web 标准的做法。
  3. 微信小程序端:能力最受限,但可以通过 enableChunked 标志接收分块数据,然后手动编写解析器来处理 SSE 格式的字符串流。这是最复杂但也是唯一可行的方法。
  4. 代码组织:利用 uni-app 强大的条件编译 (#ifdef/#endif),可以将所有平台的逻辑封装在统一的接口后面,让业务代码保持干净整洁。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐