技术选型思路

在工作中, 有一项工作是需要将大模型返回的数据流式透传给前端
因此在评估技术栈使用时, 我们需要对常见的传输方式进行比较, 如下:

方案对比表

方案	优点	缺点	为什么选/不选
方案A:同步等待	简单	前端会超时	❌ 不选
方案B:SSE + AsyncContext	实时、简单	有并发限制	✅ 选了(够用)
方案C:SSE + WebFlux	高性能	复杂、要重构	❌ 不选(过度设计)
方案D:WebSocket	双向通信	更复杂	❌ 不选(不需要双向)

下面我们来了解下各个技术栈的作用

1. SSE(Server-Sent Events)

是什么:服务器单向推送技术

类比:

普通HTTP:你问一句,我答一句(同步)
SSE:你问一句,我一句一句慢慢告诉你(流式)

工作原理:

前端:老板,帮我优化这个提示词
后端:好的,我开始想...
后端:data: 首先...\n\n (过1秒)
后端:data: 然后...\n\n (过2秒)  
后端:data: 最后...\n\n (过3秒)
后端:data: [DONE]\n\n (结束)

为什么用SSE:

  • 大模型生成很慢(可能要10-30秒)
  • 如果等全部生成完再返回,前端会超时
  • SSE可以边生成边返回,用户看到实时效果

2. AsyncContext(异步上下文)

是什么:Servlet的"等我一下"机制

普通Servlet流程:

1. 收到请求
2. 处理请求(阻塞)
3. 返回响应
4. 释放连接
问题:大模型要10秒,连接被占用10秒,别人进不来

AsyncContext流程:

1. 收到请求
2. 说:好的,我收到了(立即释放连接)
3. 后台慢慢处理(10秒)
4. 处理完了再通知前端

为什么用AsyncContext:

  • 大模型处理时间长
  • 不能让一个请求占用连接太久
  • 释放连接给别人用,提高并发能力

3. WebFlux(响应式编程)

是什么:另一种处理高并发的方式

传统方式(我们用的):

new Thread(() -> {
    // 开一个新线程处理
}).start();

WebFlux方式:

// 使用"事件驱动",像Node.js
// 不创建线程,用更少的资源处理更多请求

为什么我们不用WebFlux:

  • 学习成本高:响应式编程概念复杂
  • 兼容性:要改现有代码结构
  • 杀鸡用牛刀:我们需求不复杂,不用这么高级的技术

技术实现思路

整体流程

1. 前端用户填写表单
   ↓
2. 前端收集所有字段(prompt + type + model + opening)
   ↓
3. 前端发送POST请求(Accept: text/event-stream)
   ↓
4. Java后端接收请求,开启AsyncContext
   ↓
5. Java构建完整请求体(添加stream:true)
   ↓
6. Java调用Python大模型服务
   ↓
7. Python返回SSE流式响应
   ↓
8. Java读取流式数据并直接转发给前端
   ↓
9. 前端使用ReadableStream解析SSE数据
   ↓
10. 前端实时显示优化结果

1. 核心:SSE + AsyncContext

代码对应关系:

// 1. 声明这是SSE流
@PostMapping(value = "/optimize-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

// 2. 开启异步处理
AsyncContext asyncContext = servletRequest.startAsync();

// 3. 新线程处理(不阻塞主连接)
new Thread(() -> {
    // 4. 调用Python大模型
    // 5. 每收到一点数据就发给前端
    writer.write("data: 内容\n\n");
    writer.flush();
}).start();

2. 为什么是这个组合?

需求决定技术:

需求:大模型慢 → 需要流式 → 选SSE
需求:不想阻塞 → 需要异步 → 选AsyncContext  
需求:要简单 → 不选复杂方案 → 不选WebFlux

技术匹配度:

  • ✅ SSE:专门为服务器推送设计(我们就是推送)
  • ✅ AsyncContext:Servlet自带,兼容性好
  • ❌ WebSocket:双向通信,我们不需要
  • ❌ WebFlux:要改架构,成本高

3. 实际代码

    @ApiOperation(value = "流式优化提示词", notes = "实时流式返回优化结果")
    @PostMapping(value = "/optimize-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public void optimizeStream(@RequestBody PromptRequest request,
                               HttpServletRequest servletRequest,
                               HttpServletResponse servletResponse) {
        log.info("收到流式优化请求");
        // 开启异步处理
        AsyncContext asyncContext = servletRequest.startAsync();
        // 设置超时时间(1分钟)
        asyncContext.setTimeout(60000L);
        // 调用Service层处理
        verbalTrickService.optimizePromptStream(request, (AsyncContext) asyncContext);
    }



    /**
     * 流式优化提示词 - 使用Servlet异步支持
     */
    public void optimizePromptStream(PromptRequest request, AsyncContext asyncContext) {
        String apiUrl = pythonServiceUrl + "/prompt/optimize-prompt";
        log.info("调用大模型AI优化服务(流式转发): {}, 请求内容: {}", apiUrl, new JSONObject(request));
        // 获取响应对象
        HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
        // 设置SSE响应头
        response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);
        response.setCharacterEncoding(StandardCharsets.UTF_8.name());
        response.setHeader("Cache-Control", "no-cache");
        response.setHeader("Connection", "keep-alive");
        // 添加异步监听器
        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent event) throws IOException {
                log.info("异步处理完成");
            }
            @Override
            public void onTimeout(AsyncEvent event) throws IOException {
                log.warn("异步处理超时");
                try {
                    PrintWriter writer = event.getAsyncContext().getResponse().getWriter();
                    writer.write("event: timeout\n");
                    writer.write("data: 请求超时\n\n");
                    writer.flush();
                } catch (Exception e) {
                    log.error("发送超时信息失败", e);
                } finally {
                    event.getAsyncContext().complete();
                }
            }
            @Override
            public void onError(AsyncEvent event) throws IOException {
                log.error("异步处理出错", event.getThrowable());
            }
            @Override
            public void onStartAsync(AsyncEvent event) throws IOException {
                log.info("异步处理开始");
            }
        });

        // 在单独的线程中处理,但保持异步上下文有效
        new Thread(() -> {
            try {
                // ---------构建符合大模型接口要求的请求体--------------------------
                Map<String, Object> requestBody = new HashMap<>();
                requestBody.put("prompt", request.getPrompt());
                requestBody.put("type", request.getType() != null ? request.getType() : "优化");
                requestBody.put("model", request.getModel() != null ? request.getModel() : "qwen-max");
                requestBody.put("stream", true);
                //----------------------------------------------------------------
                log.debug("发送请求体: {}", requestBody);
                // 调用Python服务
                HttpHeaders headers = new HttpHeaders();
                headers.setContentType(MediaType.APPLICATION_JSON);
                headers.setAccept(Collections.singletonList(MediaType.TEXT_EVENT_STREAM));
                // 使用RestTemplate流式读取
                restTemplate.execute(apiUrl, HttpMethod.POST,
                        httpRequest -> {
                            httpRequest.getHeaders().putAll(headers);
                            String jsonBody = new ObjectMapper().writeValueAsString(requestBody);
                            httpRequest.getBody().write(jsonBody.getBytes(StandardCharsets.UTF_8));
                        },
                        httpResponse -> {
                            try (BufferedReader reader = new BufferedReader(
                                    new InputStreamReader(httpResponse.getBody(), StandardCharsets.UTF_8))) {
                                PrintWriter writer = asyncContext.getResponse().getWriter();
                                String line;
                                int lineCount = 0;
                                boolean isCompleted = false;
                                while ((line = reader.readLine()) != null && !isCompleted) {
                                    lineCount++;
                                    // 记录收到的数据(调试用)
                                    if (lineCount <= 10 || line.contains("[DONE]")) {
                                        log.debug("收到第{}行: {}", lineCount, line);
                                    }
                                    // 直接转发给前端
                                    writer.write(line);
                                    writer.write("\n");
                                    writer.flush();
                                    // 检查是否结束
                                    if (line.trim().equals("data: [DONE]")) {
                                        log.info("收到结束标记,传输完成,共{}行", lineCount);
                                        break;
                                    }
                                }
                            } catch (Exception e) {
                                log.error("读取Python响应失败", e);
                                try {
                                    PrintWriter writer = asyncContext.getResponse().getWriter();
                                    writer.write("event: error\n");
                                    writer.write("data: 读取失败: " + e.getMessage() + "\n\n");
                                    writer.flush();
                                } catch (IOException ex) {
                                    log.error("发送错误信息失败", ex);
                                }
                            } finally {
                                // 完成异步处理
                                asyncContext.complete();
                            }
                            return null;
                        }
                );
            } catch (Exception e) {
                log.error("流式处理失败", e);
                try {
                    PrintWriter writer = asyncContext.getResponse().getWriter();
                    writer.write("event: error\n");
                    writer.write("data: " + e.getMessage() + "\n\n");
                    writer.flush();
                } catch (IOException ex) {
                    log.error("发送错误信息失败", ex);
                } finally {
                    asyncContext.complete();
                }
            }
        }).start();
    }

前端测试代码

  1. HTML表单部分(确保所有必填字段)
<!-- 核心表单元素 -->
<textarea id="promptInput">你是一个客服助手...</textarea>

<select id="typeSelect">
    <option value="优化">优化</option>
    <option value="扩写">扩写</option>
    <option value="缩写">缩写</option>
</select>

<select id="modelSelect">
    <option value="qwen-max">qwen-max</option>
    <option value="gpt-3.5-turbo">gpt-3.5-turbo</option>
</select>

<input id="openingInput" placeholder="开场白(可选)">
  1. JavaScript核心逻辑
async function startOptimization() {
    // 1. 收集所有字段(必须包含type和model)
    const requestData = {
        prompt: document.getElementById('promptInput').value,
        type: document.getElementById('typeSelect').value,    // 必填
        model: document.getElementById('modelSelect').value,  // 必填
        opening: document.getElementById('openingInput').value || null,
        stream: true  // 流式标志
    };
    
    console.log('发送请求数据:', requestData);
    
    // 2. 发送POST请求(关键:Accept头)
    const response = await fetch('/prod-api/optimize-stream', {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json',
            'Accept': 'text/event-stream'  // 声明接受SSE
        },
        body: JSON.stringify(requestData)
    });
    
    // 3. 使用ReadableStream处理SSE响应
    const reader = response.body.getReader();
    const decoder = new TextDecoder('utf-8');
    
    while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');
        
        for (const line of lines) {
            if (line.startsWith('data: ')) {
                const data = line.substring(6);
                
                if (data === '[DONE]') {
                    console.log('传输完成');
                    return;
                }
                
                // 4. 解析JSON并提取content
                try {
                    const parsed = JSON.parse(data);
                    if (parsed.choices && parsed.choices[0] && parsed.choices[0].delta) {
                        const content = parsed.choices[0].delta.content || '';
                        // 实时显示到页面
                        updateResultDisplay(content);
                    }
                } catch (e) {
                    console.log('非JSON数据:', data);
                }
            }
        }
    }
}

效果测试
在这里插入图片描述

4. 场景类比:餐厅点餐

普通HTTP(同步):

你:老板,来碗牛肉面
老板:(在后厨做10分钟)
你:(干等10分钟)
老板:好了,给你

SSE + AsyncContext(我们的方案):

你:老板,来碗牛肉面
老板:好的,你先坐(AsyncContext释放连接)
服务员:(后台线程)开始做面
服务员:正在切牛肉...(SSE推送)
服务员:正在煮面...(SSE推送)  
服务员:正在加调料...(SSE推送)
服务员:好了,请享用(SSE结束)

WebFlux(高级方案):

整个餐厅用"事件驱动"模式
一个服务员同时服务10桌客人
效率更高,但培训成本高

总结

技术栈总结

核心技术:

  • SSE:解决"等太久"问题 → 边生成边显示
  • AsyncContext:解决"占着茅坑"问题 → 释放连接

辅助技术:

  • RestTemplate:调用Python服务
  • Thread:后台处理(简单直接)
  • JSON解析:处理大模型返回的数据格式

没用的技术:
WebFlux:太复杂,没必要
WebSocket:双向通信,用不上
消息队列:还没到那个规模

一句话总结

我们用SSE实现"慢慢说",
用AsyncContext实现"你先忙别的",
组合起来就是:前端不等,后端不堵,简单够用。

这个方案:

  • 满足业务需求(实时显示大模型结果)
  • 技术简单(团队成员都懂)
  • 易于维护(代码清晰)
  • 够用就好(不追求过度优化)

技术选择的核心原则:用最简单的技术解决业务问题,不炫技,不复杂化。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐