LangChain4j实战之八:响应流式传输
大模型对话时,实现实时展示内容到页面的效果,而不是等所有内容凑齐了再显示
·
欢迎访问我的GitHub
这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos
LangChain4j实战全系列链接
本篇概览
- 本文继续学习LangChain4j的知识点,前几篇的图像模型、聊天记忆等篇幅都较大导致学起来有点累,今天就稍微放松一下吧,学个简单的知识点:响应流式传输
- 在前面的实战中,每次验证功能都是发http请求到后台,等完整的内容一次性返回,效果如下所示

- 若内容太多会等待较长时间,此时上述方式的体验就不太友好了,更好的交互是不要等完整的内容都凑齐才返回,而是大模型返回多少就立即展示多少,如下所示,今天咱们就一起来编码实现这个效果

- 为了让项目更有实用性,本篇会提供两种方案,分别是基于低级API和高级API的实现
额外需要的知识点
- 接下来的开发会涉及到一些额外的的技术,这里提前列出
- 前端知识,能做出最简单的页面,有输入框、按钮、输出,以及对应的事件
- 前后端交互方式,Web API的EventSource
- Spring 提供的Server-Sent Events 实现,本质是HTTP 长连接 + 流式文本
- 以上就是本篇需要的额外知识点,主要是为了完成前后端交互,没有难度,相信您可以轻松理解
- 现在少说废话,开始编码吧
源码下载(觉得作者啰嗦的,直接在这里下载)
- 如果您只想快速浏览完整源码,可以在GitHub下载代码直接运行,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
| 名称 | 链接 | 备注 |
|---|---|---|
| 项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
| git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
| git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
- 这个git项目中有多个文件夹,本篇的源码在langchain4j-tutorials文件夹下,如下图红色箭头所示:

编码:父工程调整
- 《准备工作》中创建了整个《LangChain4j实战》系列代码的父工程,本篇实战会在父工程下新建一个子工程,所以这里要对父工程的pom.xml做少量修改
- modules中增加一个子工程,如下图黄框所示

编码:新增子工程
- 新增名为streaming-response的子工程
- langchain4j-totorials目录下新增名streaming-response为的文件夹
- streaming-response文件夹下新增pom.xml,内容如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.bolingcavalry</groupId>
<artifactId>langchain4j-totorials</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>streaming-response</artifactId>
<packaging>jar</packaging>
<dependencies>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- JUnit Jupiter Engine -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<!-- Mockito Core -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<!-- Mockito JUnit Jupiter -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<!-- LangChain4j Core -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-core</artifactId>
</dependency>
<!-- LangChain4j OpenAI支持(用于通义千问的OpenAI兼容接口) -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-open-ai</artifactId>
</dependency>
<!-- 官方 langchain4j(包含 AiServices 等服务类) -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j</artifactId>
</dependency>
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-community-dashscope</artifactId>
</dependency>
<!-- langchain4j 1.x版本中的服务功能已包含在core模块中 -->
<!-- 日志依赖由Spring Boot Starter自动管理,无需单独声明 -->
</dependencies>
<build>
<plugins>
<!-- Spring Boot Maven Plugin -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>3.3.5</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- 在langchain4j-totorials/streaming-response/src/main/resources新增配置文件application.properties,内容如下,主要是三个模型的配置信息,记得把your-api-key换成您自己的apikey
# Spring Boot 应用配置
server.port=8080
server.servlet.context-path=/
# LangChain4j 配置通义千问模型
# 注意:请将your-api-key替换为您实际的通义千问API密钥
langchain4j.open-ai.chat-model.api-key=your-api-key
# 通义千问模型名称
langchain4j.open-ai.chat-model.model-name=qwen3-max
# 通义千问OpenAI兼容接口地址
langchain4j.open-ai.chat-model.base-url=https://dashscope.aliyuncs.com/compatible-mode/v1
# 日志配置
logging.level.root=INFO
logging.level.com.bolingcavalry=DEBUG
logging.pattern.console=%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
# 应用名称
spring.application.name=streaming-response
- 依旧平淡无奇的启动类
package com.bolingcavalry;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Spring Boot应用程序的主类
*/
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
- 现在新的子工程已经创建好了,接下来开始写功能代码
编码,前端
- 鉴于欣宸不入流的前端技术,前端相关的代码如果有问题还请帮忙指出…
- 新增文件streaming-response/src/main/resources/static/index.html,内容如下
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>流式聊天演示</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
background-color: #f5f5f5;
}
.container {
background-color: white;
border-radius: 8px;
padding: 20px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
h1 {
text-align: center;
color: #333;
}
.input-area {
margin-bottom: 20px;
}
textarea {
width: 100%;
height: 100px;
padding: 10px;
border: 1px solid #ddd;
border-radius: 4px;
font-size: 16px;
resize: vertical;
}
button {
background-color: #4CAF50;
color: white;
border: none;
padding: 10px 20px;
font-size: 16px;
border-radius: 4px;
cursor: pointer;
margin-top: 10px;
}
button:hover {
background-color: #45a049;
}
button:disabled {
background-color: #cccccc;
cursor: not-allowed;
}
.response-area {
border: 1px solid #ddd;
border-radius: 4px;
padding: 15px;
min-height: 200px;
background-color: #f9f9f9;
font-family: 'Courier New', monospace;
white-space: pre-wrap;
}
.status {
margin-top: 10px;
color: #666;
font-size: 14px;
}
.loading {
display: inline-block;
width: 20px;
height: 20px;
border: 3px solid #f3f3f3;
border-top: 3px solid #4CAF50;
border-radius: 50%;
animation: spin 1s linear infinite;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
</style>
</head>
<body>
<div class="container">
<h1>流式聊天演示</h1>
<div class="input-area">
<textarea id="promptInput" placeholder="请输入您的问题..."></textarea>
<button id="sendBtn" onclick="sendMessage()">低级API发送</button>
<button id="highLevelSendBtn" onclick="sendHighLevelMessage()">高级API发送</button>
</div>
<div class="status">
<span id="statusText">就绪</span>
<span id="loading" class="loading" style="display: none;"></span>
</div>
<div class="response-area" id="responseArea"></div>
</div>
<script>
let eventSource = null;
function disableAllButtons() {
document.getElementById('sendBtn').disabled = true;
document.getElementById('highLevelSendBtn').disabled = true;
document.getElementById('promptInput').disabled = true;
}
function enableAllButtons() {
document.getElementById('sendBtn').disabled = false;
document.getElementById('highLevelSendBtn').disabled = false;
document.getElementById('promptInput').disabled = false;
}
function sendMessage() {
const prompt = document.getElementById('promptInput').value.trim();
if (!prompt) {
alert('请输入问题!');
return;
}
// 禁用按钮和输入框
disableAllButtons();
document.getElementById('statusText').textContent = '正在发送低级API请求...';
document.getElementById('loading').style.display = 'inline-block';
document.getElementById('responseArea').textContent = '';
// 使用Server-Sent Events建立连接(低级API)
eventSource = new EventSource(`/api/qwen/sse-streaming-chat?prompt=${encodeURIComponent(prompt)}&userId=1`);
setupEventSourceListeners();
}
function sendHighLevelMessage() {
const prompt = document.getElementById('promptInput').value.trim();
if (!prompt) {
alert('请输入问题!');
return;
}
// 禁用按钮和输入框
disableAllButtons();
document.getElementById('statusText').textContent = '正在发送高级API请求...';
document.getElementById('loading').style.display = 'inline-block';
document.getElementById('responseArea').textContent = '';
// 使用Server-Sent Events建立连接(高级API)
eventSource = new EventSource(`/api/qwen/high-level-sse-streaming-chat?prompt=${encodeURIComponent(prompt)}&userId=1`);
setupEventSourceListeners();
}
function setupEventSourceListeners() {
// 处理接收到的消息
eventSource.onmessage = function(event) {
const data = event.data;
if (data) {
const responseArea = document.getElementById('responseArea');
responseArea.textContent += data;
responseArea.scrollTop = responseArea.scrollHeight; // 自动滚动到底部
}
};
// 处理错误
eventSource.onerror = function(error) {
console.error('SSE Error:', error);
document.getElementById('statusText').textContent = '连接错误';
document.getElementById('loading').style.display = 'none';
enableAllButtons();
if (eventSource.readyState !== EventSource.CLOSED) {
eventSource.close();
}
};
// 处理连接关闭
eventSource.onclose = function() {
document.getElementById('statusText').textContent = '会话结束';
document.getElementById('loading').style.display = 'none';
enableAllButtons();
};
}
// 监听窗口关闭事件,确保关闭连接
window.addEventListener('beforeunload', function() {
if (eventSource && eventSource.readyState !== EventSource.CLOSED) {
eventSource.close();
}
});
</script>
</body>
</html>
- 上述代码要有几个重点要注意的:
- 点击按钮后,通过创建EventSource的方式和后端建立长连接,低级API和高级API的分别有不同的path
- 建立连接后,收到后台数据时,和responseArea的已有内容拼接起来再重新展示,效果就是只要后台数据不断,前端的内容就不断增加
编码,后端,低级API
- 先实现低级API的相应流式传输,这个最简单
- 首先是配置类,这里创建模型对象实例,注意和前面几篇创建的ChatModel不同,本篇要用StreamingChatModel
package com.bolingcavalry.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.bolingcavalry.service.StreamingAssistant;
import dev.langchain4j.memory.chat.MessageWindowChatMemory;
import dev.langchain4j.model.chat.StreamingChatModel;
import dev.langchain4j.model.openai.OpenAiStreamingChatModel;
import dev.langchain4j.service.AiServices;
/**
* LangChain4j配置类
*/
@Configuration
public class LangChain4jConfig {
@Value("${langchain4j.open-ai.chat-model.api-key}")
private String apiKey;
@Value("${langchain4j.open-ai.chat-model.model-name:qwen3-max}")
private String modelName;
@Value("${langchain4j.open-ai.chat-model.base-url}")
private String baseUrl;
/**
* 创建并配置StreamingChatModel实例(使用通义千问的OpenAI兼容接口)
*
* @return StreamingChatModel实例
*/
@Bean
public StreamingChatModel streamingChatModel() {
return OpenAiStreamingChatModel.builder()
.apiKey(apiKey)
.modelName(modelName)
.baseUrl(baseUrl)
.build();
}
}
- 然后是服务类QwenService.java,要注意的是streamingChatModel.chat方法的第二个入参,是StreamingChatResponseHandler的实现类,每当大模型返回token的时候,onPartialResponse方法就会被调用,传入最新的内容,所以只要在onPartialResponse方法中把新的内容给到前端即可,也就是执行emitter.send
package com.bolingcavalry.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import dev.langchain4j.model.chat.StreamingChatModel;
import dev.langchain4j.model.chat.response.ChatResponse;
import dev.langchain4j.model.chat.response.StreamingChatResponseHandler;
import dev.langchain4j.service.TokenStream;
/**
* 通义千问服务类,用于与通义千问模型进行交互
*/
@Service
public class QwenService {
private static final Logger logger = LoggerFactory.getLogger(QwenService.class);
@Autowired
private StreamingChatModel streamingChatModel;
/**
* SSE流式聊天方法(用于网页实时显示)
*
* @param prompt 提示词
* @return SseEmitter实例,用于向客户端发送流式响应
*/
public SseEmitter lowLevelStreamingChat(String prompt) {
// 创建SseEmitter,设置超时时间为30分钟
SseEmitter emitter = new SseEmitter(30 * 60 * 1000L);
// 使用streamingChatModel进行流式聊天
streamingChatModel.chat(prompt, new StreamingChatResponseHandler() {
@Override
public void onPartialResponse(String partialResponse) {
if (partialResponse == null || partialResponse.trim().isEmpty()) {
logger.warn("空的部分响应,忽略");
return;
}
try {
// 发送部分响应到客户端
emitter.send(partialResponse);
logger.info("partialResponse: {}", partialResponse);
} catch (Exception e) {
logger.error("发送部分响应失败: {}", e.getMessage(), e);
emitter.completeWithError(e);
}
}
@Override
public void onCompleteResponse(ChatResponse completeResponse) {
try {
// 发送完成标志
emitter.send(SseEmitter.event().name("complete"));
// 完成响应
emitter.complete();
logger.info("completeResponse: {}", completeResponse);
} catch (Exception e) {
logger.error("发送完成响应失败: {}", e.getMessage(), e);
emitter.completeWithError(e);
}
}
@Override
public void onError(Throwable throwable) {
logger.error("流式聊天发生错误: {}", throwable.getMessage(), throwable);
emitter.completeWithError(throwable);
}
});
// 设置超时处理
emitter.onTimeout(() -> {
logger.error("SSE连接超时");
emitter.completeWithError(new RuntimeException("连接超时"));
});
// 设置完成处理
emitter.onCompletion(() -> {
logger.info("SSE连接完成");
});
return emitter;
}
}
- 最后是controller类QwenController.java,返回SseEmitter是Spring 提供的 Server-Sent Events 实现的常规做法,这样就能做到把数据实时推送到前端
package com.bolingcavalry.controller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import com.bolingcavalry.service.QwenService;
import lombok.Data;
/**
* 通义千问控制器,处理与大模型交互的HTTP请求
*/
@RestController
@RequestMapping("/api/qwen")
public class QwenController {
private static final Logger logger = LoggerFactory.getLogger(QwenController.class);
private final QwenService qwenService;
/**
* 构造函数,通过依赖注入获取QwenService实例
*
* @param qwenService QwenService实例
*/
public QwenController(QwenService qwenService) {
this.qwenService = qwenService;
}
/**
* 提示词请求实体类
*/
@Data
static class PromptRequest {
private String prompt;
private int userId;
}
/**
* 响应实体类
*/
@Data
static class Response {
private String result;
public Response(String result) {
this.result = result;
}
}
/**
* 检查提示词参数是否有效
*
* @param prompt 提示词参数
* @return 如果有效则返回null,否则返回包含错误信息的SseEmitter
*/
private SseEmitter checkPrompt(String prompt) {
if (prompt == null || prompt.trim().isEmpty()) {
SseEmitter emitter = new SseEmitter();
emitter.completeWithError(new IllegalArgumentException("提示词不能为空"));
return emitter;
}
return null;
}
/**
* SSE流式聊天接口(用于网页实时显示)
*
* @param prompt 提示词
* @param userId 用户ID
* @return SseEmitter实例,用于向客户端发送流式响应
*/
@GetMapping("/sse-streaming-chat")
public SseEmitter streamingChat(@RequestParam(name = "prompt") String prompt,
@RequestParam(name = "userId") int userId) {
logger.info("收到来自用户[{}]的请求, 提示词: {}", userId, prompt);
// 检查提示词是否有效
SseEmitter checkRlt = checkPrompt(prompt);
if (checkRlt != null) {
return checkRlt;
}
try {
// 调用QwenService的流式聊天方法
return qwenService.lowLevelStreamingChat(prompt);
} catch (Exception e) {
// 捕获异常并返回错误的SseEmitter
SseEmitter emitter = new SseEmitter();
emitter.completeWithError(e);
return emitter;
}
}
}
- 低级API的响应流式传输功能就完成了,可见非常简单,只要实现StreamingChatModel,并且实现StreamingChatResponseHandler即可
- 把服务运行起来试试,执行命令mvn spring-boot:run
- 浏览器打开网页,地址是localhost:8080,操作如下

- 此时就会看到响应内容实时增加的效果

- 看服务端的日志就会发现onPartialResponse方法在被频繁调用,每次的入参都是少量token

- 现在低级API的响应流式传输功能已经验证通过了,接下来实现高级API的版本
编码,后端,高级API
- 对于高级API,首先是定义一个接口,然后由LangChain4j来创建其代理类实例
- 接口定义文件StreamingAssistant.java,代码如下,要注意的是其返回值必须是TokenStream接口,该接口的onPartialResponse方法用来注册大模型返回token时候的回调
package com.bolingcavalry.service;
import dev.langchain4j.service.SystemMessage;
import dev.langchain4j.service.TokenStream;
import dev.langchain4j.service.UserMessage;
/**
* 流式助手接口,用于定义高级API的AI服务方法
*/
public interface StreamingAssistant {
@SystemMessage("你是资深中国历史学者,回答问题的风格是清晰简洁")
TokenStream chat(@UserMessage String message);
}
- 在配置类中增加一个bean的实例化,AiServices.builder是典型的高级API创建方式,还要注意绑定模型实例的方法是streamingChatModel
@Bean
public StreamingAssistant streamingAssistant(StreamingChatModel streamingChatModel) {
return AiServices.builder(StreamingAssistant.class)
.streamingChatModel(streamingChatModel)
.chatMemory(MessageWindowChatMemory.withMaxMessages(10))
.build();
}
- 再看高级API具体的用法,这里至关重要,在QwenService类中增加以下代码,可见返回值和之前的低级API一致都是SseEmitter,而具体实现流式传输的关键,就是通过streamingAssistant.chat拿到TokenStream实例后,用该实例的onPartialResponse方法完成每次token生成的响应
@Autowired
private StreamingAssistant streamingAssistant;
/**
* 基于高级API的SSE流式聊天方法
*
* @param prompt 提示词
* @return SseEmitter实例,用于向客户端发送流式响应
*/
public SseEmitter highLevelStreamingChat(String prompt) {
// 创建SseEmitter,设置超时时间为30分钟
SseEmitter emitter = new SseEmitter(30 * 60 * 1000L);
try {
// 使用高级API的TokenStream
TokenStream tokenStream = streamingAssistant.chat(prompt);
// 注册回调函数
tokenStream.onPartialResponse(token -> {
if (token == null || token.trim().isEmpty()) {
logger.warn("空的token,忽略");
return;
}
try {
// 发送token到客户端
emitter.send(token);
logger.info("token: {}", token);
} catch (Exception e) {
logger.error("发送token失败: {}", e.getMessage(), e);
emitter.completeWithError(e);
}
});
tokenStream.onError(throwable -> {
logger.error("高级API流式聊天发生错误: {}", throwable.getMessage(), throwable);
emitter.completeWithError(throwable);
});
tokenStream.onCompleteResponse(completeResponse -> {
try {
// 发送完成标志
emitter.send(SseEmitter.event().name("complete"));
// 完成响应
emitter.complete();
logger.info("高级API流式聊天完成,完整响应: {}", completeResponse);
} catch (Exception e) {
logger.error("发送完成响应失败: {}", e.getMessage(), e);
emitter.completeWithError(e);
}
});
// 启动流处理
tokenStream.start();
} catch (Exception e) {
logger.error("创建高级API流式响应失败: {}", e.getMessage(), e);
emitter.completeWithError(e);
}
// 设置超时处理
emitter.onTimeout(() -> {
logger.error("高级API SSE连接超时");
emitter.completeWithError(new RuntimeException("连接超时"));
});
// 设置完成处理
emitter.onCompletion(() -> {
logger.info("高级API SSE连接完成");
});
return emitter;
}
- 最后是controller类中增加EventSource的响应
/**
* 基于高级API的SSE流式聊天接口(用于网页实时显示)
*
* @param prompt 提示词
* @param userId 用户ID
* @return SseEmitter实例,用于向客户端发送流式响应
*/
@GetMapping("/high-level-sse-streaming-chat")
public SseEmitter highLevelStreamingChat(@RequestParam(name = "prompt") String prompt,
@RequestParam(name = "userId") int userId) {
logger.info("收到来自用户[{}]的高级API请求, 提示词: {}", userId, prompt);
// 检查提示词是否有效
SseEmitter checkRlt = checkPrompt(prompt);
if (checkRlt != null) {
return checkRlt;
}
try {
// 调用QwenService的高级API流式聊天方法
return qwenService.highLevelStreamingChat(prompt);
} catch (Exception e) {
// 捕获异常并返回错误的SseEmitter
SseEmitter emitter = new SseEmitter();
emitter.completeWithError(e);
return emitter;
}
}
- 把服务运行起来,体验效果如下

- 再看服务端日志,可见用tokenStream.onPartialResponse注册的方法会被不断的调用

- 至此,响应流式传输功能就全部完成了,可见技术上十分简单,不论是低级API还是高级API,实现方法都是把回调方法准备好,然后等着被LangChain4j框架回调即可
- 《LangChain4j实战》系列还在继续,下一篇会有更精彩的实践等待您的参与
你不孤单,欣宸原创一路相伴
更多推荐


所有评论(0)