基于SSE(服务器单向推送)+ AsyncContext(异步上下文)实现流式传输
本文介绍了流式传输大模型返回数据的技术选型与实现方案。通过对比同步等待、SSE+AsyncContext、WebFlux和WebSocket四种方案,最终选择了SSE+AsyncContext组合。SSE实现服务器单向推送,AsyncContext释放连接提高并发能力,二者结合满足大模型处理慢、前端需要实时显示的需求。文章详细说明了技术选型依据、SSE和AsyncContext的工作原理,并提供了
·
技术选型思路
在工作中, 有一项工作是需要将大模型返回的数据流式透传给前端
因此在评估技术栈使用时, 我们需要对常见的传输方式进行比较, 如下:
方案对比表

下面我们来了解下各个技术栈的作用
1. SSE(Server-Sent Events)
是什么:服务器单向推送技术
类比:
普通HTTP:你问一句,我答一句(同步)
SSE:你问一句,我一句一句慢慢告诉你(流式)
工作原理:
前端:老板,帮我优化这个提示词
后端:好的,我开始想...
后端:data: 首先...\n\n (过1秒)
后端:data: 然后...\n\n (过2秒)
后端:data: 最后...\n\n (过3秒)
后端:data: [DONE]\n\n (结束)
为什么用SSE:
- 大模型生成很慢(可能要10-30秒)
- 如果等全部生成完再返回,前端会超时
- SSE可以边生成边返回,用户看到实时效果
2. AsyncContext(异步上下文)
是什么:Servlet的"等我一下"机制
普通Servlet流程:
1. 收到请求
2. 处理请求(阻塞)
3. 返回响应
4. 释放连接
问题:大模型要10秒,连接被占用10秒,别人进不来
AsyncContext流程:
1. 收到请求
2. 说:好的,我收到了(立即释放连接)
3. 后台慢慢处理(10秒)
4. 处理完了再通知前端
为什么用AsyncContext:
- 大模型处理时间长
- 不能让一个请求占用连接太久
- 释放连接给别人用,提高并发能力
3. WebFlux(响应式编程)
是什么:另一种处理高并发的方式
传统方式(我们用的):
new Thread(() -> {
// 开一个新线程处理
}).start();
WebFlux方式:
// 使用"事件驱动",像Node.js
// 不创建线程,用更少的资源处理更多请求
为什么我们不用WebFlux:
- 学习成本高:响应式编程概念复杂
- 兼容性:要改现有代码结构
- 杀鸡用牛刀:我们需求不复杂,不用这么高级的技术
技术实现思路
整体流程
1. 前端用户填写表单
↓
2. 前端收集所有字段(prompt + type + model + opening)
↓
3. 前端发送POST请求(Accept: text/event-stream)
↓
4. Java后端接收请求,开启AsyncContext
↓
5. Java构建完整请求体(添加stream:true)
↓
6. Java调用Python大模型服务
↓
7. Python返回SSE流式响应
↓
8. Java读取流式数据并直接转发给前端
↓
9. 前端使用ReadableStream解析SSE数据
↓
10. 前端实时显示优化结果
1. 核心:SSE + AsyncContext
代码对应关系:
// 1. 声明这是SSE流
@PostMapping(value = "/optimize-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
// 2. 开启异步处理
AsyncContext asyncContext = servletRequest.startAsync();
// 3. 新线程处理(不阻塞主连接)
new Thread(() -> {
// 4. 调用Python大模型
// 5. 每收到一点数据就发给前端
writer.write("data: 内容\n\n");
writer.flush();
}).start();
2. 为什么是这个组合?
需求决定技术:
需求:大模型慢 → 需要流式 → 选SSE
需求:不想阻塞 → 需要异步 → 选AsyncContext
需求:要简单 → 不选复杂方案 → 不选WebFlux
技术匹配度:
- ✅ SSE:专门为服务器推送设计(我们就是推送)
- ✅ AsyncContext:Servlet自带,兼容性好
- ❌ WebSocket:双向通信,我们不需要
- ❌ WebFlux:要改架构,成本高
3. 实际代码
@ApiOperation(value = "流式优化提示词", notes = "实时流式返回优化结果")
@PostMapping(value = "/optimize-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public void optimizeStream(@RequestBody PromptRequest request,
HttpServletRequest servletRequest,
HttpServletResponse servletResponse) {
log.info("收到流式优化请求");
// 开启异步处理
AsyncContext asyncContext = servletRequest.startAsync();
// 设置超时时间(1分钟)
asyncContext.setTimeout(60000L);
// 调用Service层处理
verbalTrickService.optimizePromptStream(request, (AsyncContext) asyncContext);
}
/**
* 流式优化提示词 - 使用Servlet异步支持
*/
public void optimizePromptStream(PromptRequest request, AsyncContext asyncContext) {
String apiUrl = pythonServiceUrl + "/prompt/optimize-prompt";
log.info("调用大模型AI优化服务(流式转发): {}, 请求内容: {}", apiUrl, new JSONObject(request));
// 获取响应对象
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
// 设置SSE响应头
response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);
response.setCharacterEncoding(StandardCharsets.UTF_8.name());
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive");
// 添加异步监听器
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) throws IOException {
log.info("异步处理完成");
}
@Override
public void onTimeout(AsyncEvent event) throws IOException {
log.warn("异步处理超时");
try {
PrintWriter writer = event.getAsyncContext().getResponse().getWriter();
writer.write("event: timeout\n");
writer.write("data: 请求超时\n\n");
writer.flush();
} catch (Exception e) {
log.error("发送超时信息失败", e);
} finally {
event.getAsyncContext().complete();
}
}
@Override
public void onError(AsyncEvent event) throws IOException {
log.error("异步处理出错", event.getThrowable());
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
log.info("异步处理开始");
}
});
// 在单独的线程中处理,但保持异步上下文有效
new Thread(() -> {
try {
// ---------构建符合大模型接口要求的请求体--------------------------
Map<String, Object> requestBody = new HashMap<>();
requestBody.put("prompt", request.getPrompt());
requestBody.put("type", request.getType() != null ? request.getType() : "优化");
requestBody.put("model", request.getModel() != null ? request.getModel() : "qwen-max");
requestBody.put("stream", true);
//----------------------------------------------------------------
log.debug("发送请求体: {}", requestBody);
// 调用Python服务
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setAccept(Collections.singletonList(MediaType.TEXT_EVENT_STREAM));
// 使用RestTemplate流式读取
restTemplate.execute(apiUrl, HttpMethod.POST,
httpRequest -> {
httpRequest.getHeaders().putAll(headers);
String jsonBody = new ObjectMapper().writeValueAsString(requestBody);
httpRequest.getBody().write(jsonBody.getBytes(StandardCharsets.UTF_8));
},
httpResponse -> {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(httpResponse.getBody(), StandardCharsets.UTF_8))) {
PrintWriter writer = asyncContext.getResponse().getWriter();
String line;
int lineCount = 0;
boolean isCompleted = false;
while ((line = reader.readLine()) != null && !isCompleted) {
lineCount++;
// 记录收到的数据(调试用)
if (lineCount <= 10 || line.contains("[DONE]")) {
log.debug("收到第{}行: {}", lineCount, line);
}
// 直接转发给前端
writer.write(line);
writer.write("\n");
writer.flush();
// 检查是否结束
if (line.trim().equals("data: [DONE]")) {
log.info("收到结束标记,传输完成,共{}行", lineCount);
break;
}
}
} catch (Exception e) {
log.error("读取Python响应失败", e);
try {
PrintWriter writer = asyncContext.getResponse().getWriter();
writer.write("event: error\n");
writer.write("data: 读取失败: " + e.getMessage() + "\n\n");
writer.flush();
} catch (IOException ex) {
log.error("发送错误信息失败", ex);
}
} finally {
// 完成异步处理
asyncContext.complete();
}
return null;
}
);
} catch (Exception e) {
log.error("流式处理失败", e);
try {
PrintWriter writer = asyncContext.getResponse().getWriter();
writer.write("event: error\n");
writer.write("data: " + e.getMessage() + "\n\n");
writer.flush();
} catch (IOException ex) {
log.error("发送错误信息失败", ex);
} finally {
asyncContext.complete();
}
}
}).start();
}
前端测试代码
- HTML表单部分(确保所有必填字段)
<!-- 核心表单元素 -->
<textarea id="promptInput">你是一个客服助手...</textarea>
<select id="typeSelect">
<option value="优化">优化</option>
<option value="扩写">扩写</option>
<option value="缩写">缩写</option>
</select>
<select id="modelSelect">
<option value="qwen-max">qwen-max</option>
<option value="gpt-3.5-turbo">gpt-3.5-turbo</option>
</select>
<input id="openingInput" placeholder="开场白(可选)">
- JavaScript核心逻辑
async function startOptimization() {
// 1. 收集所有字段(必须包含type和model)
const requestData = {
prompt: document.getElementById('promptInput').value,
type: document.getElementById('typeSelect').value, // 必填
model: document.getElementById('modelSelect').value, // 必填
opening: document.getElementById('openingInput').value || null,
stream: true // 流式标志
};
console.log('发送请求数据:', requestData);
// 2. 发送POST请求(关键:Accept头)
const response = await fetch('/prod-api/optimize-stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream' // 声明接受SSE
},
body: JSON.stringify(requestData)
});
// 3. 使用ReadableStream处理SSE响应
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.substring(6);
if (data === '[DONE]') {
console.log('传输完成');
return;
}
// 4. 解析JSON并提取content
try {
const parsed = JSON.parse(data);
if (parsed.choices && parsed.choices[0] && parsed.choices[0].delta) {
const content = parsed.choices[0].delta.content || '';
// 实时显示到页面
updateResultDisplay(content);
}
} catch (e) {
console.log('非JSON数据:', data);
}
}
}
}
}
效果测试
4. 场景类比:餐厅点餐
普通HTTP(同步):
你:老板,来碗牛肉面
老板:(在后厨做10分钟)
你:(干等10分钟)
老板:好了,给你
SSE + AsyncContext(我们的方案):
你:老板,来碗牛肉面
老板:好的,你先坐(AsyncContext释放连接)
服务员:(后台线程)开始做面
服务员:正在切牛肉...(SSE推送)
服务员:正在煮面...(SSE推送)
服务员:正在加调料...(SSE推送)
服务员:好了,请享用(SSE结束)
WebFlux(高级方案):
整个餐厅用"事件驱动"模式
一个服务员同时服务10桌客人
效率更高,但培训成本高
总结
技术栈总结
核心技术:
- SSE:解决"等太久"问题 → 边生成边显示
- AsyncContext:解决"占着茅坑"问题 → 释放连接
辅助技术:
- RestTemplate:调用Python服务
- Thread:后台处理(简单直接)
- JSON解析:处理大模型返回的数据格式
没用的技术:
WebFlux:太复杂,没必要
WebSocket:双向通信,用不上
消息队列:还没到那个规模
一句话总结
我们用SSE实现"慢慢说",
用AsyncContext实现"你先忙别的",
组合起来就是:前端不等,后端不堵,简单够用。
这个方案:
- 满足业务需求(实时显示大模型结果)
- 技术简单(团队成员都懂)
- 易于维护(代码清晰)
- 够用就好(不追求过度优化)
技术选择的核心原则:用最简单的技术解决业务问题,不炫技,不复杂化。
更多推荐



所有评论(0)