Spring AI 使用阿里百炼平台实现流式对话:基于 SSE 的实践
·
Spring AI阿里百炼平台实现流式对话:基于 SSE 的实践指南
在大模型应用开发中,流式对话是提升用户体验的关键特性。本文将详细介绍如何利用 Spring AI 结合 Spring Boot,基于 SSE(Server-Sent Events)协议实现高效的流式对话功能,包括动态中断机制和前端交互优化。
技术选型与协议解析
SSE 协议与 WebSocket 的区别
SSE(Server-Sent Events)是一种基于 HTTP 的轻量级服务器向客户端推送信息的协议,与 WebSocket 相比有显著差异:
| 特性 | SSE | WebSocket |
|---|---|---|
| 通信方向 | 单向(服务端 → 客户端) | 全双工(双向通信) |
| 连接方式 | 基于 HTTP 长连接 | 独立的 WebSocket 协议 |
| 复杂度 | 简单(无需复杂握手) | 复杂(需要专门握手过程) |
| 适用场景 | 消息推送、实时通知、流式输出 | 即时通讯、游戏等双向交互场景 |
| 数据格式 | 文本/event-stream 格式 | 二进制/文本,需自定义格式 |
对于仅需服务端向客户端推送流式响应的对话场景,SSE 是更简洁高效的选择。
项目环境搭建
核心依赖配置
首先在 pom.xml 中添加必要依赖(注意修正版本号格式错误):
<!-- Spring AI 依赖管理 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-bom</artifactId>
<version>1.0.0-M6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!-- 核心依赖 -->
<dependencies>
<!-- Spring AI OpenAI 适配 -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
</dependency>
<!-- 阿里百炼 SDK(兼容 OpenAI 协议) -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dashscope-sdk-java</artifactId>
<version>2.16.9</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Spring WebFlux(响应式编程支持) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
</dependencies>
配置文件设置
在 application.yml 中配置模型服务信息:
spring:
ai:
openai:
base-url: https://dashscope.aliyuncs.com/compatible-mode # 阿里百炼兼容接口
api-key: sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx # 替换为实际API密钥
chat:
options:
model: qwen-plus # 可替换为实际使用的模型,如qwen-max、qwen-turbo等
核心功能实现
1. ChatClient 配置
创建 ChatClient 实例,配置对话模型、记忆机制和系统提示:
import org.springframework.ai.chat.ChatClient;
import org.springframework.ai.chat.ChatMemory;
import org.springframework.ai.chat.advisor.MessageChatMemoryAdvisor;
import org.springframework.ai.chat.advisor.SimpleLoggerAdvisor;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AiConfig {
@Bean
public ChatClient chatClient(ChatModel chatModel, ChatMemory chatMemory) {
return ChatClient.builder(chatModel)
.defaultOptions(options -> options
.withModel("qwen-plus") // 模型名称
.withTemperature(0.7f)) // 新增温度参数,控制输出随机性
.defaultSystem("你是一个友好的智能助手,负责解答用户问题") // 修正引号格式
.defaultAdvisors(
new SimpleLoggerAdvisor(), // 日志记录
new MessageChatMemoryAdvisor(chatMemory) // 对话记忆
)
.build();
}
}
2. 流式对话控制器
实现支持 SSE 的控制器,包含流式响应和中断功能:
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.springframework.ai.chat.client.advisor.AbstractChatMemoryAdvisor.CHAT_MEMORY_CONVERSATION_ID_KEY;
@RestController
@RequestMapping("/api/chat")
@RequiredArgsConstructor
@Slf4j
public class ChatController {
private final ChatClient chatClient;
private final AtomicBoolean isStreaming = new AtomicBoolean(true); // 线程安全的状态标识
/**
* 流式对话接口
* @param prompt 用户输入
* @param chatId 对话ID(用于记忆上下文)
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChat(
@RequestParam String prompt,
@RequestParam(required = false, defaultValue = "default") String chatId) {
log.info("收到对话请求 [{}]: {}", chatId, prompt);
isStreaming.set(true); // 重置流式状态
return chatClient.prompt()
.user(prompt)
.advisors(advisorSpec -> advisorSpec.param(CHAT_MEMORY_CONVERSATION_ID_KEY, chatId))
.stream()
.content()
.takeWhile(data -> isStreaming.get()) // 动态中断控制
.doOnCancel(() -> log.info("对话流已取消 [{}]", chatId)) // 取消时日志
.concatWithValues("\u0003"); // 结束标记(ETX字符)
}
/**
* 中断流式输出接口
*/
@PostMapping("/cancel")
public void cancelStream() {
isStreaming.set(false);
log.info("已触发流式输出中断");
}
}
3. 前端交互实现
完善前端页面,处理流式数据接收、中断控制和用户体验优化:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=no">
<title>Spring AI 流式对话演示</title>
<style>
.container { max-width: 800px; margin: 20px auto; padding: 0 15px; }
#output { border: 1px solid #e0e0e0; padding: 15px; min-height: 300px; border-radius: 8px; }
.controls { margin: 15px 0; display: flex; gap: 10px; }
input[type="text"] { flex: 1; padding: 8px 12px; border: 1px solid #ccc; border-radius: 4px; }
button { padding: 8px 16px; cursor: pointer; border: none; border-radius: 4px; }
button.send { background: #4285f4; color: white; }
button.cancel { background: #ea4335; color: white; }
</style>
</head>
<body>
<div class="container">
<div class="controls">
<input type="text" id="prompt" placeholder="请输入问题...">
<button class="send" onclick="sendMessage()">发送</button>
<button class="cancel" onclick="cancelStream()">中断</button>
</div>
<h3>AI 回复:</h3>
<div id="output"></div>
</div>
<script>
const outputDiv = document.getElementById('output');
const promptInput = document.getElementById('prompt');
let eventSource = null;
// 发送消息
function sendMessage() {
const prompt = promptInput.value.trim();
if (!prompt) return;
// 清空输入和输出
promptInput.value = '';
outputDiv.innerHTML = '';
// 创建对话ID(可基于时间戳生成)
const chatId = 'chat_' + Date.now();
// 建立SSE连接
eventSource = new EventSource(`/api/chat/stream?prompt=${encodeURIComponent(prompt)}&chatId=${chatId}`);
// 处理收到的消息
eventSource.onmessage = (event) => {
const data = event.data;
// 检测结束标记
if (data === '\u0003') {
eventSource.close();
return;
}
// 追加内容到输出区域
outputDiv.textContent += data;
};
// 处理错误
eventSource.onerror = () => {
console.error('连接发生错误');
eventSource.close();
};
}
// 中断流式输出
function cancelStream() {
if (eventSource) {
eventSource.close();
eventSource = null;
}
// 通知后端停止生成
fetch('/api/chat/cancel', { method: 'POST' })
.catch(err => console.error('取消请求失败', err));
}
</script>
</body>
</html>
关键技术点解析
1. 响应式编程与 Flux
Spring AI 的流式响应基于 Reactor 框架的 Flux 实现:
Flux代表一个异步的序列数据流,适合处理流式输出takeWhile操作符用于根据条件(isStreaming状态)控制流的生命周期concatWithValues用于在流结束时添加终止标记,方便前端处理
2. 动态中断机制
- 使用
AtomicBoolean保证多线程环境下的状态安全性 - 前端通过
/cancel接口触发中断,后端通过takeWhile终止流 - 结合
eventSource.close()确保前端资源正确释放
3. 对话记忆管理
- 通过
ChatMemory和MessageChatMemoryAdvisor实现上下文记忆 chatId参数用于区分不同对话会话,实现多用户/多会话隔离
常见问题与优化建议
- 依赖版本问题:确保 Spring AI 版本与 Spring Boot 版本兼容(建议使用 Spring Boot 3.2+)
通过以上实现,我们基于 Spring AI 和 SSE 协议构建了一个完整的流式对话系统,支持实时响应、动态中断和上下文记忆,为用户提供流畅的对话体验。在实际应用中,可根据业务需求进一步扩展功能,如添加消息加密、内容过滤或多模型切换等特性。
更多推荐

所有评论(0)