【LangChain 源码解析三:Chat Model】
本系列共 4 部分,从日常使用到源码架构,完整拆解 LangChain 的 ChatModel 组件。不需要 API key,不需要网络。 和是官方测试用的假模型,行为和真模型完全一致。ChatModel 接受三种输入,统一输出 :这个类型定义在 :三种输入怎么统一?看 ():流程图:三种调用方式 + async 变体方法输入输出同步异步单个值单个值看的实现():并不是直接调用 ,而是走了一条长链
本系列共 4 部分,从日常使用到源码架构,完整拆解 LangChain 的 ChatModel 组件。
- 第 1 部分:5 分钟上手 ChatModel(本文)
- 第 2 部分:流式原理——从
_stream到AIMessageChunk - 第 3 部分:Tool Calling 与结构化输出
- 第 4 部分:源码架构——缓存、限流与 Partner 集成
LangChain ChatModel 深度解析(一):5 分钟上手 ChatModel
一个能跑的例子
from langchain_core.language_models.fake_chat_models import (
FakeMessagesListChatModel,
FakeListChatModel,
)
from langchain_core.messages import AIMessage, HumanMessage
# 方式一:FakeMessagesListChatModel —— 直接返回预设的 BaseMessage
chat = FakeMessagesListChatModel(responses=[AIMessage(content="你好!我是假模型")])
result = chat.invoke([HumanMessage(content="你好")])
print(type(result)) # <class 'langchain_core.messages.ai.AIMessage'>
print(result.content) # 你好!我是假模型
# 方式二:FakeListChatModel —— 返回字符串,自动包装成 AIMessage,且支持流式
chat2 = FakeListChatModel(responses=["I", "am", "fake"])
# invoke:取第一个响应
print(chat2.invoke("hi").content) # I
# stream:逐字符流式输出
for chunk in chat2.stream("hello"):
print(chunk.content, end="") # a(逐字符:a, m)
print()
# batch:批量调用
results = chat2.batch(["a", "b"])
print([r.content for r in results]) # ['fake', 'I'] — 循环取响应
不需要 API key,不需要网络。FakeMessagesListChatModel 和 FakeListChatModel 是官方测试用的假模型,行为和真模型完全一致。
ChatModel 的输入输出类型
ChatModel 接受三种输入,统一输出 AIMessage:
| 输入类型 | 示例 | 说明 |
|---|---|---|
str |
"你好" |
自动包装成 HumanMessage |
list[BaseMessage] |
[HumanMessage(...)] |
最常用的方式 |
PromptValue |
ChatPromptValue(...) |
模板渲染后的结果 |
这个类型定义在 base.py:122:
# language_models/base.py:122
LanguageModelInput = PromptValue | str | Sequence[MessageLikeRepresentation]
三种输入怎么统一?看 _convert_input(chat_models.py:375-386):
# chat_models.py:375-386
def _convert_input(self, model_input: LanguageModelInput) -> PromptValue:
if isinstance(model_input, PromptValue):
return model_input # 已经是 PromptValue,直接返回
if isinstance(model_input, str):
return StringPromptValue(text=model_input) # 字符串 → StringPromptValue
if isinstance(model_input, Sequence):
return ChatPromptValue(messages=convert_to_messages(model_input))
# list[Message] → ChatPromptValue
raise ValueError(f"Invalid input type {type(model_input)}. ...")
流程图:
用户输入(str / list[Message] / PromptValue)
│
▼
_convert_input() → 统一转为 PromptValue
│
▼
PromptValue.to_messages() → list[BaseMessage]
│
▼
_generate(messages) → ChatResult
│
▼
ChatResult.generations[0].message → AIMessage(返回给用户)
三种调用方式 + async 变体
| 方法 | 输入 | 输出 | 同步 | 异步 |
|---|---|---|---|---|
invoke |
单个值 | AIMessage |
invoke() |
ainvoke() |
stream |
单个值 | Iterator[AIMessageChunk] |
stream() |
astream() |
batch |
list[Input] |
list[AIMessage] |
batch() |
abatch() |
看 invoke 的实现(chat_models.py:388-413):
# chat_models.py:388-413
def invoke(
self,
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> AIMessage:
config = ensure_config(config)
return cast(
"AIMessage",
cast(
"ChatGeneration",
self.generate_prompt( # ← 调用 generate_prompt
[self._convert_input(input)], # ← 先统一输入格式
stop=stop,
callbacks=config.get("callbacks"),
tags=config.get("tags"),
metadata=config.get("metadata"),
run_name=config.get("run_name"),
run_id=config.pop("run_id", None),
**kwargs,
).generations[0][0], # ← 取第一个结果
).message, # ← 取出 AIMessage
)
invoke 并不是直接调用 _generate,而是走了一条长链路:
invoke
→ generate_prompt (chat_models.py:1112)
→ generate (chat_models.py:842)
→ _generate_with_cache (chat_models.py:1136)
→ [检查缓存]
→ [限流器]
→ _generate (子类实现)
这条链路在第 4 篇会详细分析。现在只需要知道:invoke 最终会调用子类的 _generate 方法。
继承链
Runnable ← 7 种方法:invoke/stream/batch + async 变体
│
RunnableSerializable ← 序列化支持(JSON dump/load)
│
BaseLanguageModel[AIMessage] ← 语言模型通用接口:cache, callbacks, tags, metadata
│
BaseChatModel ← Chat 模型核心:_generate, _stream, bind_tools, with_structured_output
│
├── SimpleChatModel ← 简化基类:只需实现 _call(messages) → str
│ │
│ ├── FakeChatModel ← 总是返回 "fake response"
│ └── FakeListChatModel ← 循环返回预设字符串列表
│
├── FakeMessagesListChatModel ← 循环返回预设 BaseMessage 列表
├── GenericFakeChatModel ← 支持流式的假模型
│
├── ChatOpenAI ← OpenAI 集成(partner 包)
└── ChatAnthropic ← Anthropic 集成(partner 包)
用代码验证:
from langchain_core.language_models.chat_models import BaseChatModel, SimpleChatModel
from langchain_core.language_models.base import BaseLanguageModel
from langchain_core.runnables import RunnableSerializable, Runnable
# 继承链验证
assert issubclass(BaseChatModel, BaseLanguageModel)
assert issubclass(BaseLanguageModel, RunnableSerializable)
assert issubclass(RunnableSerializable, Runnable)
# SimpleChatModel 是 BaseChatModel 的子类
assert issubclass(SimpleChatModel, BaseChatModel)
print("所有断言通过!")
每一层负责什么?
| 层 | 类 | 职责 |
|---|---|---|
| 1 | Runnable |
invoke/stream/batch 统一接口 |
| 2 | RunnableSerializable |
JSON 序列化/反序列化 |
| 3 | BaseLanguageModel |
cache/callbacks/tags/metadata 字段 |
| 4 | BaseChatModel |
_generate/_stream/bind_tools/with_structured_output |
| 5 | 具体子类 | 实现 _generate,对接真正的 LLM API |
核心抽象方法:最小实现合约
要实现一个自定义 ChatModel,最少只需要两样东西:
_generate方法 —— 接收消息列表,返回ChatResult_llm_type属性 —— 返回模型类型字符串
看源码中的定义(chat_models.py:1388-1406):
# chat_models.py:1388-1406
@abstractmethod
def _generate(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: CallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> ChatResult:
"""Generate the result."""
# chat_models.py:1509-1512
@property
@abstractmethod
def _llm_type(self) -> str:
"""Return type of chat model."""
最小实现示例:
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import AIMessage, BaseMessage
from langchain_core.outputs import ChatResult, ChatGeneration
from langchain_core.callbacks import CallbackManagerForLLMRun
from typing import Any
class MyChatModel(BaseChatModel):
"""最小 ChatModel 实现。"""
def _generate(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: CallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> ChatResult:
# 取最后一条消息内容,回显
last = messages[-1].content if messages else ""
return ChatResult(
generations=[ChatGeneration(message=AIMessage(content=f"Echo: {last}"))]
)
@property
def _llm_type(self) -> str:
return "my-chat-model"
# 测试
model = MyChatModel()
print(model.invoke("hello").content) # Echo: hello
print(model.invoke("world").content) # Echo: world
这就是一个完整的 ChatModel!它自动获得了 invoke/stream/batch/ainvoke/astream/abatch 全部 6 种调用方式。
SimpleChatModel:只需实现 _call 的简化基类
如果你觉得返回 ChatResult 太麻烦,可以继承 SimpleChatModel——只需返回一个字符串:
# chat_models.py:1724-1753
class SimpleChatModel(BaseChatModel):
"""Simplified implementation for a chat model to inherit from."""
def _generate(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: CallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> ChatResult:
output_str = self._call(messages, stop=stop, run_manager=run_manager, **kwargs)
message = AIMessage(content=output_str) # ← 自动包装成 AIMessage
generation = ChatGeneration(message=message) # ← 自动包装成 ChatGeneration
return ChatResult(generations=[generation]) # ← 自动包装成 ChatResult
@abstractmethod
def _call(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: CallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> str:
"""Simpler interface."""
SimpleChatModel 帮你做了 str → AIMessage → ChatGeneration → ChatResult 的包装。你只需要:
from langchain_core.language_models.chat_models import SimpleChatModel
class MySimpleChat(SimpleChatModel):
@property
def _llm_type(self) -> str:
return "my-simple-chat"
def _call(self, messages, stop=None, run_manager=None, **kwargs) -> str:
return f"收到 {len(messages)} 条消息"
model = MySimpleChat()
print(model.invoke("hi").content) # 收到 1 条消息
对比两种基类:
BaseChatModel |
SimpleChatModel |
|
|---|---|---|
| 需实现 | _generate → ChatResult |
_call → str |
| 灵活度 | 高(可控制 generation_info, llm_output 等) | 低(只能返回纯文本) |
| 适用场景 | 生产级集成 | 快速原型、测试 |
小结
本篇覆盖了 ChatModel 的核心概念:
- 输入统一:
str/list[Message]/PromptValue三种输入通过_convert_input统一为PromptValue - 输出固定:始终返回
AIMessage(invoke)或AIMessageChunk(stream) - 继承链:
Runnable → RunnableSerializable → BaseLanguageModel → BaseChatModel - 最小合约:实现
_generate+_llm_type即可获得完整的 Runnable 能力 - 简化基类:
SimpleChatModel进一步简化为只需实现_call → str
下一篇预告:
stream方法是怎么把一条完整回复拆成一个个AIMessageChunk的?_should_stream的决策逻辑是什么?AIMessageChunk.__add__又是怎么把碎片拼回完整消息的?
LangChain ChatModel 深度解析(二):流式原理——从 _stream 到 AIMessageChunk
一个能跑的例子
from langchain_core.language_models.fake_chat_models import GenericFakeChatModel
from langchain_core.messages import AIMessage
# GenericFakeChatModel 会把回复按空白符拆分成 chunk
chat = GenericFakeChatModel(messages=iter([AIMessage(content="Hello World LangChain")]))
# 逐 chunk 流式输出
chunks = []
for chunk in chat.stream("hi"):
print(repr(chunk.content), end=" ")
chunks.append(chunk)
# 输出: 'Hello' ' ' 'World' ' ' 'LangChain'
print()
# 把 chunks 拼回完整消息
full = chunks[0]
for c in chunks[1:]:
full = full + c # ← AIMessageChunk.__add__
print(full.content) # Hello World LangChain
print(type(full)) # <class 'langchain_core.messages.ai.AIMessageChunk'>
流式输出的核心:模型把回复拆成多个 AIMessageChunk,通过 __add__ 可以拼回完整消息。
stream 方法的完整执行路径
BaseChatModel.stream(chat_models.py:479-603)做了以下事情:
stream(input)
│
├─ _should_stream() == False?
│ └─ 是 → 直接调用 invoke(),yield 一次整个 AIMessage,结束
│
└─ _should_stream() == True?
│
├─ 1. _convert_input(input).to_messages() 转换输入
├─ 2. CallbackManager.configure(...) 配置回调
├─ 3. on_chat_model_start(...) 触发开始回调
├─ 4. rate_limiter.acquire() 限流(如果配置了)
│
├─ 5. for chunk in self._stream(messages): ← 调用子类的 _stream
│ ├─ 设置 chunk.message.id
│ ├─ 设置 response_metadata
│ ├─ on_llm_new_token(chunk) 触发 token 回调
│ └─ yield chunk.message ← 返回给用户
│
├─ 6. yield 最终空 chunk (chunk_position="last")
│
└─ 7. merge_chat_generation_chunks(chunks) 合并所有 chunk
└─ on_llm_end(result) 触发结束回调
看关键代码段(chat_models.py:479-603,简化后):
# chat_models.py:479-603(简化)
def stream(self, input, config=None, *, stop=None, **kwargs):
if not self._should_stream(async_api=False, **{**kwargs, "stream": True}):
# 不支持流式 → 退化为 invoke
yield cast("AIMessageChunk", self.invoke(input, config=config, stop=stop, **kwargs))
else:
config = ensure_config(config)
messages = self._convert_input(input).to_messages()
# ... 配置回调 ...
(run_manager,) = callback_manager.on_chat_model_start(...) # ← 触发 start
if self.rate_limiter:
self.rate_limiter.acquire(blocking=True) # ← 限流
try:
for chunk in self._stream(messages, stop=stop, **kwargs): # ← 核心循环
chunk.message.id = run_id
run_manager.on_llm_new_token(chunk.message.content, chunk=chunk)
yield chunk.message # ← 返回给用户
# 确保最后一个 chunk 标记为 "last"
if not chunk.message.chunk_position:
yield AIMessageChunk(content="", chunk_position="last", id=run_id)
except BaseException as e:
run_manager.on_llm_error(e, response=...) # ← 错误回调
raise
generation = merge_chat_generation_chunks(chunks)
run_manager.on_llm_end(LLMResult(generations=[[generation]])) # ← 结束回调
注意:stream 方法是 BaseChatModel 自己完全重写的,不是用 Runnable.stream 的默认实现(默认是 yield self.invoke(...))。
_should_stream 决策逻辑
_should_stream(chat_models.py:439-477)决定是否走流式路径:
# chat_models.py:439-477
def _should_stream(self, *, async_api: bool, run_manager=None, **kwargs) -> bool:
# 1. 检查子类是否实现了 _stream/_astream
sync_not_implemented = type(self)._stream == BaseChatModel._stream
async_not_implemented = type(self)._astream == BaseChatModel._astream
if (not async_api) and sync_not_implemented:
return False
if async_api and async_not_implemented and sync_not_implemented:
return False
# 2. 检查 disable_streaming 字段
if self.disable_streaming is True:
return False
if self.disable_streaming == "tool_calling" and kwargs.get("tools"):
return False # ← tool_calling 模式时不流式
# 3. 显式传入 stream=True/False
if "stream" in kwargs:
return bool(kwargs["stream"])
# 4. 检查 streaming 字段(部分模型有此字段)
if "streaming" in self.model_fields_set:
streaming_value = getattr(self, "streaming", None)
if isinstance(streaming_value, bool):
return streaming_value
# 5. 检查是否有流式回调处理器(如 astream_events)
handlers = run_manager.handlers if run_manager else []
return any(isinstance(h, _StreamingCallbackHandler) for h in handlers)
决策优先级:
子类未实现 _stream? ──→ False(不可能流式)
│
disable_streaming=True? ──→ False(用户禁用)
│
disable_streaming="tool_calling" 且有 tools? ──→ False
│
kwargs 中有 stream=True/False? ──→ 用它
│
模型字段 streaming=True/False? ──→ 用它
│
有 StreamingCallbackHandler? ──→ True(隐式流式)
│
└──→ False(默认不流式)
disable_streaming 字段有三个值:
| 值 | 含义 |
|---|---|
False(默认) |
正常流式 |
True |
完全禁用流式,stream() 退化为 invoke() |
"tool_calling" |
仅在传入 tools 参数时禁用流式 |
_stream 返回 Iterator[ChatGenerationChunk]
子类要实现的流式方法(chat_models.py:1435-1453):
# chat_models.py:1435-1453
def _stream(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: CallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
raise NotImplementedError # ← 基类默认不实现
每个 ChatGenerationChunk 内部包含一个 AIMessageChunk。看 FakeListChatModel 的流式实现(fake_chat_models.py:94-121):
# fake_chat_models.py:94-121
def _stream(self, messages, stop=None, run_manager=None, **kwargs):
response = self.responses[self.i] # 取预设回复(如 "hello")
# ... 更新索引 ...
for i_c, c in enumerate(response): # 逐字符遍历
chunk_position = "last" if i_c == len(response) - 1 else None
yield ChatGenerationChunk(
message=AIMessageChunk(
content=c, # ← 每个字符一个 chunk
chunk_position=chunk_position # ← 最后一个标记为 "last"
)
)
关键点:最后一个 chunk 必须设置 chunk_position="last"。这个标记告诉框架"流结束了",触发 tool_call_chunks 到 tool_calls 的解析。
AIMessageChunk.__add__ 的拼接机制
流式输出的 chunk 需要能拼接回完整消息。这由 AIMessageChunk.__add__ 实现(ai.py:627-718):
# ai.py:627-635
def __add__(self, other: Any) -> BaseMessageChunk:
if isinstance(other, AIMessageChunk):
return add_ai_message_chunks(self, other)
if isinstance(other, (list, tuple)) and all(isinstance(o, AIMessageChunk) for o in other):
return add_ai_message_chunks(self, *other)
return super().__add__(other)
核心逻辑在 add_ai_message_chunks(ai.py:638-718):
# ai.py:638-718(简化)
def add_ai_message_chunks(left, *others):
# 1. 合并 content(字符串拼接 or 列表合并)
content = merge_content(left.content, *(o.content for o in others))
# 2. 合并 additional_kwargs(深度合并字典)
additional_kwargs = merge_dicts(left.additional_kwargs, *(o.additional_kwargs for o in others))
# 3. 合并 response_metadata
response_metadata = merge_dicts(left.response_metadata, *(o.response_metadata for o in others))
# 4. 合并 tool_call_chunks(按 index 合并列表)
raw_tool_calls = merge_lists(left.tool_call_chunks, *(o.tool_call_chunks for o in others))
# 5. 合并 usage_metadata(数值相加)
usage_metadata = add_usage(left.usage_metadata, *(o.usage_metadata for o in others))
# 6. 选择最佳 ID(优先用 provider 分配的,其次 lc_run-*)
chunk_id = ... # 优先级:provider ID > lc_run-* > lc_* > None
# 7. chunk_position:任一为 "last" 则结果为 "last"
chunk_position = "last" if any(x.chunk_position == "last" for x in [left, *others]) else None
return left.__class__(
content=content,
additional_kwargs=additional_kwargs,
tool_call_chunks=tool_call_chunks,
response_metadata=response_metadata,
usage_metadata=usage_metadata,
id=chunk_id,
chunk_position=chunk_position,
)
各字段的合并策略:
| 字段 | 合并方式 | 示例 |
|---|---|---|
content(str) |
字符串拼接 | "Hel" + "lo" → "Hello" |
content(list) |
按 index 合并 | 见 merge_content |
additional_kwargs |
深度合并字典 | merge_dicts |
tool_call_chunks |
按 index 合并列表 | merge_lists |
usage_metadata |
数值相加 | input_tokens: 5 + 0 = 5 |
id |
取优先级最高的 | provider ID > lc_run-* |
chunk_position |
任一 “last” 则 “last” | OR 逻辑 |
用代码验证:
from langchain_core.messages import AIMessageChunk
from langchain_core.messages.ai import UsageMetadata
c1 = AIMessageChunk(content="Hello", usage_metadata=UsageMetadata(input_tokens=10, output_tokens=1, total_tokens=11))
c2 = AIMessageChunk(content=" World", usage_metadata=UsageMetadata(input_tokens=0, output_tokens=2, total_tokens=2))
c3 = AIMessageChunk(content="!", chunk_position="last")
full = c1 + c2 + c3
print(full.content) # Hello World!
print(full.usage_metadata["input_tokens"]) # 10
print(full.usage_metadata["output_tokens"])# 3
print(full.chunk_position) # last
generate_from_stream:流 → 结果转换
流式路径最终需要把所有 chunk 合并成一个 ChatResult。这由 generate_from_stream(chat_models.py:182-208)完成:
# chat_models.py:182-208
def generate_from_stream(stream: Iterator[ChatGenerationChunk]) -> ChatResult:
generation = next(stream, None) # 取第一个 chunk
if generation:
generation += list(stream) # ← 用 __add__ 逐个合并后续 chunk
if generation is None:
raise ValueError("No generations found in stream.")
return ChatResult(
generations=[
ChatGeneration(
message=message_chunk_to_message(generation.message), # Chunk → Message
generation_info=generation.generation_info,
)
]
)
注意 generation += list(stream) 这行——ChatGenerationChunk.__add__ 内部会调用 AIMessageChunk.__add__ 来合并消息。
_generate_with_cache 中的流式路径
在 _generate_with_cache(chat_models.py:1136-1260)中,流式并不是用户显式调用 stream 才触发。如果有 StreamingCallbackHandler,invoke 内部也会走流式路径:
# chat_models.py:1176-1231(简化)
def _generate_with_cache(self, messages, stop=None, run_manager=None, **kwargs):
# ... 缓存检查 ...
# ... 限流 ...
if self._should_stream(async_api=False, run_manager=run_manager, **kwargs):
# 即使是 invoke 调用,也走流式路径!
chunks = []
for chunk in self._stream(messages, stop=stop, **kwargs):
if run_manager:
run_manager.on_llm_new_token(chunk.message.content, chunk=chunk)
chunks.append(chunk)
result = generate_from_stream(iter(chunks)) # 合并成 ChatResult
else:
# 正常的非流式路径
result = self._generate(messages, stop=stop, run_manager=run_manager, **kwargs)
return result
这意味着:当使用 astream_events 时,即使你调用的是 invoke,底层也会走 _stream 来获得逐 token 的回调。
on_llm_new_token 回调
每个 chunk 都会触发 on_llm_new_token 回调。看 stream 方法中的调用(chat_models.py:557-558):
# chat_models.py:557-558
run_manager.on_llm_new_token(
cast("str", chunk.message.content), # token 文本
chunk=chunk # 完整的 ChatGenerationChunk
)
回调的完整生命周期:
on_chat_model_start(serialized, messages) ← stream/invoke 开始
│
├─ on_llm_new_token(token, chunk=chunk) ← 每个 chunk(仅流式)
├─ on_llm_new_token(token, chunk=chunk)
├─ ...
│
├─ on_llm_end(result) ← 正常结束
└─ on_llm_error(error) ← 异常结束
小结
本篇拆解了 ChatModel 的流式机制:
stream方法自己管理回调生命周期,不依赖Runnable.stream的默认实现_should_stream按优先级判断:子类实现 → disable_streaming → 显式参数 → 模型字段 → 回调处理器_stream返回Iterator[ChatGenerationChunk],每个包含一个AIMessageChunkAIMessageChunk.__add__逐字段合并:content 拼接、usage 相加、tool_call_chunks 按 index 合并_generate_with_cache中,如果检测到流式回调处理器,invoke也会走_stream路径chunk_position="last"标记流结束,触发 tool_call_chunks 解析
下一篇预告:
bind_tools是怎么把 Pydantic 模型变成 tool schema 的?with_structured_output又是怎么把 tool call 结果解析成结构化对象的?
LangChain ChatModel 深度解析(三):Tool Calling 与结构化输出
一个能跑的例子
from langchain_core.language_models.fake_chat_models import FakeMessagesListChatModel
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.output_parsers.openai_tools import PydanticToolsParser
from pydantic import BaseModel, Field
# 定义结构化输出 schema
class Weather(BaseModel):
"""天气查询结果。"""
city: str = Field(description="城市名")
temperature: float = Field(description="温度(摄氏度)")
# 模拟一个返回 tool call 的模型
fake_response = AIMessage(
content="",
tool_calls=[{
"name": "Weather",
"args": {"city": "北京", "temperature": 25.0},
"id": "call_001",
}]
)
chat = FakeMessagesListChatModel(responses=[fake_response])
# 方式一:手动 bind_tools + PydanticToolsParser
# (FakeMessagesListChatModel 没实现 bind_tools,所以我们直接演示 parser)
parser = PydanticToolsParser(tools=[Weather], first_tool_only=True)
from langchain_core.outputs import ChatGeneration
result = parser.parse_result([ChatGeneration(message=fake_response)])
print(result) # city='北京' temperature=25.0
print(type(result)) # <class '__main__.Weather'>
# 方式二:直接从 AIMessage 读取 tool_calls
msg = chat.invoke([HumanMessage(content="北京天气")])
print(msg.tool_calls) # [{'name': 'Weather', 'args': {'city': '北京', 'temperature': 25.0}, 'id': 'call_001', 'type': 'tool_call'}]
Tool Calling 的本质:模型返回的 AIMessage.tool_calls 是一个结构化的字典列表,描述"要调用什么函数、传什么参数"。
bind_tools 的实现
BaseChatModel.bind_tools(chat_models.py:1521-1538)在基类中直接抛异常:
# chat_models.py:1521-1538
def bind_tools(
self,
tools: Sequence[dict[str, Any] | type | Callable | BaseTool],
*,
tool_choice: str | None = None,
**kwargs: Any,
) -> Runnable[LanguageModelInput, AIMessage]:
raise NotImplementedError
每个 partner 包必须自己覆盖 bind_tools。看 OpenAI 的实现(openai/base.py:1819):
# openai/base.py:1819-1828(签名)
def bind_tools(
self,
tools: Sequence[dict[str, Any] | type | Callable | BaseTool],
*,
tool_choice: dict | str | bool | None = None,
strict: bool | None = None, # ← OpenAI 特有:严格模式
parallel_tool_calls: bool | None = None, # ← OpenAI 特有:并行调用
response_format: _DictOrPydanticClass | None = None,
**kwargs: Any,
) -> Runnable[LanguageModelInput, AIMessage]:
# 核心逻辑:把 tools 转成 OpenAI 格式,然后 self.bind(tools=formatted_tools)
...
而 Anthropic 的实现(anthropic/chat_models.py:1451):
# anthropic/chat_models.py:1451(签名)
def bind_tools(
self,
tools: Sequence[Mapping[str, Any] | type | Callable | BaseTool],
*,
tool_choice: dict[str, str] | str | None = None,
parallel_tool_calls: bool | None = None,
strict: bool | None = None,
**kwargs: Any,
) -> Runnable[LanguageModelInput, AIMessage]:
...
bind_tools 的本质是 self.bind(**kwargs) —— 即创建一个 RunnableBinding,把 tools 参数固定到后续的每次调用中。
convert_to_openai_tool:统一的 tool schema 转换
无论你传入什么格式的 tool,最终都要转成 OpenAI 格式。这由 convert_to_openai_tool(function_calling.py:498-557)完成:
# function_calling.py:498-557(简化)
def convert_to_openai_tool(
tool: Mapping[str, Any] | type[BaseModel] | Callable | BaseTool,
*,
strict: bool | None = None,
) -> dict[str, Any]:
# 1. 已经是 OpenAI Responses API 格式?直接返回
if isinstance(tool, dict) and tool.get("type") in ("function", "file_search", ...):
return tool
# 2. 其他格式 → 先转成 OpenAI function
oai_function = convert_to_openai_function(tool, strict=strict)
# 3. 包装成 {"type": "function", "function": {...}}
return {"type": "function", "function": oai_function}
而 convert_to_openai_function(function_calling.py:361-480)处理各种输入:
# function_calling.py:361-480(简化)
def convert_to_openai_function(function, *, strict=None):
# Anthropic 格式 {"name": ..., "input_schema": ...}
if isinstance(function, dict) and "input_schema" in function:
oai_function = {"name": function["name"], "parameters": function["input_schema"]}
# Amazon Bedrock 格式 {"toolSpec": {...}}
elif isinstance(function, dict) and "toolSpec" in function:
oai_function = {"name": ..., "parameters": ...}
# 已经是 OpenAI 格式 {"name": ...}
elif isinstance(function, dict) and "name" in function:
oai_function = {k: v for k, v in function.items() if k in {"name", "description", "parameters", "strict"}}
# JSON Schema {"title": ...}
elif isinstance(function, dict) and "title" in function:
oai_function = {"name": function.pop("title"), ...}
# Pydantic BaseModel
elif isinstance(function, type) and is_basemodel_subclass(function):
oai_function = _convert_pydantic_to_openai_function(function)
# TypedDict
elif is_typeddict(function):
oai_function = _convert_typed_dict_to_openai_function(function)
# BaseTool
elif isinstance(function, BaseTool):
oai_function = _format_tool_to_openai_function(function)
# 普通 Python 函数
elif callable(function):
oai_function = _convert_python_function_to_openai_function(function)
else:
raise ValueError(...)
return oai_function
输入类型 → OpenAI tool 格式对照表:
| 输入类型 | 转换函数 | 输出格式 |
|---|---|---|
Pydantic BaseModel |
_convert_pydantic_to_openai_function |
model.model_json_schema() → 清理 |
Python function |
_convert_python_function_to_openai_function |
从 type hints + docstring 提取 |
TypedDict |
_convert_typed_dict_to_openai_function |
先转 Pydantic,再转 schema |
BaseTool |
_format_tool_to_openai_function |
用 tool.tool_call_schema |
dict(Anthropic 格式) |
直接转换 | input_schema → parameters |
dict(OpenAI 格式) |
透传 | 保留 name/description/parameters |
Pydantic → OpenAI 的转换中有两个重要的清理步骤:
_rm_titles(function_calling.py:88-119):递归删除 JSON schema 中的title字段(Pydantic 自动生成的)dereference_refs:展开$ref引用,把嵌套的$defs内联
OpenAI vs Anthropic 的 tool 格式差异
| 字段 | OpenAI 格式 | Anthropic 格式 |
|---|---|---|
| 外层结构 | {"type": "function", "function": {...}} |
{"name": ..., "input_schema": {...}} |
| 名称 | function.name |
name |
| 描述 | function.description |
description |
| 参数 | function.parameters |
input_schema |
| 严格模式 | function.strict: true |
不支持 |
| 并行调用 | parallel_tool_calls: true(请求级) |
parallel_tool_calls: true |
convert_to_openai_tool 统一把所有格式转成 OpenAI 格式。Anthropic 的 bind_tools 再把 OpenAI 格式转回 Anthropic 格式。
with_structured_output:bind_tools + Parser 的组合
with_structured_output(chat_models.py:1540-1721)是最常用的结构化输出方法。它的实现出人意料地简单:
# chat_models.py:1685-1721(简化)
def with_structured_output(self, schema, *, include_raw=False, **kwargs):
# 0. 检查 bind_tools 是否实现
if type(self).bind_tools is BaseChatModel.bind_tools:
raise NotImplementedError("with_structured_output is not implemented for this model.")
# 1. 绑定工具
llm = self.bind_tools(
[schema],
tool_choice="any", # ← 强制模型调用工具
ls_structured_output_format={...}, # ← LangSmith 追踪信息
)
# 2. 选择 Output Parser
if isinstance(schema, type) and is_basemodel_subclass(schema):
# Pydantic 类 → PydanticToolsParser
output_parser = PydanticToolsParser(tools=[schema], first_tool_only=True)
else:
# dict/JSON Schema → JsonOutputKeyToolsParser
key_name = convert_to_openai_tool(schema)["function"]["name"]
output_parser = JsonOutputKeyToolsParser(key_name=key_name, first_tool_only=True)
# 3. 组合
if include_raw:
# include_raw=True → 返回 {"raw": AIMessage, "parsed": Pydantic, "parsing_error": None}
parser_assign = RunnablePassthrough.assign(
parsed=itemgetter("raw") | output_parser,
parsing_error=lambda _: None
)
parser_none = RunnablePassthrough.assign(parsed=lambda _: None)
parser_with_fallback = parser_assign.with_fallbacks(
[parser_none], exception_key="parsing_error"
)
return RunnableMap(raw=llm) | parser_with_fallback
else:
# include_raw=False → 直接返回解析结果
return llm | output_parser
流程图:
include_raw=False:
input → llm.bind_tools([schema]) → AIMessage → output_parser → Pydantic/dict
╰─ tool_choice="any" ╰─ 从 tool_calls 提取
include_raw=True:
input → RunnableMap(raw=llm) → {"raw": AIMessage}
│
▼
parser_assign
├─ parsed = raw | output_parser
└─ parsing_error = None
│
▼(如果解析失败,fallback)
parser_none
├─ parsed = None
└─ parsing_error = Exception
│
▼
{"raw": AIMessage, "parsed": ..., "parsing_error": ...}
Output Parser 体系
PydanticToolsParser(openai_tools.py:306-384)
用于把 AIMessage.tool_calls 解析为 Pydantic 对象:
# openai_tools.py:306-384(简化)
class PydanticToolsParser(JsonOutputToolsParser):
tools: list[TypeBaseModel] # 可能的 Pydantic 类列表
def parse_result(self, result, *, partial=False):
json_results = super().parse_result(result, partial=partial)
# super() 从 AIMessage.tool_calls 提取 [{"type": "Weather", "args": {...}}]
# 构建 name → class 映射
name_dict = {tool.__name__: tool for tool in self.tools}
pydantic_objects = []
for res in json_results:
tool = name_dict[res["type"]] # 找到对应的 Pydantic 类
pydantic_objects.append(tool(**res["args"])) # 实例化
if self.first_tool_only:
return pydantic_objects[0]
return pydantic_objects
JsonOutputKeyToolsParser(openai_tools.py:225-295)
用于提取指定工具名的 args 字典:
# openai_tools.py:225-295(简化)
class JsonOutputKeyToolsParser(JsonOutputToolsParser):
key_name: str # 要提取的工具名
def parse_result(self, result, *, partial=False):
# 从 tool_calls 中筛选 name == key_name 的
# 返回 args dict(如果 first_tool_only)或 args dict 列表
...
解析链路:
AIMessage.tool_calls
= [{"name": "Weather", "args": {"city": "北京", ...}, "id": "call_001"}]
│
▼
JsonOutputToolsParser.parse_result()
→ [{"type": "Weather", "args": {"city": "北京", ...}}] # name → type
│
┌───────┴───────┐
▼ ▼
PydanticToolsParser JsonOutputKeyToolsParser
→ Weather(...) → {"city": "北京", ...}
小结
本篇拆解了 ChatModel 的 Tool Calling 和结构化输出机制:
bind_tools在基类抛NotImplementedError,各 partner 包自行实现,本质是self.bind(tools=...)convert_to_openai_tool统一转换 Pydantic/函数/dict/BaseTool → OpenAI tool schema- Schema 清理:
_rm_titles删除 Pydantic 自动生成的 title,dereference_refs展开$ref with_structured_output=bind_tools([schema], tool_choice="any")+OutputParser- 两种 Parser:
PydanticToolsParser(→ Pydantic 对象)、JsonOutputKeyToolsParser(→ dict) include_raw=True模式用RunnablePassthrough.assign+with_fallbacks包装
下一篇预告:
invoke完整调用栈是什么?缓存和限流怎么工作?OpenAI 和 Anthropic 的 partner 实现有哪些差异?
LangChain ChatModel 深度解析(四):源码架构——缓存、限流与 Partner 集成
一个能跑的例子
from langchain_core.language_models.chat_models import BaseChatModel, SimpleChatModel
from langchain_core.language_models.base import BaseLanguageModel
from langchain_core.runnables import RunnableSerializable, Runnable
# 5 个 assert 验证继承关系
assert issubclass(BaseChatModel, BaseLanguageModel) # ChatModel IS-A LanguageModel
assert issubclass(BaseLanguageModel, RunnableSerializable) # LanguageModel IS-A RunnableSerializable
assert issubclass(RunnableSerializable, Runnable) # RunnableSerializable IS-A Runnable
assert issubclass(SimpleChatModel, BaseChatModel) # SimpleChatModel IS-A BaseChatModel
# BaseChatModel 的输出类型是 AIMessage
from langchain_core.messages import AnyMessage
model = SimpleChatModel.__dict__ # 不能直接实例化抽象类
# 通过 OutputType 属性验证
from langchain_core.language_models.fake_chat_models import FakeChatModel
assert FakeChatModel().OutputType == AnyMessage
print("所有断言通过!继承链完整。")
完整调用栈
用户调用 model.invoke("hello")
│
▼
BaseChatModel.invoke() (chat_models.py:388)
│ config = ensure_config(config)
│ input → _convert_input() → PromptValue
│
▼
BaseChatModel.generate_prompt(prompts) (chat_models.py:1112)
│ prompts → [p.to_messages() for p in prompts]
│
▼
BaseChatModel.generate(messages) (chat_models.py:842)
│ 配置 CallbackManager
│ on_chat_model_start(serialized, messages)
│
▼
BaseChatModel._generate_with_cache(messages) (chat_models.py:1136)
│
├─ 1. 缓存查找
│ llm_cache.lookup(prompt, llm_string)
│ 命中?→ 直接返回 ChatResult
│
├─ 2. 限流
│ rate_limiter.acquire(blocking=True)
│
├─ 3. 流式判断
│ _should_stream() == True?
│ ├─ Yes → _stream(messages) → generate_from_stream()
│ └─ No → _generate(messages) ← 子类实现
│
├─ 4. 后处理
│ 设置 response_metadata, message.id
│
└─ 5. 缓存写入
llm_cache.update(prompt, llm_string, generations)
│
▼
返回 ChatResult → .generations[0].message → AIMessage
│
▼
BaseChatModel.generate()
│ on_llm_end(result)
│
▼
用户拿到 AIMessage
每一层的职责非常明确:
| 方法 | 职责 |
|---|---|
invoke |
输入转换 + config 处理 |
generate_prompt |
PromptValue → list[BaseMessage] |
generate |
回调管理(start/end/error) |
_generate_with_cache |
缓存 + 限流 + 流式判断 |
_generate(子类) |
实际调用 LLM API |
缓存机制
BaseCache 接口(caches.py:32)
# caches.py:32-46
class BaseCache(ABC):
@abstractmethod
def lookup(self, prompt: str, llm_string: str) -> RETURN_VAL_TYPE | None:
"""缓存查找。返回 None 表示未命中。"""
@abstractmethod
def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None:
"""写入缓存。"""
@abstractmethod
def clear(self, **kwargs: Any) -> None:
"""清空缓存。"""
缓存的 key 是 (prompt, llm_string) 二元组:
prompt:消息列表的 JSON 序列化(dumps(normalized_messages))llm_string:模型配置的序列化(模型名、温度、stop 等参数)
InMemoryCache(caches.py:155)
# caches.py:155-272(简化)
class InMemoryCache(BaseCache):
def __init__(self, *, maxsize: int | None = None):
self._cache: dict[tuple[str, str], RETURN_VAL_TYPE] = {} # ← 就是一个字典
self._maxsize = maxsize
def lookup(self, prompt, llm_string):
return self._cache.get((prompt, llm_string), None)
def update(self, prompt, llm_string, return_val):
if self._maxsize and len(self._cache) == self._maxsize:
del self._cache[next(iter(self._cache))] # ← FIFO 淘汰
self._cache[prompt, llm_string] = return_val
def clear(self, **kwargs):
self._cache = {}
缓存在 _generate_with_cache 中的使用(chat_models.py:1136-1260)
# chat_models.py:1136-1260(简化)
def _generate_with_cache(self, messages, stop=None, run_manager=None, **kwargs):
llm_cache = self.cache if isinstance(self.cache, BaseCache) else get_llm_cache()
check_cache = self.cache or self.cache is None # None = 用全局缓存(如果有)
if check_cache and llm_cache:
llm_string = self._get_llm_string(stop=stop, **kwargs)
# 消息 ID 归零,确保相同内容的消息命中同一缓存
normalized_messages = [
msg.model_copy(update={"id": None}) if msg.id else msg
for msg in messages
]
prompt = dumps(normalized_messages)
# 查找缓存
cache_val = llm_cache.lookup(prompt, llm_string)
if isinstance(cache_val, list):
return ChatResult(generations=self._convert_cached_generations(cache_val))
# 缓存未命中 → 限流 → 调用模型
if self.rate_limiter:
self.rate_limiter.acquire(blocking=True)
result = self._generate(messages, stop=stop, ...)
# 写入缓存
if check_cache and llm_cache:
llm_cache.update(prompt, llm_string, result.generations)
return result
缓存流程图:
_generate_with_cache(messages)
│
├─ cache = self.cache or get_llm_cache()
│
├─ check_cache?
│ ├─ self.cache = True → 用全局缓存,没有则报错
│ ├─ self.cache = False → 不检查
│ ├─ self.cache = None → 用全局缓存(如果有)
│ └─ self.cache = BaseCache → 用这个实例
│
├─ lookup(prompt, llm_string)
│ ├─ 命中 → 返回缓存结果(跳过限流和 API 调用)
│ └─ 未命中 → 继续
│
├─ rate_limiter.acquire() ← 限流在缓存之后!
│
├─ _generate(messages) ← 调用模型
│
└─ update(prompt, llm_string, generations) ← 写入缓存
重要细节:限流在缓存检查之后。缓存命中时不会消耗限流配额。
限流机制
BaseRateLimiter 接口(rate_limiters.py:11)
# rate_limiters.py:11-64
class BaseRateLimiter(ABC):
@abstractmethod
def acquire(self, *, blocking: bool = True) -> bool:
"""获取令牌。blocking=True 时阻塞等待。"""
@abstractmethod
async def aacquire(self, *, blocking: bool = True) -> bool:
"""异步获取令牌。"""
InMemoryRateLimiter:Token Bucket 算法(rate_limiters.py:67)
# rate_limiters.py:67-250(简化)
class InMemoryRateLimiter(BaseRateLimiter):
def __init__(self, *, requests_per_second=1, check_every_n_seconds=0.1, max_bucket_size=1):
self.requests_per_second = requests_per_second
self.available_tokens = 0.0 # 当前可用令牌数
self.max_bucket_size = max_bucket_size
self._consume_lock = threading.Lock()
self.last = None # 上次消费时间
def _consume(self) -> bool:
with self._consume_lock:
now = time.monotonic()
if self.last is None:
self.last = now # 首次调用初始化
elapsed = now - self.last
if elapsed * self.requests_per_second >= 1:
self.available_tokens += elapsed * self.requests_per_second
self.last = now
# 限制最大突发
self.available_tokens = min(self.available_tokens, self.max_bucket_size)
if self.available_tokens >= 1:
self.available_tokens -= 1
return True # 获取成功
return False # 令牌不足
def acquire(self, *, blocking=True):
if not blocking:
return self._consume()
while not self._consume(): # 阻塞轮询
time.sleep(self.check_every_n_seconds)
return True
Token Bucket 算法图解:
┌──────────────┐
requests_per_second ──→│ 令 牌 桶 │←── max_bucket_size(上限)
(每秒补充速率) │ │
│ ○ ○ ○ ○ ○ │ available_tokens(当前令牌数)
│ │
└──────┬───────┘
│
_consume()
available >= 1?
╱ ╲
Yes No
tokens -= 1 blocking?
return True ╱ ╲
Yes No
sleep() return False
retry
使用示例:
from langchain_core.rate_limiters import InMemoryRateLimiter
rate_limiter = InMemoryRateLimiter(
requests_per_second=0.5, # 每 2 秒 1 次请求
check_every_n_seconds=0.1, # 每 100ms 检查一次
max_bucket_size=2, # 最多攒 2 个令牌(允许小突发)
)
# 传给 ChatModel
from langchain_core.language_models.fake_chat_models import FakeChatModel
model = FakeChatModel(rate_limiter=rate_limiter)
import time
for i in range(3):
start = time.time()
model.invoke("hello")
print(f"请求 {i}: {time.time() - start:.1f}s")
# 请求 0: 0.0s(首次立即通过 — 初始化不给令牌,但首次 elapsed 够用)
# 请求 1: 2.0s(等待 2 秒)
# 请求 2: 2.0s(等待 2 秒)
回调层级
ChatModel 的回调遵循统一的生命周期:
on_chat_model_start(serialized, messages, ...)
│
│ ┌─────── 流式路径 ───────┐
│ │ │
│ │ on_llm_new_token(token, chunk=chunk) ← 每个 chunk
│ │ on_llm_new_token(token, chunk=chunk)
│ │ ... │
│ └─────────────────────────┘
│
├─ on_llm_end(result) ← 成功结束
└─ on_llm_error(error) ← 异常结束
回调在 generate 方法中配置(chat_models.py:903-923):
# chat_models.py:903-923
callback_manager = CallbackManager.configure(
callbacks, # 用户传入的回调
self.callbacks, # 模型实例上的回调
self.verbose,
tags,
self.tags,
inheritable_metadata,
self.metadata,
)
run_managers = callback_manager.on_chat_model_start(
self._serialized,
messages_to_trace,
invocation_params=params,
options=options,
name=run_name,
run_id=run_id,
batch_size=len(messages),
)
Partner 集成模式对比
OpenAI 和 Anthropic 是两个最重要的 partner 包。它们的核心差异:
| 方面 | OpenAI(BaseChatOpenAI) |
Anthropic(ChatAnthropic) |
|---|---|---|
| 文件 | openai/base.py |
anthropic/chat_models.py |
| 总行数 | 4609 行 | 2152 行 |
| 类定义 | 第 488 行 | 第 733 行 |
_generate |
第 1338 行 | 第 1394 行 |
_stream |
第 1267 行 | 第 1264 行 |
bind_tools |
第 1819 行 | 第 1451 行 |
| API 客户端 | openai.OpenAI |
anthropic.Anthropic |
| Tool 格式 | OpenAI 原生 | Anthropic 原生(内部转换) |
OpenAI 实现特点
# openai/base.py:488
class BaseChatOpenAI(BaseChatModel):
client: Any # openai.OpenAI
async_client: Any # openai.AsyncOpenAI
model_name: str = Field(alias="model") # gpt-4o, gpt-4o-mini 等
temperature: float | None = None
stream_usage: bool | None = None # 流式时是否返回 usage
OpenAI 的 _generate(base.py:1338)有两条路径:
response_format存在 → 用client.beta.chat.completions.parse()- 其他情况 → 用
client.chat.completions.create()
OpenAI 的 bind_tools(base.py:1819)有额外参数:
strict: bool—— JSON Schema 严格模式parallel_tool_calls: bool—— 允许一次调用多个工具response_format—— 结构化输出格式
Anthropic 实现特点
# anthropic/chat_models.py:733
class ChatAnthropic(BaseChatModel):
model: str = Field(alias="model_name") # claude-sonnet-4-5-20250929 等
max_tokens: int = 1024 # Anthropic 必须指定
temperature: float | None = None
default_headers: dict[str, str] | None = None
Anthropic 的 tool 格式不同于 OpenAI:
# OpenAI 格式
{"type": "function", "function": {"name": "weather", "parameters": {...}}}
# Anthropic 格式
{"name": "weather", "input_schema": {...}}
ChatAnthropic.bind_tools 内部会把 tools 转成 Anthropic 格式。
_get_ls_params:LangSmith 追踪参数提取
_get_ls_params(chat_models.py:788-826)为 LangSmith 追踪提取标准参数:
# chat_models.py:788-826(简化)
def _get_ls_params(self, stop=None, **kwargs) -> LangSmithParams:
# 从类名推断 provider(ChatOpenAI → openai, ChatAnthropic → anthropic)
default_provider = self.__class__.__name__
if default_provider.startswith("Chat"):
default_provider = default_provider[4:].lower()
ls_params = LangSmithParams(ls_provider=default_provider, ls_model_type="chat")
# 从 kwargs 或 self 提取 model, temperature, max_tokens, stop
if hasattr(self, "model"):
ls_params["ls_model_name"] = self.model
if hasattr(self, "temperature"):
ls_params["ls_temperature"] = self.temperature
if hasattr(self, "max_tokens"):
ls_params["ls_max_tokens"] = self.max_tokens
if stop:
ls_params["ls_stop"] = stop
return ls_params
这些参数会传入 inheritable_metadata,最终出现在 LangSmith 追踪中。
自定义 ChatModel 实现清单
基于全部 4 篇的分析,这是实现一个生产级 ChatModel 的完整清单:
必须实现(最小合约)
| 方法/属性 | 签名 | 说明 |
|---|---|---|
_generate |
(messages, stop, run_manager, **kwargs) → ChatResult |
核心生成逻辑 |
_llm_type |
@property → str |
模型类型标识 |
建议实现(提升体验)
| 方法 | 签名 | 说明 |
|---|---|---|
_stream |
(messages, stop, run_manager, **kwargs) → Iterator[ChatGenerationChunk] |
流式输出 |
_agenerate |
async (messages, stop, run_manager, **kwargs) → ChatResult |
原生异步 |
_astream |
async (messages, stop, run_manager, **kwargs) → AsyncIterator[ChatGenerationChunk] |
异步流式 |
bind_tools |
(tools, tool_choice, **kwargs) → Runnable |
工具绑定 |
_get_ls_params |
(stop, **kwargs) → LangSmithParams |
LangSmith 追踪 |
可选实现(高级功能)
| 方法/属性 | 说明 |
|---|---|
with_structured_output |
覆盖基类默认实现(如支持 json_schema method) |
_identifying_params |
模型参数(用于缓存 key、追踪) |
_get_llm_string |
自定义缓存 key 生成 |
profile: ModelProfile |
声明模型能力(上下文窗口、支持的功能等) |
最小实现模板
from typing import Any, Iterator
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import AIMessage, AIMessageChunk, BaseMessage
from langchain_core.outputs import ChatResult, ChatGeneration, ChatGenerationChunk
from langchain_core.callbacks import CallbackManagerForLLMRun
class MyProductionChat(BaseChatModel):
"""自定义 ChatModel 模板。"""
model: str = "my-model-v1"
temperature: float = 0.7
api_key: str = ""
def _generate(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: CallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> ChatResult:
# TODO: 调用你的 LLM API
response_text = self._call_api(messages, stop, **kwargs)
return ChatResult(
generations=[ChatGeneration(message=AIMessage(content=response_text))]
)
def _stream(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: CallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
# TODO: 调用你的流式 API
for token in self._call_api_stream(messages, stop, **kwargs):
chunk = ChatGenerationChunk(
message=AIMessageChunk(content=token)
)
if run_manager:
run_manager.on_llm_new_token(token, chunk=chunk)
yield chunk
@property
def _llm_type(self) -> str:
return "my-production-chat"
def _call_api(self, messages, stop, **kwargs) -> str:
"""实际 API 调用(占位)。"""
return "This is a placeholder response."
def _call_api_stream(self, messages, stop, **kwargs):
"""实际流式 API 调用(占位)。"""
for word in "This is a streaming response".split():
yield word + " "
小结
本篇拆解了 ChatModel 的底层架构:
- 完整调用栈:
invoke → generate_prompt → generate → _generate_with_cache → _generate - 缓存:
BaseCache接口,key =(prompt_json, llm_string),在限流之前检查 - 限流:
InMemoryRateLimiter基于 Token Bucket 算法,线程安全 - 回调:
on_chat_model_start → on_llm_new_token → on_llm_end/on_llm_error - Partner 差异:OpenAI 4609 行 vs Anthropic 2152 行,tool 格式不同但通过
convert_to_openai_tool统一 - LangSmith 追踪:
_get_ls_params从类名和字段自动提取 provider/model/temperature
4 篇系列回顾
| 篇 | 主题 | 核心源码 |
|---|---|---|
| 第 1 篇 | 5 分钟上手 | _convert_input、继承链、_generate/_llm_type 最小合约 |
| 第 2 篇 | 流式原理 | stream、_should_stream、AIMessageChunk.__add__、generate_from_stream |
| 第 3 篇 | Tool Calling | bind_tools、convert_to_openai_tool、with_structured_output、PydanticToolsParser |
| 第 4 篇 | 源码架构 | _generate_with_cache、InMemoryCache、InMemoryRateLimiter、Partner 对比 |
ChatModel 是连接消息系统和 Runnable 体系的核心枢纽:
- 输入端:接收
list[BaseMessage](消息系统,见"消息深度解析"系列) - 自身:作为
Runnable子类,拥有invoke/stream/batch全套能力(见"Runnables 深度解析"系列) - 输出端:返回
AIMessage,可能包含tool_calls,驱动 Agent 循环
更多推荐



所有评论(0)