【LangChain 源码解析五:Output Parser】
本系列共 4 部分,从输出数据结构到 Output Parser 全家族,完整拆解 LangChain 的"模型输出 → 结构化结果"链路。
本系列共 4 部分,从输出数据结构到 Output Parser 全家族,完整拆解 LangChain 的"模型输出 → 结构化结果"链路。
- 第 1 部分:5 分钟上手——Generation 与 ChatGeneration
- 第 2 部分:Output Parser 基类体系
- 第 3 部分:常用 Parser 全解
- 第 4 部分:Tool Parser 与完整数据流
LangChain Outputs 深度解析(一):5 分钟上手——Generation 与 ChatGeneration
一个能跑的例子
from langchain_core.outputs import (
Generation, GenerationChunk,
ChatGeneration, ChatGenerationChunk,
ChatResult, LLMResult,
)
from langchain_core.messages import AIMessage, AIMessageChunk
# 1. 最基础的 Generation
gen = Generation(text="Hello world", generation_info={"finish_reason": "stop"})
print(gen.text) # Hello world
print(gen.generation_info) # {'finish_reason': 'stop'}
# 2. ChatGeneration —— text 自动从 message 提取
chat_gen = ChatGeneration(message=AIMessage(content="你好!"))
print(chat_gen.text) # 你好! ← 自动设置
print(chat_gen.message.content) # 你好!
# 3. ChatResult —— _generate 的返回值
result = ChatResult(
generations=[chat_gen],
llm_output={"token_usage": {"total_tokens": 10}},
)
print(len(result.generations)) # 1
print(result.llm_output) # {'token_usage': {'total_tokens': 10}}
# 4. LLMResult —— 二维结构
llm_result = LLMResult(
generations=[[chat_gen], [Generation(text="another")]],
llm_output={"model": "fake"},
)
print(len(llm_result.generations)) # 2(两个 prompt 的结果)
print(len(llm_result.generations[0])) # 1(第一个 prompt 的候选数)
# 5. GenerationChunk 拼接
c1 = GenerationChunk(text="Hel", generation_info={"a": 1})
c2 = GenerationChunk(text="lo", generation_info={"b": 2})
merged = c1 + c2
print(merged.text) # Hello
print(merged.generation_info) # {'a': 1, 'b': 2}
# 6. ChatGenerationChunk 拼接
cc1 = ChatGenerationChunk(message=AIMessageChunk(content="你"))
cc2 = ChatGenerationChunk(message=AIMessageChunk(content="好"))
merged_chat = cc1 + cc2
print(merged_chat.text) # 你好
print(merged_chat.message.content) # 你好
不需要 API key,不需要网络。这些都是纯数据结构,可以直接构造和操作。
Generation:最基础的输出单元
Generation 是所有输出类型的根基。源码只有 3 个字段(generation.py:11):
# outputs/generation.py:11-53
class Generation(Serializable):
"""A single text generation output."""
text: str
"""Generated text output."""
generation_info: dict[str, Any] | None = None
"""Raw response from the provider.
May include things like the reason for finishing or token log probabilities.
"""
type: Literal["Generation"] = "Generation"
"""Type is used exclusively for serialization purposes."""
| 字段 | 类型 | 作用 |
|---|---|---|
text |
str |
生成的文本内容 |
generation_info |
dict | None |
供应商特有的元数据(finish_reason、log_probs 等) |
type |
Literal |
序列化标识,固定为 "Generation" |
Generation 继承自 Serializable(即 Pydantic BaseModel + LangChain 序列化协议),所以可以 model_dump() / model_dump_json()。
GenerationChunk:可拼接的流式 Generation
GenerationChunk 继承 Generation,唯一新增的是 __add__ 方法(generation.py:55):
# outputs/generation.py:55-81
class GenerationChunk(Generation):
def __add__(self, other: GenerationChunk) -> GenerationChunk:
if isinstance(other, GenerationChunk):
generation_info = merge_dicts(
self.generation_info or {},
other.generation_info or {},
)
return GenerationChunk(
text=self.text + other.text, # ← text 直接拼接
generation_info=generation_info or None, # ← generation_info 用 merge_dicts 合并
)
raise TypeError(...)
关键点:
text直接字符串拼接generation_info用merge_dicts深度合并(来自langchain_core.utils._merge)- 只接受同类型的
GenerationChunk,否则抛TypeError
ChatGeneration:消息级别的输出
ChatGeneration 继承 Generation,新增 message 字段(chat_generation.py:17):
# outputs/chat_generation.py:17-73
class ChatGeneration(Generation):
text: str = "" # ← 默认空字符串,不需要手动设置
message: BaseMessage
type: Literal["ChatGeneration"] = "ChatGeneration"
@model_validator(mode="after")
def set_text(self) -> Self:
"""从 message.content 自动提取 text。"""
text = ""
if isinstance(self.message.content, str):
text = self.message.content # ← 字符串直接用
elif isinstance(self.message.content, list):
for block in self.message.content:
if isinstance(block, str):
text = block # ← 列表中第一个字符串
break
if isinstance(block, dict) and "text" in block:
block_type = block.get("type")
if block_type is None or block_type == "text":
text = block["text"] # ← 第一个 text 类型的 block
break
self.text = text
return self
set_text 是一个 Pydantic model_validator,在对象创建后自动执行。它的逻辑:
message.content 是什么类型?
│
├─ str → 直接用
│
└─ list → 遍历 content blocks
│
├─ 遇到 str → 用它,break
│
└─ 遇到 dict 且有 "text" 键
│
├─ type 为 None 或 "text" → 用 block["text"],break
│
└─ type 为 "thinking"/"reasoning" 等 → 跳过
为什么要跳过 thinking/reasoning block?因为 Anthropic 等模型的 extended thinking 返回的 content 是 [{"type": "thinking", ...}, {"type": "text", ...}],text 字段应该取实际回复,而不是推理过程。
ChatGenerationChunk:流式 Chat 输出
ChatGenerationChunk 继承 ChatGeneration,message 类型收窄为 BaseMessageChunk(chat_generation.py:76):
# outputs/chat_generation.py:76-127
class ChatGenerationChunk(ChatGeneration):
message: BaseMessageChunk # ← 类型从 BaseMessage 收窄为 BaseMessageChunk
type: Literal["ChatGenerationChunk"] = "ChatGenerationChunk"
def __add__(
self, other: ChatGenerationChunk | list[ChatGenerationChunk]
) -> ChatGenerationChunk:
if isinstance(other, ChatGenerationChunk): # ← 单个 chunk
generation_info = merge_dicts(
self.generation_info or {},
other.generation_info or {},
)
return ChatGenerationChunk(
message=self.message + other.message, # ← 委托给 BaseMessageChunk.__add__
generation_info=generation_info or None,
)
if isinstance(other, list) and all( # ← chunk 列表
isinstance(x, ChatGenerationChunk) for x in other
):
generation_info = merge_dicts(
self.generation_info or {},
*[chunk.generation_info for chunk in other if chunk.generation_info],
)
return ChatGenerationChunk(
message=self.message + [chunk.message for chunk in other],
generation_info=generation_info or None,
)
raise TypeError(...)
与 GenerationChunk 的区别:
- 拼接 message 而非 text:
text由set_textvalidator 自动从合并后的message提取 - 支持列表拼接:
chunk + [chunk1, chunk2, ...]一次性合并多个(message.__add__也支持列表) - message 类型:必须是
BaseMessageChunk(有__add__能力的消息)
merge_chat_generation_chunks:便捷合并函数
merge_chat_generation_chunks(chat_generation.py:129-146)是合并流式 chunk 列表的便捷函数:
# outputs/chat_generation.py:129-146
def merge_chat_generation_chunks(
chunks: list[ChatGenerationChunk],
) -> ChatGenerationChunk | None:
if not chunks: # 空列表 → None
return None
if len(chunks) == 1: # 单个 → 直接返回
return chunks[0]
return chunks[0] + chunks[1:] # 多个 → 利用列表拼接一次性合并
这个函数在 ChatModel 的 generate_from_stream(chat_models.py)中被调用——把 _stream 产生的所有 chunk 合并成最终的 ChatGeneration。
_stream() yield chunk1, chunk2, chunk3, ...
│
▼
收集到 list[ChatGenerationChunk]
│
▼
merge_chat_generation_chunks([chunk1, chunk2, chunk3])
│
▼
chunk1 + [chunk2, chunk3] → 最终的 ChatGenerationChunk
│
▼
包装成 ChatResult → 作为 generate() 的返回值
ChatResult:_generate 的返回值
ChatResult 是 _generate 方法的返回类型(chat_result.py:8):
# outputs/chat_result.py:8-37
class ChatResult(BaseModel):
generations: list[ChatGeneration]
"""List of the chat generations.
Generations is a list to allow for multiple candidate generations
for a single input prompt.
"""
llm_output: dict | None = None
"""For arbitrary LLM provider specific output."""
| 字段 | 类型 | 作用 |
|---|---|---|
generations |
list[ChatGeneration] |
候选输出列表(通常只有 1 个,n>1 时有多个) |
llm_output |
dict | None |
供应商特有信息(token 用量、模型版本等) |
注意:ChatResult 继承的是 pydantic.BaseModel,不是 Serializable。它是内部数据传递用的,不需要 LangChain 序列化协议。
用户通常不直接接触 ChatResult,它在 invoke 的调用栈中被拆解:
invoke()
→ generate()
→ _generate() 返回 ChatResult
→ ChatResult.generations[0].message ← 提取第一个候选的 AIMessage
→ 返回 AIMessage 给用户
LLMResult:二维容器
LLMResult 是更高层的容器(llm_result.py:15),支持多 prompt 多候选的二维结构:
# outputs/llm_result.py:15-112
class LLMResult(BaseModel):
generations: list[
list[Generation | ChatGeneration | GenerationChunk | ChatGenerationChunk]
]
"""第一维:不同输入 prompt
第二维:同一 prompt 的不同候选"""
llm_output: dict | None = None
run: list[RunInfo] | None = None # ← 已废弃
type: Literal["LLMResult"] = "LLMResult"
二维结构示意:
LLMResult.generations = [
[ChatGeneration("回答1a"), ChatGeneration("回答1b")], ← prompt 1 的候选
[ChatGeneration("回答2a")], ← prompt 2 的候选
]
flatten():展平为一维
# outputs/llm_result.py:59-93
def flatten(self) -> list[LLMResult]:
llm_results = []
for i, gen_list in enumerate(self.generations):
if i == 0:
llm_results.append(
LLMResult(generations=[gen_list], llm_output=self.llm_output)
)
else:
if self.llm_output is not None:
llm_output = deepcopy(self.llm_output)
llm_output["token_usage"] = {} # ← 避免重复计算 token
else:
llm_output = None
llm_results.append(
LLMResult(generations=[gen_list], llm_output=llm_output)
)
return llm_results
注意第 74 行的注释 Avoid double counting tokens in OpenAICallback:只有第一个 LLMResult 保留 token_usage,其余的清空,避免在回调中重复统计。
eq:忽略 run 元数据
# outputs/llm_result.py:95-111
def __eq__(self, other: object) -> bool:
if not isinstance(other, LLMResult):
return NotImplemented
return (
self.generations == other.generations
and self.llm_output == other.llm_output
# ← 故意不比较 self.run,因为 run_id 每次不同
)
__hash__ = None # ← 不可哈希,因为包含 list
RunInfo:即将废弃
RunInfo(run_info.py:10)只有一个字段:
# outputs/run_info.py:10-22
class RunInfo(BaseModel):
"""Defined for backwards compatibility with older versions.
This model will likely be deprecated in the future.
"""
run_id: UUID
现在 run_id 信息可以通过回调或 astream_events API 获取,RunInfo 已经不再需要。
继承关系总览
Serializable (Pydantic BaseModel + LC 序列化)
│
├── Generation
│ │ text: str
│ │ generation_info: dict | None
│ │
│ ├── GenerationChunk
│ │ + __add__(GenerationChunk) → GenerationChunk
│ │
│ └── ChatGeneration
│ │ message: BaseMessage
│ │ + set_text validator(自动从 message 提取 text)
│ │
│ └── ChatGenerationChunk
│ message: BaseMessageChunk(收窄类型)
│ + __add__(ChatGenerationChunk | list) → ChatGenerationChunk
BaseModel (纯 Pydantic)
│
├── ChatResult
│ generations: list[ChatGeneration]
│ llm_output: dict | None
│
├── LLMResult
│ generations: list[list[Generation]](二维)
│ llm_output: dict | None
│ run: list[RunInfo] | None
│
└── RunInfo
run_id: UUID
与 ChatModel 的关系
这些数据结构在 ChatModel 调用链中的位置:
用户调用 model.invoke(messages)
│
▼
_convert_input() → PromptValue → messages: list[BaseMessage]
│
│ ╭─ 非流式路径 ──────────────────────────╮
├──┤ _generate(messages) → ChatResult │
│ │ .generations[0].message → AIMessage │
│ ╰────────────────────────────────────────╯
│
│ ╭─ 流式路径 ─────────────────────────────────────────────────╮
└──┤ _stream(messages) → Iterator[ChatGenerationChunk] │
│ yield ChatGenerationChunk(message=AIMessageChunk(...)) │
│ │
│ merge_chat_generation_chunks(all_chunks) │
│ → ChatGenerationChunk → ChatResult │
╰────────────────────────────────────────────────────────────╯
│
▼
返回 AIMessage(invoke)或 Iterator[AIMessageChunk](stream)
小结
本篇梳理了 LangChain 输出层的 7 个核心类:
- Generation:
text+generation_info,最基础的输出单元 - GenerationChunk:可拼接的流式 Generation,
text直接拼、generation_info深度合并 - ChatGeneration:新增
message: BaseMessage,set_textvalidator 自动从 message 提取 text - ChatGenerationChunk:
message: BaseMessageChunk,支持单个和列表拼接 - ChatResult:
_generate的返回值,generations+llm_output - LLMResult:二维容器,
flatten()展平且避免重复计 token - RunInfo:仅
run_id,即将废弃
下一篇问题:ChatModel 返回了 ChatResult/AIMessage,但应用层通常需要 JSON、Pydantic 对象、列表等结构化数据。谁来做这个转换?Output Parser 体系是怎么设计的?
LangChain Outputs 深度解析(二):Output Parser 基类体系
一个能跑的例子
from langchain_core.output_parsers import StrOutputParser
from langchain_core.messages import AIMessage
parser = StrOutputParser()
# 直接解析 AIMessage
result = parser.invoke(AIMessage(content="Hello, World!"))
print(result) # Hello, World!
print(type(result)) # <class 'str'>
# 解析纯字符串
result2 = parser.invoke("Hello again!")
print(result2) # Hello again!
# Parser 也是 Runnable!可以用 | 组合
from langchain_core.language_models.fake_chat_models import FakeListChatModel
model = FakeListChatModel(responses=["I am a fake model"])
chain = model | StrOutputParser()
print(chain.invoke("hello")) # I am a fake model
print(type(chain.invoke("hello"))) # <class 'str'> ← 不再是 AIMessage
# 流式也支持
for chunk in chain.stream("hello"):
print(repr(chunk), end=" ")
# 'I' ' ' 'a' 'm' ' ' 'a' ' ' 'f' 'a' 'k' 'e' ' ' 'm' 'o' 'd' 'e' 'l'
StrOutputParser 把 AIMessage 变成 str——这是最简单的 Output Parser。但它背后的基类体系支撑着所有复杂的解析逻辑。
5 层继承链总览
Output Parser 的基类层层递进,每层增加一种能力:
BaseLLMOutputParser (ABC) ← 最底层:parse_result 抽象方法
│
├── BaseGenerationOutputParser ← + RunnableSerializable,invoke 路由 str/BaseMessage
│ (较少直接使用)
│
└── BaseOutputParser ← + RunnableSerializable,parse_result → parse(text)
│
└── BaseTransformOutputParser ← + 流式支持(_transform)
│
└── BaseCumulativeTransformOutputParser ← + 累积式流式(diff 模式)
后面两层(BaseGenerationOutputParser 和 BaseOutputParser)都继承了 BaseLLMOutputParser,但它们是平行分支,不是继承关系。大多数 Parser 继承自 BaseOutputParser 这条线。
第 1 层:BaseLLMOutputParser
最底层的抽象,只定义了 parse_result 接口(base.py:30):
# output_parsers/base.py:30-67
class BaseLLMOutputParser(ABC, Generic[T]):
"""Abstract base class for parsing the outputs of a model."""
@abstractmethod
def parse_result(self, result: list[Generation], *, partial: bool = False) -> T:
"""Parse a list of candidate model Generation objects into a specific format."""
async def aparse_result(
self, result: list[Generation], *, partial: bool = False
) -> T:
return await run_in_executor(None, self.parse_result, result, partial=partial)
| 方法 | 作用 |
|---|---|
parse_result(list[Generation]) |
核心抽象——从候选 Generation 列表中解析出目标类型 T |
aparse_result |
异步版,默认用线程池包装同步方法 |
注意:parse_result 接收的是 list[Generation]——候选列表。但实际上绝大多数 Parser 只用 result[0]。
第 2 层(分支 A):BaseGenerationOutputParser
在 BaseLLMOutputParser 基础上混入 RunnableSerializable(base.py:70):
# output_parsers/base.py:70-133
class BaseGenerationOutputParser(
BaseLLMOutputParser, RunnableSerializable[LanguageModelOutput, T]
):
@property
def InputType(self) -> Any:
return str | AnyMessage # ← 接受 str 或 BaseMessage
def invoke(self, input: str | BaseMessage, config=None, **kwargs) -> T:
if isinstance(input, BaseMessage):
return self._call_with_config(
lambda inner_input: self.parse_result(
[ChatGeneration(message=inner_input)] # ← BaseMessage 包装成 ChatGeneration
),
input, config, run_type="parser",
)
return self._call_with_config(
lambda inner_input: self.parse_result(
[Generation(text=inner_input)] # ← str 包装成 Generation
),
input, config, run_type="parser",
)
它的 invoke 做了输入路由:
BaseMessage→ 包装成ChatGeneration,调用parse_resultstr→ 包装成Generation,调用parse_result
这使得 Parser 可以同时处理 ChatModel 的输出(AIMessage)和 LLM 的输出(str)。
第 2 层(分支 B):BaseOutputParser——核心基类
大多数 Parser 继承自这条线(base.py:136)。它和 BaseGenerationOutputParser 是平行分支,但增加了 parse(text) 抽象方法:
# output_parsers/base.py:136-349
class BaseOutputParser(
BaseLLMOutputParser, RunnableSerializable[LanguageModelOutput, T]
):
@property
def InputType(self) -> Any:
return str | AnyMessage
# ──── invoke:和 BaseGenerationOutputParser 相同的路由逻辑 ────
def invoke(self, input: str | BaseMessage, config=None, **kwargs) -> T:
if isinstance(input, BaseMessage):
return self._call_with_config(
lambda inner_input: self.parse_result(
[ChatGeneration(message=inner_input)]
),
input, config, run_type="parser",
)
return self._call_with_config(
lambda inner_input: self.parse_result(
[Generation(text=inner_input)]
),
input, config, run_type="parser",
)
# ──── parse_result → parse(text) 的桥接 ────
def parse_result(self, result: list[Generation], *, partial: bool = False) -> T:
return self.parse(result[0].text) # ← 取第一个候选的 text,交给 parse
@abstractmethod
def parse(self, text: str) -> T:
"""Parse a single string model output into some structure."""
# ──── 格式指令 ────
def get_format_instructions(self) -> str:
"""Instructions on how the LLM output should be formatted."""
raise NotImplementedError
BaseOutputParser vs BaseGenerationOutputParser 的关键区别:
| 对比项 | BaseGenerationOutputParser |
BaseOutputParser |
|---|---|---|
| 需要实现 | parse_result(list[Generation]) |
parse(text: str) |
| 输入粒度 | 能访问完整的 Generation 对象 |
只看到文本 |
| 适用场景 | 需要 generation_info 或 message |
只需文本解析 |
| 额外能力 | 无 | get_format_instructions() |
绝大多数 Parser 只需要文本就够了,所以继承 BaseOutputParser + 实现 parse(text) 是最常见的模式。
parse_result 的桥接逻辑
invoke(AIMessage("hello"))
│
▼
包装成 [ChatGeneration(message=AIMessage("hello"))]
│
▼
parse_result([ChatGeneration(...)])
│
▼
result[0].text ← ChatGeneration.text 由 set_text validator 从 message 提取
│
▼
parse("hello") ← 子类只需实现这个
│
▼
返回 T
这就是 Output Parser 和第 1 篇的连接点——ChatGeneration.set_text validator 把 message.content 提取为 text,然后 BaseOutputParser.parse_result 拿到 text 交给 parse。
第 3 层:BaseTransformOutputParser——流式支持
在 BaseOutputParser 基础上增加流式处理能力(transform.py:28):
# output_parsers/transform.py:28-96
class BaseTransformOutputParser(BaseOutputParser[T]):
"""Base class for an output parser that can handle streaming input."""
def _transform(self, input: Iterator[str | BaseMessage]) -> Iterator[T]:
for chunk in input:
if isinstance(chunk, BaseMessage):
yield self.parse_result([ChatGeneration(message=chunk)])
else:
yield self.parse_result([Generation(text=chunk)])
def transform(
self, input: Iterator[str | BaseMessage], config=None, **kwargs
) -> Iterator[T]:
yield from self._transform_stream_with_config(
input, self._transform, config, run_type="parser"
)
默认的 _transform 实现非常简单:逐 chunk 独立解析。每收到一个 chunk,就调用一次 parse_result。
stream 输入: "He" "llo" " Wo" "rld"
│ │ │ │
▼ ▼ ▼ ▼
_transform: parse("He") parse("llo") parse(" Wo") parse("rld")
│ │ │ │
▼ ▼ ▼ ▼
stream 输出: "He" "llo" " Wo" "rld" ← 对 StrOutputParser 来说,直接透传
这对 StrOutputParser 这种"透传"Parser 来说完美——每个 chunk 独立处理,不需要等待完整输出。
第 4 层:BaseCumulativeTransformOutputParser——累积式流式
对于 JSON、Pydantic 等需要完整结构才能解析的 Parser,逐 chunk 独立解析行不通。BaseCumulativeTransformOutputParser 用累积后重新解析的策略(transform.py:99):
# output_parsers/transform.py:99-175
class BaseCumulativeTransformOutputParser(BaseTransformOutputParser[T]):
diff: bool = False
"""In streaming mode, whether to yield diffs between the previous
and current parsed output."""
def _diff(self, prev: T | None, next: T) -> T:
"""Convert parsed outputs into a diff format."""
raise NotImplementedError
def _transform(self, input: Iterator[str | BaseMessage]) -> Iterator[Any]:
prev_parsed = None
acc_gen: GenerationChunk | ChatGenerationChunk | None = None # ← 累积器
for chunk in input:
# 1. 构造 chunk_gen
if isinstance(chunk, BaseMessageChunk):
chunk_gen = ChatGenerationChunk(message=chunk)
elif isinstance(chunk, BaseMessage):
chunk_gen = ChatGenerationChunk(
message=BaseMessageChunk(**chunk.model_dump())
)
else:
chunk_gen = GenerationChunk(text=chunk)
# 2. 累积
acc_gen = chunk_gen if acc_gen is None else acc_gen + chunk_gen
# 3. 尝试 partial 解析
parsed = self.parse_result([acc_gen], partial=True)
# 4. 有新结果且不同于上一次 → yield
if parsed is not None and parsed != prev_parsed:
if self.diff:
yield self._diff(prev_parsed, parsed) # ← diff 模式
else:
yield parsed # ← 完整模式
prev_parsed = parsed
核心流程:
stream 输入: '{"na' 'me":' '"Al' 'ice"}'
│ │ │ │
▼ ▼ ▼ ▼
累积: '{"na' '{"name":' '{"name":"Al' '{"name":"Alice"}'
│ │ │ │
▼ ▼ ▼ ▼
partial 解析: None None {"name":"Al"} {"name":"Alice"}
│ │
▼ ▼
yield(非 diff): {"name":"Al"} {"name":"Alice"}
yield(diff 模式): [add /name "Al"] [replace /name "Alice"]
| 策略 | BaseTransformOutputParser |
BaseCumulativeTransformOutputParser |
|---|---|---|
| 累积 | 不累积,逐 chunk 独立 | 累积所有 chunk 后重新解析 |
| partial | 不用 | parse_result(partial=True) |
| diff | 不支持 | self.diff=True 时调用 _diff |
| 适用 | 文本透传(StrOutputParser) | 结构化输出(JSON / Pydantic) |
| 去重 | 无 | parsed != prev_parsed 防止重复 yield |
Parser 也是 Runnable!
所有 Output Parser 都继承了 RunnableSerializable,这意味着它们拥有 Runnable 的全套能力:
parser = StrOutputParser()
# invoke
parser.invoke(AIMessage(content="hello")) # "hello"
# stream / transform
for chunk in parser.stream(AIMessage(content="hello")):
print(chunk)
# batch
parser.batch([AIMessage(content="a"), AIMessage(content="b")]) # ["a", "b"]
# | 管道
chain = some_model | parser # 等价于 RunnableSequence(some_model, parser)
# 异步
import asyncio
asyncio.run(parser.ainvoke(AIMessage(content="hello")))
这也是为什么 model | StrOutputParser() 可以工作——| 调用的是 Runnable 的 __or__,把两个 Runnable 组合成 RunnableSequence。
invoke 的完整调用栈
以 chain = model | StrOutputParser() 为例:
chain.invoke("hello")
│
▼
RunnableSequence.invoke("hello")
│
├── model.invoke("hello")
│ → AIMessage(content="I am fake")
│
▼
StrOutputParser.invoke(AIMessage(content="I am fake"))
│
▼
_call_with_config(
lambda msg: parse_result([ChatGeneration(message=msg)]),
AIMessage(...),
run_type="parser"
)
│
▼
parse_result([ChatGeneration(message=AIMessage("I am fake"))])
│
▼
parse(result[0].text) ← text 由 set_text validator 从 message 提取
│
▼
parse("I am fake")
│
▼
return "I am fake" ← StrOutputParser.parse 直接返回 text
小结
本篇拆解了 Output Parser 的 5 层基类体系:
- BaseLLMOutputParser:最底层抽象,定义
parse_result(list[Generation]) → T - BaseGenerationOutputParser:+ Runnable,
invoke路由 str/BaseMessage,适合需要访问完整 Generation 的场景 - BaseOutputParser:+ Runnable +
parse(text),大多数 Parser 的基类,通过parse_result → parse桥接 - BaseTransformOutputParser:+ 流式,逐 chunk 独立解析
- BaseCumulativeTransformOutputParser:+ 累积式流式,支持 partial 解析和 diff 模式
下一篇问题:有了基类体系,实际的 Parser 是怎么实现的?StrOutputParser、JsonOutputParser、PydanticOutputParser、ListOutputParser、XMLOutputParser 分别解决什么问题?它们给模型的 format_instructions 有什么不同?
LangChain Outputs 深度解析(三):常用 Parser 全解
一个能跑的例子
from langchain_core.output_parsers import (
StrOutputParser,
JsonOutputParser,
PydanticOutputParser,
CommaSeparatedListOutputParser,
)
from langchain_core.messages import AIMessage
from pydantic import BaseModel, Field
# 同一个"原始输入"
raw_json = '{"name": "Alice", "age": 30}'
# 1. StrOutputParser → 原样返回字符串
str_parser = StrOutputParser()
print(str_parser.invoke(raw_json))
# '{"name": "Alice", "age": 30}'
# 2. JsonOutputParser → 解析为 dict
json_parser = JsonOutputParser()
print(json_parser.invoke(raw_json))
# {'name': 'Alice', 'age': 30}
# 3. PydanticOutputParser → 解析为 Pydantic 对象
class Person(BaseModel):
name: str = Field(description="人名")
age: int = Field(description="年龄")
pydantic_parser = PydanticOutputParser(pydantic_object=Person)
person = pydantic_parser.invoke(raw_json)
print(person) # name='Alice' age=30
print(person.name) # Alice
# 4. CommaSeparatedListOutputParser → 解析为列表
csv_parser = CommaSeparatedListOutputParser()
print(csv_parser.invoke("apple, banana, cherry"))
# ['apple', 'banana', 'cherry']
# 查看各 Parser 给模型的格式指令
print("--- JsonOutputParser ---")
print(json_parser.get_format_instructions())
print("--- PydanticOutputParser ---")
print(pydantic_parser.get_format_instructions())
print("--- CommaSeparatedListOutputParser ---")
print(csv_parser.get_format_instructions())
4 种 Parser,同样的输入,不同的输出类型。接下来逐个拆解。
StrOutputParser:最简单的 Parser
源码只有一个方法(string.py:8):
# output_parsers/string.py:8-63
class StrOutputParser(BaseTransformOutputParser[str]):
"""Extract text content from model outputs as a string."""
def parse(self, text: str) -> str:
return text # ← 直接返回,就这么简单
继承自 BaseTransformOutputParser,所以天然支持流式。流式时的行为:
model.stream("hello") → AIMessageChunk("I") → AIMessageChunk(" am") → ...
│
▼
StrOutputParser._transform:
parse_result([ChatGeneration(msg)]) → result[0].text → "I"
parse_result([ChatGeneration(msg)]) → result[0].text → " am"
│
▼
输出: "I" " am" " fake" ... ← 逐 chunk 透传文本
StrOutputParser 是使用频率最高的 Parser——几乎所有"只要文本"的场景都用它。
JsonOutputParser:JSON 解析
继承自 BaseCumulativeTransformOutputParser,用累积式流式解析 JSON(json.py:31):
# output_parsers/json.py:31-131
class JsonOutputParser(BaseCumulativeTransformOutputParser[Any]):
pydantic_object: type[TBaseModel] | None = None
"""Optional Pydantic object for validation."""
def _diff(self, prev: Any | None, next: Any) -> Any:
return jsonpatch.make_patch(prev, next).patch # ← JSON Patch 格式
def parse_result(self, result: list[Generation], *, partial: bool = False) -> Any:
text = result[0].text.strip()
if partial:
try:
return parse_json_markdown(text) # ← partial 模式:解析失败返回 None
except JSONDecodeError:
return None
else:
try:
return parse_json_markdown(text) # ← 完整模式:解析失败抛异常
except JSONDecodeError as e:
raise OutputParserException(f"Invalid json output: {text}") from e
def parse(self, text: str) -> Any:
return self.parse_result([Generation(text=text)])
关键点:
parse_json_markdown:能处理json代码块(`` ```json {…} `````)或裸 JSON- partial 模式:不完整的 JSON 也尝试解析(如
{"name": "Ali→ 可能返回{"name": "Ali"}),失败返回None - diff 模式:
_diff用jsonpatch库生成 JSON Patch,描述两次解析结果之间的差异
流式解析流程
stream 输入: '```json\n' '{"na' 'me":' '"Al' 'ice",' '"age":' '30}' '\n```'
│ │ │ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼
累积后解析: None None None None {"name": {"name": {"name": {"name":
"Alice"} "Alice"} "Alice", "Alice",
"age":30} "age":30}
│ │
▼ ▼
yield(diff=False): {"name":"Alice"} {"name":"Alice","age":30}
get_format_instructions
# output_parsers/json.py:104-123
def get_format_instructions(self) -> str:
if self.pydantic_object is None:
return "Return a JSON object."
schema = dict(self._get_schema(self.pydantic_object).items())
# 移除 title 和 type(减少冗余)
reduced_schema = schema
if "title" in reduced_schema:
del reduced_schema["title"]
if "type" in reduced_schema:
del reduced_schema["type"]
schema_str = json.dumps(reduced_schema, ensure_ascii=False)
return JSON_FORMAT_INSTRUCTIONS.format(schema=schema_str)
如果提供了 pydantic_object,会把 JSON Schema 注入到格式指令中。
PydanticOutputParser:Pydantic 验证
继承自 JsonOutputParser,增加 Pydantic 验证层(pydantic.py:19):
# output_parsers/pydantic.py:19-132
class PydanticOutputParser(JsonOutputParser, Generic[TBaseModel]):
pydantic_object: type[TBaseModel] # ← 必填,不像 JsonOutputParser 可以为 None
def _parse_obj(self, obj: dict) -> TBaseModel:
try:
if issubclass(self.pydantic_object, pydantic.BaseModel):
return self.pydantic_object.model_validate(obj) # ← Pydantic v2
if issubclass(self.pydantic_object, pydantic.v1.BaseModel):
return self.pydantic_object.parse_obj(obj) # ← Pydantic v1
raise OutputParserException("Unsupported model version")
except (pydantic.ValidationError, pydantic.v1.ValidationError) as e:
raise self._parser_exception(e, obj) from e
def parse_result(self, result: list[Generation], *, partial: bool = False):
try:
json_object = super().parse_result(result) # ← 先用 JsonOutputParser 解析为 dict
return self._parse_obj(json_object) # ← 再用 Pydantic 验证
except OutputParserException:
if partial:
return None # ← partial 模式:验证失败返回 None
raise
调用链:
PydanticOutputParser.parse_result(result)
│
├── JsonOutputParser.parse_result(result) ← 第 1 步:JSON 文本 → dict
│ → parse_json_markdown(text) → {"name": "Alice", "age": 30}
│
└── _parse_obj(dict) ← 第 2 步:dict → Pydantic 对象
→ Person.model_validate({"name": "Alice", "age": 30})
→ Person(name="Alice", age=30)
format_instructions 区别
PydanticOutputParser 有自己的格式指令模板,比 JsonOutputParser 多了一个示例:
# output_parsers/pydantic.py:124-132
_PYDANTIC_FORMAT_INSTRUCTIONS = """The output should be formatted as a JSON instance \
that conforms to the JSON schema below.
As an example, for the schema {{"properties": {{"foo": ...}}}}, ...
Here is the output schema:
{schema}
ListOutputParser 家族:列表解析
ListOutputParser 是列表 Parser 的抽象基类(list.py:43),有 3 个具体实现:
# output_parsers/list.py:43-102
class ListOutputParser(BaseTransformOutputParser[list[str]]):
@abstractmethod
def parse(self, text: str) -> list[str]: ...
def parse_iter(self, text: str) -> Iterator[re.Match]:
"""增量解析——子类可选实现,用于流式。"""
raise NotImplementedError
def _transform(self, input: Iterator[str | BaseMessage]) -> Iterator[list[str]]:
buffer = ""
for chunk in input:
if isinstance(chunk, BaseMessage):
chunk_content = chunk.content
if not isinstance(chunk_content, str):
continue
buffer += chunk_content
else:
buffer += chunk
try:
done_idx = 0
for m in droplastn(self.parse_iter(buffer), 1): # ← 保留最后一个(可能不完整)
done_idx = m.end()
yield [m.group(1)] # ← 每次 yield 一个完整项
buffer = buffer[done_idx:]
except NotImplementedError:
parts = self.parse(buffer)
if len(parts) > 1:
for part in parts[:-1]:
yield [part]
buffer = parts[-1]
for part in self.parse(buffer): # ← 最后把 buffer 剩余内容解析
yield [part]
流式策略的关键是 droplastn(iter, 1)——丢掉最后一个匹配(因为可能不完整),只 yield 确定完整的项:
buffer 累积: "1. app" → "1. apple\n2. ban" → "1. apple\n2. banana\n3. cherry"
│ │ │
▼ ▼ ▼
parse_iter: [m("app")] [m("apple"), m("ban")] [m("apple"), m("banana"), m("cherry")]
droplastn(1): [] [m("apple")] [m("apple"), m("banana")]
│ │
▼ ▼
yield: ["apple"] ["banana"]
最后 flush: ["cherry"]
CommaSeparatedListOutputParser
# output_parsers/list.py:139-185
class CommaSeparatedListOutputParser(ListOutputParser):
def get_format_instructions(self) -> str:
return (
"Your response should be a list of comma separated values, "
"eg: `foo, bar, baz` or `foo,bar,baz`"
)
def parse(self, text: str) -> list[str]:
try:
reader = csv.reader(
StringIO(text), quotechar='"', delimiter=",", skipinitialspace=True
)
return [item for sublist in reader for item in sublist]
except csv.Error:
return [part.strip() for part in text.split(",")] # ← fallback
用 csv.reader 解析,能正确处理引号内的逗号(如 "hello, world", foo)。解析失败则 fallback 到简单的 split(",").
NumberedListOutputParser
# output_parsers/list.py:188-218
class NumberedListOutputParser(ListOutputParser):
pattern: str = r"\d+\.\s([^\n]+)"
def get_format_instructions(self) -> str:
return (
"Your response should be a numbered list with each item on a new line. "
"For example: \n\n1. foo\n\n2. bar\n\n3. baz"
)
def parse(self, text: str) -> list[str]:
return re.findall(self.pattern, text)
def parse_iter(self, text: str) -> Iterator[re.Match]:
return re.finditer(self.pattern, text) # ← 支持流式增量匹配
正则 \d+\.\s([^\n]+) 匹配 1. xxx 格式,捕获组 1 是内容。
MarkdownListOutputParser
# output_parsers/list.py:221-249
class MarkdownListOutputParser(ListOutputParser):
pattern: str = r"^\s*[-*]\s([^\n]+)$"
def get_format_instructions(self) -> str:
return "Your response should be a markdown list, eg: `- foo\n- bar\n- baz`"
def parse(self, text: str) -> list[str]:
return re.findall(self.pattern, text, re.MULTILINE)
def parse_iter(self, text: str) -> Iterator[re.Match]:
return re.finditer(self.pattern, text, re.MULTILINE)
正则 ^\s*[-*]\s([^\n]+)$ 匹配 - xxx 或 * xxx 格式。注意 re.MULTILINE 使 ^$ 匹配每行。
XMLOutputParser:增量 XML 解析
XMLOutputParser(xml.py:151)继承 BaseTransformOutputParser,有自己的流式实现——不用累积式,而是用 ET.XMLPullParser 做真正的增量解析:
# output_parsers/xml.py:151-285
class XMLOutputParser(BaseTransformOutputParser):
tags: list[str] | None = None
parser: Literal["defusedxml", "xml"] = "defusedxml" # ← 默认用安全的 defusedxml
def parse(self, text: str) -> dict[str, str | list[Any]]:
# 支持 ```xml ... ```代码块
match = re.search(r"```(xml)?(.*)```", text, re.DOTALL)
if match is not None:
text = match.group(2)
# 处理 encoding 声明
encoding_match = self.encoding_matcher.search(text)
if encoding_match:
text = encoding_match.group(2)
text = text.strip()
root = et.fromstring(text)
return self._root_to_dict(root)
def _transform(self, input: Iterator[str | BaseMessage]) -> Iterator[AddableDict]:
streaming_parser = _StreamingParser(self.parser)
for chunk in input:
yield from streaming_parser.parse(chunk) # ← 增量解析,有结果就 yield
streaming_parser.close()
_StreamingParser 内部机制
_StreamingParser(xml.py:42)是流式 XML 解析的核心:
# output_parsers/xml.py:42-148
class _StreamingParser:
def __init__(self, parser):
self.pull_parser = ET.XMLPullParser(["start", "end"])
self.xml_start_re = re.compile(r"<[a-zA-Z:_]")
self.current_path: list[str] = []
self.current_path_has_children = False
self.buffer = ""
self.xml_started = False
def parse(self, chunk) -> Iterator[AddableDict]:
# 1. 等待 XML 开始(跳过前导文本)
self.buffer += chunk
if not self.xml_started:
if match := self.xml_start_re.search(self.buffer):
self.buffer = self.buffer[match.start():]
self.xml_started = True
else:
return
# 2. feed 给 PullParser
self.pull_parser.feed(self.buffer)
self.buffer = ""
# 3. 读取事件
for event, elem in self.pull_parser.read_events():
if event == "start":
self.current_path.append(elem.tag)
self.current_path_has_children = False
elif event == "end":
self.current_path.pop()
if not self.current_path_has_children:
yield nested_element(self.current_path, elem) # ← 叶子节点才 yield
if self.current_path:
self.current_path_has_children = True
流式解析过程:
输入: "<root>" "<name>" "Alice" "</name>" "<age>" "30" "</age>" "</root>"
事件流:
start root → path=[root]
start name → path=[root, name]
end name → path=[root],yield {"root": [{"name": "Alice"}]}
start age → path=[root, age]
end age → path=[root],yield {"root": [{"age": "30"}]}
end root → path=[]
输出(AddableDict):
{"root": [{"name": "Alice"}]}
{"root": [{"age": "30"}]}
两个 AddableDict 相加后:
{"root": [{"name": "Alice"}, {"age": "30"}]}
AddableDict 的 __add__ 会把列表值 append 而非覆盖,使得流式输出可以自然合并。
get_format_instructions 对比表
| Parser | 给模型的格式指令 |
|---|---|
StrOutputParser |
(不提供,直接返回文本) |
JsonOutputParser |
"Return a JSON object." 或带 JSON Schema 的详细指令 |
PydanticOutputParser |
带 JSON Schema + 示例的详细指令 |
CommaSeparatedListOutputParser |
"Your response should be a list of comma separated values, eg: foo, bar, baz" |
NumberedListOutputParser |
"Your response should be a numbered list... 1. foo 2. bar 3. baz" |
MarkdownListOutputParser |
"Your response should be a markdown list, eg: - foo\n- bar\n- baz" |
XMLOutputParser |
带 tags 列表的 XML 格式说明 |
这些指令通常注入到 Prompt 中,告诉模型如何格式化输出:
# 典型用法
from langchain_core.prompts import ChatPromptTemplate
parser = PydanticOutputParser(pydantic_object=Person)
prompt = ChatPromptTemplate.from_messages([
("system", "按以下格式输出:\n{format_instructions}"),
("human", "{query}"),
])
chain = prompt | model | parser
# prompt 会把 parser.get_format_instructions() 注入到 system message 中
继承关系总览
BaseTransformOutputParser[str]
└── StrOutputParser ← parse(text) → text
BaseCumulativeTransformOutputParser[Any]
└── JsonOutputParser ← parse_json_markdown + partial
└── PydanticOutputParser ← + Pydantic model_validate
BaseTransformOutputParser[list[str]]
└── ListOutputParser (ABC) ← buffer + parse_iter 流式
├── CommaSeparatedListOutputParser ← csv.reader
├── NumberedListOutputParser ← r"\d+\.\s([^\n]+)"
└── MarkdownListOutputParser ← r"^\s*[-*]\s([^\n]+)$"
BaseTransformOutputParser
└── XMLOutputParser ← _StreamingParser + ET.XMLPullParser
小结
本篇拆解了 7 种常用 Parser 的实现:
- StrOutputParser:
parse(text) → text,最简单,流式直接透传 - JsonOutputParser:
parse_json_markdown+ 累积式流式 + jsonpatch diff - PydanticOutputParser:JsonOutputParser +
model_validate验证,支持 v1/v2 - CommaSeparatedListOutputParser:
csv.reader解析,有 fallback - NumberedListOutputParser:
\d+\.\s正则匹配,支持parse_iter流式 - MarkdownListOutputParser:
[-*]\s正则匹配,re.MULTILINE - XMLOutputParser:
ET.XMLPullParser真正增量解析,AddableDict可合并输出
下一篇问题:上面的 Parser 都是解析文本输出。但当模型返回的是 tool_calls(结构化的函数调用),该怎么解析?PydanticToolsParser 是如何把 AIMessage.tool_calls 变成 Pydantic 对象的?完整的数据流从头到尾是怎样的?
LangChain Outputs 深度解析(四):Tool Parser 与完整数据流
一个能跑的例子
from langchain_core.messages import AIMessage
from langchain_core.outputs import ChatGeneration
from langchain_core.output_parsers.openai_tools import (
JsonOutputToolsParser,
PydanticToolsParser,
)
from pydantic import BaseModel, Field
# 模拟一个带 tool_calls 的 AIMessage
ai_msg = AIMessage(
content="",
tool_calls=[
{
"name": "GetWeather",
"args": {"city": "Beijing", "unit": "celsius"},
"id": "call_001",
}
],
)
gen = ChatGeneration(message=ai_msg)
# 1. JsonOutputToolsParser → dict 列表
json_tool_parser = JsonOutputToolsParser()
result = json_tool_parser.parse_result([gen])
print(result)
# [{'type': 'GetWeather', 'args': {'city': 'Beijing', 'unit': 'celsius'}}]
# 注意:name → type(向后兼容重命名)
# 2. PydanticToolsParser → Pydantic 对象列表
class GetWeather(BaseModel):
"""获取天气信息。"""
city: str = Field(description="城市名称")
unit: str = Field(default="celsius", description="温度单位")
pydantic_tool_parser = PydanticToolsParser(tools=[GetWeather])
result2 = pydantic_tool_parser.parse_result([gen])
print(result2)
# [GetWeather(city='Beijing', unit='celsius')]
print(result2[0].city) # Beijing
# 3. first_tool_only 模式
parser_single = PydanticToolsParser(tools=[GetWeather], first_tool_only=True)
result3 = parser_single.parse_result([gen])
print(result3) # GetWeather(city='Beijing', unit='celsius') ← 不是列表
print(type(result3)) # <class '__main__.GetWeather'>
不需要 API key。Tool Parser 解析的是 AIMessage.tool_calls——结构化数据,不是自由文本。
辅助函数:parse_tool_call 和 parse_tool_calls
在 Parser 类之前,先看两个辅助函数(用于处理旧格式的 additional_kwargs["tool_calls"])。
parse_tool_call
# output_parsers/openai_tools.py:27-78
def parse_tool_call(
raw_tool_call: dict[str, Any],
*,
partial: bool = False,
strict: bool = False,
return_id: bool = True,
) -> dict[str, Any] | None:
if "function" not in raw_tool_call:
return None
arguments = raw_tool_call["function"]["arguments"]
if partial:
try:
function_args = parse_partial_json(arguments, strict=strict)
except (JSONDecodeError, TypeError):
return None
elif not arguments: # ← 无参工具
function_args = {}
else:
try:
function_args = json.loads(arguments, strict=strict)
except JSONDecodeError as e:
raise OutputParserException(...) from e
parsed = {
"name": raw_tool_call["function"]["name"] or "",
"args": function_args or {},
}
if return_id:
parsed["id"] = raw_tool_call.get("id")
parsed = create_tool_call(**parsed) # ← 标准化为 ToolCall TypedDict
return parsed
这个函数处理的是 OpenAI 原始格式的 tool call:
# OpenAI 原始格式
{
"id": "call_001",
"function": {
"name": "GetWeather",
"arguments": '{"city": "Beijing"}' # ← 注意:arguments 是 JSON 字符串
}
}
parse_tool_call 把它转换为 LangChain 标准格式:
# LangChain 标准格式
{
"name": "GetWeather",
"args": {"city": "Beijing"}, # ← 已解析为 dict
"id": "call_001",
}
parse_tool_calls
# output_parsers/openai_tools.py:102-137
def parse_tool_calls(
raw_tool_calls: list[dict],
*, partial: bool = False, strict: bool = False, return_id: bool = True,
) -> list[dict[str, Any]]:
final_tools = []
exceptions = []
for tool_call in raw_tool_calls:
try:
parsed = parse_tool_call(tool_call, partial=partial, strict=strict, return_id=return_id)
if parsed:
final_tools.append(parsed)
except OutputParserException as e:
exceptions.append(str(e)) # ← 收集所有错误
continue
if exceptions:
raise OutputParserException("\n\n".join(exceptions)) # ← 汇总抛出
return final_tools
批量版本。注意先收集所有错误再抛出,而不是遇到第一个错误就停。
JsonOutputToolsParser
JsonOutputToolsParser(openai_tools.py:140)是 Tool Parser 的基础类:
# output_parsers/openai_tools.py:140-222
class JsonOutputToolsParser(BaseCumulativeTransformOutputParser[Any]):
strict: bool = False
return_id: bool = False
first_tool_only: bool = False
def parse_result(self, result: list[Generation], *, partial: bool = False) -> Any:
generation = result[0]
if not isinstance(generation, ChatGeneration):
raise OutputParserException("This output parser can only be used with a chat generation.")
message = generation.message
# ──── 新格式:直接从 message.tool_calls 取 ────
if isinstance(message, AIMessage) and message.tool_calls:
tool_calls = [dict(tc) for tc in message.tool_calls]
for tool_call in tool_calls:
if not self.return_id:
_ = tool_call.pop("id")
# ──── 旧格式:从 additional_kwargs["tool_calls"] 取 ────
else:
try:
raw_tool_calls = copy.deepcopy(message.additional_kwargs["tool_calls"])
except KeyError:
return []
tool_calls = parse_tool_calls(
raw_tool_calls, partial=partial, strict=self.strict, return_id=self.return_id,
)
# ──── 向后兼容:name → type ────
for tc in tool_calls:
tc["type"] = tc.pop("name")
if self.first_tool_only:
return tool_calls[0] if tool_calls else None
return tool_calls
两个关键设计:
1. 双路径取 tool_calls
AIMessage
│
├── message.tool_calls 有值? ← 新路径(推荐)
│ → 直接使用,已经是标准格式
│
└── message.additional_kwargs["tool_calls"] ← 旧路径(向后兼容)
→ 需要 parse_tool_calls() 解析 JSON 字符串
新版 LangChain 的 AIMessage 在构造时就已经解析好了 tool_calls。旧版或缓存的消息可能只有 additional_kwargs["tool_calls"]。
2. name → type 重命名
for tc in tool_calls:
tc["type"] = tc.pop("name")
# {"name": "GetWeather", "args": {...}} → {"type": "GetWeather", "args": {...}}
这是向后兼容——早期版本的 Tool Parser 用 type 而非 name,为了不破坏下游代码,保留了这个重命名。
JsonOutputKeyToolsParser
继承 JsonOutputToolsParser,增加按 key_name 过滤的能力(openai_tools.py:225):
# output_parsers/openai_tools.py:225-295
class JsonOutputKeyToolsParser(JsonOutputToolsParser):
key_name: str
"""The type of tools to return."""
def parse_result(self, result, *, partial=False):
# ... 获取 parsed_tool_calls(和父类相同)...
# 按 key_name 过滤
if self.first_tool_only:
parsed_result = list(
filter(lambda x: x["type"] == self.key_name, parsed_tool_calls)
)
single_result = parsed_result[0] if parsed_result else None
if self.return_id:
return single_result
if single_result:
return single_result["args"] # ← 不返回 id 时,直接返回 args
return None
return (
[res for res in parsed_tool_calls if res["type"] == self.key_name]
if self.return_id
else [res["args"] for res in parsed_tool_calls if res["type"] == self.key_name]
)
使用场景:当模型可能返回多种 tool call,你只关心某一种时:
parser = JsonOutputKeyToolsParser(key_name="GetWeather")
# 只返回 name=="GetWeather" 的 tool calls
PydanticToolsParser
继承 JsonOutputToolsParser,把 dict 转为 Pydantic 对象(openai_tools.py:306):
# output_parsers/openai_tools.py:306-384
class PydanticToolsParser(JsonOutputToolsParser):
tools: list[TypeBaseModel]
def parse_result(self, result, *, partial=False):
# 1. 调用父类获取 JSON tool calls
json_results = super().parse_result(result, partial=partial)
if not json_results:
return None if self.first_tool_only else []
json_results = [json_results] if self.first_tool_only else json_results
# 2. 构建 name → Pydantic class 映射
name_dict_v2 = {
tool.model_config.get("title") or tool.__name__: tool
for tool in self.tools
if is_pydantic_v2_subclass(tool)
}
name_dict_v1 = {
tool.__name__: tool
for tool in self.tools
if is_pydantic_v1_subclass(tool)
}
name_dict = {**name_dict_v2, **name_dict_v1}
# 3. 逐个实例化
pydantic_objects = []
for res in json_results:
if not isinstance(res["args"], dict):
if partial: continue
raise ValueError(f"Tool arguments must be specified as a dict, received: {res['args']}")
try:
tool = name_dict[res["type"]] # ← 按 name(已重命名为 type)查找类
except KeyError as e:
raise OutputParserException(f"Unknown tool type: {res['type']!r}") from e
try:
pydantic_objects.append(tool(**res["args"])) # ← 实例化 Pydantic 对象
except (ValidationError, ValueError):
if partial: continue
# ── max_tokens 检测 ──
has_max_tokens_stop_reason = any(
generation.message.response_metadata.get("stop_reason") == "max_tokens"
for generation in result
if isinstance(generation, ChatGeneration)
)
if has_max_tokens_stop_reason:
logger.exception(
"Output parser received a `max_tokens` stop reason. "
"The output is likely incomplete—please increase `max_tokens`."
)
raise
if self.first_tool_only:
return pydantic_objects[0] if pydantic_objects else None
return pydantic_objects
完整流程:
PydanticToolsParser.parse_result([ChatGeneration(message=ai_msg)])
│
├── super().parse_result() → JsonOutputToolsParser
│ → [{"type": "GetWeather", "args": {"city": "Beijing"}}]
│
├── 构建映射:{"GetWeather": <class GetWeather>}
│
└── 实例化:GetWeather(**{"city": "Beijing"})
→ GetWeather(city="Beijing", unit="celsius")
name → class 映射的细节
# Pydantic v2:优先用 model_config["title"],其次用 __name__
name_dict_v2 = {
tool.model_config.get("title") or tool.__name__: tool
for tool in self.tools
if is_pydantic_v2_subclass(tool)
}
这意味着如果你的 Pydantic 模型配置了 title,会用 title 作为 key(而非类名)。这在 bind_tools 场景中很重要——convert_to_openai_tool 会把 Pydantic 模型的 schema title 作为 function name。
max_tokens 错误检测
has_max_tokens_stop_reason = any(
generation.message.response_metadata.get("stop_reason") == "max_tokens"
for generation in result
if isinstance(generation, ChatGeneration)
)
if has_max_tokens_stop_reason:
logger.exception(_MAX_TOKENS_ERROR)
当 Pydantic 验证失败时,Parser 会检查是否因为 max_tokens 导致输出被截断。如果是,会打印一条明确的提示日志,帮助用户排查问题。
Tool Parser 继承关系
BaseCumulativeTransformOutputParser[Any]
│
└── JsonOutputToolsParser
│ parse_result: AIMessage.tool_calls → [{"type": ..., "args": ...}]
│ name → type 重命名(向后兼容)
│
├── JsonOutputKeyToolsParser
│ + key_name 过滤
│
└── PydanticToolsParser
+ tools: list[TypeBaseModel]
+ name→class 映射 → Pydantic 实例化
+ max_tokens 错误检测
完整数据流总览
把 4 个系列串起来,看数据从用户输入到结构化结果的完整链路:
╔═══════════════════════════════════════════════════════════════════════════╗
║ 用户输入 ║
║ str / list[BaseMessage] / PromptValue ║
╚═══════════════════════════════════════════════════════════════════════════╝
│
▼
┌─── _convert_input() ───┐
│ str → StringPromptValue │
│ list → ChatPromptValue │
│ PromptValue → 直接用 │
└────────────────────────┘
│
▼
PromptValue.to_messages()
│
▼
messages: list[BaseMessage]
(Messages 系列 ✓)
│
┌───────────────┴───────────────┐
│ │
╔═════╧══════╗ ╔══════╧══════╗
║ 非流式路径 ║ ║ 流式路径 ║
╠════════════╣ ╠═════════════╣
║ _generate()║ ║ _stream() ║
║ │ ║ ║ │ ║
║ ▼ ║ ║ ▼ ║
║ ChatResult ║ ║ Iterator ║
║ .generations[0] ║ [ChatGeneration ║
║ .message ║ ║ Chunk] ║
╚═════╤══════╝ ╚══════╤══════╝
│ │
│ merge_chat_generation_chunks()
│ │
└───────────────┬───────────────┘
│
▼
ChatGeneration.message
│
▼
AIMessage
(ChatModel 系列 ✓)
│
┌──────────────┼──────────────┐
│ │ │
StrOutput JsonOutput PydanticTools
Parser Parser Parser
│ │ │
▼ ▼ ▼
str dict Pydantic 对象
(Outputs 系列 ✓)
各系列的职责
| 系列 | 层 | 核心职责 | 关键抽象 |
|---|---|---|---|
| Messages | 数据格式层 | 定义对话中的数据单元 | BaseMessage / BaseMessageChunk |
| Runnables | 执行框架层 | 提供 invoke/stream/batch + | 组合 |
Runnable / RunnableSequence |
| ChatModel | 生产者层 | 调用 LLM,生成消息 | BaseChatModel._generate / _stream |
| Outputs + Parsers | 消费者层 | 把模型输出转换为结构化结果 | Generation → OutputParser → T |
一条完整的 chain
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel
class Answer(BaseModel):
answer: str
confidence: float
parser = PydanticOutputParser(pydantic_object=Answer)
prompt = ChatPromptTemplate.from_messages([
("system", "回答用户问题。\n{format_instructions}"),
("human", "{question}"),
]).partial(format_instructions=parser.get_format_instructions())
# chain = prompt | model | parser
# 每个 | 都是 Runnable.__or__,组合成 RunnableSequence
# 数据流:
# "question" → prompt.invoke() → ChatPromptValue
# → model.invoke() → AIMessage(content='{"answer":"...", "confidence":0.9}')
# → parser.invoke() → Answer(answer="...", confidence=0.9)
4 篇系列回顾
| 篇 | 主题 | 核心源码 |
|---|---|---|
| 第 1 篇 | Generation 与 ChatGeneration | Generation、ChatGeneration.set_text、ChatGenerationChunk.__add__、merge_chat_generation_chunks、ChatResult、LLMResult.flatten |
| 第 2 篇 | Output Parser 基类体系 | BaseLLMOutputParser.parse_result、BaseOutputParser.parse、BaseTransformOutputParser._transform、BaseCumulativeTransformOutputParser 累积+diff |
| 第 3 篇 | 常用 Parser 全解 | StrOutputParser、JsonOutputParser(partial+jsonpatch)、PydanticOutputParser(model_validate)、ListOutputParser(droplastn 流式)、XMLOutputParser(_StreamingParser) |
| 第 4 篇 | Tool Parser 与完整数据流 | parse_tool_call、JsonOutputToolsParser(双路径+name→type)、PydanticToolsParser(name→class 映射+max_tokens 检测)、完整数据流 |
Outputs 系列是 ChatModel 系列的"输出端"——_generate 返回 ChatResult,_stream 返回 ChatGenerationChunk,Output Parser 消费它们并产出结构化结果。加上 Messages(数据格式)和 Runnables(执行框架),四个系列一起构成了 LangChain 核心的完整图景:
Messages(数据格式)→ Runnables(执行框架)→ ChatModel(生产者)→ Outputs + Parsers(消费者)
更多推荐


所有评论(0)