本系列共 4 部分,从输出数据结构到 Output Parser 全家族,完整拆解 LangChain 的"模型输出 → 结构化结果"链路。

  • 第 1 部分:5 分钟上手——Generation 与 ChatGeneration
  • 第 2 部分:Output Parser 基类体系
  • 第 3 部分:常用 Parser 全解
  • 第 4 部分:Tool Parser 与完整数据流

LangChain Outputs 深度解析(一):5 分钟上手——Generation 与 ChatGeneration

一个能跑的例子

from langchain_core.outputs import (
    Generation, GenerationChunk,
    ChatGeneration, ChatGenerationChunk,
    ChatResult, LLMResult,
)
from langchain_core.messages import AIMessage, AIMessageChunk

# 1. 最基础的 Generation
gen = Generation(text="Hello world", generation_info={"finish_reason": "stop"})
print(gen.text)             # Hello world
print(gen.generation_info)  # {'finish_reason': 'stop'}

# 2. ChatGeneration —— text 自动从 message 提取
chat_gen = ChatGeneration(message=AIMessage(content="你好!"))
print(chat_gen.text)            # 你好!  ← 自动设置
print(chat_gen.message.content) # 你好!

# 3. ChatResult —— _generate 的返回值
result = ChatResult(
    generations=[chat_gen],
    llm_output={"token_usage": {"total_tokens": 10}},
)
print(len(result.generations))  # 1
print(result.llm_output)       # {'token_usage': {'total_tokens': 10}}

# 4. LLMResult —— 二维结构
llm_result = LLMResult(
    generations=[[chat_gen], [Generation(text="another")]],
    llm_output={"model": "fake"},
)
print(len(llm_result.generations))     # 2(两个 prompt 的结果)
print(len(llm_result.generations[0]))  # 1(第一个 prompt 的候选数)

# 5. GenerationChunk 拼接
c1 = GenerationChunk(text="Hel", generation_info={"a": 1})
c2 = GenerationChunk(text="lo", generation_info={"b": 2})
merged = c1 + c2
print(merged.text)             # Hello
print(merged.generation_info)  # {'a': 1, 'b': 2}

# 6. ChatGenerationChunk 拼接
cc1 = ChatGenerationChunk(message=AIMessageChunk(content="你"))
cc2 = ChatGenerationChunk(message=AIMessageChunk(content="好"))
merged_chat = cc1 + cc2
print(merged_chat.text)             # 你好
print(merged_chat.message.content)  # 你好

不需要 API key,不需要网络。这些都是纯数据结构,可以直接构造和操作。


Generation:最基础的输出单元

Generation 是所有输出类型的根基。源码只有 3 个字段(generation.py:11):

# outputs/generation.py:11-53
class Generation(Serializable):
    """A single text generation output."""

    text: str
    """Generated text output."""

    generation_info: dict[str, Any] | None = None
    """Raw response from the provider.
    May include things like the reason for finishing or token log probabilities.
    """

    type: Literal["Generation"] = "Generation"
    """Type is used exclusively for serialization purposes."""
字段 类型 作用
text str 生成的文本内容
generation_info dict | None 供应商特有的元数据(finish_reason、log_probs 等)
type Literal 序列化标识,固定为 "Generation"

Generation 继承自 Serializable(即 Pydantic BaseModel + LangChain 序列化协议),所以可以 model_dump() / model_dump_json()


GenerationChunk:可拼接的流式 Generation

GenerationChunk 继承 Generation,唯一新增的是 __add__ 方法(generation.py:55):

# outputs/generation.py:55-81
class GenerationChunk(Generation):
    def __add__(self, other: GenerationChunk) -> GenerationChunk:
        if isinstance(other, GenerationChunk):
            generation_info = merge_dicts(
                self.generation_info or {},
                other.generation_info or {},
            )
            return GenerationChunk(
                text=self.text + other.text,            # ← text 直接拼接
                generation_info=generation_info or None, # ← generation_info 用 merge_dicts 合并
            )
        raise TypeError(...)

关键点:

  • text 直接字符串拼接
  • generation_infomerge_dicts 深度合并(来自 langchain_core.utils._merge
  • 只接受同类型的 GenerationChunk,否则抛 TypeError

ChatGeneration:消息级别的输出

ChatGeneration 继承 Generation,新增 message 字段(chat_generation.py:17):

# outputs/chat_generation.py:17-73
class ChatGeneration(Generation):
    text: str = ""    # ← 默认空字符串,不需要手动设置
    message: BaseMessage
    type: Literal["ChatGeneration"] = "ChatGeneration"

    @model_validator(mode="after")
    def set_text(self) -> Self:
        """从 message.content 自动提取 text。"""
        text = ""
        if isinstance(self.message.content, str):
            text = self.message.content                    # ← 字符串直接用
        elif isinstance(self.message.content, list):
            for block in self.message.content:
                if isinstance(block, str):
                    text = block                           # ← 列表中第一个字符串
                    break
                if isinstance(block, dict) and "text" in block:
                    block_type = block.get("type")
                    if block_type is None or block_type == "text":
                        text = block["text"]               # ← 第一个 text 类型的 block
                        break
        self.text = text
        return self

set_text 是一个 Pydantic model_validator,在对象创建后自动执行。它的逻辑:

message.content 是什么类型?
    │
    ├─ str → 直接用
    │
    └─ list → 遍历 content blocks
                │
                ├─ 遇到 str → 用它,break
                │
                └─ 遇到 dict 且有 "text" 键
                    │
                    ├─ type 为 None 或 "text" → 用 block["text"],break
                    │
                    └─ type 为 "thinking"/"reasoning" 等 → 跳过

为什么要跳过 thinking/reasoning block?因为 Anthropic 等模型的 extended thinking 返回的 content 是 [{"type": "thinking", ...}, {"type": "text", ...}]text 字段应该取实际回复,而不是推理过程。


ChatGenerationChunk:流式 Chat 输出

ChatGenerationChunk 继承 ChatGenerationmessage 类型收窄为 BaseMessageChunkchat_generation.py:76):

# outputs/chat_generation.py:76-127
class ChatGenerationChunk(ChatGeneration):
    message: BaseMessageChunk          # ← 类型从 BaseMessage 收窄为 BaseMessageChunk

    type: Literal["ChatGenerationChunk"] = "ChatGenerationChunk"

    def __add__(
        self, other: ChatGenerationChunk | list[ChatGenerationChunk]
    ) -> ChatGenerationChunk:
        if isinstance(other, ChatGenerationChunk):                  # ← 单个 chunk
            generation_info = merge_dicts(
                self.generation_info or {},
                other.generation_info or {},
            )
            return ChatGenerationChunk(
                message=self.message + other.message,               # ← 委托给 BaseMessageChunk.__add__
                generation_info=generation_info or None,
            )
        if isinstance(other, list) and all(                         # ← chunk 列表
            isinstance(x, ChatGenerationChunk) for x in other
        ):
            generation_info = merge_dicts(
                self.generation_info or {},
                *[chunk.generation_info for chunk in other if chunk.generation_info],
            )
            return ChatGenerationChunk(
                message=self.message + [chunk.message for chunk in other],
                generation_info=generation_info or None,
            )
        raise TypeError(...)

GenerationChunk 的区别:

  1. 拼接 message 而非 texttextset_text validator 自动从合并后的 message 提取
  2. 支持列表拼接chunk + [chunk1, chunk2, ...] 一次性合并多个(message.__add__ 也支持列表)
  3. message 类型:必须是 BaseMessageChunk(有 __add__ 能力的消息)

merge_chat_generation_chunks:便捷合并函数

merge_chat_generation_chunkschat_generation.py:129-146)是合并流式 chunk 列表的便捷函数:

# outputs/chat_generation.py:129-146
def merge_chat_generation_chunks(
    chunks: list[ChatGenerationChunk],
) -> ChatGenerationChunk | None:
    if not chunks:          # 空列表 → None
        return None
    if len(chunks) == 1:    # 单个 → 直接返回
        return chunks[0]
    return chunks[0] + chunks[1:]  # 多个 → 利用列表拼接一次性合并

这个函数在 ChatModel 的 generate_from_streamchat_models.py)中被调用——把 _stream 产生的所有 chunk 合并成最终的 ChatGeneration

_stream() yield chunk1, chunk2, chunk3, ...
    │
    ▼
收集到 list[ChatGenerationChunk]
    │
    ▼
merge_chat_generation_chunks([chunk1, chunk2, chunk3])
    │
    ▼
chunk1 + [chunk2, chunk3]  → 最终的 ChatGenerationChunk
    │
    ▼
包装成 ChatResult → 作为 generate() 的返回值

ChatResult:_generate 的返回值

ChatResult_generate 方法的返回类型(chat_result.py:8):

# outputs/chat_result.py:8-37
class ChatResult(BaseModel):
    generations: list[ChatGeneration]
    """List of the chat generations.
    Generations is a list to allow for multiple candidate generations
    for a single input prompt.
    """

    llm_output: dict | None = None
    """For arbitrary LLM provider specific output."""
字段 类型 作用
generations list[ChatGeneration] 候选输出列表(通常只有 1 个,n>1 时有多个)
llm_output dict | None 供应商特有信息(token 用量、模型版本等)

注意:ChatResult 继承的是 pydantic.BaseModel不是 Serializable。它是内部数据传递用的,不需要 LangChain 序列化协议。

用户通常不直接接触 ChatResult,它在 invoke 的调用栈中被拆解:

invoke()
  → generate()
    → _generate() 返回 ChatResult
  → ChatResult.generations[0].message  ← 提取第一个候选的 AIMessage
→ 返回 AIMessage 给用户

LLMResult:二维容器

LLMResult 是更高层的容器(llm_result.py:15),支持多 prompt 多候选的二维结构:

# outputs/llm_result.py:15-112
class LLMResult(BaseModel):
    generations: list[
        list[Generation | ChatGeneration | GenerationChunk | ChatGenerationChunk]
    ]
    """第一维:不同输入 prompt
       第二维:同一 prompt 的不同候选"""

    llm_output: dict | None = None
    run: list[RunInfo] | None = None  # ← 已废弃
    type: Literal["LLMResult"] = "LLMResult"

二维结构示意:

LLMResult.generations = [
    [ChatGeneration("回答1a"), ChatGeneration("回答1b")],  ← prompt 1 的候选
    [ChatGeneration("回答2a")],                            ← prompt 2 的候选
]

flatten():展平为一维

# outputs/llm_result.py:59-93
def flatten(self) -> list[LLMResult]:
    llm_results = []
    for i, gen_list in enumerate(self.generations):
        if i == 0:
            llm_results.append(
                LLMResult(generations=[gen_list], llm_output=self.llm_output)
            )
        else:
            if self.llm_output is not None:
                llm_output = deepcopy(self.llm_output)
                llm_output["token_usage"] = {}  # ← 避免重复计算 token
            else:
                llm_output = None
            llm_results.append(
                LLMResult(generations=[gen_list], llm_output=llm_output)
            )
    return llm_results

注意第 74 行的注释 Avoid double counting tokens in OpenAICallback:只有第一个 LLMResult 保留 token_usage,其余的清空,避免在回调中重复统计。

eq:忽略 run 元数据

# outputs/llm_result.py:95-111
def __eq__(self, other: object) -> bool:
    if not isinstance(other, LLMResult):
        return NotImplemented
    return (
        self.generations == other.generations
        and self.llm_output == other.llm_output
        # ← 故意不比较 self.run,因为 run_id 每次不同
    )

__hash__ = None  # ← 不可哈希,因为包含 list

RunInfo:即将废弃

RunInforun_info.py:10)只有一个字段:

# outputs/run_info.py:10-22
class RunInfo(BaseModel):
    """Defined for backwards compatibility with older versions.
    This model will likely be deprecated in the future.
    """
    run_id: UUID

现在 run_id 信息可以通过回调或 astream_events API 获取,RunInfo 已经不再需要。


继承关系总览

Serializable (Pydantic BaseModel + LC 序列化)
    │
    ├── Generation
    │     │   text: str
    │     │   generation_info: dict | None
    │     │
    │     ├── GenerationChunk
    │     │     + __add__(GenerationChunk) → GenerationChunk
    │     │
    │     └── ChatGeneration
    │           │   message: BaseMessage
    │           │   + set_text validator(自动从 message 提取 text)
    │           │
    │           └── ChatGenerationChunk
    │                 message: BaseMessageChunk(收窄类型)
    │                 + __add__(ChatGenerationChunk | list) → ChatGenerationChunk


BaseModel (纯 Pydantic)
    │
    ├── ChatResult
    │     generations: list[ChatGeneration]
    │     llm_output: dict | None
    │
    ├── LLMResult
    │     generations: list[list[Generation]](二维)
    │     llm_output: dict | None
    │     run: list[RunInfo] | None
    │
    └── RunInfo
          run_id: UUID

与 ChatModel 的关系

这些数据结构在 ChatModel 调用链中的位置:

用户调用 model.invoke(messages)
    │
    ▼
_convert_input() → PromptValue → messages: list[BaseMessage]
    │
    │  ╭─ 非流式路径 ──────────────────────────╮
    ├──┤  _generate(messages) → ChatResult      │
    │  │    .generations[0].message → AIMessage  │
    │  ╰────────────────────────────────────────╯
    │
    │  ╭─ 流式路径 ─────────────────────────────────────────────────╮
    └──┤  _stream(messages) → Iterator[ChatGenerationChunk]         │
       │    yield ChatGenerationChunk(message=AIMessageChunk(...))  │
       │                                                            │
       │  merge_chat_generation_chunks(all_chunks)                  │
       │    → ChatGenerationChunk → ChatResult                      │
       ╰────────────────────────────────────────────────────────────╯
    │
    ▼
返回 AIMessage(invoke)或 Iterator[AIMessageChunk](stream)

小结

本篇梳理了 LangChain 输出层的 7 个核心类:

  1. Generationtext + generation_info,最基础的输出单元
  2. GenerationChunk:可拼接的流式 Generation,text 直接拼、generation_info 深度合并
  3. ChatGeneration:新增 message: BaseMessageset_text validator 自动从 message 提取 text
  4. ChatGenerationChunkmessage: BaseMessageChunk,支持单个和列表拼接
  5. ChatResult_generate 的返回值,generations + llm_output
  6. LLMResult:二维容器,flatten() 展平且避免重复计 token
  7. RunInfo:仅 run_id,即将废弃

下一篇问题:ChatModel 返回了 ChatResult/AIMessage,但应用层通常需要 JSON、Pydantic 对象、列表等结构化数据。谁来做这个转换?Output Parser 体系是怎么设计的?


LangChain Outputs 深度解析(二):Output Parser 基类体系

一个能跑的例子

from langchain_core.output_parsers import StrOutputParser
from langchain_core.messages import AIMessage

parser = StrOutputParser()

# 直接解析 AIMessage
result = parser.invoke(AIMessage(content="Hello, World!"))
print(result)       # Hello, World!
print(type(result)) # <class 'str'>

# 解析纯字符串
result2 = parser.invoke("Hello again!")
print(result2)      # Hello again!

# Parser 也是 Runnable!可以用 | 组合
from langchain_core.language_models.fake_chat_models import FakeListChatModel

model = FakeListChatModel(responses=["I am a fake model"])
chain = model | StrOutputParser()

print(chain.invoke("hello"))  # I am a fake model
print(type(chain.invoke("hello")))  # <class 'str'>  ← 不再是 AIMessage

# 流式也支持
for chunk in chain.stream("hello"):
    print(repr(chunk), end=" ")
# 'I' ' ' 'a' 'm' ' ' 'a' ' ' 'f' 'a' 'k' 'e' ' ' 'm' 'o' 'd' 'e' 'l'

StrOutputParserAIMessage 变成 str——这是最简单的 Output Parser。但它背后的基类体系支撑着所有复杂的解析逻辑。


5 层继承链总览

Output Parser 的基类层层递进,每层增加一种能力:

BaseLLMOutputParser (ABC)         ← 最底层:parse_result 抽象方法
    │
    ├── BaseGenerationOutputParser    ← + RunnableSerializable,invoke 路由 str/BaseMessage
    │       (较少直接使用)
    │
    └── BaseOutputParser              ← + RunnableSerializable,parse_result → parse(text)
            │
            └── BaseTransformOutputParser     ← + 流式支持(_transform)
                    │
                    └── BaseCumulativeTransformOutputParser  ← + 累积式流式(diff 模式)

后面两层(BaseGenerationOutputParserBaseOutputParser)都继承了 BaseLLMOutputParser,但它们是平行分支,不是继承关系。大多数 Parser 继承自 BaseOutputParser 这条线。


第 1 层:BaseLLMOutputParser

最底层的抽象,只定义了 parse_result 接口(base.py:30):

# output_parsers/base.py:30-67
class BaseLLMOutputParser(ABC, Generic[T]):
    """Abstract base class for parsing the outputs of a model."""

    @abstractmethod
    def parse_result(self, result: list[Generation], *, partial: bool = False) -> T:
        """Parse a list of candidate model Generation objects into a specific format."""

    async def aparse_result(
        self, result: list[Generation], *, partial: bool = False
    ) -> T:
        return await run_in_executor(None, self.parse_result, result, partial=partial)
方法 作用
parse_result(list[Generation]) 核心抽象——从候选 Generation 列表中解析出目标类型 T
aparse_result 异步版,默认用线程池包装同步方法

注意:parse_result 接收的是 list[Generation]——候选列表。但实际上绝大多数 Parser 只用 result[0]


第 2 层(分支 A):BaseGenerationOutputParser

BaseLLMOutputParser 基础上混入 RunnableSerializablebase.py:70):

# output_parsers/base.py:70-133
class BaseGenerationOutputParser(
    BaseLLMOutputParser, RunnableSerializable[LanguageModelOutput, T]
):
    @property
    def InputType(self) -> Any:
        return str | AnyMessage          # ← 接受 str 或 BaseMessage

    def invoke(self, input: str | BaseMessage, config=None, **kwargs) -> T:
        if isinstance(input, BaseMessage):
            return self._call_with_config(
                lambda inner_input: self.parse_result(
                    [ChatGeneration(message=inner_input)]   # ← BaseMessage 包装成 ChatGeneration
                ),
                input, config, run_type="parser",
            )
        return self._call_with_config(
            lambda inner_input: self.parse_result(
                [Generation(text=inner_input)]              # ← str 包装成 Generation
            ),
            input, config, run_type="parser",
        )

它的 invoke 做了输入路由

  • BaseMessage → 包装成 ChatGeneration,调用 parse_result
  • str → 包装成 Generation,调用 parse_result

这使得 Parser 可以同时处理 ChatModel 的输出(AIMessage)和 LLM 的输出(str)。


第 2 层(分支 B):BaseOutputParser——核心基类

大多数 Parser 继承自这条线(base.py:136)。它和 BaseGenerationOutputParser 是平行分支,但增加了 parse(text) 抽象方法:

# output_parsers/base.py:136-349
class BaseOutputParser(
    BaseLLMOutputParser, RunnableSerializable[LanguageModelOutput, T]
):
    @property
    def InputType(self) -> Any:
        return str | AnyMessage

    # ──── invoke:和 BaseGenerationOutputParser 相同的路由逻辑 ────
    def invoke(self, input: str | BaseMessage, config=None, **kwargs) -> T:
        if isinstance(input, BaseMessage):
            return self._call_with_config(
                lambda inner_input: self.parse_result(
                    [ChatGeneration(message=inner_input)]
                ),
                input, config, run_type="parser",
            )
        return self._call_with_config(
            lambda inner_input: self.parse_result(
                [Generation(text=inner_input)]
            ),
            input, config, run_type="parser",
        )

    # ──── parse_result → parse(text) 的桥接 ────
    def parse_result(self, result: list[Generation], *, partial: bool = False) -> T:
        return self.parse(result[0].text)   # ← 取第一个候选的 text,交给 parse

    @abstractmethod
    def parse(self, text: str) -> T:
        """Parse a single string model output into some structure."""

    # ──── 格式指令 ────
    def get_format_instructions(self) -> str:
        """Instructions on how the LLM output should be formatted."""
        raise NotImplementedError

BaseOutputParser vs BaseGenerationOutputParser 的关键区别:

对比项 BaseGenerationOutputParser BaseOutputParser
需要实现 parse_result(list[Generation]) parse(text: str)
输入粒度 能访问完整的 Generation 对象 只看到文本
适用场景 需要 generation_infomessage 只需文本解析
额外能力 get_format_instructions()

绝大多数 Parser 只需要文本就够了,所以继承 BaseOutputParser + 实现 parse(text) 是最常见的模式。

parse_result 的桥接逻辑

invoke(AIMessage("hello"))
    │
    ▼
包装成 [ChatGeneration(message=AIMessage("hello"))]
    │
    ▼
parse_result([ChatGeneration(...)])
    │
    ▼
result[0].text  ← ChatGeneration.text 由 set_text validator 从 message 提取
    │
    ▼
parse("hello")  ← 子类只需实现这个
    │
    ▼
返回 T

这就是 Output Parser 和第 1 篇的连接点——ChatGeneration.set_text validator 把 message.content 提取为 text,然后 BaseOutputParser.parse_result 拿到 text 交给 parse


第 3 层:BaseTransformOutputParser——流式支持

BaseOutputParser 基础上增加流式处理能力(transform.py:28):

# output_parsers/transform.py:28-96
class BaseTransformOutputParser(BaseOutputParser[T]):
    """Base class for an output parser that can handle streaming input."""

    def _transform(self, input: Iterator[str | BaseMessage]) -> Iterator[T]:
        for chunk in input:
            if isinstance(chunk, BaseMessage):
                yield self.parse_result([ChatGeneration(message=chunk)])
            else:
                yield self.parse_result([Generation(text=chunk)])

    def transform(
        self, input: Iterator[str | BaseMessage], config=None, **kwargs
    ) -> Iterator[T]:
        yield from self._transform_stream_with_config(
            input, self._transform, config, run_type="parser"
        )

默认的 _transform 实现非常简单:逐 chunk 独立解析。每收到一个 chunk,就调用一次 parse_result

stream 输入:  "He"    "llo"    " Wo"    "rld"
                │        │        │        │
                ▼        ▼        ▼        ▼
_transform:  parse("He") parse("llo") parse(" Wo") parse("rld")
                │        │        │        │
                ▼        ▼        ▼        ▼
stream 输出:  "He"    "llo"    " Wo"    "rld"    ← 对 StrOutputParser 来说,直接透传

这对 StrOutputParser 这种"透传"Parser 来说完美——每个 chunk 独立处理,不需要等待完整输出。


第 4 层:BaseCumulativeTransformOutputParser——累积式流式

对于 JSON、Pydantic 等需要完整结构才能解析的 Parser,逐 chunk 独立解析行不通。BaseCumulativeTransformOutputParser累积后重新解析的策略(transform.py:99):

# output_parsers/transform.py:99-175
class BaseCumulativeTransformOutputParser(BaseTransformOutputParser[T]):
    diff: bool = False
    """In streaming mode, whether to yield diffs between the previous
    and current parsed output."""

    def _diff(self, prev: T | None, next: T) -> T:
        """Convert parsed outputs into a diff format."""
        raise NotImplementedError

    def _transform(self, input: Iterator[str | BaseMessage]) -> Iterator[Any]:
        prev_parsed = None
        acc_gen: GenerationChunk | ChatGenerationChunk | None = None  # ← 累积器

        for chunk in input:
            # 1. 构造 chunk_gen
            if isinstance(chunk, BaseMessageChunk):
                chunk_gen = ChatGenerationChunk(message=chunk)
            elif isinstance(chunk, BaseMessage):
                chunk_gen = ChatGenerationChunk(
                    message=BaseMessageChunk(**chunk.model_dump())
                )
            else:
                chunk_gen = GenerationChunk(text=chunk)

            # 2. 累积
            acc_gen = chunk_gen if acc_gen is None else acc_gen + chunk_gen

            # 3. 尝试 partial 解析
            parsed = self.parse_result([acc_gen], partial=True)

            # 4. 有新结果且不同于上一次 → yield
            if parsed is not None and parsed != prev_parsed:
                if self.diff:
                    yield self._diff(prev_parsed, parsed)  # ← diff 模式
                else:
                    yield parsed                            # ← 完整模式
                prev_parsed = parsed

核心流程:

stream 输入:  '{"na'   'me":'   '"Al'   'ice"}'
                │        │         │        │
                ▼        ▼         ▼        ▼
累积:      '{"na'  '{"name":' '{"name":"Al' '{"name":"Alice"}'
                │        │         │        │
                ▼        ▼         ▼        ▼
partial 解析:  None    None    {"name":"Al"}  {"name":"Alice"}
                                    │              │
                                    ▼              ▼
yield(非 diff):              {"name":"Al"}  {"name":"Alice"}

yield(diff 模式):           [add /name "Al"]  [replace /name "Alice"]
策略 BaseTransformOutputParser BaseCumulativeTransformOutputParser
累积 不累积,逐 chunk 独立 累积所有 chunk 后重新解析
partial 不用 parse_result(partial=True)
diff 不支持 self.diff=True 时调用 _diff
适用 文本透传(StrOutputParser) 结构化输出(JSON / Pydantic)
去重 parsed != prev_parsed 防止重复 yield

Parser 也是 Runnable!

所有 Output Parser 都继承了 RunnableSerializable,这意味着它们拥有 Runnable 的全套能力:

parser = StrOutputParser()

# invoke
parser.invoke(AIMessage(content="hello"))  # "hello"

# stream / transform
for chunk in parser.stream(AIMessage(content="hello")):
    print(chunk)

# batch
parser.batch([AIMessage(content="a"), AIMessage(content="b")])  # ["a", "b"]

# | 管道
chain = some_model | parser  # 等价于 RunnableSequence(some_model, parser)

# 异步
import asyncio
asyncio.run(parser.ainvoke(AIMessage(content="hello")))

这也是为什么 model | StrOutputParser() 可以工作——| 调用的是 Runnable 的 __or__,把两个 Runnable 组合成 RunnableSequence


invoke 的完整调用栈

chain = model | StrOutputParser() 为例:

chain.invoke("hello")
    │
    ▼
RunnableSequence.invoke("hello")
    │
    ├── model.invoke("hello")
    │       → AIMessage(content="I am fake")
    │
    ▼
StrOutputParser.invoke(AIMessage(content="I am fake"))
    │
    ▼
_call_with_config(
    lambda msg: parse_result([ChatGeneration(message=msg)]),
    AIMessage(...),
    run_type="parser"
)
    │
    ▼
parse_result([ChatGeneration(message=AIMessage("I am fake"))])
    │
    ▼
parse(result[0].text)      ← text 由 set_text validator 从 message 提取
    │
    ▼
parse("I am fake")
    │
    ▼
return "I am fake"          ← StrOutputParser.parse 直接返回 text

小结

本篇拆解了 Output Parser 的 5 层基类体系:

  1. BaseLLMOutputParser:最底层抽象,定义 parse_result(list[Generation]) → T
  2. BaseGenerationOutputParser:+ Runnable,invoke 路由 str/BaseMessage,适合需要访问完整 Generation 的场景
  3. BaseOutputParser:+ Runnable + parse(text),大多数 Parser 的基类,通过 parse_result → parse 桥接
  4. BaseTransformOutputParser:+ 流式,逐 chunk 独立解析
  5. BaseCumulativeTransformOutputParser:+ 累积式流式,支持 partial 解析和 diff 模式

下一篇问题:有了基类体系,实际的 Parser 是怎么实现的?StrOutputParserJsonOutputParserPydanticOutputParserListOutputParserXMLOutputParser 分别解决什么问题?它们给模型的 format_instructions 有什么不同?


LangChain Outputs 深度解析(三):常用 Parser 全解

一个能跑的例子

from langchain_core.output_parsers import (
    StrOutputParser,
    JsonOutputParser,
    PydanticOutputParser,
    CommaSeparatedListOutputParser,
)
from langchain_core.messages import AIMessage
from pydantic import BaseModel, Field

# 同一个"原始输入"
raw_json = '{"name": "Alice", "age": 30}'

# 1. StrOutputParser → 原样返回字符串
str_parser = StrOutputParser()
print(str_parser.invoke(raw_json))
# '{"name": "Alice", "age": 30}'

# 2. JsonOutputParser → 解析为 dict
json_parser = JsonOutputParser()
print(json_parser.invoke(raw_json))
# {'name': 'Alice', 'age': 30}

# 3. PydanticOutputParser → 解析为 Pydantic 对象
class Person(BaseModel):
    name: str = Field(description="人名")
    age: int = Field(description="年龄")

pydantic_parser = PydanticOutputParser(pydantic_object=Person)
person = pydantic_parser.invoke(raw_json)
print(person)        # name='Alice' age=30
print(person.name)   # Alice

# 4. CommaSeparatedListOutputParser → 解析为列表
csv_parser = CommaSeparatedListOutputParser()
print(csv_parser.invoke("apple, banana, cherry"))
# ['apple', 'banana', 'cherry']

# 查看各 Parser 给模型的格式指令
print("--- JsonOutputParser ---")
print(json_parser.get_format_instructions())
print("--- PydanticOutputParser ---")
print(pydantic_parser.get_format_instructions())
print("--- CommaSeparatedListOutputParser ---")
print(csv_parser.get_format_instructions())

4 种 Parser,同样的输入,不同的输出类型。接下来逐个拆解。


StrOutputParser:最简单的 Parser

源码只有一个方法(string.py:8):

# output_parsers/string.py:8-63
class StrOutputParser(BaseTransformOutputParser[str]):
    """Extract text content from model outputs as a string."""

    def parse(self, text: str) -> str:
        return text                # ← 直接返回,就这么简单

继承自 BaseTransformOutputParser,所以天然支持流式。流式时的行为:

model.stream("hello") → AIMessageChunk("I") → AIMessageChunk(" am") → ...
         │
         ▼
StrOutputParser._transform:
    parse_result([ChatGeneration(msg)]) → result[0].text → "I"
    parse_result([ChatGeneration(msg)]) → result[0].text → " am"
         │
         ▼
输出:  "I"  " am"  " fake"  ...    ← 逐 chunk 透传文本

StrOutputParser 是使用频率最高的 Parser——几乎所有"只要文本"的场景都用它。


JsonOutputParser:JSON 解析

继承自 BaseCumulativeTransformOutputParser,用累积式流式解析 JSON(json.py:31):

# output_parsers/json.py:31-131
class JsonOutputParser(BaseCumulativeTransformOutputParser[Any]):
    pydantic_object: type[TBaseModel] | None = None
    """Optional Pydantic object for validation."""

    def _diff(self, prev: Any | None, next: Any) -> Any:
        return jsonpatch.make_patch(prev, next).patch     # ← JSON Patch 格式

    def parse_result(self, result: list[Generation], *, partial: bool = False) -> Any:
        text = result[0].text.strip()
        if partial:
            try:
                return parse_json_markdown(text)           # ← partial 模式:解析失败返回 None
            except JSONDecodeError:
                return None
        else:
            try:
                return parse_json_markdown(text)           # ← 完整模式:解析失败抛异常
            except JSONDecodeError as e:
                raise OutputParserException(f"Invalid json output: {text}") from e

    def parse(self, text: str) -> Any:
        return self.parse_result([Generation(text=text)])

关键点:

  1. parse_json_markdown:能处理 json 代码块(`` ```json {…} `````)或裸 JSON
  2. partial 模式:不完整的 JSON 也尝试解析(如 {"name": "Ali → 可能返回 {"name": "Ali"}),失败返回 None
  3. diff 模式_diffjsonpatch 库生成 JSON Patch,描述两次解析结果之间的差异

流式解析流程

stream 输入:  '```json\n'  '{"na'  'me":'  '"Al'  'ice",'  '"age":'  '30}'  '\n```'
                │            │       │       │       │         │         │       │
                ▼            ▼       ▼       ▼       ▼         ▼         ▼       ▼
累积后解析:    None         None   None    None   {"name":  {"name":   {"name":  {"name":
                                                  "Alice"}  "Alice"}  "Alice",  "Alice",
                                                                      "age":30} "age":30}
                                                    │                    │
                                                    ▼                    ▼
yield(diff=False):                          {"name":"Alice"}    {"name":"Alice","age":30}

get_format_instructions

# output_parsers/json.py:104-123
def get_format_instructions(self) -> str:
    if self.pydantic_object is None:
        return "Return a JSON object."
    schema = dict(self._get_schema(self.pydantic_object).items())
    # 移除 title 和 type(减少冗余)
    reduced_schema = schema
    if "title" in reduced_schema:
        del reduced_schema["title"]
    if "type" in reduced_schema:
        del reduced_schema["type"]
    schema_str = json.dumps(reduced_schema, ensure_ascii=False)
    return JSON_FORMAT_INSTRUCTIONS.format(schema=schema_str)

如果提供了 pydantic_object,会把 JSON Schema 注入到格式指令中。


PydanticOutputParser:Pydantic 验证

继承自 JsonOutputParser,增加 Pydantic 验证层(pydantic.py:19):

# output_parsers/pydantic.py:19-132
class PydanticOutputParser(JsonOutputParser, Generic[TBaseModel]):
    pydantic_object: type[TBaseModel]   # ← 必填,不像 JsonOutputParser 可以为 None

    def _parse_obj(self, obj: dict) -> TBaseModel:
        try:
            if issubclass(self.pydantic_object, pydantic.BaseModel):
                return self.pydantic_object.model_validate(obj)       # ← Pydantic v2
            if issubclass(self.pydantic_object, pydantic.v1.BaseModel):
                return self.pydantic_object.parse_obj(obj)            # ← Pydantic v1
            raise OutputParserException("Unsupported model version")
        except (pydantic.ValidationError, pydantic.v1.ValidationError) as e:
            raise self._parser_exception(e, obj) from e

    def parse_result(self, result: list[Generation], *, partial: bool = False):
        try:
            json_object = super().parse_result(result)   # ← 先用 JsonOutputParser 解析为 dict
            return self._parse_obj(json_object)           # ← 再用 Pydantic 验证
        except OutputParserException:
            if partial:
                return None                               # ← partial 模式:验证失败返回 None
            raise

调用链:

PydanticOutputParser.parse_result(result)
    │
    ├── JsonOutputParser.parse_result(result)    ← 第 1 步:JSON 文本 → dict
    │       → parse_json_markdown(text) → {"name": "Alice", "age": 30}
    │
    └── _parse_obj(dict)                          ← 第 2 步:dict → Pydantic 对象
            → Person.model_validate({"name": "Alice", "age": 30})
            → Person(name="Alice", age=30)

format_instructions 区别

PydanticOutputParser 有自己的格式指令模板,比 JsonOutputParser 多了一个示例:

# output_parsers/pydantic.py:124-132
_PYDANTIC_FORMAT_INSTRUCTIONS = """The output should be formatted as a JSON instance \
that conforms to the JSON schema below.

As an example, for the schema {{"properties": {{"foo": ...}}}}, ...

Here is the output schema:

{schema}


ListOutputParser 家族:列表解析

ListOutputParser 是列表 Parser 的抽象基类(list.py:43),有 3 个具体实现:

# output_parsers/list.py:43-102
class ListOutputParser(BaseTransformOutputParser[list[str]]):
    @abstractmethod
    def parse(self, text: str) -> list[str]: ...

    def parse_iter(self, text: str) -> Iterator[re.Match]:
        """增量解析——子类可选实现,用于流式。"""
        raise NotImplementedError

    def _transform(self, input: Iterator[str | BaseMessage]) -> Iterator[list[str]]:
        buffer = ""
        for chunk in input:
            if isinstance(chunk, BaseMessage):
                chunk_content = chunk.content
                if not isinstance(chunk_content, str):
                    continue
                buffer += chunk_content
            else:
                buffer += chunk
            try:
                done_idx = 0
                for m in droplastn(self.parse_iter(buffer), 1):  # ← 保留最后一个(可能不完整)
                    done_idx = m.end()
                    yield [m.group(1)]                            # ← 每次 yield 一个完整项
                buffer = buffer[done_idx:]
            except NotImplementedError:
                parts = self.parse(buffer)
                if len(parts) > 1:
                    for part in parts[:-1]:
                        yield [part]
                    buffer = parts[-1]
        for part in self.parse(buffer):                           # ← 最后把 buffer 剩余内容解析
            yield [part]

流式策略的关键是 droplastn(iter, 1)——丢掉最后一个匹配(因为可能不完整),只 yield 确定完整的项:

buffer 累积:  "1. app"  →  "1. apple\n2. ban"  →  "1. apple\n2. banana\n3. cherry"
                │              │                        │
                ▼              ▼                        ▼
parse_iter:   [m("app")]    [m("apple"), m("ban")]   [m("apple"), m("banana"), m("cherry")]
droplastn(1): []            [m("apple")]             [m("apple"), m("banana")]
                              │                        │
                              ▼                        ▼
yield:                      ["apple"]                ["banana"]

最后 flush:                                         ["cherry"]

CommaSeparatedListOutputParser

# output_parsers/list.py:139-185
class CommaSeparatedListOutputParser(ListOutputParser):
    def get_format_instructions(self) -> str:
        return (
            "Your response should be a list of comma separated values, "
            "eg: `foo, bar, baz` or `foo,bar,baz`"
        )

    def parse(self, text: str) -> list[str]:
        try:
            reader = csv.reader(
                StringIO(text), quotechar='"', delimiter=",", skipinitialspace=True
            )
            return [item for sublist in reader for item in sublist]
        except csv.Error:
            return [part.strip() for part in text.split(",")]  # ← fallback

csv.reader 解析,能正确处理引号内的逗号(如 "hello, world", foo)。解析失败则 fallback 到简单的 split(",").

NumberedListOutputParser

# output_parsers/list.py:188-218
class NumberedListOutputParser(ListOutputParser):
    pattern: str = r"\d+\.\s([^\n]+)"

    def get_format_instructions(self) -> str:
        return (
            "Your response should be a numbered list with each item on a new line. "
            "For example: \n\n1. foo\n\n2. bar\n\n3. baz"
        )

    def parse(self, text: str) -> list[str]:
        return re.findall(self.pattern, text)

    def parse_iter(self, text: str) -> Iterator[re.Match]:
        return re.finditer(self.pattern, text)    # ← 支持流式增量匹配

正则 \d+\.\s([^\n]+) 匹配 1. xxx 格式,捕获组 1 是内容。

MarkdownListOutputParser

# output_parsers/list.py:221-249
class MarkdownListOutputParser(ListOutputParser):
    pattern: str = r"^\s*[-*]\s([^\n]+)$"

    def get_format_instructions(self) -> str:
        return "Your response should be a markdown list, eg: `- foo\n- bar\n- baz`"

    def parse(self, text: str) -> list[str]:
        return re.findall(self.pattern, text, re.MULTILINE)

    def parse_iter(self, text: str) -> Iterator[re.Match]:
        return re.finditer(self.pattern, text, re.MULTILINE)

正则 ^\s*[-*]\s([^\n]+)$ 匹配 - xxx* xxx 格式。注意 re.MULTILINE 使 ^$ 匹配每行。


XMLOutputParser:增量 XML 解析

XMLOutputParserxml.py:151)继承 BaseTransformOutputParser,有自己的流式实现——不用累积式,而是用 ET.XMLPullParser 做真正的增量解析:

# output_parsers/xml.py:151-285
class XMLOutputParser(BaseTransformOutputParser):
    tags: list[str] | None = None
    parser: Literal["defusedxml", "xml"] = "defusedxml"  # ← 默认用安全的 defusedxml

    def parse(self, text: str) -> dict[str, str | list[Any]]:
        # 支持 ```xml ... ```代码块
        match = re.search(r"```(xml)?(.*)```", text, re.DOTALL)
        if match is not None:
            text = match.group(2)
        # 处理 encoding 声明
        encoding_match = self.encoding_matcher.search(text)
        if encoding_match:
            text = encoding_match.group(2)
        text = text.strip()
        root = et.fromstring(text)
        return self._root_to_dict(root)

    def _transform(self, input: Iterator[str | BaseMessage]) -> Iterator[AddableDict]:
        streaming_parser = _StreamingParser(self.parser)
        for chunk in input:
            yield from streaming_parser.parse(chunk)    # ← 增量解析,有结果就 yield
        streaming_parser.close()

_StreamingParser 内部机制

_StreamingParserxml.py:42)是流式 XML 解析的核心:

# output_parsers/xml.py:42-148
class _StreamingParser:
    def __init__(self, parser):
        self.pull_parser = ET.XMLPullParser(["start", "end"])
        self.xml_start_re = re.compile(r"<[a-zA-Z:_]")
        self.current_path: list[str] = []
        self.current_path_has_children = False
        self.buffer = ""
        self.xml_started = False

    def parse(self, chunk) -> Iterator[AddableDict]:
        # 1. 等待 XML 开始(跳过前导文本)
        self.buffer += chunk
        if not self.xml_started:
            if match := self.xml_start_re.search(self.buffer):
                self.buffer = self.buffer[match.start():]
                self.xml_started = True
            else:
                return

        # 2. feed 给 PullParser
        self.pull_parser.feed(self.buffer)
        self.buffer = ""

        # 3. 读取事件
        for event, elem in self.pull_parser.read_events():
            if event == "start":
                self.current_path.append(elem.tag)
                self.current_path_has_children = False
            elif event == "end":
                self.current_path.pop()
                if not self.current_path_has_children:
                    yield nested_element(self.current_path, elem)  # ← 叶子节点才 yield
                if self.current_path:
                    self.current_path_has_children = True

流式解析过程:

输入:  "<root>"  "<name>"  "Alice"  "</name>"  "<age>"  "30"  "</age>"  "</root>"

事件流:
  start root  → path=[root]
  start name  → path=[root, name]
  end name    → path=[root],yield {"root": [{"name": "Alice"}]}
  start age   → path=[root, age]
  end age     → path=[root],yield {"root": [{"age": "30"}]}
  end root    → path=[]

输出(AddableDict):
  {"root": [{"name": "Alice"}]}
  {"root": [{"age": "30"}]}

两个 AddableDict 相加后:
  {"root": [{"name": "Alice"}, {"age": "30"}]}

AddableDict__add__ 会把列表值 append 而非覆盖,使得流式输出可以自然合并。


get_format_instructions 对比表

Parser 给模型的格式指令
StrOutputParser (不提供,直接返回文本)
JsonOutputParser "Return a JSON object." 或带 JSON Schema 的详细指令
PydanticOutputParser 带 JSON Schema + 示例的详细指令
CommaSeparatedListOutputParser "Your response should be a list of comma separated values, eg: foo, bar, baz"
NumberedListOutputParser "Your response should be a numbered list... 1. foo 2. bar 3. baz"
MarkdownListOutputParser "Your response should be a markdown list, eg: - foo\n- bar\n- baz"
XMLOutputParser 带 tags 列表的 XML 格式说明

这些指令通常注入到 Prompt 中,告诉模型如何格式化输出:

# 典型用法
from langchain_core.prompts import ChatPromptTemplate

parser = PydanticOutputParser(pydantic_object=Person)
prompt = ChatPromptTemplate.from_messages([
    ("system", "按以下格式输出:\n{format_instructions}"),
    ("human", "{query}"),
])

chain = prompt | model | parser
# prompt 会把 parser.get_format_instructions() 注入到 system message 中

继承关系总览

BaseTransformOutputParser[str]
    └── StrOutputParser                          ← parse(text) → text

BaseCumulativeTransformOutputParser[Any]
    └── JsonOutputParser                         ← parse_json_markdown + partial
            └── PydanticOutputParser             ← + Pydantic model_validate

BaseTransformOutputParser[list[str]]
    └── ListOutputParser (ABC)                   ← buffer + parse_iter 流式
            ├── CommaSeparatedListOutputParser    ← csv.reader
            ├── NumberedListOutputParser          ← r"\d+\.\s([^\n]+)"
            └── MarkdownListOutputParser          ← r"^\s*[-*]\s([^\n]+)$"

BaseTransformOutputParser
    └── XMLOutputParser                          ← _StreamingParser + ET.XMLPullParser

小结

本篇拆解了 7 种常用 Parser 的实现:

  1. StrOutputParserparse(text) → text,最简单,流式直接透传
  2. JsonOutputParserparse_json_markdown + 累积式流式 + jsonpatch diff
  3. PydanticOutputParser:JsonOutputParser + model_validate 验证,支持 v1/v2
  4. CommaSeparatedListOutputParsercsv.reader 解析,有 fallback
  5. NumberedListOutputParser\d+\.\s 正则匹配,支持 parse_iter 流式
  6. MarkdownListOutputParser[-*]\s 正则匹配,re.MULTILINE
  7. XMLOutputParserET.XMLPullParser 真正增量解析,AddableDict 可合并输出

下一篇问题:上面的 Parser 都是解析文本输出。但当模型返回的是 tool_calls(结构化的函数调用),该怎么解析?PydanticToolsParser 是如何把 AIMessage.tool_calls 变成 Pydantic 对象的?完整的数据流从头到尾是怎样的?


LangChain Outputs 深度解析(四):Tool Parser 与完整数据流

一个能跑的例子

from langchain_core.messages import AIMessage
from langchain_core.outputs import ChatGeneration
from langchain_core.output_parsers.openai_tools import (
    JsonOutputToolsParser,
    PydanticToolsParser,
)
from pydantic import BaseModel, Field

# 模拟一个带 tool_calls 的 AIMessage
ai_msg = AIMessage(
    content="",
    tool_calls=[
        {
            "name": "GetWeather",
            "args": {"city": "Beijing", "unit": "celsius"},
            "id": "call_001",
        }
    ],
)
gen = ChatGeneration(message=ai_msg)

# 1. JsonOutputToolsParser → dict 列表
json_tool_parser = JsonOutputToolsParser()
result = json_tool_parser.parse_result([gen])
print(result)
# [{'type': 'GetWeather', 'args': {'city': 'Beijing', 'unit': 'celsius'}}]
# 注意:name → type(向后兼容重命名)

# 2. PydanticToolsParser → Pydantic 对象列表
class GetWeather(BaseModel):
    """获取天气信息。"""
    city: str = Field(description="城市名称")
    unit: str = Field(default="celsius", description="温度单位")

pydantic_tool_parser = PydanticToolsParser(tools=[GetWeather])
result2 = pydantic_tool_parser.parse_result([gen])
print(result2)
# [GetWeather(city='Beijing', unit='celsius')]
print(result2[0].city)  # Beijing

# 3. first_tool_only 模式
parser_single = PydanticToolsParser(tools=[GetWeather], first_tool_only=True)
result3 = parser_single.parse_result([gen])
print(result3)           # GetWeather(city='Beijing', unit='celsius')  ← 不是列表
print(type(result3))     # <class '__main__.GetWeather'>

不需要 API key。Tool Parser 解析的是 AIMessage.tool_calls——结构化数据,不是自由文本。


辅助函数:parse_tool_call 和 parse_tool_calls

在 Parser 类之前,先看两个辅助函数(用于处理旧格式的 additional_kwargs["tool_calls"])。

parse_tool_call

# output_parsers/openai_tools.py:27-78
def parse_tool_call(
    raw_tool_call: dict[str, Any],
    *,
    partial: bool = False,
    strict: bool = False,
    return_id: bool = True,
) -> dict[str, Any] | None:
    if "function" not in raw_tool_call:
        return None

    arguments = raw_tool_call["function"]["arguments"]

    if partial:
        try:
            function_args = parse_partial_json(arguments, strict=strict)
        except (JSONDecodeError, TypeError):
            return None
    elif not arguments:             # ← 无参工具
        function_args = {}
    else:
        try:
            function_args = json.loads(arguments, strict=strict)
        except JSONDecodeError as e:
            raise OutputParserException(...) from e

    parsed = {
        "name": raw_tool_call["function"]["name"] or "",
        "args": function_args or {},
    }
    if return_id:
        parsed["id"] = raw_tool_call.get("id")
        parsed = create_tool_call(**parsed)   # ← 标准化为 ToolCall TypedDict
    return parsed

这个函数处理的是 OpenAI 原始格式的 tool call:

# OpenAI 原始格式
{
    "id": "call_001",
    "function": {
        "name": "GetWeather",
        "arguments": '{"city": "Beijing"}'  # ← 注意:arguments 是 JSON 字符串
    }
}

parse_tool_call 把它转换为 LangChain 标准格式:

# LangChain 标准格式
{
    "name": "GetWeather",
    "args": {"city": "Beijing"},   # ← 已解析为 dict
    "id": "call_001",
}

parse_tool_calls

# output_parsers/openai_tools.py:102-137
def parse_tool_calls(
    raw_tool_calls: list[dict],
    *, partial: bool = False, strict: bool = False, return_id: bool = True,
) -> list[dict[str, Any]]:
    final_tools = []
    exceptions = []
    for tool_call in raw_tool_calls:
        try:
            parsed = parse_tool_call(tool_call, partial=partial, strict=strict, return_id=return_id)
            if parsed:
                final_tools.append(parsed)
        except OutputParserException as e:
            exceptions.append(str(e))      # ← 收集所有错误
            continue
    if exceptions:
        raise OutputParserException("\n\n".join(exceptions))  # ← 汇总抛出
    return final_tools

批量版本。注意先收集所有错误再抛出,而不是遇到第一个错误就停。


JsonOutputToolsParser

JsonOutputToolsParseropenai_tools.py:140)是 Tool Parser 的基础类:

# output_parsers/openai_tools.py:140-222
class JsonOutputToolsParser(BaseCumulativeTransformOutputParser[Any]):
    strict: bool = False
    return_id: bool = False
    first_tool_only: bool = False

    def parse_result(self, result: list[Generation], *, partial: bool = False) -> Any:
        generation = result[0]
        if not isinstance(generation, ChatGeneration):
            raise OutputParserException("This output parser can only be used with a chat generation.")

        message = generation.message

        # ──── 新格式:直接从 message.tool_calls 取 ────
        if isinstance(message, AIMessage) and message.tool_calls:
            tool_calls = [dict(tc) for tc in message.tool_calls]
            for tool_call in tool_calls:
                if not self.return_id:
                    _ = tool_call.pop("id")

        # ──── 旧格式:从 additional_kwargs["tool_calls"] 取 ────
        else:
            try:
                raw_tool_calls = copy.deepcopy(message.additional_kwargs["tool_calls"])
            except KeyError:
                return []
            tool_calls = parse_tool_calls(
                raw_tool_calls, partial=partial, strict=self.strict, return_id=self.return_id,
            )

        # ──── 向后兼容:name → type ────
        for tc in tool_calls:
            tc["type"] = tc.pop("name")

        if self.first_tool_only:
            return tool_calls[0] if tool_calls else None
        return tool_calls

两个关键设计:

1. 双路径取 tool_calls

AIMessage
    │
    ├── message.tool_calls 有值?         ← 新路径(推荐)
    │       → 直接使用,已经是标准格式
    │
    └── message.additional_kwargs["tool_calls"]  ← 旧路径(向后兼容)
            → 需要 parse_tool_calls() 解析 JSON 字符串

新版 LangChain 的 AIMessage 在构造时就已经解析好了 tool_calls。旧版或缓存的消息可能只有 additional_kwargs["tool_calls"]

2. name → type 重命名

for tc in tool_calls:
    tc["type"] = tc.pop("name")
# {"name": "GetWeather", "args": {...}} → {"type": "GetWeather", "args": {...}}

这是向后兼容——早期版本的 Tool Parser 用 type 而非 name,为了不破坏下游代码,保留了这个重命名。


JsonOutputKeyToolsParser

继承 JsonOutputToolsParser,增加按 key_name 过滤的能力(openai_tools.py:225):

# output_parsers/openai_tools.py:225-295
class JsonOutputKeyToolsParser(JsonOutputToolsParser):
    key_name: str
    """The type of tools to return."""

    def parse_result(self, result, *, partial=False):
        # ... 获取 parsed_tool_calls(和父类相同)...

        # 按 key_name 过滤
        if self.first_tool_only:
            parsed_result = list(
                filter(lambda x: x["type"] == self.key_name, parsed_tool_calls)
            )
            single_result = parsed_result[0] if parsed_result else None
            if self.return_id:
                return single_result
            if single_result:
                return single_result["args"]   # ← 不返回 id 时,直接返回 args
            return None

        return (
            [res for res in parsed_tool_calls if res["type"] == self.key_name]
            if self.return_id
            else [res["args"] for res in parsed_tool_calls if res["type"] == self.key_name]
        )

使用场景:当模型可能返回多种 tool call,你只关心某一种时:

parser = JsonOutputKeyToolsParser(key_name="GetWeather")
# 只返回 name=="GetWeather" 的 tool calls

PydanticToolsParser

继承 JsonOutputToolsParser,把 dict 转为 Pydantic 对象(openai_tools.py:306):

# output_parsers/openai_tools.py:306-384
class PydanticToolsParser(JsonOutputToolsParser):
    tools: list[TypeBaseModel]

    def parse_result(self, result, *, partial=False):
        # 1. 调用父类获取 JSON tool calls
        json_results = super().parse_result(result, partial=partial)
        if not json_results:
            return None if self.first_tool_only else []

        json_results = [json_results] if self.first_tool_only else json_results

        # 2. 构建 name → Pydantic class 映射
        name_dict_v2 = {
            tool.model_config.get("title") or tool.__name__: tool
            for tool in self.tools
            if is_pydantic_v2_subclass(tool)
        }
        name_dict_v1 = {
            tool.__name__: tool
            for tool in self.tools
            if is_pydantic_v1_subclass(tool)
        }
        name_dict = {**name_dict_v2, **name_dict_v1}

        # 3. 逐个实例化
        pydantic_objects = []
        for res in json_results:
            if not isinstance(res["args"], dict):
                if partial: continue
                raise ValueError(f"Tool arguments must be specified as a dict, received: {res['args']}")

            try:
                tool = name_dict[res["type"]]    # ← 按 name(已重命名为 type)查找类
            except KeyError as e:
                raise OutputParserException(f"Unknown tool type: {res['type']!r}") from e

            try:
                pydantic_objects.append(tool(**res["args"]))  # ← 实例化 Pydantic 对象
            except (ValidationError, ValueError):
                if partial: continue
                # ── max_tokens 检测 ──
                has_max_tokens_stop_reason = any(
                    generation.message.response_metadata.get("stop_reason") == "max_tokens"
                    for generation in result
                    if isinstance(generation, ChatGeneration)
                )
                if has_max_tokens_stop_reason:
                    logger.exception(
                        "Output parser received a `max_tokens` stop reason. "
                        "The output is likely incomplete—please increase `max_tokens`."
                    )
                raise

        if self.first_tool_only:
            return pydantic_objects[0] if pydantic_objects else None
        return pydantic_objects

完整流程:

PydanticToolsParser.parse_result([ChatGeneration(message=ai_msg)])
    │
    ├── super().parse_result() → JsonOutputToolsParser
    │       → [{"type": "GetWeather", "args": {"city": "Beijing"}}]
    │
    ├── 构建映射:{"GetWeather": <class GetWeather>}
    │
    └── 实例化:GetWeather(**{"city": "Beijing"})
            → GetWeather(city="Beijing", unit="celsius")

name → class 映射的细节

# Pydantic v2:优先用 model_config["title"],其次用 __name__
name_dict_v2 = {
    tool.model_config.get("title") or tool.__name__: tool
    for tool in self.tools
    if is_pydantic_v2_subclass(tool)
}

这意味着如果你的 Pydantic 模型配置了 title,会用 title 作为 key(而非类名)。这在 bind_tools 场景中很重要——convert_to_openai_tool 会把 Pydantic 模型的 schema title 作为 function name。

max_tokens 错误检测

has_max_tokens_stop_reason = any(
    generation.message.response_metadata.get("stop_reason") == "max_tokens"
    for generation in result
    if isinstance(generation, ChatGeneration)
)
if has_max_tokens_stop_reason:
    logger.exception(_MAX_TOKENS_ERROR)

当 Pydantic 验证失败时,Parser 会检查是否因为 max_tokens 导致输出被截断。如果是,会打印一条明确的提示日志,帮助用户排查问题。


Tool Parser 继承关系

BaseCumulativeTransformOutputParser[Any]
    │
    └── JsonOutputToolsParser
            │   parse_result: AIMessage.tool_calls → [{"type": ..., "args": ...}]
            │   name → type 重命名(向后兼容)
            │
            ├── JsonOutputKeyToolsParser
            │       + key_name 过滤
            │
            └── PydanticToolsParser
                    + tools: list[TypeBaseModel]
                    + name→class 映射 → Pydantic 实例化
                    + max_tokens 错误检测

完整数据流总览

把 4 个系列串起来,看数据从用户输入到结构化结果的完整链路:

╔═══════════════════════════════════════════════════════════════════════════╗
║                        用户输入                                          ║
║   str / list[BaseMessage] / PromptValue                                  ║
╚═══════════════════════════════════════════════════════════════════════════╝
                            │
                            ▼
               ┌─── _convert_input() ───┐
               │  str → StringPromptValue     │
               │  list → ChatPromptValue      │
               │  PromptValue → 直接用        │
               └────────────────────────┘
                            │
                            ▼
                  PromptValue.to_messages()
                            │
                            ▼
              messages: list[BaseMessage]
                  (Messages 系列 ✓)
                            │
            ┌───────────────┴───────────────┐
            │                               │
      ╔═════╧══════╗                 ╔══════╧══════╗
      ║  非流式路径  ║                 ║  流式路径    ║
      ╠════════════╣                 ╠═════════════╣
      ║ _generate()║                 ║  _stream()   ║
      ║     │      ║                 ║     │        ║
      ║     ▼      ║                 ║     ▼        ║
      ║ ChatResult ║                 ║ Iterator     ║
      ║ .generations[0]              ║ [ChatGeneration ║
      ║ .message   ║                 ║  Chunk]      ║
      ╚═════╤══════╝                 ╚══════╤══════╝
            │                               │
            │                    merge_chat_generation_chunks()
            │                               │
            └───────────────┬───────────────┘
                            │
                            ▼
                ChatGeneration.message
                        │
                        ▼
                    AIMessage
              (ChatModel 系列 ✓)
                        │
         ┌──────────────┼──────────────┐
         │              │              │
   StrOutput     JsonOutput     PydanticTools
   Parser        Parser         Parser
         │              │              │
         ▼              ▼              ▼
       str           dict       Pydantic 对象
              (Outputs 系列 ✓)

各系列的职责

系列 核心职责 关键抽象
Messages 数据格式层 定义对话中的数据单元 BaseMessage / BaseMessageChunk
Runnables 执行框架层 提供 invoke/stream/batch + | 组合 Runnable / RunnableSequence
ChatModel 生产者层 调用 LLM,生成消息 BaseChatModel._generate / _stream
Outputs + Parsers 消费者层 把模型输出转换为结构化结果 GenerationOutputParserT

一条完整的 chain

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel

class Answer(BaseModel):
    answer: str
    confidence: float

parser = PydanticOutputParser(pydantic_object=Answer)

prompt = ChatPromptTemplate.from_messages([
    ("system", "回答用户问题。\n{format_instructions}"),
    ("human", "{question}"),
]).partial(format_instructions=parser.get_format_instructions())

# chain = prompt | model | parser
# 每个 | 都是 Runnable.__or__,组合成 RunnableSequence

# 数据流:
# "question" → prompt.invoke() → ChatPromptValue
#   → model.invoke() → AIMessage(content='{"answer":"...", "confidence":0.9}')
#     → parser.invoke() → Answer(answer="...", confidence=0.9)

4 篇系列回顾

主题 核心源码
第 1 篇 Generation 与 ChatGeneration GenerationChatGeneration.set_textChatGenerationChunk.__add__merge_chat_generation_chunksChatResultLLMResult.flatten
第 2 篇 Output Parser 基类体系 BaseLLMOutputParser.parse_resultBaseOutputParser.parseBaseTransformOutputParser._transformBaseCumulativeTransformOutputParser 累积+diff
第 3 篇 常用 Parser 全解 StrOutputParserJsonOutputParser(partial+jsonpatch)、PydanticOutputParser(model_validate)、ListOutputParser(droplastn 流式)、XMLOutputParser(_StreamingParser)
第 4 篇 Tool Parser 与完整数据流 parse_tool_callJsonOutputToolsParser(双路径+name→type)、PydanticToolsParser(name→class 映射+max_tokens 检测)、完整数据流

Outputs 系列是 ChatModel 系列的"输出端"——_generate 返回 ChatResult_stream 返回 ChatGenerationChunk,Output Parser 消费它们并产出结构化结果。加上 Messages(数据格式)和 Runnables(执行框架),四个系列一起构成了 LangChain 核心的完整图景:

Messages(数据格式)→ Runnables(执行框架)→ ChatModel(生产者)→ Outputs + Parsers(消费者)
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐