深入浅出:LangChain.js 中的流式输出(Streaming)与 SSE 协议实战
在构建大语言模型(LLM)应用时,开发者常面临一个棘手的性能瓶颈:大模型的推理存在固有的延迟,尤其是当涉及长文本生成或复杂的 Agent 推理链(Thinking Process)时。如果采用传统的“请求-响应”模式,用户必须面对长时间的空白加载界面,这不仅极其耗时,更会严重损害用户体验(UX)。
流式输出(Streaming) 是解决这一痛点的关键技术。它允许服务端将生成的内容以“数据流”的形式,按 Token 分块实时推送给客户端,从而极大地降低了首字响应时间(TTFT, Time To First Token),让用户感受到“即时响应”的交互体验。
本文将深入探讨如何在 LangChain.js 生态中实现 Agent 和 LangGraph 的流式输出,并解析其底层依赖的 SSE(Server-Sent Events) 协议原理。
一、 Agent 流式输出
1.1 流式输出简介
在 LangChain 的架构中,Agent 的执行过程远比单一文本生成复杂。它包含了一个完整的认知循环:思考(Thought) -> 调用工具(Action) -> 观察结果(Observation) -> 最终回答(Final Answer)。
传统的 .invoke() 方法是一个黑盒,只能在所有步骤完成后返回最终结果。但在实际业务中,我们往往需要向用户展示 Agent 的思考过程(例如:“正在检索 Google…”、“正在分析数据…”),以增加系统的透明度和可信度。
LangChain 提供的 .stream() 方法正是为此设计,它支持多种流式模式,其中最常用的是:
updates模式:流式传输 Agent 执行过程中每个节点(Node)的状态更新,适合监控 Agent 的每一步动作。messages模式:流式传输整个对话过程中生成的消息(MessageChunk),适合实现类似 ChatGPT 的“打字机”效果。
1.2 代码示例
我们将构建一个简单的 Agent,包含服务端(Node.js/Express)和客户端(原生 JS)。
服务端 (Node.js + Express + LangChain)
import express from "express";
import { ChatOpenAI } from "@langchain/openai";
import { createAgent, tool } from "langchain";
import { z } from "zod";
const app = express();
const model = new ChatOpenAI({
temperature: 0.7,
model: "",
configuration: { baseURL: "https://ark.cn-beijing.volces.com/api/v3" },
apiKey: "",
});
const search = tool(
async ({ query }) => {
// 2s后返回结果
await new Promise((resolve) => setTimeout(resolve, 2000));
return `
搜索结果:这里是关于"${query}"的搜索结果摘要。
- 头部模型(Gemini 3、Claude 4、DeepSeek-R1 等)全面转向原生多模态,支持文本 / 图像 / 视频 / 音频 / 3D / 代码的端到端统一处理,上下文窗口普遍达百万 token 级,推理成本较 2023 年骤降 99.9% 以上。
- DeepSeek-R1 以纯强化学习路径实现顶尖推理,训练成本大幅降低并开源,推动全球竞争从 “算力比拼” 转向 “算法与工程效率”。
- MoE 架构成为主流,模型性能与体积 “剪刀差” 扩大,小参数量模型也能实现高性能。
`;
},
{
name: "search_tool",
description: "一个用于搜索信息的工具",
schema: z.object({ query: z.string() }),
}
);
app.get("/chat", async (req, res) => {
const agent = createAgent({
model,
tools: [search],
});
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
try {
// streamMode: "messages":打字机效果,分段吐字
for await (const [token, metadata] of await agent.stream(
{ messages: [{ role: "user", content: "请帮我搜索关于人工智能的最新发展。" }] },
{ streamMode: "messages" }
)) {
console.log(`node: ${metadata.langgraph_node}`);
// node: model_request | tools
console.log(`content: ${JSON.stringify(token.contentBlocks, null, 2)}`);
// content: [
// {
// "type": "text",
// "text": "显著"
// }
// ]
res.write(`data: ${token.contentBlocks}\n\n`);
}
// streamMode: "updates":打字机效果,分段吐字
for await (const chunk of await agent.stream(
{
messages: [
{ role: "user", content: "请帮我搜索关于人工智能的最新发展。" },
],
},
{ streamMode: "updates" }
)) {
const [step, content] = Object.entries(chunk)[0];
console.log(`step: ${step}`);
if (content.messages?.[0]?.content) {
console.log(
`messages: ${JSON.stringify(content.messages?.[0]?.content, null, 2)}`
);
} else {
console.log(`messages: ${JSON.stringify(content.messages, null, 2)}`);
}
res.write(`data: ${chunk}\n\n`);
}
res.write("data: [DONE]\n\n");
} catch (error: any) {
console.error("Error during agent execution:", error);
res.write(`data: Error: ${error.message}\n\n`);
} finally {
res.end();
}
});
app.listen(3000, () => {
console.log("Server is running on http://localhost:3000");
});
客户端 (Frontend Fetch)
import http from "http";
const options = {
hostname: "localhost",
port: 3000,
path: "/chat",
method: "GET",
headers: {
"Content-Type": "application/json",
},
};
const req = http.request(options, (res) => {
console.log(`Status: ${res.statusCode}`);
console.log(`Headers: ${JSON.stringify(res.headers)}`);
let data = "";
res.on("data", (chunk) => {
data += chunk;
console.log(`Received chunk: ${chunk}`);
});
res.on("end", () => {
console.log("Stream ended");
console.log("Full response:", data);
});
});
req.on("error", (error) => {
console.error("Request error:", error);
});
req.end();
console.log("Request sent to http://localhost:3000/chat");
二、 LangGraph 流式输出
2.1 流式输出简介
LangGraph 是 LangChain 体系下用于构建有状态、多角色(Multi-Actor)应用的编排引擎。如果说 Agent 是单一的大脑,LangGraph 就是协调多个大脑和工具的工作流(Workflow)。
LangGraph 的流式输出机制与 Agent 类似,底层同样依赖 .stream()。
2.2 代码示例
服务端 (Node.js + LangGraph)
import { StateGraph, END } from "@langchain/langgraph";
// ... 假设已定义好 State 和 Nodes (agentNode, toolNode)
// 定义图
const workflow = new StateGraph(GraphState)
.addNode("agent", agentNode)
.addNode("tools", toolNode)
.addEdge("start", "agent")
.addEdge("agent", "tools") // 简化版逻辑
.addEdge("tools", "agent");
const app = workflow.compile();
// Express 路由
app.get("/stream-graph", async (req, res) => {
res.setHeader("Content-Type", "text/event-stream");
// ... 其他 header
const inputs = { messages: [new HumanMessage(req.query.input)] };
// 使用 .stream 方法
const stream = await app.stream(inputs, {
streamMode: "updates", // 可选: 'values', 'updates'
});
for await (const chunk of stream) {
// LangGraph 的 chunk 通常是包含节点更新的对象
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
res.write("data: [DONE]\n\n");
res.end();
});
客户端
客户端逻辑与 Agent 类似,主要是解析的数据结构不同。LangGraph 的 chunk 通常是 {[nodeName]: { ...stateUpdate }} 的形式。
// 在处理逻辑中
if (jsonStr !== "[DONE]") {
const chunk = JSON.parse(jsonStr);
// 例如:chunk = { agent: { messages: [...] } }
if (chunk.agent) {
console.log("Agent 节点完成了思考");
} else if (chunk.tools) {
console.log("Tool 节点完成了执行");
}
}
三、 SSE 协议 (Server-Sent Events)
3.1 协议介绍
我们上面代码中反复出现的 Content-Type: text/event-stream 就是 SSE 的核心标志。
SSE (Server-Sent Events) 是一种基于 HTTP 的服务器推送技术。与 WebSocket 不同,SSE 是 单向 的(Server -> Client)。
- 适用场景:LLM 流式响应、股票行情更新、新闻推送。
- 对比 WebSocket:
- WebSocket 是全双工(双向),协议复杂,不仅限于 HTTP。
- SSE 只是 HTTP 长连接,更轻量,更易于穿越防火墙,且浏览器原生支持自动重连。
对于 LLM 这种“发一个 Prompt,回一大段流式文本”的场景,SSE 是最完美、最轻量的选择。
3.2 通信原理
SSE 的工作流程非常简单,遵循 HTTP 协议:
- 建立连接:
客户端发起一个普通的 HTTP GET 请求。 - 保持开启:
服务端收到请求后,不立即关闭连接,而是设置响应头:Content-Type: text/event-stream:告诉客户端这是事件流。Cache-Control: no-cache:防止缓存。Connection: keep-alive:保持 TCP 连接开启。
- 数据传输:
服务端通过这个未关闭的连接,不断地发送数据块(Chunk)。
SSE 定义了严格的文本格式:event: message <-- 可选,事件类型 id: 123 <-- 可选,ID data: {"text": "Hello"} <-- 数据负载 <-- 必须有一个空行来表示该条消息结束 - 关闭连接:
当生成结束时,服务端调用res.end()关闭 HTTP 连接,或者发送一个特定的结束标记(如[DONE])让客户端主动关闭EventSource。
总结
LangChain.js 封装了 LLM 复杂的生成逻辑,通过 .stream() 接口暴露了内部的事件流;而 SSE 协议则充当了“管道”,将这些细粒度的事件流丝滑地传输到前端。两者结合,使得构建一个具备 ChatGPT 般流畅体验的 AI 应用变得触手可及。
更多推荐



所有评论(0)