在这里插入图片描述

如何流式传输可运行项

先决条件
本指南假定您熟悉以下概念:

聊天模型
LangChain 表达式语言
输出解析器

此界面提供了两种通用的流式内容方法:

  1. 同步 stream 和异步 astream:从链中流式传输最终输出的 默认实现
  2. 异步 astream_events 和异步 astream_log:这些提供了从链中流式传输 中间步骤最终输出 的方式。

使用流

  1. 统一接口
    • 所有 Runnable 都内置 stream(同步)与 astream(异步)两种流式方法。

  2. 设计目标
    • 一旦某一块(chunk)结果就绪,立即推送给调用方,减少等待时间。

  3. 可行条件
    • 整条链路中的每一步必须能“按块处理”:
    – 输入逐块进入 → 即时产生对应的输出块。
    • 若任何步骤只能一次性处理完整输入,则流式中断。

  4. 复杂度梯度
    • 简单:逐 token 转发 LLM 输出。
    • 复杂:在完整 JSON 尚未生成前,逐步解析并流式返回部分 JSON 片段。

  5. 入门建议
    • 从最核心的 LLM 开始体验流式,再逐步扩展到整条链路的每一步。

LLMs 和聊天模型

核心知识点提炼

  1. 性能瓶颈
    • 大型语言模型(LLM)及其聊天变体是系统性能的主要瓶颈。
    • 单次完整响应耗时通常以秒计,远超用户感知“流畅”的上限(≈200–300 ms)。

  2. 用户感知优化目标
    • 关键策略:让应用“感觉”更响应——即时呈现中间结果,而非等待整段答案。

  3. 实现手段
    • 流式处理(Streaming):逐个 token 实时推送给前端,持续刷新界面。
    • 用聊天模型即可完成流式输出,无需额外架构。

一句话记忆
“LLM 慢于人类耐心;用流式把 token 打点滴,200 ms 内先让用户看到‘正在回答’。”~~~~

让我们从同步 stream API 开始:

另外,如果您在异步环境中工作,您可以考虑使用异步 astream API

from typing import Any
from langchain.chains.qa_with_sources.stuff_prompt import template
from langchain.chat_models import init_chat_model
from pykg import get_config
app_key = get_config("ZAI_API_KEY")
print(app_key)

class _ChatModelZhipu:
    """
    使用聊天模型:openai
     zhipu openai 接入形式,接入模型 封装调用类
    """
    def __init__(self):
        self.llm = init_chat_model(
                model="glm-4.5",
                model_provider="openai",
                temperature=0.5,
                base_url="https://open.bigmodel.cn/api/paas/v4/",
                api_key=get_config("ZAI_API_KEY"),  # 请确保 get_config 能正确获取 API Key
                max_tokens=1024*64
            )

    def get_model(self):
        return self.llm


    def invoke(self,messages:list|str):
        return self.llm.invoke(messages)

    def stream(self,prompt: str):
        """
        同步 stream API
        :param prompt:
        :return:
        """
        chunks = []
        for chunk in self.llm.stream(prompt):
            chunks.append(chunk)
            content = chunk.content if hasattr(chunk, 'content') else str(chunk)
            print(content , end="", flush=True)

    async def astream(self, prompt: str| type):
        """
        异步 astream API
        :param prompt:
        :return:
        """
        chunks = []
        # 修正:使用 self.llm.astream 而不是 model.astream
        async for chunk in self.llm.astream(prompt):  # 修正这一行
            chunks.append(chunk)
            content = chunk.content if hasattr(chunk, 'content') else str(chunk)
            print(content, end="", flush=True)
        return chunks

    async def astream_yield(self,prompt: str | dict[str, Any]):
        """
        异步 astream API
        使用 yield 的主要优势是:
            内存效率:不需要将所有 chunks 存储在内存中
            实时处理:可以立即处理每个 chunk,而不需要等待完整响应
            灵活性:调用者可以决定如何处理每个 chunk
        :param prompt:
        :return:
        """
        # 修正:使用 self.llm.astream 而不是 model.astream
        async for chunk in self.llm.astream(prompt):  # 修正这一行
            yield chunk

import asyncio
model = _ChatModelZhipu()


## 测试main函数
# print(model.invoke("请用中文写一个Hello World"))
# model.stream("请用中文写一个Hello World")

print("开始异步流式传输...")
# chunks =  await model.astream("你好")
# print(f"\nchunk type == {type(chunks[0])}")
# print("\n异步执行完成")

async for chunk in model.astream_yield("你好"):
    # 可以在这里对每个 chunk 进行实时处理
    content = chunk.content if hasattr(chunk, 'content') else str(chunk)
    print(content,end='',flush=True)


让我们检查其中一个片段

print(chunks[0])
chunk type == <class 'langchain_core.messages.ai.AIMessageChunk'>

我们得到了一个名为 AIMessageChunk 的东西。这个块代表了一个 AIMessage 的部分。

消息块按设计是累加的——只需简单地将它们相加即可得到迄今为止的响应状态!

chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4]

链 (Chain)

几乎所有LLM应用程序都涉及比仅调用语言模型更多的步骤。
让我们使用LangChain 表达式语言(LCEL)构建一个简单的链,该语言结合了提示、模型和解析器,并验证流式传输是否正常工作

我们将使用 StrOutputParser 来解析模型的输出。这是一个简单的解析器,它从 content 字段中提取 AIMessageChunk,为我们提供模型返回的 token。

[!NOTE]
LCEL 是一种通过链式组合不同的 LangChain 原语来指定“程序”的声明性方法。使用 LCEL 创建的链可以从自动实现的流和异步流中受益,允许最终输出的流式传输。实际上,使用 LCEL 创建的链实现了整个标准 Runnable 接口。

API 参考:StrOutputParser | ChatPromptTemplate

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

class _ChainChatModel03(_ChatModelZhipu):
    """
    重写 llm 属性以构建链
    """
    def __init__(self):
        # 调用父类初始化
        super().__init__()

        # 保存原始模型
        original_llm = self.llm

        # 创建链组件
        self.prompt = ChatPromptTemplate.from_template("请讲一个关于{in}的故事")
        self.parser = StrOutputParser()

        # 重新定义 llm 为完整的链
        self.llm = self.prompt | original_llm | self.parser

    # 父类的 astream_yield 方法可以直接使用新的链



class _ChainChatModel01(_ChatModelZhipu):
    """
    添加新的链属性而不覆盖 llm
    """
    def __init__(self):
        super().__init__()
        # 保留父类的 self.llm,同时创建新的链属性
        self.prompt = ChatPromptTemplate.from_template("请讲一个关于{topic}的故事")
        self.parser = StrOutputParser()
        self.chain = self.prompt | self.llm | self.parser

    async def astream_chain(self, prompt_data: dict[str, Any]):
        """
        通过链进行异步流式传输
        """
        async for chunk in self.chain.astream(prompt_data):
            yield chunk

    def invoke_chain(self, prompt_data: dict[str, Any]):
        """
        通过链调用
        """
        return self.chain.invoke(prompt_data)


from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import Any

class _ChainChatModel02(_ChatModelZhipu):
    """
    方案一:继承父类并使用父类的 llm
    """
    def __init__(self):
        # 调用父类的初始化方法
        super().__init__()

        # 创建提示模板
        self.prompt = ChatPromptTemplate.from_template("请讲一个关于{topic}的故事")

        # 创建输出解析器
        self.parser = StrOutputParser()

        # 构建链:提示模板 | 模型 | 解析器
        # 使用父类的 self.llm
        self.chain = self.prompt | self.llm | self.parser

    async def astream_yield(self, prompt_data: dict[str, Any]):
        """
        异步流式传输,使用父类的模型
        """
        async for chunk in self.chain.astream(prompt_data):
            yield chunk

    def invoke(self, prompt_data: dict[str, Any]):
        """
        调用链
        """
        return self.chain.invoke(prompt_data)

class _ChainChatModel:
    def __init__(self):
       self.llm = init_chat_model(
                model="glm-4.5",
                model_provider="openai",
                temperature=0.5,
                base_url="https://open.bigmodel.cn/api/paas/v4/",
                api_key=get_config("ZAI_API_KEY"),  # 请确保 get_config 能正确获取 API Key
                max_tokens=1024*64
            )

    async def astream_yield(self,prompt1: str | dict[str, Any]):
        """
        异步 astream API
        使用 yield 的主要优势是:
            内存效率:不需要将所有 chunks 存储在内存中
            实时处理:可以立即处理每个 chunk,而不需要等待完整响应
            灵活性:调用者可以决定如何处理每个 chunk
        :param prompt:
        :return:
        """

        prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
        parser = StrOutputParser()
        chain = prompt | self.llm | parser

        async for chunk in chain.astream(prompt1):
             yield chunk
        # # 修正:使用 self.llm.astream 而不是 model.astream
        # async for chunk in self.llm.astream(prompt):  # 修正这一行
        #     yield chunk

# model = _ChainChatModel02()
model = _ChainChatModel03()
## 测试main函数
# print(model.invoke({"input":"小偷和农夫"}))

async for chunk in model.astream_yield({"in":"小偷和农夫"}):
    # 可以在这里对每个 chunk 进行实时处理
    content = chunk.content if hasattr(chunk, 'content') else str(chunk)
    print(content,end='',flush=True)
# 使用添加新的链属性而不覆盖 llm
model = _ChainChatModel01()

# 测试 invoke 方法
# result = model.invoke({"topic": "小偷和农夫"})
# print(result)

# 测试流式传输
print("开始流式传输...")
async for chunk in model.astream_chain({"topic": "小偷和农夫"}):
    content = chunk.content if hasattr(chunk, 'content') else str(chunk)
    print(content, end='', flush=True)

print("\n流式传输完成")
请注意,尽管我们在上述链的末尾使用了解析器,但我们仍然获得了流式输出。该解析器对每个流式块单独操作。许多`LCEL` 原语也支持这种转换风格的透传流式,这在构建应用程序时非常方便。

与输入流一起工作

如果您想边生成边将 JSON 输出流式传输,会怎样?

如果依赖json.loads来解析部分 JSON,解析将失败,因为部分 JSON 不是有效的 JSON。

你可能会完全不知所措,声称无法流式传输 JSON。

好吧,原来有一种方法可以实现——解析器需要在上操作,并尝试将部分 json“自动完成”到一个有效状态。

让我们看看这样的解析器是如何工作的,以了解这意味着什么。

API 参考:JsonOutputParser

from langchain_core.output_parsers import JsonOutputParser

class _ChatModeJsonParser:
    def __init__(self):
        self.llm = init_chat_model(
                model="glm-4.5",
                model_provider="openai",
                temperature=0.5,
                base_url="https://open.bigmodel.cn/api/paas/v4/",
                api_key=get_config("ZAI_API_KEY"),  # 请确保 get_config 能正确获取 API Key
                max_tokens=1024
        )
        self.chain = (self.llm | JsonOutputParser() | self._extract_country_names)

        # A function that operates on finalized inputs
    # rather than on an input_stream
    def _extract_country_names(_ChatModeJsonParser, inputs):
        print(inputs,_ChatModeJsonParser)
        """A function that does not operates on input streams and breaks streaming."""
        if not isinstance(inputs, dict):
            return ""

        if "countries" not in inputs:
            return ""

        countries = inputs["countries"]

        if not isinstance(countries, list):
            return ""

        country_names = [
            country.get("name") for country in countries if isinstance(country, dict)
        ]
        return country_names


    async def astream(self):
        # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
        async for text in  self.chain.astream(
              "output a list of the countries france, spain and japan and their populations in JSON format. "
                'Use a dict with an outer key of "countries" which contains a list of countries. '
                "Each country should have the key `name` and `population`"
        ):
            print(text, end="|", flush=True)

model = _ChatModeJsonParser()
await model.astream()

# Output : ['France', 'Spain', 'Japan']|

生成器函数

让我们使用一个可以操作 输入流 的生成器函数来修复流。

生成器函数(使用 yield 的函数)允许编写操作 输入流 的代码

from langchain_core.output_parsers import JsonOutputParser

class _ChatModeJsonParserStream:
    def __init__(self):
        self.llm = init_chat_model(
                model="glm-4.5",
                model_provider="openai",
                temperature=0.5,
                base_url="https://open.bigmodel.cn/api/paas/v4/",
                api_key=get_config("ZAI_API_KEY"),  # 请确保 get_config 能正确获取 API Key
                max_tokens=1024
        )
        self.chain = (self.llm | JsonOutputParser() | self._extract_country_names_streaming)

    async def _extract_country_names_streaming(dataType,input_stream):
        """A function that operates on input streams.
         dataType:  前一个JSON Parse对象
         input_stream : Json 输出对象
        """
        country_names_so_far = set()

        async for input in input_stream:
            if not isinstance(input, dict):
                continue

            if "countries" not in input:
                continue

            countries = input["countries"]

            if not isinstance(countries, list):
                continue

            for country in countries:
                name = country.get("name")
                if not name:
                    continue
                if name not in country_names_so_far:
                    yield name
                    country_names_so_far.add(name)


    async def astream(self):
        # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
        async for text in  self.chain.astream(
              "output a list of the countries france, spain and japan and their populations in JSON format. "
                'Use a dict with an outer key of "countries" which contains a list of countries. '
                "Each country should have the key `name` and `population`"
        ):
            print(text, end="/", flush=True)

model = _ChatModeJsonParserStream()
await model.astream()


非流式组件

一些内置组件如检索器不提供任何流式传输。如果我们尝试流式传输它们会发生什么?🤨

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain.chat_models import init_chat_model
from pykg import get_config,embeddings_scill
# 硅基流动的嵌入模型
# 初始化智谱嵌入模型
from langchain_openai import OpenAIEmbeddings
print(get_config("SCILL_BASE_URL"),get_config("SCILL_API_KEY"),)

model = init_chat_model(
                model="Qwen/Qwen3-8B",
                model_provider="openai",
                temperature=0.5,
                base_url=get_config("SCILL_BASE_URL"),
                api_key=get_config("SCILL_API_KEY"),  # 请确保 get_config 能正确获取 API Key
                max_tokens=1024
        )

embeddings_scill.validate_environment()

# embeddings_scill.embed_query("hello world")



template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)


vectorstore = FAISS.from_texts(
    ["harrison worked at kensho", "harrison likes spicy food"],
    embedding=embeddings_scill,
)
retriever = vectorstore.as_retriever()

chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
print(chunks)

retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | model
    | StrOutputParser()
)

for chunk in retrieval_chain.stream(
    "Where did harrison work? Write 3 made up sentences about this place.使用中文回答"
):
    print(chunk, end="", flush=True)

现在我们已经了解了stream和astream的工作原理,让我们进入流式事件的世界。🏞️


在这里插入图片描述

使用流事件

事件流是一个 beta API。此 API 可能会根据反馈进行一些调整。
本指南演示了 V2 API,并需要 langchain-core >= 0.2。对于与 LangChain 旧版本兼容的 V1 API,请参阅 此处。

为了使 astream_events API 正确工作:

  • 尽可能在整个代码中使用async(例如,异步工具等)
  • 传播回调,如果定义自定义函数/可运行项
  • 每次使用不带 LCEL 的 runnables 时,请确保在LLMs上调用.astream()而不是调用.ainvoke,以强制LLM流式传输令牌。
  • 让我们知道如果有什么不符合预期! 😃

事件参考

当流式传输正确实现时,可运行对象的输入将在完全消耗输入流之后才会知晓。这意味着通常仅在结束事件中包含输入,而不是在开始事件中。

这个表格展示了LangChain中不同组件的事件流,包括聊天模型、LLM、链、工具、检索器和提示模板等组件在不同阶段触发的事件类型及其相关的输入输出信息。

event name chunk input output
on_chat_model_start [model name] {“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream [model name] AIMessageChunk(content=“hello”)
on_chat_model_end [model name] {“messages”: [[SystemMessage, HumanMessage]]} AIMessageChunk(content=“hello world”)
on_llm_start [model name] {‘input’: ‘hello’}
on_llm_stream [model name] ‘Hello’
on_llm_end [model name] ‘Hello human!’
on_chain_start format_docs
on_chain_stream format_docs “hello world!, goodbye world!”
on_chain_end format_docs [Document(…)] “hello world!, goodbye world!”
on_tool_start some_tool {“x”: 1, “y”: “2”}
on_tool_end some_tool {“x”: 1, “y”: “2”}
on_retriever_start [retriever name] {“query”: “hello”}
on_retriever_end [retriever name] {“query”: “hello”} [Document(…), …]
on_prompt_start [template_name] {“question”: “hello”}
on_prompt_end [template_name] {“question”: “hello”} ChatPromptValue(messages: [SystemMessage, …])

聊天模型

让我们首先看看聊天模型产生的事件。

events = []
async for event in model.astream_events("hello"):
    events.append(event)
注意
对于 langchain-core<0.3.37,请显式设置 version 参数(例如, model.astream_events("hello", version="v2") )。

让我们看看一些开始事件和结束事件。

from pykg import openai_scill_chat_model as model

events = []
chain = model
async for event in model.astream_events("hello"):
    events.append(event)
print(events[:3])

print(events[-2:])

让我们重新审视那个解析流式 JSON 的示例链,以探索流式事件 API。

from langchain_core.output_parsers import JsonOutputParser
from pykg import openai_scill_chat_model as model
chain = (
    model | JsonOutputParser()
)  # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models

events = [
    event
    async for event in chain.astream_events(
        "output a list of the countries france, spain and japan and their populations in JSON format. "
        'Use a dict with an outer key of "countries" which contains a list of countries. '
        "Each country should have the key `name` and `population`",
    )
]
print(events)

如果您查看前几个事件,您会注意到有 3 个不同的起始事件,而不是 2 个起始事件。

三个起始事件对应于:

  • 链(模型 + 解析器)
  • 该模型
  • 解析器
print(events[:3])

你认为如果你查看最后三个事件会看到什么?中间的呢?

让我们使用这个 API 来输出模型和解析器的流事件。我们正在忽略开始事件、结束事件和链事件。

num_events = 0

async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        if event['data']['chunk'].content == '':
            continue
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
    num_events += 1

Chat model chunk: '",\n'
Chat model chunk: '     '
Chat model chunk: ' "'
Chat model chunk: 'population'
Chat model chunk: '":'
Chat model chunk: ' '
Chat model chunk: '1'
Parser chunk: {'countries': [{'name': 'France', 'population': 65300000}, {'name': 'Spain', 'population': 47400000}, {'name': 'Japan', 'population': 1}]}
Chat model chunk: '2'
Parser chunk: {'countries': [{'name': 'France', 'population': 65300000}, {'name': 'Spain', 'population': 47400000}, {'name': 'Japan', 'population': 12}]}
Chat model chunk: '5'
Parser chunk: {'countries': [{'name': 'France', 'population': 65300000}, {'name': 'Spain', 'population': 47400000}, {'name': 'Japan', 'population': 125}]}
Chat model chunk: '5'
Parser chunk: {'countries': [{'name': 'France', 'population': 65300000}, {'name': 'Spain', 'population': 47400000}, {'name': 'Japan', 'population': 1255}]}
Chat model chunk: '0'
Parser chunk: {'countries': [{'name': 'France', 'population': 65300000}, {'name': 'Spain', 'population': 47400000}, {'name': 'Japan', 'population': 12550}]}
Chat model chunk: '0'
Parser chunk: {'countries': [{'name': 'France', 'population': 65300000}, {'name': 'Spain', 'population': 47400000}, {'name': 'Japan', 'population': 125500}]}
Chat model chunk: '0'
Parser chunk: {'countries': [{'name': 'France', 'population': 65300000}, {'name': 'Spain', 'population': 47400000}, {'name': 'Japan', 'population': 1255000}]}
Chat model chunk: '0'
Parser chunk: {'countries': [{'name': 'France', 'population': 65300000}, {'name': 'Spain', 'population': 47400000}, {'name': 'Japan', 'population': 12550000}]}
Chat model chunk: '0'
Parser chunk: {'countries': [{'name': 'France', 'population': 65300000}, {'name': 'Spain', 'population': 47400000}, {'name': 'Japan', 'population': 125500000}]}
Chat model chunk: '\n'
Chat model chunk: '   '
Chat model chunk: ' }\n'
Chat model chunk: ' '
Chat model chunk: ' ]\n'
Chat model chunk: '}'

因为模型和解析器都支持流式处理,所以我们实时地看到了这两个组件的流式事件!有点酷,不是吗?🦜

过滤事件

因为此 API 产生了很多事件,所以能够对事件进行筛选是有用的。

您可以按组件名称name、组件标签或组件类型进行筛选。

按名称
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"} # 1 Add a run name to the parser for filter
)

max_events = 0
async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    include_names=["my_parser"], # 2 Filter by run name
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

通过类型
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    include_types=["chat_model"], # Only include events of type "chat_model"
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break
通过标签
title: 谨慎
标签由给定可运行组件的子组件继承。

如果您正在使用标签进行筛选,请确保这是您想要的。
chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]}) #1. 添加标签

max_events = 0
async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    include_tags=["my_chain"], # 2. 筛选标签
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

非流式组件

记得有些组件因为不操作于输入流而无法很好地流式传输吗?

title: 重要
虽然这些组件在使用astream时可能会打断最终输出的流,但astream_events仍然会从支持流的中间步骤产生流式事件!
# Function that does not support streaming.
# It operates on the finalizes inputs rather than
# operating on the input stream.
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names


chain = (
    model | JsonOutputParser() | _extract_country_names
)  # This parser only works with OpenAI right now

如预期,astream API 无法正确工作(非流式输出),因为_extract_country_names不作用于流(提取最终的内容json信息)

async for chunk in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
):
    print(chunk, flush=True)

现在,让我们确认一下,使用 astream_events 我们是否仍然从模型和解析器中看到流式输出。【可以流式输出,因为中间过程是流】

num_events = 0

async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
    num_events += 1
    if num_events > 30:
        # Truncate the output
        print("...")
        break

传播回调

如果您在工具中使用调用可运行项,需要将回调传播到可运行项;否则,

使用RunnableLambdas或@chain装饰器时,回调将在幕后自动传播。

from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool


def reverse_word(word: str):
    return word[::-1]


reverse_word = RunnableLambda(reverse_word)


@tool
def bad_tool(word: str):
    """Custom tool that doesn't propagate callbacks."""
    return reverse_word.invoke(word)


async for event in bad_tool.astream_events("hello"):
    print(event)

API 参考:RunnableLambda | 工具

这里是一个正确传播回调的重实现。您会注意到现在我们不仅从 reverse_word 可运行项中获取事件。

@tool
def correct_tool(word: str, callbacks):
    """A tool that correctly propagates callbacks."""
    return reverse_word.invoke(word, {"callbacks": callbacks})


async for event in correct_tool.astream_events("hello"):
    print(event)

如果您在 Runnable Lambda 或@chains内部调用 runnables,那么回调将自动代为传递。

from langchain_core.runnables import RunnableLambda


async def reverse_and_double(word: str):
    return await reverse_word.ainvoke(word) * 2


reverse_and_double = RunnableLambda(reverse_and_double)

await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234"):
    print(event)

并且使用 @chain 装饰器:

from langchain_core.runnables import chain


@chain
async def reverse_and_double(word: str):
    return await reverse_word.ainvoke(word) * 2


await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234"):
    print(event)

现在你已经学会了使用 LangChain 流式传输最终输出和内部步骤的一些方法。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐