本系列共 4 部分,从日常使用到源码架构,完整拆解 LangChain 的消息系统。

  • 第 1 部分:5 分钟上手(本文)
  • 第 2 部分:Tool Call 全流程
  • 第 3 部分:Content Block 标准化
  • 第 4 部分:源码架构与设计哲学

LangChain 消息系统深度解析(一):5 分钟上手

一个能跑的例子

from langchain_core.messages import SystemMessage, HumanMessage, AIMessage

messages = [
    SystemMessage(content="你是一个天气助手"),
    HumanMessage(content="北京今天天气怎么样?"),
    AIMessage(content="今天北京晴天,气温25°C,适合出行。"),
]

# 看看 AIMessage 内部长什么样
print(messages[2].model_dump())

输出:

{
    "content": "今天北京晴天,气温25°C,适合出行。",
    "type": "ai",
    "id": None,
    "name": None,
    "additional_kwargs": {},
    "response_metadata": {},
    "tool_calls": [],
    "invalid_tool_calls": [],
    "usage_metadata": None,
}

content 是消息内容,type 标识消息类型——就这么简单。其他字段后面会用到,先不管。


6 种消息类型

LangChain 把聊天中的每条消息都封装成一个对象。不同角色,不同类型:

1. SystemMessage —— 系统指令

SystemMessage(content="你是一个专业的翻译助手,只翻译,不解释。")

给模型的"规则手册",通常放在对话最开头。

2. HumanMessage —— 用户输入

HumanMessage(content="把这句话翻译成英文:今天天气真好")

用户说的话。

3. AIMessage —— 模型回复

AIMessage(content="The weather is really nice today.")

模型的回复。这是 6 种消息里最复杂的一种——它除了文本内容,还可能包含工具调用 (tool_calls)、token 用量 (usage_metadata)、推理过程等。

4. ToolMessage —— 工具返回结果

from langchain_core.messages import ToolMessage

ToolMessage(
    content="北京 25°C 晴天",
    tool_call_id="call_abc123",  # ← 必须填,关联到 AI 的哪次工具调用
)

工具执行完后,把结果包成 ToolMessage 返回给模型。tool_call_id 是必填的——因为 AI 可能同时调用多个工具,需要靠这个 ID 对应"哪个结果是哪个调用的"。

5. ChatMessage —— 自定义角色

from langchain_core.messages import ChatMessage

ChatMessage(role="moderator", content="请保持对话友善。")

当内置的 human/ai/system 不够用时,可以自定义角色名。

6. RemoveMessage —— 删除消息

from langchain_core.messages import RemoveMessage

RemoveMessage(id="msg_to_delete")

不是真正的聊天消息,而是一个"指令"——告诉 LangGraph 等框架删除 ID 对应的消息。在需要管理对话历史时使用。


content 的两种写法

纯文本

HumanMessage(content="你好")

最简单的方式,content 就是一个字符串。

多模态(文本 + 图片 + 音频…)

当你需要发送图片、音频等内容时,content 变成一个列表,每个元素是一个 dict:

HumanMessage(content=[
    {"type": "text", "text": "这张图片里是什么?"},
    {
        "type": "image",
        "url": "https://example.com/cat.jpg",
    },
])

也可以用 base64:

HumanMessage(content=[
    {"type": "text", "text": "描述这张图片"},
    {
        "type": "image",
        "base64": "iVBORw0KGgo...",  # 图片的 base64 编码
        "mime_type": "image/png",
    },
])

还可以用工厂函数,获得类型提示和自动 ID 生成:

from langchain_core.messages.content import create_text_block, create_image_block

HumanMessage(content=[
    create_text_block("描述这张图片"),
    create_image_block(url="https://example.com/cat.jpg"),
])

这些 dict 在 LangChain 内部叫 Content Block,后续第 3 部分会详细讲。


3 个最实用的工具函数

filter_messages —— 按条件过滤

from langchain_core.messages import filter_messages

messages = [
    SystemMessage(content="你是助手"),
    HumanMessage(content="你好", name="alice"),
    AIMessage(content="你好!", id="ai_1"),
    HumanMessage(content="天气呢?", name="bob"),
    AIMessage(content="今天晴天", id="ai_2"),
]

# 只要 AI 消息
filter_messages(messages, include_types=["ai"])
# → [AIMessage("你好!"), AIMessage("今天晴天")]

# 排除特定 ID
filter_messages(messages, exclude_ids=["ai_1"])
# → [SystemMessage("你是助手"), HumanMessage("你好"), HumanMessage("天气呢?"), AIMessage("今天晴天")]

# 只要特定用户
filter_messages(messages, include_names=["alice"])
# → [HumanMessage("你好")]

merge_message_runs —— 合并连续同类型消息

有些场景下,连续多条同类型消息需要合并成一条(比如模型要求 human/ai 必须交替出现):

from langchain_core.messages import merge_message_runs

messages = [
    SystemMessage(content="你是助手"),
    HumanMessage(content="你好"),
    HumanMessage(content="天气呢?"),       # 连续两条 human
    AIMessage(content="让我查查"),
    AIMessage(content="北京 25°C 晴"),      # 连续两条 ai
]

merged = merge_message_runs(messages)
# → [
#     SystemMessage("你是助手"),
#     HumanMessage("你好\n天气呢?"),      ← 合并了
#     AIMessage("让我查查\n北京 25°C 晴"), ← 合并了
#   ]

注意:ToolMessage 永远不会被合并,因为每条都有独立的 tool_call_id

trim_messages —— 按 token 裁剪

对话历史太长时,需要裁剪到模型的 token 限制以内:

from langchain_core.messages import trim_messages

messages = [
    SystemMessage(content="你是助手"),
    HumanMessage(content="第一个问题..."),
    AIMessage(content="第一个回答..."),
    HumanMessage(content="第二个问题..."),
    AIMessage(content="第二个回答..."),
    HumanMessage(content="最新的问题"),
]

# 保留最近的消息,总 token 不超过 100
# 同时保留 SystemMessage 并确保从 human 消息开始
trimmed = trim_messages(
    messages,
    max_tokens=100,
    token_counter="approximate",  # 用近似计数,速度快
    strategy="last",              # 保留最近的
    include_system=True,          # 保留 system 消息
    start_on="human",             # 确保从 human 开始
)
# → [SystemMessage("你是助手"), HumanMessage("最新的问题")]

放进 Chain 里用

这三个函数都支持双模式——传入消息直接执行,不传消息则返回一个 Runnable

# 模式一:直接调用
result = filter_messages(messages, include_types=["ai"])

# 模式二:放进 chain(不传 messages 参数)
from langchain_openai import ChatOpenAI

llm = ChatOpenAI()
chain = filter_messages(include_types=["human", "ai"]) | llm
# 后续 chain.invoke(messages) 时,先过滤再送入模型

两个重要的属性

.text —— 总是给你纯文本

不管 content 是字符串还是多模态列表,.text 都只提取文本部分:

msg1 = AIMessage(content="你好世界")
print(msg1.text)  # → "你好世界"

msg2 = AIMessage(content=[
    {"type": "text", "text": "这是文字"},
    {"type": "image", "url": "https://..."},
    {"type": "text", "text": "还有文字"},
])
print(msg2.text)  # → "这是文字还有文字"  (只提取 text 类型的块)

.content_blocks —— 标准化的内容块

不同模型提供商返回的 content 格式天差地别,但 .content_blocks 把它们统一成标准格式:

# Anthropic 返回的格式
msg = AIMessage(
    content=[
        {"type": "text", "text": "让我想想..."},
        {"type": "thinking", "thinking": "用户问的是...", "signature": "EpoWCpc..."},
    ],
    response_metadata={"model_provider": "anthropic"},
)

# 不管原始格式是什么,content_blocks 输出统一格式
blocks = msg.content_blocks
# → [
#     {"type": "text", "text": "让我想想..."},
#     {"type": "reasoning", "reasoning": "用户问的是...", "extras": {"signature": "EpoWCpc..."}},
#   ]

注意看:Anthropic 的 thinking 变成了标准的 reasoning,而 Anthropic 特有的 signature 字段被放进了 extras 中,不会丢失。

这个标准化机制是 LangChain 消息系统最精妙的设计之一,第 3 篇会深入拆解。


所有消息的公共字段

所有消息类型都继承自 BaseMessage,共享这些字段:

字段 类型 说明
content `str list[str
type str 消息类型标识("human"
, "ai"
, "system"
等)
id `str None`
name `str None`
additional_kwargs dict 提供商原始附加数据(如 OpenAI 的原始 tool_calls)
response_metadata dict 响应元数据(如 token 计数、模型名称、model_provider)

AIMessage 额外多出的字段:

字段 类型 说明
tool_calls list[ToolCall] 已解析的工具调用请求
invalid_tool_calls list[InvalidToolCall] 解析失败的工具调用
usage_metadata `UsageMetadata None`

小结

本篇覆盖了日常使用中最常接触的部分:

  • 6 种消息类型,各有角色分工
  • content 支持纯文本和多模态列表
  • 3 个工具函数(filter / merge / trim),既能直接调用也能放进 chain
  • .text 取纯文本,.content_blocks 取标准化内容块

但还有两个关键问题没回答:

  1. AI 的 tool_calls 是怎么从不同提供商的原始格式自动解析出来的?
  2. content_blocks 是怎么做到不同提供商返回统一格式的?

下一部分讲 Tool Call 的完整流程。

LangChain 消息系统深度解析(二):Tool Call 全流程

本系列共 4 部分。本文是第 2 部分,拆解工具调用从请求到返回的完整链路。


一个完整的工具调用往返

先看全貌——AI 请求调用工具,工具返回结果,再送回模型:

from langchain_core.messages import AIMessage, ToolMessage, HumanMessage

# 1. 用户提问
user = HumanMessage(content="北京今天天气怎么样?")

# 2. AI 决定调用工具(模型返回的)
ai = AIMessage(
    content="让我查一下天气",
    tool_calls=[
        {
            "name": "get_weather",
            "args": {"city": "北京"},
            "id": "call_abc123",
            "type": "tool_call",
        }
    ],
)

# 3. 你执行工具,把结果包成 ToolMessage
tool_result = ToolMessage(
    content="北京 25°C 晴天,湿度40%",
    tool_call_id="call_abc123",  # ← 必须和上面的 id 对应
)

# 4. 把整个对话送回模型
messages = [user, ai, tool_result]
# response = model.invoke(messages)
# → AI 拿到工具结果后生成最终回答

关键在第 3 步:tool_call_id="call_abc123" 必须和 AIMessage.tool_calls 里的 id 对应,否则模型无法将结果和请求关联。


两种构造 Tool Call 的方式

方式一:直接写 tool_calls(推荐)

ai = AIMessage(
    content="让我查一下天气",
    tool_calls=[
        {
            "name": "get_weather",
            "args": {"city": "北京"},
            "id": "call_abc123",
            "type": "tool_call",
        }
    ],
)

print(ai.tool_calls)
# [{'name': 'get_weather', 'args': {'city': '北京'}, 'id': 'call_abc123', 'type': 'tool_call'}]

方式二:放在 additional_kwargs(OpenAI 原始格式)

OpenAI API 返回的工具调用是这种格式,arguments 是 JSON 字符串:

ai2 = AIMessage(
    content="",
    additional_kwargs={
        "tool_calls": [
            {
                "id": "call_abc123",
                "function": {
                    "name": "get_weather",
                    "arguments": '{"city": "北京"}',  # ← JSON 字符串,不是 dict
                },
            }
        ]
    },
)

# 但你访问 tool_calls,发现已经自动解析好了!
print(ai2.tool_calls)
# [{'name': 'get_weather', 'args': {'city': '北京'}, 'id': 'call_abc123', 'type': 'tool_call'}]

两种方式的 .tool_calls 输出完全一样。方式二的 JSON 字符串被自动解析成了 dict。


自动解析的秘密:model_validator

这个"自动解析"不是魔法,是 Pydantic 的 model_validator 在对象创建时触发的。

AIMessage 的源码(ai.py)里有这段逻辑:

class AIMessage(BaseMessage):
    tool_calls: list[ToolCall] = []
    invalid_tool_calls: list[InvalidToolCall] = []

    @model_validator(mode="before")
    @classmethod
    def _backwards_compat_tool_calls(cls, values: dict) -> Any:
        # 如果 tool_calls 和 invalid_tool_calls 都是空的
        check = not any(
            values.get(k) for k in ("tool_calls", "invalid_tool_calls", "tool_call_chunks")
        )

        if check and (raw := values.get("additional_kwargs", {}).get("tool_calls")):
            # 从 additional_kwargs 里取出原始数据,自动解析!
            parsed_tool_calls, parsed_invalid_tool_calls = default_tool_parser(raw)
            values["tool_calls"] = parsed_tool_calls
            values["invalid_tool_calls"] = parsed_invalid_tool_calls

        return values

执行流程:

创建 AIMessage
    ↓
model_validator 触发(mode="before",在字段赋值之前)
    ↓
检查 tool_calls 是否为空?
    ↓ 是
检查 additional_kwargs["tool_calls"] 有数据吗?
    ↓ 有
调用 default_tool_parser 解析
    ↓
自动填充 tool_calls 和 invalid_tool_calls

default_tool_parsertool.py 里,核心就是把 OpenAI 格式的 function.arguments JSON 字符串解析出来:

def default_tool_parser(raw_tool_calls):
    tool_calls = []
    invalid_tool_calls = []
    for raw in raw_tool_calls:
        if "function" not in raw:
            continue
        name = raw["function"]["name"]
        try:
            args = json.loads(raw["function"]["arguments"])  # 解析 JSON 字符串
            tool_calls.append(tool_call(name=name, args=args, id=raw.get("id")))
        except json.JSONDecodeError:
            # JSON 解析失败 → 放进 invalid_tool_calls
            invalid_tool_calls.append(
                invalid_tool_call(name=name, args=raw["function"]["arguments"], id=raw.get("id"), error=None)
            )
    return tool_calls, invalid_tool_calls

解析失败时会发生什么

模型有时候会生成不合法的 JSON(尤其是小模型或复杂参数时):

ai_bad = AIMessage(
    content="",
    additional_kwargs={
        "tool_calls": [
            {
                "id": "call_xyz",
                "function": {
                    "name": "search",
                    "arguments": '{"query": "天气"',  # ← 缺少右花括号!
                },
            }
        ]
    },
)

print(ai_bad.tool_calls)
# []  ← 空的!

print(ai_bad.invalid_tool_calls)
# [{'name': 'search', 'args': '{"query": "天气"', 'id': 'call_xyz', 'error': None, 'type': 'invalid_tool_call'}]

解析失败的调用被放进了 invalid_tool_callsargs 保留原始字符串。你可以据此做错误处理或重试。


ToolCall 的两套数据结构

LangChain 内部有两套 ToolCall 定义,这是历史遗留:

旧版:tool.py 中的 ToolCall

# tool.py —— 用于 AIMessage.tool_calls 字段类型
class ToolCall(TypedDict):
    name: str
    args: dict[str, Any]
    id: str | None
    type: NotRequired[Literal["tool_call"]]

新版:content.py 中的 ToolCall

# content.py —— 用于 content_blocks 输出的 v1 标准格式
class ToolCall(TypedDict):
    type: Literal["tool_call"]
    id: str | None
    name: str
    args: dict[str, Any]
    index: NotRequired[int | str]     # ← 多了这个,用于流式
    extras: NotRequired[dict[str, Any]]  # ← 多了这个,存提供商特有数据
旧版 (tool.py) 新版 (content.py)
用途 AIMessage.tool_calls
字段
.content_blocks
输出
type
字段
NotRequired
,可选
Required
,固定为 "tool_call"
index 有,流式聚合用
extras 有,存提供商特有数据

日常使用中你不需要关心这个区别——直接用 AIMessage.tool_calls 就好。


流式场景:Tool Call 是碎片化的

流式(streaming)时,模型不会一次性返回完整的 tool call。JSON 参数是一块一块吐出来的:

from langchain_core.messages import AIMessageChunk

# 第 1 个 chunk:拿到了函数名和 JSON 的开头
chunk1 = AIMessageChunk(
    content="",
    tool_call_chunks=[
        {"name": "get_weather", "args": '{"ci', "id": "call_abc123", "index": 0}
    ],
)

# 第 2 个 chunk:拿到了 JSON 的剩余部分
chunk2 = AIMessageChunk(
    content="",
    tool_call_chunks=[
        {"name": None, "args": 'ty": "北京"}', "id": None, "index": 0}
    ],
)

注意 index=0——两个 chunk 的 index 相同,表示它们属于同一个工具调用。

拼接

full = chunk1 + chunk2

# tool_call_chunks 被按 index 合并
print(full.tool_call_chunks)
# [{'name': 'get_weather', 'args': '{"city": "北京"}', 'id': 'call_abc123', 'index': 0, 'type': 'tool_call_chunk'}]

# tool_calls 自动尝试解析完整的 JSON
print(full.tool_calls)
# [{'name': 'get_weather', 'args': {'city': '北京'}, 'id': 'call_abc123', 'type': 'tool_call'}]

碎片化的 '{"ci' + 'ty": "北京"}' 被拼成了完整的 '{"city": "北京"}',然后自动 json.loads 成 dict。

底层实现

拼接时调用的是 add_ai_message_chunks 函数(ai.py),其中关键一步:

# 按 index 合并 tool_call_chunks
raw_tool_calls = merge_lists(left.tool_call_chunks, *(o.tool_call_chunks for o in others))

merge_lists(来自 _merge.py)的逻辑:两个 dict 如果 index 相同且不为 None,就合并(字符串拼接、非空值优先)。这就是为什么 index=0 的两个 chunk 会被合并到一起。

合并完成后,init_tool_calls 这个 model_validator 自动触发:

class AIMessageChunk(AIMessage, BaseMessageChunk):

    @model_validator(mode="after")
    def init_tool_calls(self) -> Self:
        tool_calls = []
        invalid_tool_calls = []

        for chunk in self.tool_call_chunks:
            # 用 parse_partial_json 做容错解析
            args_ = parse_partial_json(chunk["args"]) if chunk["args"] else {}
            if isinstance(args_, dict):
                tool_calls.append(create_tool_call(name=chunk["name"] or "", args=args_, id=chunk["id"]))
            else:
                invalid_tool_calls.append(...)

        self.tool_calls = tool_calls
        self.invalid_tool_calls = invalid_tool_calls
        return self

parse_partial_json 是一个容错 JSON 解析器——即使 JSON 不完整(比如只收到了 '{"ci'),也不会报错,而是尽量解析。当所有 chunk 拼完后,JSON 变完整了,就能正确解析成 dict。

chunk_position=“last” 的特殊处理

当流结束时,最后一个 chunk 会带 chunk_position="last" 标记。此时如果 content 里有 tool_call_chunk 类型的 block,会被替换成完整的 tool_call

# 流式聚合的最后一步
if self.chunk_position == "last" and self.tool_call_chunks:
    # 把 content 里的 tool_call_chunk 替换成完整的 tool_call
    for idx, block in enumerate(self.content):
        if block.get("type") == "tool_call_chunk" and call_id in id_to_tc:
            self.content[idx] = id_to_tc[call_id]  # 替换!

多个工具并发调用

AI 可以在一条消息里同时调用多个工具:

ai = AIMessage(
    content="让我同时查天气和汇率",
    tool_calls=[
        {"name": "get_weather", "args": {"city": "北京"}, "id": "call_1", "type": "tool_call"},
        {"name": "get_exchange_rate", "args": {"from": "USD", "to": "CNY"}, "id": "call_2", "type": "tool_call"},
    ],
)

# 每个工具结果都要有对应的 tool_call_id
tool1 = ToolMessage(content="北京 25°C 晴", tool_call_id="call_1")
tool2 = ToolMessage(content="1 USD = 7.24 CNY", tool_call_id="call_2")

# 注意顺序:AI → Tool1 → Tool2 → 再送回模型
messages = [user, ai, tool1, tool2]

在流式场景下,多个并发调用的 chunk 通过不同的 index 值来区分:

# 两个工具调用同时流式返回
chunk = AIMessageChunk(
    content="",
    tool_call_chunks=[
        {"name": "get_weather", "args": '{"city":', "id": "call_1", "index": 0},       # 第 1 个工具
        {"name": "get_exchange_rate", "args": '{"from":', "id": "call_2", "index": 1},  # 第 2 个工具
    ],
)

merge_lists 会按 index 分别合并——index=0 的归一堆,index=1 的归另一堆。


ToolMessage 的数据结构

class ToolMessage(BaseMessage):
    tool_call_id: str                             # 必填,关联 AIMessage 的 tool_call id
    type: Literal["tool"] = "tool"
    artifact: Any = None                          # 完整工具输出(不送给模型)
    status: Literal["success", "error"] = "success"

几个重要的细节:

artifact —— 保存完整输出但不送给模型

tool_output = {
    "summary": "北京 25°C 晴天",
    "raw_data": {"temp": 25, "humidity": 40, "wind_speed": 12, ...},  # 模型不需要这些
}

ToolMessage(
    content=tool_output["summary"],         # 只把摘要送给模型
    artifact=tool_output,                    # 完整数据保存在 artifact 里
    tool_call_id="call_abc123",
)

status —— 标记执行成功或失败

# 工具执行失败
ToolMessage(
    content="API 请求超时,请稍后重试",
    tool_call_id="call_abc123",
    status="error",
)

tool_calls vs content_blocks 里的 tool_call

AIMessage 上有两个地方可以看到 tool call 的信息:

  1. **message.tool_calls** —— 顶层字段,已解析的结构体列表
  2. **message.content_blocks** —— content 里 type="tool_call" 的 block

它们的关系:

ai = AIMessage(
    content="让我查一下",
    tool_calls=[
        {"name": "get_weather", "args": {"city": "北京"}, "id": "call_1", "type": "tool_call"}
    ],
)

# 方式一:直接访问
print(ai.tool_calls)
# [{'name': 'get_weather', 'args': {'city': '北京'}, 'id': 'call_1', 'type': 'tool_call'}]

# 方式二:通过 content_blocks
print(ai.content_blocks)
# [
#   {'type': 'text', 'text': '让我查一下'},
#   {'type': 'tool_call', 'name': 'get_weather', 'args': {'city': '北京'}, 'id': 'call_1'},
# ]

**.tool_calls**** 是"快捷方式"**——直接拿到已解析的工具调用。

**.content_blocks**** 是"完整视图"**——把文本、推理、工具调用等所有内容按顺序排列。如果 tool_calls 里的内容在 content 中没有对应的 block,content_blocks 属性会自动补上。

源码中的逻辑(ai.pycontent_blocks 属性):

@property
def content_blocks(self):
    blocks = super().content_blocks  # 先从 content 解析

    if self.tool_calls:
        # 检查 content 里已有的 tool_call id
        content_tool_call_ids = {
            block.get("id") for block in self.content
            if isinstance(block, dict) and block.get("type") == "tool_call"
        }
        # content 里没有的,从 tool_calls 补上
        for tool_call in self.tool_calls:
            if tool_call.get("id") not in content_tool_call_ids:
                blocks.append({
                    "type": "tool_call",
                    "id": tool_call.get("id"),
                    "name": tool_call["name"],
                    "args": tool_call["args"],
                })

    return blocks

完整数据流总结

提供商原始返回(OpenAI / Anthropic / ...)
    ↓
AIMessage 创建
    ↓
model_validator 触发:
  ├── additional_kwargs["tool_calls"] 有数据?
  │     → default_tool_parser 解析
  │     → JSON 成功 → tool_calls
  │     → JSON 失败 → invalid_tool_calls
  │
  └── 已经有 tool_calls?→ 跳过
    ↓
访问 .tool_calls     → 直接拿到已解析的结构体
访问 .content_blocks → 文本 + 推理 + 工具调用的完整有序列表
    ↓
流式场景额外步骤:
  chunk1 + chunk2 + ... + chunkN
    ↓
  merge_lists 按 index 合并 tool_call_chunks
    ↓
  init_tool_calls validator:parse_partial_json 解析
    ↓
  chunk_position="last":tool_call_chunk → tool_call

小结

  • AIMessage.tool_calls 是标准化的工具调用列表,推荐使用
  • additional_kwargs["tool_calls"] 存原始格式,model_validator 自动解析
  • JSON 解析失败不会报错,而是放进 invalid_tool_calls
  • 流式场景下,tool_call_chunksindex 合并,parse_partial_json 容错解析
  • ToolMessage 通过 tool_call_id 关联请求和结果

下一部分讲 Content Block 标准化——不同提供商的格式是如何被统一的。

LangChain 消息系统深度解析(三):Content Block 标准化

本系列共 4 部分。本文是第 3 部分,拆解 LangChain 如何用一套标准格式统一所有模型提供商的输出。


问题:同一张图片,4 种格式

同一张 base64 图片,不同提供商返回的格式完全不同:

# OpenAI
{"type": "image_url", "image_url": {"url": "data:image/png;base64,iVBOR..."}}

# Anthropic
{"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": "iVBOR..."}}

# Bedrock Converse
{"image": {"format": "png", "source": {"bytes": b"\x89PNG..."}}}

# Google GenAI
{"type": "image_url", "image_url": {"url": "iVBOR..."}}  # 裸 base64,没有 data: 前缀

如果你要写代码处理模型返回的图片,就得针对每个提供商写一套解析逻辑。

LangChain 的解决方案:**.content_blocks**** 属性把所有格式统一成一种标准**。

# 不管原始格式是什么,content_blocks 输出都是:
{"type": "image", "base64": "iVBOR...", "mime_type": "image/png"}

标准 ContentBlock 类型一览

所有标准类型定义在 content.py 中,都是 TypedDict

type 值 TypedDict 名 用途 关键字段
"text" TextContentBlock 文本输出 text
, annotations
"reasoning" ReasoningContentBlock 推理过程 reasoning
"image" ImageContentBlock 图片 url
/ base64
/ file_id
, mime_type
"audio" AudioContentBlock 音频 url
/ base64
/ file_id
, mime_type
"video" VideoContentBlock 视频 url
/ base64
/ file_id
, mime_type
"file" FileContentBlock 文件(PDF 等) url
/ base64
/ file_id
, mime_type
"text-plain" PlainTextContentBlock 纯文本文档 text
, title
, context
"tool_call" ToolCall 工具调用请求 name
, args
, id
"tool_call_chunk" ToolCallChunk 流式工具调用片段 name
, args
, id
, index
"invalid_tool_call" InvalidToolCall 解析失败的工具调用 name
, args
, error
"server_tool_call" ServerToolCall 服务端工具调用 name
, args
, id
"server_tool_result" ServerToolResult 服务端工具结果 tool_call_id
, status
, output
"non_standard" NonStandardContentBlock 兜底(未知格式) value

每个 block 都有 extras

除了 NonStandardContentBlock 外,所有 block 都有一个可选的 extras: dict 字段,用来存放提供商特有的元数据。

这是一个核心设计决策:标准化不等于丢数据

# Anthropic 返回的 thinking block 带有 signature
{"type": "thinking", "thinking": "让我想想...", "signature": "EpoWCpc..."}

# 翻译成标准格式后:
{
    "type": "reasoning",
    "reasoning": "让我想想...",
    "extras": {"signature": "EpoWCpc..."},  # ← signature 不丢,放进 extras
}

NonStandardContentBlock —— 兜底

当一个 block 完全无法识别时,不会报错,而是包成 NonStandardContentBlock

# 某个提供商返回了 LangChain 不认识的类型
{"type": "some_new_thing", "data": "..."}

# 变成:
{"type": "non_standard", "value": {"type": "some_new_thing", "data": "..."}}

原始数据完整保留在 value 里,不会丢失。


翻译是怎么发生的:三条路径

当你访问 message.content_blocks 时,内部按以下优先级选择翻译路径:

┌─────────────────────────────────────────────────┐
│ 检查 response_metadata["output_version"] == "v1" │
│     → 是:直接返回 content(已经是标准格式)       │
└────────────────────┬────────────────────────────┘
                     │ 否
                     ▼
┌─────────────────────────────────────────────────┐
│ 检查 response_metadata["model_provider"]         │
│     → 有值:查注册表,找到对应的 translator       │
│     → 调用 translator["translate_content"](msg)  │
└────────────────────┬────────────────────────────┘
                     │ 没有 provider 或没有 translator
                     ▼
┌─────────────────────────────────────────────────┐
│ Best-effort 多轮管道                              │
│   第一轮:快速分类                                 │
│     已知类型 → 保留                               │
│     未知类型 → 包成 non_standard                   │
│   第二轮:依次尝试 5 个提供商的输入格式解析器       │
│     v0 旧格式 → OpenAI → Anthropic → Google → Bedrock │
│     每步只动 non_standard block                    │
└─────────────────────────────────────────────────┘

路径一:已经是标准格式

# 如果集成包已经把 content 转成了 v1 格式,会设置 output_version
ai = AIMessage(
    content=[{"type": "text", "text": "你好"}],
    response_metadata={"output_version": "v1"},
)
ai.content_blocks  # → 直接返回 content,零开销

路径二:有 model_provider,用专门的 translator

# 集成包设置了 model_provider
ai = AIMessage(
    content=[
        {"type": "text", "text": "让我想想..."},
        {"type": "thinking", "thinking": "用户问的是...", "signature": "abc"},
    ],
    response_metadata={"model_provider": "anthropic"},
)

ai.content_blocks
# → 调用 Anthropic 的 translator,输出标准格式
# [
#   {"type": "text", "text": "让我想想..."},
#   {"type": "reasoning", "reasoning": "用户问的是...", "extras": {"signature": "abc"}},
# ]

路径三:没有 provider,best-effort 猜测

# 没有设置 model_provider(比如手动构造的消息)
msg = HumanMessage(content=[
    {"type": "text", "text": "看看这个图"},
    {"type": "image_url", "image_url": {"url": "data:image/png;base64,iVBOR..."}},
])

msg.content_blocks
# 第一轮:text 是已知类型保留,image_url 不是已知类型 → 包成 non_standard
# 第二轮:依次尝试各提供商的解析器
#   _convert_v0_multimodal_input_to_v1 → 不认识,跳过
#   _convert_to_v1_from_chat_completions_input → 认识 image_url!转换!
# → [
#     {"type": "text", "text": "看看这个图"},
#     {"type": "image", "base64": "iVBOR...", "mime_type": "image/png"},
#   ]

源码在 base.pycontent_blocks 属性里:

@property
def content_blocks(self):
    blocks = []
    content = [self.content] if isinstance(self.content, str) else self.content

    # 第一轮:快速分类
    for item in content:
        if isinstance(item, str):
            blocks.append({"type": "text", "text": item})
        elif isinstance(item, dict):
            if item.get("type") in KNOWN_BLOCK_TYPES:
                blocks.append(item)                              # 已知类型
            else:
                blocks.append({"type": "non_standard", "value": item})  # 未知类型
        # ...

    # 第二轮:依次尝试各提供商的输入格式解析器
    for parsing_step in [
        _convert_v0_multimodal_input_to_v1,           # LangChain v0 旧格式
        _convert_to_v1_from_chat_completions_input,   # OpenAI
        _convert_to_v1_from_anthropic_input,          # Anthropic
        _convert_to_v1_from_genai_input,              # Google
        _convert_to_v1_from_converse_input,           # Bedrock
    ]:
        blocks = parsing_step(blocks)
        # 每一步只处理 type="non_standard" 的 block
        # 能认识 → 转换成标准 block
        # 不认识 → 保持 non_standard,交给下一步

    return blocks

每个解析步骤的套路都一样——拆开 non_standardvalue,看能不能识别,能就转换,不能就保持原样。


注册表机制

PROVIDER_TRANSLATORS

所有 translator 都注册在一个全局字典里(block_translators/__init__.py):

PROVIDER_TRANSLATORS: dict[str, dict[str, Callable]] = {}

def register_translator(provider, translate_content, translate_content_chunk):
    PROVIDER_TRANSLATORS[provider] = {
        "translate_content": translate_content,           # 处理 AIMessage
        "translate_content_chunk": translate_content_chunk, # 处理 AIMessageChunk
    }

def get_translator(provider):
    return PROVIDER_TRANSLATORS.get(provider)

自动注册

模块加载时,所有内置 translator 自动注册:

# block_translators/__init__.py 底部
def _register_translators():
    _register_anthropic_translator()
    _register_bedrock_translator()
    _register_bedrock_converse_translator()
    _register_google_genai_translator()
    _register_google_vertexai_translator()
    _register_groq_translator()
    _register_openai_translator()

_register_translators()  # ← 模块 import 时自动执行

每个提供商文件底部也有自注册:

# anthropic.py 底部
def _register_anthropic_translator():
    register_translator("anthropic", translate_content, translate_content_chunk)

_register_anthropic_translator()

提供商之间的复用

不是每个提供商都需要从头写 translator:

# bedrock.py —— Claude on Bedrock,格式和 Anthropic 一样
from langchain_core.messages.block_translators.anthropic import _convert_to_v1_from_anthropic

def _convert_to_v1_from_bedrock(message):
    out = _convert_to_v1_from_anthropic(message)  # ← 直接复用 Anthropic 的
    # ... 补充 Bedrock 特有的逻辑
    return out
# google_vertexai.py —— 直接复用 GenAI 的 translator
from langchain_core.messages.block_translators.google_genai import (
    translate_content,
    translate_content_chunk,
)

def _register_google_vertexai_translator():
    register_translator("google_vertexai", translate_content, translate_content_chunk)

可扩展性

如果你有自己的提供商,也能注册:

from langchain_core.messages.block_translators import register_translator

def my_translate(message):
    # 你的翻译逻辑
    ...

def my_translate_chunk(message):
    # 你的流式翻译逻辑
    ...

register_translator("my_provider", my_translate, my_translate_chunk)

拆解一个具体的 translator:Anthropic

以 Anthropic 为例,看看 _convert_to_v1_from_anthropicanthropic.py)是怎么逐个 block 翻译的。

text → text(带 citations)

# 输入
{"type": "text", "text": "西班牙赢了", "citations": [{"type": "web_search_result_location", ...}]}

# 处理逻辑
if block_type == "text":
    if citations := block.get("citations"):
        text_block = {
            "type": "text",
            "text": block.get("text", ""),
            "annotations": [_convert_citation_to_v1(a) for a in citations],
        }
    else:
        text_block = {"type": "text", "text": block["text"]}

# 输出
{"type": "text", "text": "西班牙赢了", "annotations": [{"type": "citation", "url": "...", ...}]}

Anthropic 的 citations 被转成了标准的 annotations,每个 citation 的类型也从 web_search_result_location 变成了通用的 citation

thinking → reasoning

# 输入
{"type": "thinking", "thinking": "用户问的是天气...", "signature": "EpoWCpc..."}

# 处理逻辑
elif block_type == "thinking":
    reasoning_block = {
        "type": "reasoning",                     # thinking → reasoning
        "reasoning": block.get("thinking", ""),   # thinking → reasoning
    }
    # 未知字段塞进 extras
    known_fields = {"type", "thinking", "index", "extras"}
    for key in block:
        if key not in known_fields:
            reasoning_block.setdefault("extras", {})[key] = block[key]
            # signature → extras["signature"]

# 输出
{"type": "reasoning", "reasoning": "用户问的是天气...", "extras": {"signature": "EpoWCpc..."}}

两个关键操作:

  1. 字段重命名thinkingreasoning(语义统一)
  2. 未知字段保留signatureextras["signature"](不丢数据)

tool_use → tool_call

这个分支比较复杂,因为要区分流式和非流式:

elif block_type == "tool_use":
    if isinstance(message, AIMessageChunk) and len(message.tool_call_chunks) == 1 and message.chunk_position != "last":
        # 流式单 chunk → 输出 tool_call_chunk
        chunk = message.tool_call_chunks[0]
        yield {
            "type": "tool_call_chunk",
            "name": chunk.get("name"),
            "id": chunk.get("id"),
            "args": chunk.get("args"),
        }
    else:
        # 非流式或已聚合 → 输出 tool_call
        # 优先从 message.tool_calls 取已解析的数据
        for tc in message.tool_calls:
            if tc.get("id") == block.get("id"):
                yield {
                    "type": "tool_call",
                    "name": tc["name"],
                    "args": tc["args"],    # ← 已经是 dict,不是 JSON 字符串
                    "id": tc.get("id"),
                }
                break

注意它优先从 message.tool_calls 取数据——因为 tool_calls 已经经过了 model_validator 的解析,args 是 dict 而不是 JSON 字符串。

server_tool_use → server_tool_call

Anthropic 的服务端工具(如 code execution、web search)走单独的分支:

elif block_type == "server_tool_use":
    # 特殊命名映射
    if block.get("name") == "code_execution":
        name = "code_interpreter"       # Anthropic 叫 code_execution,标准化叫 code_interpreter
    else:
        name = block.get("name", "")

    server_tool_call = {
        "type": "server_tool_call",
        "name": name,
        "args": block.get("input", {}),
        "id": block.get("id", ""),
    }
    # ... extras 处理

兜底

不认识的 block 统一包成 non_standard

else:
    yield {"type": "non_standard", "value": block}

_populate_extras:不丢数据的公共函数

多个 translator 都用到了这个函数:

def _populate_extras(standard_block, block, known_fields):
    if standard_block.get("type") == "non_standard":
        return standard_block  # non_standard 不需要 extras,数据在 value 里

    for key, value in block.items():
        if key not in known_fields:
            standard_block.setdefault("extras", {})[key] = value

    return standard_block

它的逻辑很简单:遍历原始 block 的所有字段,不在 known_fields 里的,全塞进 extras

这保证了即使提供商加了新字段,标准化过程也不会丢掉它们。


完整翻译示例

Anthropic 消息翻译

ai = AIMessage(
    content=[
        {"type": "text", "text": "让我想想..."},
        {"type": "thinking", "thinking": "用户问天气...", "signature": "abc123"},
        {"type": "tool_use", "id": "call_1", "name": "get_weather", "input": {"city": "北京"}},
    ],
    tool_calls=[
        {"name": "get_weather", "args": {"city": "北京"}, "id": "call_1", "type": "tool_call"}
    ],
    response_metadata={"model_provider": "anthropic"},
)

print(ai.content_blocks)
# [
#   {"type": "text", "text": "让我想想..."},
#   {"type": "reasoning", "reasoning": "用户问天气...", "extras": {"signature": "abc123"}},
#   {"type": "tool_call", "name": "get_weather", "args": {"city": "北京"}, "id": "call_1"},
# ]

OpenAI Chat Completions 消息翻译

ai = AIMessage(
    content="今天天气不错",
    tool_calls=[
        {"name": "get_weather", "args": {"city": "北京"}, "id": "call_1", "type": "tool_call"}
    ],
    response_metadata={"model_provider": "openai"},
)

print(ai.content_blocks)
# [
#   {"type": "text", "text": "今天天气不错"},
#   {"type": "tool_call", "name": "get_weather", "args": {"city": "北京"}, "id": "call_1"},
# ]

OpenAI Chat Completions 格式比较简单——content 就是字符串,tool calls 从 message.tool_calls 取。

OpenAI Responses API 消息翻译

OpenAI 的 Responses API 返回更复杂的结构,包含 web_search、file_search、code_interpreter 等服务端工具:

ai = AIMessage(
    content=[
        {"type": "web_search_call", "id": "ws_1", "action": {"type": "search", "query": "北京天气"}, "status": "completed"},
        {"type": "text", "text": "北京今天 25°C 晴天", "annotations": [{"type": "url_citation", "url": "https://...", "title": "天气预报"}]},
    ],
    response_metadata={"model_provider": "openai"},
)

print(ai.content_blocks)
# [
#   {"type": "server_tool_call", "name": "web_search", "args": {"type": "search", "query": "北京天气"}, "id": "ws_1"},
#   {"type": "server_tool_result", "tool_call_id": "ws_1", "status": "success"},
#   {"type": "text", "text": "北京今天 25°C 晴天", "annotations": [{"type": "citation", "url": "https://...", "title": "天气预报"}]},
# ]

web_search_call 被拆成了 server_tool_call + server_tool_result 两个标准 block。url_citation 被转成了通用的 citation

Best-effort 猜测

# 没有 model_provider,手动构造的包含 Anthropic 格式图片的消息
msg = HumanMessage(content=[
    {"type": "text", "text": "看看这个"},
    {"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": "iVBOR..."}},
])

print(msg.content_blocks)
# 第一轮:text 保留,image 是已知类型但有 source 子结构...
#   实际上 base.py 会因为有 "source" 而把它标为 non_standard
# 第二轮:
#   _convert_v0_multimodal_input_to_v1 → 没有 source_type,不认识
#   _convert_to_v1_from_chat_completions_input → 不是 image_url,不认识
#   _convert_to_v1_from_anthropic_input → 认识!source.type == "base64"
# → [
#     {"type": "text", "text": "看看这个"},
#     {"type": "image", "base64": "iVBOR...", "mime_type": "image/png"},
#   ]

标准化的全景图

提供商原始格式                         标准 ContentBlock
─────────────                         ──────────────

Anthropic:
  thinking           ──→   reasoning  (+ extras)
  tool_use           ──→   tool_call
  server_tool_use    ──→   server_tool_call
  *_tool_result      ──→   server_tool_result
  image.source       ──→   image (base64/url/file_id)
  document.source    ──→   file / text-plain
  citations          ──→   annotations[citation]

OpenAI Chat Completions:
  str content        ──→   text
  image_url          ──→   image
  input_audio        ──→   audio
  file               ──→   file

OpenAI Responses:
  reasoning          ──→   reasoning (explode summary)
  function_call      ──→   tool_call
  web_search_call    ──→   server_tool_call + server_tool_result
  file_search_call   ──→   server_tool_call + server_tool_result
  code_interpreter   ──→   server_tool_call + server_tool_result
  mcp_call           ──→   server_tool_call + server_tool_result
  image_gen_call     ──→   image (from result base64)
  url_citation       ──→   citation
  file_citation      ──→   citation

Google GenAI:
  thinking           ──→   reasoning
  function_call      ──→   tool_call
  executable_code    ──→   server_tool_call (code_interpreter)
  code_exec_result   ──→   server_tool_result
  file_data          ──→   file (url)
  grounding_metadata ──→   annotations[citation]

Bedrock Converse:
  reasoning_content  ──→   reasoning (+ extras.signature)
  tool_use           ──→   tool_call
  {image: {...}}     ──→   image (bytes → base64)
  {document: {...}}  ──→   file / text-plain

Groq:
  executed_tools     ──→   server_tool_call + server_tool_result
  reasoning_content  ──→   reasoning (from additional_kwargs)

所有提供商:
  不认识的类型       ──→   non_standard (value 保留原始数据)
  不认识的字段       ──→   extras (保留在标准 block 上)

小结

  • Content Block 是 TypedDict,定义在 content.py,纯数据结构,零依赖
  • 三条翻译路径:output_version=“v1” → model_provider translator → best-effort 管道
  • 注册表模式PROVIDER_TRANSLATORS 全局 dict,模块加载时自动注册,可扩展
  • 不丢数据:未知字段 → extras,未知类型 → non_standard
  • 提供商间复用:Bedrock 复用 Anthropic,VertexAI 复用 GenAI

下一部分讲源码架构——目录为什么这么分、流式聚合怎么实现、工具函数层的双模式设计。

LangChain 消息系统深度解析(四):源码架构与设计哲学

本系列共 4 部分。本文是第 4 部分,从目录结构、继承体系、流式聚合到工具函数层,拆解 LangChain 消息系统的架构设计。


完整目录结构

langchain_core/messages/
│
├── __init__.py                     # 入口:懒加载 + 统一导出
│
├── base.py                         # 基类:BaseMessage, BaseMessageChunk, merge_content
├── content.py                      # 数据定义:所有 ContentBlock TypedDict
│
├── ai.py                           # ── 角色层 ──
├── human.py                        #    ~70 行
├── system.py                       #    ~70 行
├── tool.py                         #    ~300 行(含 ToolCall 数据结构 + 解析函数)
├── chat.py                         #    ~60 行
├── function.py                     #    ~50 行(旧版,保留兼容)
├── modifier.py                     #    ~30 行(RemoveMessage)
│
├── utils.py                        # 工具层:filter, merge, trim, convert_to_openai, ~1500 行
│
└── block_translators/              # ── 适配层 ──
    ├── __init__.py                 #   注册表 + 自动注册
    ├── anthropic.py                #   ~400 行
    ├── openai.py                   #   ~700 行(Chat Completions + Responses 两套)
    ├── google_genai.py             #   ~500 行
    ├── google_vertexai.py          #   ~15 行(复用 genai)
    ├── bedrock.py                  #   ~90 行(复用 anthropic)
    ├── bedrock_converse.py         #   ~300 行
    ├── groq.py                     #   ~150 行
    └── langchain_v0.py             #   ~250 行(旧格式兼容)

这个目录结构不是随意的。下面逐层讲为什么这么分。


为什么按角色拆文件

看一下各文件的行数差异:

human.py     ~70 行
system.py    ~70 行
chat.py      ~60 行
function.py  ~50 行
modifier.py  ~30 行
tool.py      ~300 行
ai.py        ~500 行   ← 最复杂

human.py 的全部实质内容:

class HumanMessage(BaseMessage):
    type: Literal["human"] = "human"

class HumanMessageChunk(HumanMessage, BaseMessageChunk):
    type: Literal["HumanMessageChunk"] = "HumanMessageChunk"

就这么多——继承 BaseMessage,固定 type 值。system.pychat.py 也类似。

ai.py 有 500 行,因为 AI 消息承载了:tool_callsmodel_validator 自动解析、content_blocks 的 translator 调度、AIMessageChunk 的流式聚合、init_tool_calls 的 JSON 容错解析、pretty_repr 的美化输出、UsageMetadata 的 token 统计…

按角色拆,而不是按 Message/Chunk 拆。原因是 HumanMessageHumanMessageChunk 紧密耦合(Chunk 继承 Message),而 HumanMessageAIMessage 之间几乎没有交互。按角色拆让复杂度被限制在单个文件内——ai.py 再复杂也不会污染 human.py


base.py 和 content.py 的分离

这两个文件解决的是不同层次的问题:

base.py    → "消息"是什么?(容器)
content.py → "内容块"是什么?(容器里装的东西)

content.py 是纯数据定义——全是 TypedDict,没有任何业务逻辑,不 import 任何 messages 模块内的东西。

base.py 有逻辑——Pydantic 模型、merge_content 函数、content_blocks 属性、text 属性等。

分开的关键原因是避免循环依赖

content.py(纯数据,零依赖)
    ↑
base.py(import content 做类型标注)
    ↑
ai.py / human.py / tool.py(import base 做继承)
    ↑
utils.py(import 所有角色类型)

依赖方向严格单向。如果把 content.py 和 base.py 合并,而 ai.py 又要 import BaseMessage 和各种 TypedDict,很容易产生循环引用。


tool.py 的特殊地位

tool.py 是整个目录里唯一同时包含消息类型和数据结构的文件

# tool.py 同时定义了:

# 1. 消息类型
class ToolMessage(BaseMessage):
    tool_call_id: str
    status: Literal["success", "error"] = "success"

# 2. 数据结构
class ToolCall(TypedDict):
    name: str
    args: dict[str, Any]
    id: str | None

class ToolCallChunk(TypedDict):
    name: str | None
    args: str | None
    id: str | None
    index: int | None

# 3. 工厂函数
def tool_call(*, name, args, id) -> ToolCall: ...

# 4. 解析函数
def default_tool_parser(raw_tool_calls) -> tuple[list[ToolCall], list[InvalidToolCall]]: ...

这样设计是因为 tool call 横跨两个维度——ToolMessage 是消息角色(工具返回结果),ToolCall 是数据结构(AI 请求调用工具),而 ToolMessage.tool_call_id 关联 ToolCall.id,语义上密不可分。

注意 content.py 里也有一套 ToolCall(v1 标准版,多了 indexextras 字段)。两套共存是历史过渡——tool.py 的是旧版供 Pydantic 字段用,content.py 的是新版供 content_blocks 输出用。


继承体系

Message 和 Chunk 的关系

每个 Message 都有一个对应的 Chunk 版本,用于流式场景。Chunk 继承 Message:

class BaseMessage(Serializable):
    content: str | list[str | dict]
    type: str
    id: str | None
    additional_kwargs: dict
    response_metadata: dict

class BaseMessageChunk(BaseMessage):
    def __add__(self, other):  # ← 支持拼接
        ...

子类的继承关系(钻石继承):

Serializable
                    │
                BaseMessage
               ╱           ╲
    HumanMessage      BaseMessageChunk
               ╲           ╱
           HumanMessageChunk
class HumanMessage(BaseMessage):
    type: Literal["human"] = "human"

class HumanMessageChunk(HumanMessage, BaseMessageChunk):
    type: Literal["HumanMessageChunk"] = "HumanMessageChunk"

HumanMessageChunk 同时继承了 HumanMessage 的角色属性和 BaseMessageChunk 的拼接能力。

AIMessageChunk 最复杂,额外多了 tool_call_chunkschunk_positioninit_tool_calls 等:

class AIMessageChunk(AIMessage, BaseMessageChunk):
    tool_call_chunks: list[ToolCallChunk] = []
    chunk_position: Literal["last"] | None = None

    @model_validator(mode="after")
    def init_tool_calls(self) -> Self: ...  # 自动解析

    def __add__(self, other):
        return add_ai_message_chunks(self, other)  # 专门的聚合逻辑

Message ↔ Chunk 互转

utils.py 里维护了一张映射表:

_MSG_CHUNK_MAP = {
    HumanMessage: HumanMessageChunk,
    AIMessage: AIMessageChunk,
    SystemMessage: SystemMessageChunk,
    ToolMessage: ToolMessageChunk,
    FunctionMessage: FunctionMessageChunk,
    ChatMessage: ChatMessageChunk,
}
_CHUNK_MSG_MAP = {v: k for k, v in _MSG_CHUNK_MAP.items()}

def _msg_to_chunk(message):
    chunk_cls = _MSG_CHUNK_MAP[message.__class__]
    return chunk_cls(**message.model_dump(exclude={"type"}))

def _chunk_to_msg(chunk):
    msg_cls = _CHUNK_MSG_MAP[chunk.__class__]
    return msg_cls(**chunk.model_dump(exclude={"type", "tool_call_chunks", "chunk_position"}))

这在 merge_message_runs 中被使用——把 Message 转成 Chunk 来利用 __add__ 合并,再转回 Message。


流式聚合的实现

merge_content:内容拼接

def merge_content(first_content, *contents):
    merged = first_content
    for content in contents:
        if isinstance(merged, str) and isinstance(content, str):
            merged += content                       # "今天" + "天气" → "今天天气"
        elif isinstance(merged, str) and isinstance(content, list):
            merged = [merged, *content]             # str + list → list
        elif isinstance(merged, list) and isinstance(content, list):
            merged = merge_lists(merged, content)   # list + list → 按 index 智能合并
        elif merged and isinstance(merged[-1], str):
            merged[-1] += content                   # list 最后元素是 str,追加
        elif content == "":
            pass                                    # 空字符串忽略
        elif merged:
            merged.append(content)                  # 其他情况追加
    return merged

4 种组合的实际效果:

# str + str
merge_content("你好", "世界")           # → "你好世界"

# str + list
merge_content("你好", [{"type": "text", "text": "世界"}])
# → ["你好", {"type": "text", "text": "世界"}]

# list + list
merge_content(
    [{"type": "text", "text": "你好", "index": 0}],
    [{"type": "text", "text": "世界", "index": 0}],
)
# → [{"type": "text", "text": "你好世界", "index": 0}]  ← merge_lists 按 index 合并

merge_lists:按 index 对齐合并

merge_lists(来自 utils/_merge.py)是流式聚合的关键。它不是简单的 list.extend,而是按 dict 里的 index 字段对齐合并。

直觉上:

# 两个 list 里 index=0 的 dict 会被合并在一起
left  = [{"text": "你好", "index": 0}]
right = [{"text": "世界", "index": 0}]

merge_lists(left, right)
# → [{"text": "你好世界", "index": 0}]

这就是为什么流式 tool_call_chunks 能正确合并——同一个工具调用的碎片有相同的 index

add_ai_message_chunks:完整的聚合流程

def add_ai_message_chunks(left, *others):
    # 1. 合并 content
    content = merge_content(left.content, *(o.content for o in others))

    # 2. 合并 additional_kwargs 和 response_metadata(递归合并 dict)
    additional_kwargs = merge_dicts(left.additional_kwargs, *(o.additional_kwargs for o in others))
    response_metadata = merge_dicts(left.response_metadata, *(o.response_metadata for o in others))

    # 3. 合并 tool_call_chunks(按 index 合并)
    raw_tool_calls = merge_lists(left.tool_call_chunks, *(o.tool_call_chunks for o in others))

    # 4. 合并 usage_metadata(递归加法)
    usage_metadata = add_usage(left.usage_metadata, other.usage_metadata)

    # 5. 选最好的 ID
    #    优先级:提供商原始 ID > lc_run-* > lc_*
    for id_ in all_ids:
        if not id_.startswith("lc_"):
            chunk_id = id_    # 最高优先级,立即返回
            break
        rank = 1 if id_.startswith("lc_run-") else 0
        if rank > best_rank:
            chunk_id = id_

    # 6. 传播 chunk_position
    chunk_position = "last" if any(x.chunk_position == "last" for x in [left, *others]) else None

    return AIMessageChunk(content=content, tool_call_chunks=..., usage_metadata=..., id=chunk_id, ...)

ID 的优先级设计很有意思:提供商分配的 ID(如 OpenAI 的 chatcmpl-xxx)优先级最高,LangChain 自动生成的 lc_run-* 次之,其他 lc_* 最低。这保证了如果提供商给了 ID,它会被保留。

add_usage:token 用量合并

def add_usage(left, right):
    if not (left or right):
        return UsageMetadata(input_tokens=0, output_tokens=0, total_tokens=0)
    if not (left and right):
        return left or right
    # 递归加法合并(包括嵌套的 input_token_details、output_token_details)
    return UsageMetadata(**_dict_int_op(left, right, operator.add))

_dict_int_op 递归遍历两个 dict,对所有 int 值执行加法。所以 input_tokens: 5 + input_tokens: 3 = input_tokens: 8,嵌套的 cache_read: 100 + cache_read: 50 = cache_read: 150


工具函数层的双模式设计

utils.py 是最大的文件(~1500 行),包含 filter_messagesmerge_message_runstrim_messagesconvert_to_openai_messages 等。

这些函数有一个共同的设计——双模式,通过 @_runnable_support 装饰器实现。

@_runnable_support 的实现

def _runnable_support(func):
    @wraps(func)
    def wrapped(messages=None, *args, **kwargs):
        if messages is not None:
            return func(messages, *args, **kwargs)     # 直接执行
        else:
            return RunnableLambda(partial(func, **kwargs), name=func.__name__)  # 返回 Runnable
    return wrapped

效果:

@_runnable_support
def filter_messages(messages, *, include_types=None, ...):
    ...

# 传了 messages → 直接返回结果
result = filter_messages(messages, include_types=["ai"])  # → list[BaseMessage]

# 不传 messages → 返回一个 Runnable,可以放进 chain
chain = filter_messages(include_types=["ai"]) | llm
result = chain.invoke(messages)

merge_message_runs 的实现

这个函数利用了 Chunk 的 __add__ 来实现合并:

@_runnable_support
def merge_message_runs(messages, *, chunk_separator="\n"):
    messages = convert_to_messages(messages)
    merged = []
    for msg in messages:
        last = merged.pop() if merged else None
        if not last:
            merged.append(msg)
        elif isinstance(msg, ToolMessage) or not isinstance(msg, last.__class__):
            merged.extend([last, msg])  # 不同类型,不合并
        else:
            last_chunk = _msg_to_chunk(last)    # Message → Chunk
            curr_chunk = _msg_to_chunk(msg)
            # 两个 str content 之间加分隔符
            if isinstance(last_chunk.content, str) and isinstance(curr_chunk.content, str):
                last_chunk.content += chunk_separator
            merged.append(_chunk_to_msg(last_chunk + curr_chunk))  # Chunk.__add__ → 转回 Message
    return merged

设计很巧妙——不是重新实现合并逻辑,而是复用已有的 Chunk 拼接能力_msg_to_chunk_chunk_to_msg 通过映射表互转。

trim_messages 的实现

trim_messages 支持 "first""last" 两种策略。"last" 策略的实现很有趣——反转消息列表,调用 “first” 策略,再反转回来

def _last_max_tokens(messages, *, max_tokens, token_counter, ...):
    # 处理 system 消息
    system_message = None
    if include_system and isinstance(messages[0], SystemMessage):
        system_message = messages[0]
        messages = messages[1:]
        remaining_tokens = max_tokens - token_counter([system_message])

    # 反转!
    reversed_messages = messages[::-1]

    # 用 "first" 策略处理反转后的列表
    reversed_result = _first_max_tokens(
        reversed_messages,
        max_tokens=remaining_tokens,
        token_counter=token_counter,
        partial_strategy="last" if allow_partial else None,
        end_on=start_on,  # ← 注意:start_on 变成了 end_on
    )

    # 再反转回来
    result = reversed_result[::-1]
    if system_message:
        result = [system_message, *result]
    return result

_first_max_tokens 内部使用二分搜索来高效找到在 token 限制内能包含的最大消息数,而不是线性逐条添加:

def _first_max_tokens(messages, *, max_tokens, token_counter, ...):
    # 二分搜索:找到最大的 mid 使得 token_counter(messages[:mid]) <= max_tokens
    left, right = 0, len(messages)
    for _ in range(len(messages).bit_length()):
        if left >= right:
            break
        mid = (left + right + 1) // 2
        if token_counter(messages[:mid]) <= max_tokens:
            left = mid
        else:
            right = mid - 1
    idx = left
    # ...

懒加载入口

__init__.py 不直接 import 任何子模块,而是通过 __getattr__ 按需加载:

# __init__.py

_dynamic_imports = {
    "AIMessage": "ai",
    "HumanMessage": "human",
    "ToolCall": "tool",
    "convert_to_openai_messages": "utils",
    "convert_to_openai_image_block": "block_translators.openai",
    # ... 60+ 个映射
}

def __getattr__(attr_name):
    module_name = _dynamic_imports.get(attr_name)
    result = import_attr(attr_name, module_name, __spec__.parent)
    globals()[attr_name] = result  # 缓存,下次不触发 __getattr__
    return result

好处:import langchain_core.messages 几乎零成本。不会触发所有子模块和 block_translators 的加载。只有你真正用到 HumanMessage 时才加载 human.py

同时 TYPE_CHECKING 块里有完整的 import,让 IDE 能正确提供类型提示和自动补全:

if TYPE_CHECKING:
    from langchain_core.messages.ai import AIMessage, AIMessageChunk, ...
    from langchain_core.messages.human import HumanMessage, ...
    # ...

block_translators/ 为什么是独立子目录

翻译逻辑放在哪里,有几个选择:

方案 A:放在 ai.py 里?
  → ai.py 已经 500 行,加上 7 个提供商的翻译逻辑会到 3000+

方案 B:放在 base.py 里?
  → base.py 是基类,不该知道具体提供商的存在

方案 C:放在各自的集成包里(langchain-openai、langchain-anthropic)?
  → content_blocks 是 core 的属性,不能依赖集成包

方案 D:block_translators/ 子目录,每个提供商一个文件  ← 实际选择

选择 D 的原因:标准化逻辑在 core 里,但要理解各提供商格式。这是一个无法避免的矛盾——只能通过在 core 里维护适配层来解决。

注册表模式让系统可扩展——新增提供商只需要:

  1. block_translators/ 下新建一个 .py 文件
  2. 实现 translate_contenttranslate_content_chunk
  3. 调用 register_translator 注册
  4. __init__.py_register_translators 中添加一行

依赖方向图

┌─────────────────────┐
                    │  __init__.py         │  懒加载入口
                    │  (按需 import)        │
                    └────────┬────────────┘
                             │
                    ┌────────▼────────────┐
                    │  utils.py            │  工具层(横切所有类型)
                    │  filter / merge /    │
                    │  trim / convert      │
                    └────────┬────────────┘
                             │ import all message types
            ┌────────────────┼────────────────┐
            │                │                │
    ┌───────▼──────┐ ┌──────▼───────┐ ┌──────▼───────┐
    │   ai.py      │ │  human.py    │ │  tool.py     │  角色层
    │  ~500 行     │ │  ~70 行      │ │  ~300 行     │
    └───────┬──────┘ └──────┬───────┘ └──────┬───────┘
            │                │                │
            └────────────────┼────────────────┘
                             │ import BaseMessage
                    ┌────────▼────────────┐
                    │  base.py             │  骨架层
                    │  BaseMessage         │
                    │  BaseMessageChunk    │
                    │  merge_content       │
                    └────────┬────────────┘
                             │ import TypedDicts
                    ┌────────▼────────────┐
                    │  content.py          │  数据定义层(零依赖)
                    │  TextContentBlock    │
                    │  ImageContentBlock   │
                    │  ToolCall            │
                    │  ...                 │
                    └─────────────────────┘

    block_translators/                       适配层(独立子目录)
    ├── __init__.py  (注册表)
    ├── anthropic.py ←── bedrock.py (复用)
    ├── openai.py
    ├── google_genai.py ←── google_vertexai.py (复用)
    ├── bedrock_converse.py
    ├── groq.py
    └── langchain_v0.py

依赖方向严格自下而上content.py 不依赖任何人 → base.py 只依赖 content → 角色层依赖 baseutils 依赖所有角色 → block_translators 依赖 base + content + 角色层。

没有循环依赖。每一层的职责清晰。


5 个架构设计原则

回顾整个消息系统的设计,可以提炼出 5 个核心原则:

1. 按角色拆文件,复杂度隔离

简单的角色(human、system)保持简单,复杂的角色(ai)集中管理。避免"一个大文件"导致的认知负担。

2. 数据定义零依赖,依赖方向单向

content.py 是纯 TypedDict,不 import 任何 messages 内部模块。所有依赖关系从上到下,不成环。

3. 注册表模式实现可扩展

PROVIDER_TRANSLATORS 全局字典 + register_translator 函数。内置提供商在模块加载时自注册,外部提供商也能随时注册。新增提供商不需要修改核心代码。

4. 双模式统一接口

filter_messagesmerge_message_runstrim_messages 既能直接调用返回结果,又能不传参数返回 Runnable 放进 chain。一个 @_runnable_support 装饰器搞定。

5. 渐进式无损解析

  • 不丢类型:不认识的 block → NonStandardContentBlock(保留原始数据)
  • 不丢字段:不认识的字段 → extras(保留在标准 block 上)
  • 多轮尝试:best-effort 管道依次尝试所有已知格式
  • 容错解析parse_partial_json 处理不完整的流式 JSON

这保证了无论什么提供商返回什么格式,数据永远不会在标准化过程中丢失。


写在最后

LangChain 的消息系统在表面上很简单——HumanMessage(content="你好") 就能用。但底层为了解决"多提供商格式不统一"这个核心问题,构建了一套完整的三层架构:

标准层:ContentBlock TypedDict(统一的数据表示)
翻译层:Block Translators 注册表(可插拔的格式转换)
解析层:渐进式无损解析(non_standard 兜底 + 多轮尝试 + extras 保留)

理解了这套架构,你就能:

  • 在多个模型提供商之间无缝切换
  • 正确处理流式 tool call 的聚合
  • 在需要时写自己的 translator
  • 读懂 LangChain 集成包的消息处理代码
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐