在构建大语言模型(LLM)应用时,开发者常面临一个棘手的性能瓶颈:大模型的推理存在固有的延迟,尤其是当涉及长文本生成或复杂的 Agent 推理链(Thinking Process)时。如果采用传统的“请求-响应”模式,用户必须面对长时间的空白加载界面,这不仅极其耗时,更会严重损害用户体验(UX)。

流式输出(Streaming) 是解决这一痛点的关键技术。它允许服务端将生成的内容以“数据流”的形式,按 Token 分块实时推送给客户端,从而极大地降低了首字响应时间(TTFT, Time To First Token),让用户感受到“即时响应”的交互体验。

本文将深入探讨如何在 LangChain.js 生态中实现 Agent 和 LangGraph 的流式输出,并解析其底层依赖的 SSE(Server-Sent Events) 协议原理。


一、 Agent 流式输出

1.1 流式输出简介

在 LangChain 的架构中,Agent 的执行过程远比单一文本生成复杂。它包含了一个完整的认知循环:思考(Thought) -> 调用工具(Action) -> 观察结果(Observation) -> 最终回答(Final Answer)

传统的 .invoke() 方法是一个黑盒,只能在所有步骤完成后返回最终结果。但在实际业务中,我们往往需要向用户展示 Agent 的思考过程(例如:“正在检索 Google…”、“正在分析数据…”),以增加系统的透明度和可信度。

LangChain 提供的 .stream() 方法正是为此设计,它支持多种流式模式,其中最常用的是:

  • updates 模式:流式传输 Agent 执行过程中每个节点(Node)的状态更新,适合监控 Agent 的每一步动作。
  • messages 模式:流式传输整个对话过程中生成的消息(MessageChunk),适合实现类似 ChatGPT 的“打字机”效果。

1.2 代码示例

我们将构建一个简单的 Agent,包含服务端(Node.js/Express)和客户端(原生 JS)。

服务端 (Node.js + Express + LangChain)
import express from "express";
import { ChatOpenAI } from "@langchain/openai";
import { createAgent, tool } from "langchain";
import { z } from "zod";

const app = express();

const model = new ChatOpenAI({
  temperature: 0.7,
  model: "", 
  configuration: { baseURL: "https://ark.cn-beijing.volces.com/api/v3" },
  apiKey: "",
});

const search = tool(
  async ({ query }) => {
    // 2s后返回结果
    await new Promise((resolve) => setTimeout(resolve, 2000));
    return `
    搜索结果:这里是关于"${query}"的搜索结果摘要。
    - 头部模型(Gemini 3、Claude 4、DeepSeek-R1 等)全面转向原生多模态,支持文本 / 图像 / 视频 / 音频 / 3D / 代码的端到端统一处理,上下文窗口普遍达百万 token 级,推理成本较 2023 年骤降 99.9% 以上。
    - DeepSeek-R1 以纯强化学习路径实现顶尖推理,训练成本大幅降低并开源,推动全球竞争从 “算力比拼” 转向 “算法与工程效率”。
    - MoE 架构成为主流,模型性能与体积 “剪刀差” 扩大,小参数量模型也能实现高性能。
    `;
  },
  {
    name: "search_tool",
    description: "一个用于搜索信息的工具",
    schema: z.object({ query: z.string() }),
  }
);

app.get("/chat", async (req, res) => {
  const agent = createAgent({
    model,
    tools: [search],
  });

  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  try {
    // streamMode: "messages":打字机效果,分段吐字
    for await (const [token, metadata] of await agent.stream(
      { messages: [{ role: "user", content: "请帮我搜索关于人工智能的最新发展。" }] },
      { streamMode: "messages" }
    )) {
      console.log(`node: ${metadata.langgraph_node}`);
      // node: model_request | tools
      console.log(`content: ${JSON.stringify(token.contentBlocks, null, 2)}`);
      // content: [
      //   {
      //     "type": "text",
      //     "text": "显著"
      //   }
      // ]
      res.write(`data: ${token.contentBlocks}\n\n`);
    }

    // streamMode: "updates":打字机效果,分段吐字
    for await (const chunk of await agent.stream(
      {
        messages: [
          { role: "user", content: "请帮我搜索关于人工智能的最新发展。" },
        ],
      },
      { streamMode: "updates" }
    )) {
      const [step, content] = Object.entries(chunk)[0];
      console.log(`step: ${step}`);
      if (content.messages?.[0]?.content) {
        console.log(
          `messages: ${JSON.stringify(content.messages?.[0]?.content, null, 2)}`
        );
      } else {
        console.log(`messages: ${JSON.stringify(content.messages, null, 2)}`);
      }

      res.write(`data: ${chunk}\n\n`);
    }

    res.write("data: [DONE]\n\n");
  } catch (error: any) {
    console.error("Error during agent execution:", error);
    res.write(`data: Error: ${error.message}\n\n`);
  } finally {
    res.end();
  }
});

app.listen(3000, () => {
  console.log("Server is running on http://localhost:3000");
});
客户端 (Frontend Fetch)
import http from "http";

const options = {
  hostname: "localhost",
  port: 3000,
  path: "/chat",
  method: "GET",
  headers: {
    "Content-Type": "application/json",
  },
};

const req = http.request(options, (res) => {
  console.log(`Status: ${res.statusCode}`);
  console.log(`Headers: ${JSON.stringify(res.headers)}`);

  let data = "";

  res.on("data", (chunk) => {
    data += chunk;
    console.log(`Received chunk: ${chunk}`);
  });

  res.on("end", () => {
    console.log("Stream ended");
    console.log("Full response:", data);
  });
});

req.on("error", (error) => {
  console.error("Request error:", error);
});

req.end();

console.log("Request sent to http://localhost:3000/chat");

二、 LangGraph 流式输出

2.1 流式输出简介

LangGraph 是 LangChain 体系下用于构建有状态、多角色(Multi-Actor)应用的编排引擎。如果说 Agent 是单一的大脑,LangGraph 就是协调多个大脑和工具的工作流(Workflow)。

LangGraph 的流式输出机制与 Agent 类似,底层同样依赖 .stream()

2.2 代码示例

服务端 (Node.js + LangGraph)
import { StateGraph, END } from "@langchain/langgraph";
// ... 假设已定义好 State 和 Nodes (agentNode, toolNode)

// 定义图
const workflow = new StateGraph(GraphState)
  .addNode("agent", agentNode)
  .addNode("tools", toolNode)
  .addEdge("start", "agent")
  .addEdge("agent", "tools") // 简化版逻辑
  .addEdge("tools", "agent");

const app = workflow.compile();

// Express 路由
app.get("/stream-graph", async (req, res) => {
  res.setHeader("Content-Type", "text/event-stream");
  // ... 其他 header

  const inputs = { messages: [new HumanMessage(req.query.input)] };

  // 使用 .stream 方法
  const stream = await app.stream(inputs, {
    streamMode: "updates", // 可选: 'values', 'updates'
  });

  for await (const chunk of stream) {
    // LangGraph 的 chunk 通常是包含节点更新的对象
    res.write(`data: ${JSON.stringify(chunk)}\n\n`);
  }
  
  res.write("data: [DONE]\n\n");
  res.end();
});
客户端

客户端逻辑与 Agent 类似,主要是解析的数据结构不同。LangGraph 的 chunk 通常是 {[nodeName]: { ...stateUpdate }} 的形式。

// 在处理逻辑中
if (jsonStr !== "[DONE]") {
    const chunk = JSON.parse(jsonStr);
    // 例如:chunk = { agent: { messages: [...] } }
    if (chunk.agent) {
        console.log("Agent 节点完成了思考");
    } else if (chunk.tools) {
        console.log("Tool 节点完成了执行");
    }
}

三、 SSE 协议 (Server-Sent Events)

3.1 协议介绍

我们上面代码中反复出现的 Content-Type: text/event-stream 就是 SSE 的核心标志。

SSE (Server-Sent Events) 是一种基于 HTTP 的服务器推送技术。与 WebSocket 不同,SSE 是 单向 的(Server -> Client)。

  • 适用场景:LLM 流式响应、股票行情更新、新闻推送。
  • 对比 WebSocket
    • WebSocket 是全双工(双向),协议复杂,不仅限于 HTTP。
    • SSE 只是 HTTP 长连接,更轻量,更易于穿越防火墙,且浏览器原生支持自动重连。

对于 LLM 这种“发一个 Prompt,回一大段流式文本”的场景,SSE 是最完美、最轻量的选择。

3.2 通信原理

SSE 的工作流程非常简单,遵循 HTTP 协议:
在这里插入图片描述

  1. 建立连接
    客户端发起一个普通的 HTTP GET 请求。
  2. 保持开启
    服务端收到请求后,不立即关闭连接,而是设置响应头:
    • Content-Type: text/event-stream:告诉客户端这是事件流。
    • Cache-Control: no-cache:防止缓存。
    • Connection: keep-alive:保持 TCP 连接开启。
  3. 数据传输
    服务端通过这个未关闭的连接,不断地发送数据块(Chunk)。
    SSE 定义了严格的文本格式:
    event: message         <-- 可选,事件类型
    id: 123                <-- 可选,ID
    data: {"text": "Hello"} <-- 数据负载
                           <-- 必须有一个空行来表示该条消息结束
    
  4. 关闭连接
    当生成结束时,服务端调用 res.end() 关闭 HTTP 连接,或者发送一个特定的结束标记(如 [DONE])让客户端主动关闭 EventSource

总结

LangChain.js 封装了 LLM 复杂的生成逻辑,通过 .stream() 接口暴露了内部的事件流;而 SSE 协议则充当了“管道”,将这些细粒度的事件流丝滑地传输到前端。两者结合,使得构建一个具备 ChatGPT 般流畅体验的 AI 应用变得触手可及。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐