【LangChain 源码解析四: Message】
本系列共 4 部分,从日常使用到源码架构,完整拆解 LangChain 的消息系统。
- 第 1 部分:5 分钟上手(本文)
- 第 2 部分:Tool Call 全流程
- 第 3 部分:Content Block 标准化
- 第 4 部分:源码架构与设计哲学
LangChain 消息系统深度解析(一):5 分钟上手
一个能跑的例子
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
messages = [
SystemMessage(content="你是一个天气助手"),
HumanMessage(content="北京今天天气怎么样?"),
AIMessage(content="今天北京晴天,气温25°C,适合出行。"),
]
# 看看 AIMessage 内部长什么样
print(messages[2].model_dump())
输出:
{
"content": "今天北京晴天,气温25°C,适合出行。",
"type": "ai",
"id": None,
"name": None,
"additional_kwargs": {},
"response_metadata": {},
"tool_calls": [],
"invalid_tool_calls": [],
"usage_metadata": None,
}
content 是消息内容,type 标识消息类型——就这么简单。其他字段后面会用到,先不管。
6 种消息类型
LangChain 把聊天中的每条消息都封装成一个对象。不同角色,不同类型:
1. SystemMessage —— 系统指令
SystemMessage(content="你是一个专业的翻译助手,只翻译,不解释。")
给模型的"规则手册",通常放在对话最开头。
2. HumanMessage —— 用户输入
HumanMessage(content="把这句话翻译成英文:今天天气真好")
用户说的话。
3. AIMessage —— 模型回复
AIMessage(content="The weather is really nice today.")
模型的回复。这是 6 种消息里最复杂的一种——它除了文本内容,还可能包含工具调用 (tool_calls)、token 用量 (usage_metadata)、推理过程等。
4. ToolMessage —— 工具返回结果
from langchain_core.messages import ToolMessage
ToolMessage(
content="北京 25°C 晴天",
tool_call_id="call_abc123", # ← 必须填,关联到 AI 的哪次工具调用
)
工具执行完后,把结果包成 ToolMessage 返回给模型。tool_call_id 是必填的——因为 AI 可能同时调用多个工具,需要靠这个 ID 对应"哪个结果是哪个调用的"。
5. ChatMessage —— 自定义角色
from langchain_core.messages import ChatMessage
ChatMessage(role="moderator", content="请保持对话友善。")
当内置的 human/ai/system 不够用时,可以自定义角色名。
6. RemoveMessage —— 删除消息
from langchain_core.messages import RemoveMessage
RemoveMessage(id="msg_to_delete")
不是真正的聊天消息,而是一个"指令"——告诉 LangGraph 等框架删除 ID 对应的消息。在需要管理对话历史时使用。
content 的两种写法
纯文本
HumanMessage(content="你好")
最简单的方式,content 就是一个字符串。
多模态(文本 + 图片 + 音频…)
当你需要发送图片、音频等内容时,content 变成一个列表,每个元素是一个 dict:
HumanMessage(content=[
{"type": "text", "text": "这张图片里是什么?"},
{
"type": "image",
"url": "https://example.com/cat.jpg",
},
])
也可以用 base64:
HumanMessage(content=[
{"type": "text", "text": "描述这张图片"},
{
"type": "image",
"base64": "iVBORw0KGgo...", # 图片的 base64 编码
"mime_type": "image/png",
},
])
还可以用工厂函数,获得类型提示和自动 ID 生成:
from langchain_core.messages.content import create_text_block, create_image_block
HumanMessage(content=[
create_text_block("描述这张图片"),
create_image_block(url="https://example.com/cat.jpg"),
])
这些 dict 在 LangChain 内部叫 Content Block,后续第 3 部分会详细讲。
3 个最实用的工具函数
filter_messages —— 按条件过滤
from langchain_core.messages import filter_messages
messages = [
SystemMessage(content="你是助手"),
HumanMessage(content="你好", name="alice"),
AIMessage(content="你好!", id="ai_1"),
HumanMessage(content="天气呢?", name="bob"),
AIMessage(content="今天晴天", id="ai_2"),
]
# 只要 AI 消息
filter_messages(messages, include_types=["ai"])
# → [AIMessage("你好!"), AIMessage("今天晴天")]
# 排除特定 ID
filter_messages(messages, exclude_ids=["ai_1"])
# → [SystemMessage("你是助手"), HumanMessage("你好"), HumanMessage("天气呢?"), AIMessage("今天晴天")]
# 只要特定用户
filter_messages(messages, include_names=["alice"])
# → [HumanMessage("你好")]
merge_message_runs —— 合并连续同类型消息
有些场景下,连续多条同类型消息需要合并成一条(比如模型要求 human/ai 必须交替出现):
from langchain_core.messages import merge_message_runs
messages = [
SystemMessage(content="你是助手"),
HumanMessage(content="你好"),
HumanMessage(content="天气呢?"), # 连续两条 human
AIMessage(content="让我查查"),
AIMessage(content="北京 25°C 晴"), # 连续两条 ai
]
merged = merge_message_runs(messages)
# → [
# SystemMessage("你是助手"),
# HumanMessage("你好\n天气呢?"), ← 合并了
# AIMessage("让我查查\n北京 25°C 晴"), ← 合并了
# ]
注意:ToolMessage 永远不会被合并,因为每条都有独立的 tool_call_id。
trim_messages —— 按 token 裁剪
对话历史太长时,需要裁剪到模型的 token 限制以内:
from langchain_core.messages import trim_messages
messages = [
SystemMessage(content="你是助手"),
HumanMessage(content="第一个问题..."),
AIMessage(content="第一个回答..."),
HumanMessage(content="第二个问题..."),
AIMessage(content="第二个回答..."),
HumanMessage(content="最新的问题"),
]
# 保留最近的消息,总 token 不超过 100
# 同时保留 SystemMessage 并确保从 human 消息开始
trimmed = trim_messages(
messages,
max_tokens=100,
token_counter="approximate", # 用近似计数,速度快
strategy="last", # 保留最近的
include_system=True, # 保留 system 消息
start_on="human", # 确保从 human 开始
)
# → [SystemMessage("你是助手"), HumanMessage("最新的问题")]
放进 Chain 里用
这三个函数都支持双模式——传入消息直接执行,不传消息则返回一个 Runnable:
# 模式一:直接调用
result = filter_messages(messages, include_types=["ai"])
# 模式二:放进 chain(不传 messages 参数)
from langchain_openai import ChatOpenAI
llm = ChatOpenAI()
chain = filter_messages(include_types=["human", "ai"]) | llm
# 后续 chain.invoke(messages) 时,先过滤再送入模型
两个重要的属性
.text —— 总是给你纯文本
不管 content 是字符串还是多模态列表,.text 都只提取文本部分:
msg1 = AIMessage(content="你好世界")
print(msg1.text) # → "你好世界"
msg2 = AIMessage(content=[
{"type": "text", "text": "这是文字"},
{"type": "image", "url": "https://..."},
{"type": "text", "text": "还有文字"},
])
print(msg2.text) # → "这是文字还有文字" (只提取 text 类型的块)
.content_blocks —— 标准化的内容块
不同模型提供商返回的 content 格式天差地别,但 .content_blocks 把它们统一成标准格式:
# Anthropic 返回的格式
msg = AIMessage(
content=[
{"type": "text", "text": "让我想想..."},
{"type": "thinking", "thinking": "用户问的是...", "signature": "EpoWCpc..."},
],
response_metadata={"model_provider": "anthropic"},
)
# 不管原始格式是什么,content_blocks 输出统一格式
blocks = msg.content_blocks
# → [
# {"type": "text", "text": "让我想想..."},
# {"type": "reasoning", "reasoning": "用户问的是...", "extras": {"signature": "EpoWCpc..."}},
# ]
注意看:Anthropic 的 thinking 变成了标准的 reasoning,而 Anthropic 特有的 signature 字段被放进了 extras 中,不会丢失。
这个标准化机制是 LangChain 消息系统最精妙的设计之一,第 3 篇会深入拆解。
所有消息的公共字段
所有消息类型都继承自 BaseMessage,共享这些字段:
| 字段 | 类型 | 说明 |
|---|---|---|
content |
`str | list[str |
type |
str |
消息类型标识("human", "ai", "system"等) |
id |
`str | None` |
name |
`str | None` |
additional_kwargs |
dict |
提供商原始附加数据(如 OpenAI 的原始 tool_calls) |
response_metadata |
dict |
响应元数据(如 token 计数、模型名称、model_provider) |
AIMessage 额外多出的字段:
| 字段 | 类型 | 说明 |
|---|---|---|
tool_calls |
list[ToolCall] |
已解析的工具调用请求 |
invalid_tool_calls |
list[InvalidToolCall] |
解析失败的工具调用 |
usage_metadata |
`UsageMetadata | None` |
小结
本篇覆盖了日常使用中最常接触的部分:
- 6 种消息类型,各有角色分工
- content 支持纯文本和多模态列表
- 3 个工具函数(filter / merge / trim),既能直接调用也能放进 chain
- .text 取纯文本,.content_blocks 取标准化内容块
但还有两个关键问题没回答:
- AI 的 tool_calls 是怎么从不同提供商的原始格式自动解析出来的?
- content_blocks 是怎么做到不同提供商返回统一格式的?
下一部分讲 Tool Call 的完整流程。
LangChain 消息系统深度解析(二):Tool Call 全流程
本系列共 4 部分。本文是第 2 部分,拆解工具调用从请求到返回的完整链路。
一个完整的工具调用往返
先看全貌——AI 请求调用工具,工具返回结果,再送回模型:
from langchain_core.messages import AIMessage, ToolMessage, HumanMessage
# 1. 用户提问
user = HumanMessage(content="北京今天天气怎么样?")
# 2. AI 决定调用工具(模型返回的)
ai = AIMessage(
content="让我查一下天气",
tool_calls=[
{
"name": "get_weather",
"args": {"city": "北京"},
"id": "call_abc123",
"type": "tool_call",
}
],
)
# 3. 你执行工具,把结果包成 ToolMessage
tool_result = ToolMessage(
content="北京 25°C 晴天,湿度40%",
tool_call_id="call_abc123", # ← 必须和上面的 id 对应
)
# 4. 把整个对话送回模型
messages = [user, ai, tool_result]
# response = model.invoke(messages)
# → AI 拿到工具结果后生成最终回答
关键在第 3 步:tool_call_id="call_abc123" 必须和 AIMessage.tool_calls 里的 id 对应,否则模型无法将结果和请求关联。
两种构造 Tool Call 的方式
方式一:直接写 tool_calls(推荐)
ai = AIMessage(
content="让我查一下天气",
tool_calls=[
{
"name": "get_weather",
"args": {"city": "北京"},
"id": "call_abc123",
"type": "tool_call",
}
],
)
print(ai.tool_calls)
# [{'name': 'get_weather', 'args': {'city': '北京'}, 'id': 'call_abc123', 'type': 'tool_call'}]
方式二:放在 additional_kwargs(OpenAI 原始格式)
OpenAI API 返回的工具调用是这种格式,arguments 是 JSON 字符串:
ai2 = AIMessage(
content="",
additional_kwargs={
"tool_calls": [
{
"id": "call_abc123",
"function": {
"name": "get_weather",
"arguments": '{"city": "北京"}', # ← JSON 字符串,不是 dict
},
}
]
},
)
# 但你访问 tool_calls,发现已经自动解析好了!
print(ai2.tool_calls)
# [{'name': 'get_weather', 'args': {'city': '北京'}, 'id': 'call_abc123', 'type': 'tool_call'}]
两种方式的 .tool_calls 输出完全一样。方式二的 JSON 字符串被自动解析成了 dict。
自动解析的秘密:model_validator
这个"自动解析"不是魔法,是 Pydantic 的 model_validator 在对象创建时触发的。
AIMessage 的源码(ai.py)里有这段逻辑:
class AIMessage(BaseMessage):
tool_calls: list[ToolCall] = []
invalid_tool_calls: list[InvalidToolCall] = []
@model_validator(mode="before")
@classmethod
def _backwards_compat_tool_calls(cls, values: dict) -> Any:
# 如果 tool_calls 和 invalid_tool_calls 都是空的
check = not any(
values.get(k) for k in ("tool_calls", "invalid_tool_calls", "tool_call_chunks")
)
if check and (raw := values.get("additional_kwargs", {}).get("tool_calls")):
# 从 additional_kwargs 里取出原始数据,自动解析!
parsed_tool_calls, parsed_invalid_tool_calls = default_tool_parser(raw)
values["tool_calls"] = parsed_tool_calls
values["invalid_tool_calls"] = parsed_invalid_tool_calls
return values
执行流程:
创建 AIMessage
↓
model_validator 触发(mode="before",在字段赋值之前)
↓
检查 tool_calls 是否为空?
↓ 是
检查 additional_kwargs["tool_calls"] 有数据吗?
↓ 有
调用 default_tool_parser 解析
↓
自动填充 tool_calls 和 invalid_tool_calls
default_tool_parser 在 tool.py 里,核心就是把 OpenAI 格式的 function.arguments JSON 字符串解析出来:
def default_tool_parser(raw_tool_calls):
tool_calls = []
invalid_tool_calls = []
for raw in raw_tool_calls:
if "function" not in raw:
continue
name = raw["function"]["name"]
try:
args = json.loads(raw["function"]["arguments"]) # 解析 JSON 字符串
tool_calls.append(tool_call(name=name, args=args, id=raw.get("id")))
except json.JSONDecodeError:
# JSON 解析失败 → 放进 invalid_tool_calls
invalid_tool_calls.append(
invalid_tool_call(name=name, args=raw["function"]["arguments"], id=raw.get("id"), error=None)
)
return tool_calls, invalid_tool_calls
解析失败时会发生什么
模型有时候会生成不合法的 JSON(尤其是小模型或复杂参数时):
ai_bad = AIMessage(
content="",
additional_kwargs={
"tool_calls": [
{
"id": "call_xyz",
"function": {
"name": "search",
"arguments": '{"query": "天气"', # ← 缺少右花括号!
},
}
]
},
)
print(ai_bad.tool_calls)
# [] ← 空的!
print(ai_bad.invalid_tool_calls)
# [{'name': 'search', 'args': '{"query": "天气"', 'id': 'call_xyz', 'error': None, 'type': 'invalid_tool_call'}]
解析失败的调用被放进了 invalid_tool_calls,args 保留原始字符串。你可以据此做错误处理或重试。
ToolCall 的两套数据结构
LangChain 内部有两套 ToolCall 定义,这是历史遗留:
旧版:tool.py 中的 ToolCall
# tool.py —— 用于 AIMessage.tool_calls 字段类型
class ToolCall(TypedDict):
name: str
args: dict[str, Any]
id: str | None
type: NotRequired[Literal["tool_call"]]
新版:content.py 中的 ToolCall
# content.py —— 用于 content_blocks 输出的 v1 标准格式
class ToolCall(TypedDict):
type: Literal["tool_call"]
id: str | None
name: str
args: dict[str, Any]
index: NotRequired[int | str] # ← 多了这个,用于流式
extras: NotRequired[dict[str, Any]] # ← 多了这个,存提供商特有数据
| 旧版 (tool.py) | 新版 (content.py) | |
|---|---|---|
| 用途 | AIMessage.tool_calls字段 |
.content_blocks输出 |
type字段 |
NotRequired,可选 |
Required,固定为 "tool_call" |
index |
无 | 有,流式聚合用 |
extras |
无 | 有,存提供商特有数据 |
日常使用中你不需要关心这个区别——直接用 AIMessage.tool_calls 就好。
流式场景:Tool Call 是碎片化的
流式(streaming)时,模型不会一次性返回完整的 tool call。JSON 参数是一块一块吐出来的:
from langchain_core.messages import AIMessageChunk
# 第 1 个 chunk:拿到了函数名和 JSON 的开头
chunk1 = AIMessageChunk(
content="",
tool_call_chunks=[
{"name": "get_weather", "args": '{"ci', "id": "call_abc123", "index": 0}
],
)
# 第 2 个 chunk:拿到了 JSON 的剩余部分
chunk2 = AIMessageChunk(
content="",
tool_call_chunks=[
{"name": None, "args": 'ty": "北京"}', "id": None, "index": 0}
],
)
注意 index=0——两个 chunk 的 index 相同,表示它们属于同一个工具调用。
拼接
full = chunk1 + chunk2
# tool_call_chunks 被按 index 合并
print(full.tool_call_chunks)
# [{'name': 'get_weather', 'args': '{"city": "北京"}', 'id': 'call_abc123', 'index': 0, 'type': 'tool_call_chunk'}]
# tool_calls 自动尝试解析完整的 JSON
print(full.tool_calls)
# [{'name': 'get_weather', 'args': {'city': '北京'}, 'id': 'call_abc123', 'type': 'tool_call'}]
碎片化的 '{"ci' + 'ty": "北京"}' 被拼成了完整的 '{"city": "北京"}',然后自动 json.loads 成 dict。
底层实现
拼接时调用的是 add_ai_message_chunks 函数(ai.py),其中关键一步:
# 按 index 合并 tool_call_chunks
raw_tool_calls = merge_lists(left.tool_call_chunks, *(o.tool_call_chunks for o in others))
merge_lists(来自 _merge.py)的逻辑:两个 dict 如果 index 相同且不为 None,就合并(字符串拼接、非空值优先)。这就是为什么 index=0 的两个 chunk 会被合并到一起。
合并完成后,init_tool_calls 这个 model_validator 自动触发:
class AIMessageChunk(AIMessage, BaseMessageChunk):
@model_validator(mode="after")
def init_tool_calls(self) -> Self:
tool_calls = []
invalid_tool_calls = []
for chunk in self.tool_call_chunks:
# 用 parse_partial_json 做容错解析
args_ = parse_partial_json(chunk["args"]) if chunk["args"] else {}
if isinstance(args_, dict):
tool_calls.append(create_tool_call(name=chunk["name"] or "", args=args_, id=chunk["id"]))
else:
invalid_tool_calls.append(...)
self.tool_calls = tool_calls
self.invalid_tool_calls = invalid_tool_calls
return self
parse_partial_json 是一个容错 JSON 解析器——即使 JSON 不完整(比如只收到了 '{"ci'),也不会报错,而是尽量解析。当所有 chunk 拼完后,JSON 变完整了,就能正确解析成 dict。
chunk_position=“last” 的特殊处理
当流结束时,最后一个 chunk 会带 chunk_position="last" 标记。此时如果 content 里有 tool_call_chunk 类型的 block,会被替换成完整的 tool_call:
# 流式聚合的最后一步
if self.chunk_position == "last" and self.tool_call_chunks:
# 把 content 里的 tool_call_chunk 替换成完整的 tool_call
for idx, block in enumerate(self.content):
if block.get("type") == "tool_call_chunk" and call_id in id_to_tc:
self.content[idx] = id_to_tc[call_id] # 替换!
多个工具并发调用
AI 可以在一条消息里同时调用多个工具:
ai = AIMessage(
content="让我同时查天气和汇率",
tool_calls=[
{"name": "get_weather", "args": {"city": "北京"}, "id": "call_1", "type": "tool_call"},
{"name": "get_exchange_rate", "args": {"from": "USD", "to": "CNY"}, "id": "call_2", "type": "tool_call"},
],
)
# 每个工具结果都要有对应的 tool_call_id
tool1 = ToolMessage(content="北京 25°C 晴", tool_call_id="call_1")
tool2 = ToolMessage(content="1 USD = 7.24 CNY", tool_call_id="call_2")
# 注意顺序:AI → Tool1 → Tool2 → 再送回模型
messages = [user, ai, tool1, tool2]
在流式场景下,多个并发调用的 chunk 通过不同的 index 值来区分:
# 两个工具调用同时流式返回
chunk = AIMessageChunk(
content="",
tool_call_chunks=[
{"name": "get_weather", "args": '{"city":', "id": "call_1", "index": 0}, # 第 1 个工具
{"name": "get_exchange_rate", "args": '{"from":', "id": "call_2", "index": 1}, # 第 2 个工具
],
)
merge_lists 会按 index 分别合并——index=0 的归一堆,index=1 的归另一堆。
ToolMessage 的数据结构
class ToolMessage(BaseMessage):
tool_call_id: str # 必填,关联 AIMessage 的 tool_call id
type: Literal["tool"] = "tool"
artifact: Any = None # 完整工具输出(不送给模型)
status: Literal["success", "error"] = "success"
几个重要的细节:
artifact —— 保存完整输出但不送给模型
tool_output = {
"summary": "北京 25°C 晴天",
"raw_data": {"temp": 25, "humidity": 40, "wind_speed": 12, ...}, # 模型不需要这些
}
ToolMessage(
content=tool_output["summary"], # 只把摘要送给模型
artifact=tool_output, # 完整数据保存在 artifact 里
tool_call_id="call_abc123",
)
status —— 标记执行成功或失败
# 工具执行失败
ToolMessage(
content="API 请求超时,请稍后重试",
tool_call_id="call_abc123",
status="error",
)
tool_calls vs content_blocks 里的 tool_call
AIMessage 上有两个地方可以看到 tool call 的信息:
**message.tool_calls**—— 顶层字段,已解析的结构体列表**message.content_blocks**—— content 里type="tool_call"的 block
它们的关系:
ai = AIMessage(
content="让我查一下",
tool_calls=[
{"name": "get_weather", "args": {"city": "北京"}, "id": "call_1", "type": "tool_call"}
],
)
# 方式一:直接访问
print(ai.tool_calls)
# [{'name': 'get_weather', 'args': {'city': '北京'}, 'id': 'call_1', 'type': 'tool_call'}]
# 方式二:通过 content_blocks
print(ai.content_blocks)
# [
# {'type': 'text', 'text': '让我查一下'},
# {'type': 'tool_call', 'name': 'get_weather', 'args': {'city': '北京'}, 'id': 'call_1'},
# ]
**.tool_calls**** 是"快捷方式"**——直接拿到已解析的工具调用。
**.content_blocks**** 是"完整视图"**——把文本、推理、工具调用等所有内容按顺序排列。如果 tool_calls 里的内容在 content 中没有对应的 block,content_blocks 属性会自动补上。
源码中的逻辑(ai.py 的 content_blocks 属性):
@property
def content_blocks(self):
blocks = super().content_blocks # 先从 content 解析
if self.tool_calls:
# 检查 content 里已有的 tool_call id
content_tool_call_ids = {
block.get("id") for block in self.content
if isinstance(block, dict) and block.get("type") == "tool_call"
}
# content 里没有的,从 tool_calls 补上
for tool_call in self.tool_calls:
if tool_call.get("id") not in content_tool_call_ids:
blocks.append({
"type": "tool_call",
"id": tool_call.get("id"),
"name": tool_call["name"],
"args": tool_call["args"],
})
return blocks
完整数据流总结
提供商原始返回(OpenAI / Anthropic / ...)
↓
AIMessage 创建
↓
model_validator 触发:
├── additional_kwargs["tool_calls"] 有数据?
│ → default_tool_parser 解析
│ → JSON 成功 → tool_calls
│ → JSON 失败 → invalid_tool_calls
│
└── 已经有 tool_calls?→ 跳过
↓
访问 .tool_calls → 直接拿到已解析的结构体
访问 .content_blocks → 文本 + 推理 + 工具调用的完整有序列表
↓
流式场景额外步骤:
chunk1 + chunk2 + ... + chunkN
↓
merge_lists 按 index 合并 tool_call_chunks
↓
init_tool_calls validator:parse_partial_json 解析
↓
chunk_position="last":tool_call_chunk → tool_call
小结
AIMessage.tool_calls是标准化的工具调用列表,推荐使用additional_kwargs["tool_calls"]存原始格式,model_validator自动解析- JSON 解析失败不会报错,而是放进
invalid_tool_calls - 流式场景下,
tool_call_chunks按index合并,parse_partial_json容错解析 ToolMessage通过tool_call_id关联请求和结果
下一部分讲 Content Block 标准化——不同提供商的格式是如何被统一的。
LangChain 消息系统深度解析(三):Content Block 标准化
本系列共 4 部分。本文是第 3 部分,拆解 LangChain 如何用一套标准格式统一所有模型提供商的输出。
问题:同一张图片,4 种格式
同一张 base64 图片,不同提供商返回的格式完全不同:
# OpenAI
{"type": "image_url", "image_url": {"url": "data:image/png;base64,iVBOR..."}}
# Anthropic
{"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": "iVBOR..."}}
# Bedrock Converse
{"image": {"format": "png", "source": {"bytes": b"\x89PNG..."}}}
# Google GenAI
{"type": "image_url", "image_url": {"url": "iVBOR..."}} # 裸 base64,没有 data: 前缀
如果你要写代码处理模型返回的图片,就得针对每个提供商写一套解析逻辑。
LangChain 的解决方案:**.content_blocks**** 属性把所有格式统一成一种标准**。
# 不管原始格式是什么,content_blocks 输出都是:
{"type": "image", "base64": "iVBOR...", "mime_type": "image/png"}
标准 ContentBlock 类型一览
所有标准类型定义在 content.py 中,都是 TypedDict:
| type 值 | TypedDict 名 | 用途 | 关键字段 |
|---|---|---|---|
"text" |
TextContentBlock |
文本输出 | text, annotations |
"reasoning" |
ReasoningContentBlock |
推理过程 | reasoning |
"image" |
ImageContentBlock |
图片 | url/ base64/ file_id, mime_type |
"audio" |
AudioContentBlock |
音频 | url/ base64/ file_id, mime_type |
"video" |
VideoContentBlock |
视频 | url/ base64/ file_id, mime_type |
"file" |
FileContentBlock |
文件(PDF 等) | url/ base64/ file_id, mime_type |
"text-plain" |
PlainTextContentBlock |
纯文本文档 | text, title, context |
"tool_call" |
ToolCall |
工具调用请求 | name, args, id |
"tool_call_chunk" |
ToolCallChunk |
流式工具调用片段 | name, args, id, index |
"invalid_tool_call" |
InvalidToolCall |
解析失败的工具调用 | name, args, error |
"server_tool_call" |
ServerToolCall |
服务端工具调用 | name, args, id |
"server_tool_result" |
ServerToolResult |
服务端工具结果 | tool_call_id, status, output |
"non_standard" |
NonStandardContentBlock |
兜底(未知格式) | value |
每个 block 都有 extras
除了 NonStandardContentBlock 外,所有 block 都有一个可选的 extras: dict 字段,用来存放提供商特有的元数据。
这是一个核心设计决策:标准化不等于丢数据。
# Anthropic 返回的 thinking block 带有 signature
{"type": "thinking", "thinking": "让我想想...", "signature": "EpoWCpc..."}
# 翻译成标准格式后:
{
"type": "reasoning",
"reasoning": "让我想想...",
"extras": {"signature": "EpoWCpc..."}, # ← signature 不丢,放进 extras
}
NonStandardContentBlock —— 兜底
当一个 block 完全无法识别时,不会报错,而是包成 NonStandardContentBlock:
# 某个提供商返回了 LangChain 不认识的类型
{"type": "some_new_thing", "data": "..."}
# 变成:
{"type": "non_standard", "value": {"type": "some_new_thing", "data": "..."}}
原始数据完整保留在 value 里,不会丢失。
翻译是怎么发生的:三条路径
当你访问 message.content_blocks 时,内部按以下优先级选择翻译路径:
┌─────────────────────────────────────────────────┐
│ 检查 response_metadata["output_version"] == "v1" │
│ → 是:直接返回 content(已经是标准格式) │
└────────────────────┬────────────────────────────┘
│ 否
▼
┌─────────────────────────────────────────────────┐
│ 检查 response_metadata["model_provider"] │
│ → 有值:查注册表,找到对应的 translator │
│ → 调用 translator["translate_content"](msg) │
└────────────────────┬────────────────────────────┘
│ 没有 provider 或没有 translator
▼
┌─────────────────────────────────────────────────┐
│ Best-effort 多轮管道 │
│ 第一轮:快速分类 │
│ 已知类型 → 保留 │
│ 未知类型 → 包成 non_standard │
│ 第二轮:依次尝试 5 个提供商的输入格式解析器 │
│ v0 旧格式 → OpenAI → Anthropic → Google → Bedrock │
│ 每步只动 non_standard block │
└─────────────────────────────────────────────────┘
路径一:已经是标准格式
# 如果集成包已经把 content 转成了 v1 格式,会设置 output_version
ai = AIMessage(
content=[{"type": "text", "text": "你好"}],
response_metadata={"output_version": "v1"},
)
ai.content_blocks # → 直接返回 content,零开销
路径二:有 model_provider,用专门的 translator
# 集成包设置了 model_provider
ai = AIMessage(
content=[
{"type": "text", "text": "让我想想..."},
{"type": "thinking", "thinking": "用户问的是...", "signature": "abc"},
],
response_metadata={"model_provider": "anthropic"},
)
ai.content_blocks
# → 调用 Anthropic 的 translator,输出标准格式
# [
# {"type": "text", "text": "让我想想..."},
# {"type": "reasoning", "reasoning": "用户问的是...", "extras": {"signature": "abc"}},
# ]
路径三:没有 provider,best-effort 猜测
# 没有设置 model_provider(比如手动构造的消息)
msg = HumanMessage(content=[
{"type": "text", "text": "看看这个图"},
{"type": "image_url", "image_url": {"url": "data:image/png;base64,iVBOR..."}},
])
msg.content_blocks
# 第一轮:text 是已知类型保留,image_url 不是已知类型 → 包成 non_standard
# 第二轮:依次尝试各提供商的解析器
# _convert_v0_multimodal_input_to_v1 → 不认识,跳过
# _convert_to_v1_from_chat_completions_input → 认识 image_url!转换!
# → [
# {"type": "text", "text": "看看这个图"},
# {"type": "image", "base64": "iVBOR...", "mime_type": "image/png"},
# ]
源码在 base.py 的 content_blocks 属性里:
@property
def content_blocks(self):
blocks = []
content = [self.content] if isinstance(self.content, str) else self.content
# 第一轮:快速分类
for item in content:
if isinstance(item, str):
blocks.append({"type": "text", "text": item})
elif isinstance(item, dict):
if item.get("type") in KNOWN_BLOCK_TYPES:
blocks.append(item) # 已知类型
else:
blocks.append({"type": "non_standard", "value": item}) # 未知类型
# ...
# 第二轮:依次尝试各提供商的输入格式解析器
for parsing_step in [
_convert_v0_multimodal_input_to_v1, # LangChain v0 旧格式
_convert_to_v1_from_chat_completions_input, # OpenAI
_convert_to_v1_from_anthropic_input, # Anthropic
_convert_to_v1_from_genai_input, # Google
_convert_to_v1_from_converse_input, # Bedrock
]:
blocks = parsing_step(blocks)
# 每一步只处理 type="non_standard" 的 block
# 能认识 → 转换成标准 block
# 不认识 → 保持 non_standard,交给下一步
return blocks
每个解析步骤的套路都一样——拆开 non_standard 的 value,看能不能识别,能就转换,不能就保持原样。
注册表机制
PROVIDER_TRANSLATORS
所有 translator 都注册在一个全局字典里(block_translators/__init__.py):
PROVIDER_TRANSLATORS: dict[str, dict[str, Callable]] = {}
def register_translator(provider, translate_content, translate_content_chunk):
PROVIDER_TRANSLATORS[provider] = {
"translate_content": translate_content, # 处理 AIMessage
"translate_content_chunk": translate_content_chunk, # 处理 AIMessageChunk
}
def get_translator(provider):
return PROVIDER_TRANSLATORS.get(provider)
自动注册
模块加载时,所有内置 translator 自动注册:
# block_translators/__init__.py 底部
def _register_translators():
_register_anthropic_translator()
_register_bedrock_translator()
_register_bedrock_converse_translator()
_register_google_genai_translator()
_register_google_vertexai_translator()
_register_groq_translator()
_register_openai_translator()
_register_translators() # ← 模块 import 时自动执行
每个提供商文件底部也有自注册:
# anthropic.py 底部
def _register_anthropic_translator():
register_translator("anthropic", translate_content, translate_content_chunk)
_register_anthropic_translator()
提供商之间的复用
不是每个提供商都需要从头写 translator:
# bedrock.py —— Claude on Bedrock,格式和 Anthropic 一样
from langchain_core.messages.block_translators.anthropic import _convert_to_v1_from_anthropic
def _convert_to_v1_from_bedrock(message):
out = _convert_to_v1_from_anthropic(message) # ← 直接复用 Anthropic 的
# ... 补充 Bedrock 特有的逻辑
return out
# google_vertexai.py —— 直接复用 GenAI 的 translator
from langchain_core.messages.block_translators.google_genai import (
translate_content,
translate_content_chunk,
)
def _register_google_vertexai_translator():
register_translator("google_vertexai", translate_content, translate_content_chunk)
可扩展性
如果你有自己的提供商,也能注册:
from langchain_core.messages.block_translators import register_translator
def my_translate(message):
# 你的翻译逻辑
...
def my_translate_chunk(message):
# 你的流式翻译逻辑
...
register_translator("my_provider", my_translate, my_translate_chunk)
拆解一个具体的 translator:Anthropic
以 Anthropic 为例,看看 _convert_to_v1_from_anthropic(anthropic.py)是怎么逐个 block 翻译的。
text → text(带 citations)
# 输入
{"type": "text", "text": "西班牙赢了", "citations": [{"type": "web_search_result_location", ...}]}
# 处理逻辑
if block_type == "text":
if citations := block.get("citations"):
text_block = {
"type": "text",
"text": block.get("text", ""),
"annotations": [_convert_citation_to_v1(a) for a in citations],
}
else:
text_block = {"type": "text", "text": block["text"]}
# 输出
{"type": "text", "text": "西班牙赢了", "annotations": [{"type": "citation", "url": "...", ...}]}
Anthropic 的 citations 被转成了标准的 annotations,每个 citation 的类型也从 web_search_result_location 变成了通用的 citation。
thinking → reasoning
# 输入
{"type": "thinking", "thinking": "用户问的是天气...", "signature": "EpoWCpc..."}
# 处理逻辑
elif block_type == "thinking":
reasoning_block = {
"type": "reasoning", # thinking → reasoning
"reasoning": block.get("thinking", ""), # thinking → reasoning
}
# 未知字段塞进 extras
known_fields = {"type", "thinking", "index", "extras"}
for key in block:
if key not in known_fields:
reasoning_block.setdefault("extras", {})[key] = block[key]
# signature → extras["signature"]
# 输出
{"type": "reasoning", "reasoning": "用户问的是天气...", "extras": {"signature": "EpoWCpc..."}}
两个关键操作:
- 字段重命名:
thinking→reasoning(语义统一) - 未知字段保留:
signature→extras["signature"](不丢数据)
tool_use → tool_call
这个分支比较复杂,因为要区分流式和非流式:
elif block_type == "tool_use":
if isinstance(message, AIMessageChunk) and len(message.tool_call_chunks) == 1 and message.chunk_position != "last":
# 流式单 chunk → 输出 tool_call_chunk
chunk = message.tool_call_chunks[0]
yield {
"type": "tool_call_chunk",
"name": chunk.get("name"),
"id": chunk.get("id"),
"args": chunk.get("args"),
}
else:
# 非流式或已聚合 → 输出 tool_call
# 优先从 message.tool_calls 取已解析的数据
for tc in message.tool_calls:
if tc.get("id") == block.get("id"):
yield {
"type": "tool_call",
"name": tc["name"],
"args": tc["args"], # ← 已经是 dict,不是 JSON 字符串
"id": tc.get("id"),
}
break
注意它优先从 message.tool_calls 取数据——因为 tool_calls 已经经过了 model_validator 的解析,args 是 dict 而不是 JSON 字符串。
server_tool_use → server_tool_call
Anthropic 的服务端工具(如 code execution、web search)走单独的分支:
elif block_type == "server_tool_use":
# 特殊命名映射
if block.get("name") == "code_execution":
name = "code_interpreter" # Anthropic 叫 code_execution,标准化叫 code_interpreter
else:
name = block.get("name", "")
server_tool_call = {
"type": "server_tool_call",
"name": name,
"args": block.get("input", {}),
"id": block.get("id", ""),
}
# ... extras 处理
兜底
不认识的 block 统一包成 non_standard:
else:
yield {"type": "non_standard", "value": block}
_populate_extras:不丢数据的公共函数
多个 translator 都用到了这个函数:
def _populate_extras(standard_block, block, known_fields):
if standard_block.get("type") == "non_standard":
return standard_block # non_standard 不需要 extras,数据在 value 里
for key, value in block.items():
if key not in known_fields:
standard_block.setdefault("extras", {})[key] = value
return standard_block
它的逻辑很简单:遍历原始 block 的所有字段,不在 known_fields 里的,全塞进 extras。
这保证了即使提供商加了新字段,标准化过程也不会丢掉它们。
完整翻译示例
Anthropic 消息翻译
ai = AIMessage(
content=[
{"type": "text", "text": "让我想想..."},
{"type": "thinking", "thinking": "用户问天气...", "signature": "abc123"},
{"type": "tool_use", "id": "call_1", "name": "get_weather", "input": {"city": "北京"}},
],
tool_calls=[
{"name": "get_weather", "args": {"city": "北京"}, "id": "call_1", "type": "tool_call"}
],
response_metadata={"model_provider": "anthropic"},
)
print(ai.content_blocks)
# [
# {"type": "text", "text": "让我想想..."},
# {"type": "reasoning", "reasoning": "用户问天气...", "extras": {"signature": "abc123"}},
# {"type": "tool_call", "name": "get_weather", "args": {"city": "北京"}, "id": "call_1"},
# ]
OpenAI Chat Completions 消息翻译
ai = AIMessage(
content="今天天气不错",
tool_calls=[
{"name": "get_weather", "args": {"city": "北京"}, "id": "call_1", "type": "tool_call"}
],
response_metadata={"model_provider": "openai"},
)
print(ai.content_blocks)
# [
# {"type": "text", "text": "今天天气不错"},
# {"type": "tool_call", "name": "get_weather", "args": {"city": "北京"}, "id": "call_1"},
# ]
OpenAI Chat Completions 格式比较简单——content 就是字符串,tool calls 从 message.tool_calls 取。
OpenAI Responses API 消息翻译
OpenAI 的 Responses API 返回更复杂的结构,包含 web_search、file_search、code_interpreter 等服务端工具:
ai = AIMessage(
content=[
{"type": "web_search_call", "id": "ws_1", "action": {"type": "search", "query": "北京天气"}, "status": "completed"},
{"type": "text", "text": "北京今天 25°C 晴天", "annotations": [{"type": "url_citation", "url": "https://...", "title": "天气预报"}]},
],
response_metadata={"model_provider": "openai"},
)
print(ai.content_blocks)
# [
# {"type": "server_tool_call", "name": "web_search", "args": {"type": "search", "query": "北京天气"}, "id": "ws_1"},
# {"type": "server_tool_result", "tool_call_id": "ws_1", "status": "success"},
# {"type": "text", "text": "北京今天 25°C 晴天", "annotations": [{"type": "citation", "url": "https://...", "title": "天气预报"}]},
# ]
web_search_call 被拆成了 server_tool_call + server_tool_result 两个标准 block。url_citation 被转成了通用的 citation。
Best-effort 猜测
# 没有 model_provider,手动构造的包含 Anthropic 格式图片的消息
msg = HumanMessage(content=[
{"type": "text", "text": "看看这个"},
{"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": "iVBOR..."}},
])
print(msg.content_blocks)
# 第一轮:text 保留,image 是已知类型但有 source 子结构...
# 实际上 base.py 会因为有 "source" 而把它标为 non_standard
# 第二轮:
# _convert_v0_multimodal_input_to_v1 → 没有 source_type,不认识
# _convert_to_v1_from_chat_completions_input → 不是 image_url,不认识
# _convert_to_v1_from_anthropic_input → 认识!source.type == "base64"
# → [
# {"type": "text", "text": "看看这个"},
# {"type": "image", "base64": "iVBOR...", "mime_type": "image/png"},
# ]
标准化的全景图
提供商原始格式 标准 ContentBlock
───────────── ──────────────
Anthropic:
thinking ──→ reasoning (+ extras)
tool_use ──→ tool_call
server_tool_use ──→ server_tool_call
*_tool_result ──→ server_tool_result
image.source ──→ image (base64/url/file_id)
document.source ──→ file / text-plain
citations ──→ annotations[citation]
OpenAI Chat Completions:
str content ──→ text
image_url ──→ image
input_audio ──→ audio
file ──→ file
OpenAI Responses:
reasoning ──→ reasoning (explode summary)
function_call ──→ tool_call
web_search_call ──→ server_tool_call + server_tool_result
file_search_call ──→ server_tool_call + server_tool_result
code_interpreter ──→ server_tool_call + server_tool_result
mcp_call ──→ server_tool_call + server_tool_result
image_gen_call ──→ image (from result base64)
url_citation ──→ citation
file_citation ──→ citation
Google GenAI:
thinking ──→ reasoning
function_call ──→ tool_call
executable_code ──→ server_tool_call (code_interpreter)
code_exec_result ──→ server_tool_result
file_data ──→ file (url)
grounding_metadata ──→ annotations[citation]
Bedrock Converse:
reasoning_content ──→ reasoning (+ extras.signature)
tool_use ──→ tool_call
{image: {...}} ──→ image (bytes → base64)
{document: {...}} ──→ file / text-plain
Groq:
executed_tools ──→ server_tool_call + server_tool_result
reasoning_content ──→ reasoning (from additional_kwargs)
所有提供商:
不认识的类型 ──→ non_standard (value 保留原始数据)
不认识的字段 ──→ extras (保留在标准 block 上)
小结
- Content Block 是 TypedDict,定义在
content.py,纯数据结构,零依赖 - 三条翻译路径:output_version=“v1” → model_provider translator → best-effort 管道
- 注册表模式:
PROVIDER_TRANSLATORS全局 dict,模块加载时自动注册,可扩展 - 不丢数据:未知字段 →
extras,未知类型 →non_standard - 提供商间复用:Bedrock 复用 Anthropic,VertexAI 复用 GenAI
下一部分讲源码架构——目录为什么这么分、流式聚合怎么实现、工具函数层的双模式设计。
LangChain 消息系统深度解析(四):源码架构与设计哲学
本系列共 4 部分。本文是第 4 部分,从目录结构、继承体系、流式聚合到工具函数层,拆解 LangChain 消息系统的架构设计。
完整目录结构
langchain_core/messages/
│
├── __init__.py # 入口:懒加载 + 统一导出
│
├── base.py # 基类:BaseMessage, BaseMessageChunk, merge_content
├── content.py # 数据定义:所有 ContentBlock TypedDict
│
├── ai.py # ── 角色层 ──
├── human.py # ~70 行
├── system.py # ~70 行
├── tool.py # ~300 行(含 ToolCall 数据结构 + 解析函数)
├── chat.py # ~60 行
├── function.py # ~50 行(旧版,保留兼容)
├── modifier.py # ~30 行(RemoveMessage)
│
├── utils.py # 工具层:filter, merge, trim, convert_to_openai, ~1500 行
│
└── block_translators/ # ── 适配层 ──
├── __init__.py # 注册表 + 自动注册
├── anthropic.py # ~400 行
├── openai.py # ~700 行(Chat Completions + Responses 两套)
├── google_genai.py # ~500 行
├── google_vertexai.py # ~15 行(复用 genai)
├── bedrock.py # ~90 行(复用 anthropic)
├── bedrock_converse.py # ~300 行
├── groq.py # ~150 行
└── langchain_v0.py # ~250 行(旧格式兼容)
这个目录结构不是随意的。下面逐层讲为什么这么分。
为什么按角色拆文件
看一下各文件的行数差异:
human.py ~70 行
system.py ~70 行
chat.py ~60 行
function.py ~50 行
modifier.py ~30 行
tool.py ~300 行
ai.py ~500 行 ← 最复杂
human.py 的全部实质内容:
class HumanMessage(BaseMessage):
type: Literal["human"] = "human"
class HumanMessageChunk(HumanMessage, BaseMessageChunk):
type: Literal["HumanMessageChunk"] = "HumanMessageChunk"
就这么多——继承 BaseMessage,固定 type 值。system.py、chat.py 也类似。
而 ai.py 有 500 行,因为 AI 消息承载了:tool_calls 的 model_validator 自动解析、content_blocks 的 translator 调度、AIMessageChunk 的流式聚合、init_tool_calls 的 JSON 容错解析、pretty_repr 的美化输出、UsageMetadata 的 token 统计…
按角色拆,而不是按 Message/Chunk 拆。原因是 HumanMessage 和 HumanMessageChunk 紧密耦合(Chunk 继承 Message),而 HumanMessage 和 AIMessage 之间几乎没有交互。按角色拆让复杂度被限制在单个文件内——ai.py 再复杂也不会污染 human.py。
base.py 和 content.py 的分离
这两个文件解决的是不同层次的问题:
base.py → "消息"是什么?(容器)
content.py → "内容块"是什么?(容器里装的东西)
content.py 是纯数据定义——全是 TypedDict,没有任何业务逻辑,不 import 任何 messages 模块内的东西。
base.py 有逻辑——Pydantic 模型、merge_content 函数、content_blocks 属性、text 属性等。
分开的关键原因是避免循环依赖:
content.py(纯数据,零依赖)
↑
base.py(import content 做类型标注)
↑
ai.py / human.py / tool.py(import base 做继承)
↑
utils.py(import 所有角色类型)
依赖方向严格单向。如果把 content.py 和 base.py 合并,而 ai.py 又要 import BaseMessage 和各种 TypedDict,很容易产生循环引用。
tool.py 的特殊地位
tool.py 是整个目录里唯一同时包含消息类型和数据结构的文件:
# tool.py 同时定义了:
# 1. 消息类型
class ToolMessage(BaseMessage):
tool_call_id: str
status: Literal["success", "error"] = "success"
# 2. 数据结构
class ToolCall(TypedDict):
name: str
args: dict[str, Any]
id: str | None
class ToolCallChunk(TypedDict):
name: str | None
args: str | None
id: str | None
index: int | None
# 3. 工厂函数
def tool_call(*, name, args, id) -> ToolCall: ...
# 4. 解析函数
def default_tool_parser(raw_tool_calls) -> tuple[list[ToolCall], list[InvalidToolCall]]: ...
这样设计是因为 tool call 横跨两个维度——ToolMessage 是消息角色(工具返回结果),ToolCall 是数据结构(AI 请求调用工具),而 ToolMessage.tool_call_id 关联 ToolCall.id,语义上密不可分。
注意 content.py 里也有一套 ToolCall(v1 标准版,多了 index、extras 字段)。两套共存是历史过渡——tool.py 的是旧版供 Pydantic 字段用,content.py 的是新版供 content_blocks 输出用。
继承体系
Message 和 Chunk 的关系
每个 Message 都有一个对应的 Chunk 版本,用于流式场景。Chunk 继承 Message:
class BaseMessage(Serializable):
content: str | list[str | dict]
type: str
id: str | None
additional_kwargs: dict
response_metadata: dict
class BaseMessageChunk(BaseMessage):
def __add__(self, other): # ← 支持拼接
...
子类的继承关系(钻石继承):
Serializable
│
BaseMessage
╱ ╲
HumanMessage BaseMessageChunk
╲ ╱
HumanMessageChunk
class HumanMessage(BaseMessage):
type: Literal["human"] = "human"
class HumanMessageChunk(HumanMessage, BaseMessageChunk):
type: Literal["HumanMessageChunk"] = "HumanMessageChunk"
HumanMessageChunk 同时继承了 HumanMessage 的角色属性和 BaseMessageChunk 的拼接能力。
AIMessageChunk 最复杂,额外多了 tool_call_chunks、chunk_position、init_tool_calls 等:
class AIMessageChunk(AIMessage, BaseMessageChunk):
tool_call_chunks: list[ToolCallChunk] = []
chunk_position: Literal["last"] | None = None
@model_validator(mode="after")
def init_tool_calls(self) -> Self: ... # 自动解析
def __add__(self, other):
return add_ai_message_chunks(self, other) # 专门的聚合逻辑
Message ↔ Chunk 互转
utils.py 里维护了一张映射表:
_MSG_CHUNK_MAP = {
HumanMessage: HumanMessageChunk,
AIMessage: AIMessageChunk,
SystemMessage: SystemMessageChunk,
ToolMessage: ToolMessageChunk,
FunctionMessage: FunctionMessageChunk,
ChatMessage: ChatMessageChunk,
}
_CHUNK_MSG_MAP = {v: k for k, v in _MSG_CHUNK_MAP.items()}
def _msg_to_chunk(message):
chunk_cls = _MSG_CHUNK_MAP[message.__class__]
return chunk_cls(**message.model_dump(exclude={"type"}))
def _chunk_to_msg(chunk):
msg_cls = _CHUNK_MSG_MAP[chunk.__class__]
return msg_cls(**chunk.model_dump(exclude={"type", "tool_call_chunks", "chunk_position"}))
这在 merge_message_runs 中被使用——把 Message 转成 Chunk 来利用 __add__ 合并,再转回 Message。
流式聚合的实现
merge_content:内容拼接
def merge_content(first_content, *contents):
merged = first_content
for content in contents:
if isinstance(merged, str) and isinstance(content, str):
merged += content # "今天" + "天气" → "今天天气"
elif isinstance(merged, str) and isinstance(content, list):
merged = [merged, *content] # str + list → list
elif isinstance(merged, list) and isinstance(content, list):
merged = merge_lists(merged, content) # list + list → 按 index 智能合并
elif merged and isinstance(merged[-1], str):
merged[-1] += content # list 最后元素是 str,追加
elif content == "":
pass # 空字符串忽略
elif merged:
merged.append(content) # 其他情况追加
return merged
4 种组合的实际效果:
# str + str
merge_content("你好", "世界") # → "你好世界"
# str + list
merge_content("你好", [{"type": "text", "text": "世界"}])
# → ["你好", {"type": "text", "text": "世界"}]
# list + list
merge_content(
[{"type": "text", "text": "你好", "index": 0}],
[{"type": "text", "text": "世界", "index": 0}],
)
# → [{"type": "text", "text": "你好世界", "index": 0}] ← merge_lists 按 index 合并
merge_lists:按 index 对齐合并
merge_lists(来自 utils/_merge.py)是流式聚合的关键。它不是简单的 list.extend,而是按 dict 里的 index 字段对齐合并。
直觉上:
# 两个 list 里 index=0 的 dict 会被合并在一起
left = [{"text": "你好", "index": 0}]
right = [{"text": "世界", "index": 0}]
merge_lists(left, right)
# → [{"text": "你好世界", "index": 0}]
这就是为什么流式 tool_call_chunks 能正确合并——同一个工具调用的碎片有相同的 index。
add_ai_message_chunks:完整的聚合流程
def add_ai_message_chunks(left, *others):
# 1. 合并 content
content = merge_content(left.content, *(o.content for o in others))
# 2. 合并 additional_kwargs 和 response_metadata(递归合并 dict)
additional_kwargs = merge_dicts(left.additional_kwargs, *(o.additional_kwargs for o in others))
response_metadata = merge_dicts(left.response_metadata, *(o.response_metadata for o in others))
# 3. 合并 tool_call_chunks(按 index 合并)
raw_tool_calls = merge_lists(left.tool_call_chunks, *(o.tool_call_chunks for o in others))
# 4. 合并 usage_metadata(递归加法)
usage_metadata = add_usage(left.usage_metadata, other.usage_metadata)
# 5. 选最好的 ID
# 优先级:提供商原始 ID > lc_run-* > lc_*
for id_ in all_ids:
if not id_.startswith("lc_"):
chunk_id = id_ # 最高优先级,立即返回
break
rank = 1 if id_.startswith("lc_run-") else 0
if rank > best_rank:
chunk_id = id_
# 6. 传播 chunk_position
chunk_position = "last" if any(x.chunk_position == "last" for x in [left, *others]) else None
return AIMessageChunk(content=content, tool_call_chunks=..., usage_metadata=..., id=chunk_id, ...)
ID 的优先级设计很有意思:提供商分配的 ID(如 OpenAI 的 chatcmpl-xxx)优先级最高,LangChain 自动生成的 lc_run-* 次之,其他 lc_* 最低。这保证了如果提供商给了 ID,它会被保留。
add_usage:token 用量合并
def add_usage(left, right):
if not (left or right):
return UsageMetadata(input_tokens=0, output_tokens=0, total_tokens=0)
if not (left and right):
return left or right
# 递归加法合并(包括嵌套的 input_token_details、output_token_details)
return UsageMetadata(**_dict_int_op(left, right, operator.add))
_dict_int_op 递归遍历两个 dict,对所有 int 值执行加法。所以 input_tokens: 5 + input_tokens: 3 = input_tokens: 8,嵌套的 cache_read: 100 + cache_read: 50 = cache_read: 150。
工具函数层的双模式设计
utils.py 是最大的文件(~1500 行),包含 filter_messages、merge_message_runs、trim_messages、convert_to_openai_messages 等。
这些函数有一个共同的设计——双模式,通过 @_runnable_support 装饰器实现。
@_runnable_support 的实现
def _runnable_support(func):
@wraps(func)
def wrapped(messages=None, *args, **kwargs):
if messages is not None:
return func(messages, *args, **kwargs) # 直接执行
else:
return RunnableLambda(partial(func, **kwargs), name=func.__name__) # 返回 Runnable
return wrapped
效果:
@_runnable_support
def filter_messages(messages, *, include_types=None, ...):
...
# 传了 messages → 直接返回结果
result = filter_messages(messages, include_types=["ai"]) # → list[BaseMessage]
# 不传 messages → 返回一个 Runnable,可以放进 chain
chain = filter_messages(include_types=["ai"]) | llm
result = chain.invoke(messages)
merge_message_runs 的实现
这个函数利用了 Chunk 的 __add__ 来实现合并:
@_runnable_support
def merge_message_runs(messages, *, chunk_separator="\n"):
messages = convert_to_messages(messages)
merged = []
for msg in messages:
last = merged.pop() if merged else None
if not last:
merged.append(msg)
elif isinstance(msg, ToolMessage) or not isinstance(msg, last.__class__):
merged.extend([last, msg]) # 不同类型,不合并
else:
last_chunk = _msg_to_chunk(last) # Message → Chunk
curr_chunk = _msg_to_chunk(msg)
# 两个 str content 之间加分隔符
if isinstance(last_chunk.content, str) and isinstance(curr_chunk.content, str):
last_chunk.content += chunk_separator
merged.append(_chunk_to_msg(last_chunk + curr_chunk)) # Chunk.__add__ → 转回 Message
return merged
设计很巧妙——不是重新实现合并逻辑,而是复用已有的 Chunk 拼接能力。_msg_to_chunk 和 _chunk_to_msg 通过映射表互转。
trim_messages 的实现
trim_messages 支持 "first" 和 "last" 两种策略。"last" 策略的实现很有趣——反转消息列表,调用 “first” 策略,再反转回来:
def _last_max_tokens(messages, *, max_tokens, token_counter, ...):
# 处理 system 消息
system_message = None
if include_system and isinstance(messages[0], SystemMessage):
system_message = messages[0]
messages = messages[1:]
remaining_tokens = max_tokens - token_counter([system_message])
# 反转!
reversed_messages = messages[::-1]
# 用 "first" 策略处理反转后的列表
reversed_result = _first_max_tokens(
reversed_messages,
max_tokens=remaining_tokens,
token_counter=token_counter,
partial_strategy="last" if allow_partial else None,
end_on=start_on, # ← 注意:start_on 变成了 end_on
)
# 再反转回来
result = reversed_result[::-1]
if system_message:
result = [system_message, *result]
return result
_first_max_tokens 内部使用二分搜索来高效找到在 token 限制内能包含的最大消息数,而不是线性逐条添加:
def _first_max_tokens(messages, *, max_tokens, token_counter, ...):
# 二分搜索:找到最大的 mid 使得 token_counter(messages[:mid]) <= max_tokens
left, right = 0, len(messages)
for _ in range(len(messages).bit_length()):
if left >= right:
break
mid = (left + right + 1) // 2
if token_counter(messages[:mid]) <= max_tokens:
left = mid
else:
right = mid - 1
idx = left
# ...
懒加载入口
__init__.py 不直接 import 任何子模块,而是通过 __getattr__ 按需加载:
# __init__.py
_dynamic_imports = {
"AIMessage": "ai",
"HumanMessage": "human",
"ToolCall": "tool",
"convert_to_openai_messages": "utils",
"convert_to_openai_image_block": "block_translators.openai",
# ... 60+ 个映射
}
def __getattr__(attr_name):
module_name = _dynamic_imports.get(attr_name)
result = import_attr(attr_name, module_name, __spec__.parent)
globals()[attr_name] = result # 缓存,下次不触发 __getattr__
return result
好处:import langchain_core.messages 几乎零成本。不会触发所有子模块和 block_translators 的加载。只有你真正用到 HumanMessage 时才加载 human.py。
同时 TYPE_CHECKING 块里有完整的 import,让 IDE 能正确提供类型提示和自动补全:
if TYPE_CHECKING:
from langchain_core.messages.ai import AIMessage, AIMessageChunk, ...
from langchain_core.messages.human import HumanMessage, ...
# ...
block_translators/ 为什么是独立子目录
翻译逻辑放在哪里,有几个选择:
方案 A:放在 ai.py 里?
→ ai.py 已经 500 行,加上 7 个提供商的翻译逻辑会到 3000+
方案 B:放在 base.py 里?
→ base.py 是基类,不该知道具体提供商的存在
方案 C:放在各自的集成包里(langchain-openai、langchain-anthropic)?
→ content_blocks 是 core 的属性,不能依赖集成包
方案 D:block_translators/ 子目录,每个提供商一个文件 ← 实际选择
选择 D 的原因:标准化逻辑在 core 里,但要理解各提供商格式。这是一个无法避免的矛盾——只能通过在 core 里维护适配层来解决。
注册表模式让系统可扩展——新增提供商只需要:
- 在
block_translators/下新建一个.py文件 - 实现
translate_content和translate_content_chunk - 调用
register_translator注册 - 在
__init__.py的_register_translators中添加一行
依赖方向图
┌─────────────────────┐
│ __init__.py │ 懒加载入口
│ (按需 import) │
└────────┬────────────┘
│
┌────────▼────────────┐
│ utils.py │ 工具层(横切所有类型)
│ filter / merge / │
│ trim / convert │
└────────┬────────────┘
│ import all message types
┌────────────────┼────────────────┐
│ │ │
┌───────▼──────┐ ┌──────▼───────┐ ┌──────▼───────┐
│ ai.py │ │ human.py │ │ tool.py │ 角色层
│ ~500 行 │ │ ~70 行 │ │ ~300 行 │
└───────┬──────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└────────────────┼────────────────┘
│ import BaseMessage
┌────────▼────────────┐
│ base.py │ 骨架层
│ BaseMessage │
│ BaseMessageChunk │
│ merge_content │
└────────┬────────────┘
│ import TypedDicts
┌────────▼────────────┐
│ content.py │ 数据定义层(零依赖)
│ TextContentBlock │
│ ImageContentBlock │
│ ToolCall │
│ ... │
└─────────────────────┘
block_translators/ 适配层(独立子目录)
├── __init__.py (注册表)
├── anthropic.py ←── bedrock.py (复用)
├── openai.py
├── google_genai.py ←── google_vertexai.py (复用)
├── bedrock_converse.py
├── groq.py
└── langchain_v0.py
依赖方向严格自下而上:content.py 不依赖任何人 → base.py 只依赖 content → 角色层依赖 base → utils 依赖所有角色 → block_translators 依赖 base + content + 角色层。
没有循环依赖。每一层的职责清晰。
5 个架构设计原则
回顾整个消息系统的设计,可以提炼出 5 个核心原则:
1. 按角色拆文件,复杂度隔离
简单的角色(human、system)保持简单,复杂的角色(ai)集中管理。避免"一个大文件"导致的认知负担。
2. 数据定义零依赖,依赖方向单向
content.py 是纯 TypedDict,不 import 任何 messages 内部模块。所有依赖关系从上到下,不成环。
3. 注册表模式实现可扩展
PROVIDER_TRANSLATORS 全局字典 + register_translator 函数。内置提供商在模块加载时自注册,外部提供商也能随时注册。新增提供商不需要修改核心代码。
4. 双模式统一接口
filter_messages、merge_message_runs、trim_messages 既能直接调用返回结果,又能不传参数返回 Runnable 放进 chain。一个 @_runnable_support 装饰器搞定。
5. 渐进式无损解析
- 不丢类型:不认识的 block →
NonStandardContentBlock(保留原始数据) - 不丢字段:不认识的字段 →
extras(保留在标准 block 上) - 多轮尝试:best-effort 管道依次尝试所有已知格式
- 容错解析:
parse_partial_json处理不完整的流式 JSON
这保证了无论什么提供商返回什么格式,数据永远不会在标准化过程中丢失。
写在最后
LangChain 的消息系统在表面上很简单——HumanMessage(content="你好") 就能用。但底层为了解决"多提供商格式不统一"这个核心问题,构建了一套完整的三层架构:
标准层:ContentBlock TypedDict(统一的数据表示)
翻译层:Block Translators 注册表(可插拔的格式转换)
解析层:渐进式无损解析(non_standard 兜底 + 多轮尝试 + extras 保留)
理解了这套架构,你就能:
- 在多个模型提供商之间无缝切换
- 正确处理流式 tool call 的聚合
- 在需要时写自己的 translator
- 读懂 LangChain 集成包的消息处理代码
更多推荐



所有评论(0)