本系列共 4 部分,从日常使用到源码架构,完整拆解 LangChain 的 ChatModel 组件。

  • 第 1 部分:5 分钟上手 ChatModel(本文)
  • 第 2 部分:流式原理——从 _streamAIMessageChunk
  • 第 3 部分:Tool Calling 与结构化输出
  • 第 4 部分:源码架构——缓存、限流与 Partner 集成

LangChain ChatModel 深度解析(一):5 分钟上手 ChatModel

一个能跑的例子

from langchain_core.language_models.fake_chat_models import (
    FakeMessagesListChatModel,
    FakeListChatModel,
)
from langchain_core.messages import AIMessage, HumanMessage

# 方式一:FakeMessagesListChatModel —— 直接返回预设的 BaseMessage
chat = FakeMessagesListChatModel(responses=[AIMessage(content="你好!我是假模型")])

result = chat.invoke([HumanMessage(content="你好")])
print(type(result))    # <class 'langchain_core.messages.ai.AIMessage'>
print(result.content)  # 你好!我是假模型

# 方式二:FakeListChatModel —— 返回字符串,自动包装成 AIMessage,且支持流式
chat2 = FakeListChatModel(responses=["I", "am", "fake"])

# invoke:取第一个响应
print(chat2.invoke("hi").content)  # I

# stream:逐字符流式输出
for chunk in chat2.stream("hello"):
    print(chunk.content, end="")  # a(逐字符:a, m)
print()

# batch:批量调用
results = chat2.batch(["a", "b"])
print([r.content for r in results])  # ['fake', 'I']  — 循环取响应

不需要 API key,不需要网络。FakeMessagesListChatModelFakeListChatModel 是官方测试用的假模型,行为和真模型完全一致。


ChatModel 的输入输出类型

ChatModel 接受三种输入,统一输出 AIMessage

输入类型 示例 说明
str "你好" 自动包装成 HumanMessage
list[BaseMessage] [HumanMessage(...)] 最常用的方式
PromptValue ChatPromptValue(...) 模板渲染后的结果

这个类型定义在 base.py:122

# language_models/base.py:122
LanguageModelInput = PromptValue | str | Sequence[MessageLikeRepresentation]

三种输入怎么统一?看 _convert_inputchat_models.py:375-386):

# chat_models.py:375-386
def _convert_input(self, model_input: LanguageModelInput) -> PromptValue:
    if isinstance(model_input, PromptValue):
        return model_input                                    # 已经是 PromptValue,直接返回
    if isinstance(model_input, str):
        return StringPromptValue(text=model_input)            # 字符串 → StringPromptValue
    if isinstance(model_input, Sequence):
        return ChatPromptValue(messages=convert_to_messages(model_input))
        # list[Message] → ChatPromptValue
    raise ValueError(f"Invalid input type {type(model_input)}. ...")

流程图:

用户输入(str / list[Message] / PromptValue)
    │
    ▼
_convert_input()  →  统一转为 PromptValue
    │
    ▼
PromptValue.to_messages()  →  list[BaseMessage]
    │
    ▼
_generate(messages)  →  ChatResult
    │
    ▼
ChatResult.generations[0].message  →  AIMessage(返回给用户)

三种调用方式 + async 变体

方法 输入 输出 同步 异步
invoke 单个值 AIMessage invoke() ainvoke()
stream 单个值 Iterator[AIMessageChunk] stream() astream()
batch list[Input] list[AIMessage] batch() abatch()

invoke 的实现(chat_models.py:388-413):

# chat_models.py:388-413
def invoke(
    self,
    input: LanguageModelInput,
    config: RunnableConfig | None = None,
    *,
    stop: list[str] | None = None,
    **kwargs: Any,
) -> AIMessage:
    config = ensure_config(config)
    return cast(
        "AIMessage",
        cast(
            "ChatGeneration",
            self.generate_prompt(                # ← 调用 generate_prompt
                [self._convert_input(input)],    # ← 先统一输入格式
                stop=stop,
                callbacks=config.get("callbacks"),
                tags=config.get("tags"),
                metadata=config.get("metadata"),
                run_name=config.get("run_name"),
                run_id=config.pop("run_id", None),
                **kwargs,
            ).generations[0][0],                 # ← 取第一个结果
        ).message,                               # ← 取出 AIMessage
    )

invoke 并不是直接调用 _generate,而是走了一条长链路:

invoke
  → generate_prompt        (chat_models.py:1112)
    → generate             (chat_models.py:842)
      → _generate_with_cache  (chat_models.py:1136)
        → [检查缓存]
        → [限流器]
        → _generate          (子类实现)

这条链路在第 4 篇会详细分析。现在只需要知道:invoke 最终会调用子类的 _generate 方法。


继承链

Runnable                         ← 7 种方法:invoke/stream/batch + async 变体
  │
RunnableSerializable             ← 序列化支持(JSON dump/load)
  │
BaseLanguageModel[AIMessage]     ← 语言模型通用接口:cache, callbacks, tags, metadata
  │
BaseChatModel                    ← Chat 模型核心:_generate, _stream, bind_tools, with_structured_output
  │
├── SimpleChatModel              ← 简化基类:只需实现 _call(messages) → str
│     │
│     ├── FakeChatModel          ← 总是返回 "fake response"
│     └── FakeListChatModel      ← 循环返回预设字符串列表
│
├── FakeMessagesListChatModel    ← 循环返回预设 BaseMessage 列表
├── GenericFakeChatModel         ← 支持流式的假模型
│
├── ChatOpenAI                   ← OpenAI 集成(partner 包)
└── ChatAnthropic                ← Anthropic 集成(partner 包)

用代码验证:

from langchain_core.language_models.chat_models import BaseChatModel, SimpleChatModel
from langchain_core.language_models.base import BaseLanguageModel
from langchain_core.runnables import RunnableSerializable, Runnable

# 继承链验证
assert issubclass(BaseChatModel, BaseLanguageModel)
assert issubclass(BaseLanguageModel, RunnableSerializable)
assert issubclass(RunnableSerializable, Runnable)

# SimpleChatModel 是 BaseChatModel 的子类
assert issubclass(SimpleChatModel, BaseChatModel)
print("所有断言通过!")

每一层负责什么?

职责
1 Runnable invoke/stream/batch 统一接口
2 RunnableSerializable JSON 序列化/反序列化
3 BaseLanguageModel cache/callbacks/tags/metadata 字段
4 BaseChatModel _generate/_stream/bind_tools/with_structured_output
5 具体子类 实现 _generate,对接真正的 LLM API

核心抽象方法:最小实现合约

要实现一个自定义 ChatModel,最少只需要两样东西

  1. _generate 方法 —— 接收消息列表,返回 ChatResult
  2. _llm_type 属性 —— 返回模型类型字符串

看源码中的定义(chat_models.py:1388-1406):

# chat_models.py:1388-1406
@abstractmethod
def _generate(
    self,
    messages: list[BaseMessage],
    stop: list[str] | None = None,
    run_manager: CallbackManagerForLLMRun | None = None,
    **kwargs: Any,
) -> ChatResult:
    """Generate the result."""

# chat_models.py:1509-1512
@property
@abstractmethod
def _llm_type(self) -> str:
    """Return type of chat model."""

最小实现示例:

from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import AIMessage, BaseMessage
from langchain_core.outputs import ChatResult, ChatGeneration
from langchain_core.callbacks import CallbackManagerForLLMRun
from typing import Any

class MyChatModel(BaseChatModel):
    """最小 ChatModel 实现。"""

    def _generate(
        self,
        messages: list[BaseMessage],
        stop: list[str] | None = None,
        run_manager: CallbackManagerForLLMRun | None = None,
        **kwargs: Any,
    ) -> ChatResult:
        # 取最后一条消息内容,回显
        last = messages[-1].content if messages else ""
        return ChatResult(
            generations=[ChatGeneration(message=AIMessage(content=f"Echo: {last}"))]
        )

    @property
    def _llm_type(self) -> str:
        return "my-chat-model"

# 测试
model = MyChatModel()
print(model.invoke("hello").content)   # Echo: hello
print(model.invoke("world").content)   # Echo: world

这就是一个完整的 ChatModel!它自动获得了 invoke/stream/batch/ainvoke/astream/abatch 全部 6 种调用方式。


SimpleChatModel:只需实现 _call 的简化基类

如果你觉得返回 ChatResult 太麻烦,可以继承 SimpleChatModel——只需返回一个字符串:

# chat_models.py:1724-1753
class SimpleChatModel(BaseChatModel):
    """Simplified implementation for a chat model to inherit from."""

    def _generate(
        self,
        messages: list[BaseMessage],
        stop: list[str] | None = None,
        run_manager: CallbackManagerForLLMRun | None = None,
        **kwargs: Any,
    ) -> ChatResult:
        output_str = self._call(messages, stop=stop, run_manager=run_manager, **kwargs)
        message = AIMessage(content=output_str)            # ← 自动包装成 AIMessage
        generation = ChatGeneration(message=message)       # ← 自动包装成 ChatGeneration
        return ChatResult(generations=[generation])         # ← 自动包装成 ChatResult

    @abstractmethod
    def _call(
        self,
        messages: list[BaseMessage],
        stop: list[str] | None = None,
        run_manager: CallbackManagerForLLMRun | None = None,
        **kwargs: Any,
    ) -> str:
        """Simpler interface."""

SimpleChatModel 帮你做了 str → AIMessage → ChatGeneration → ChatResult 的包装。你只需要:

from langchain_core.language_models.chat_models import SimpleChatModel

class MySimpleChat(SimpleChatModel):
    @property
    def _llm_type(self) -> str:
        return "my-simple-chat"

    def _call(self, messages, stop=None, run_manager=None, **kwargs) -> str:
        return f"收到 {len(messages)} 条消息"

model = MySimpleChat()
print(model.invoke("hi").content)  # 收到 1 条消息

对比两种基类:

BaseChatModel SimpleChatModel
需实现 _generateChatResult _callstr
灵活度 高(可控制 generation_info, llm_output 等) 低(只能返回纯文本)
适用场景 生产级集成 快速原型、测试

小结

本篇覆盖了 ChatModel 的核心概念:

  1. 输入统一str/list[Message]/PromptValue 三种输入通过 _convert_input 统一为 PromptValue
  2. 输出固定:始终返回 AIMessage(invoke)或 AIMessageChunk(stream)
  3. 继承链Runnable → RunnableSerializable → BaseLanguageModel → BaseChatModel
  4. 最小合约:实现 _generate + _llm_type 即可获得完整的 Runnable 能力
  5. 简化基类SimpleChatModel 进一步简化为只需实现 _call → str

下一篇预告stream 方法是怎么把一条完整回复拆成一个个 AIMessageChunk 的?_should_stream 的决策逻辑是什么?AIMessageChunk.__add__ 又是怎么把碎片拼回完整消息的?


LangChain ChatModel 深度解析(二):流式原理——从 _streamAIMessageChunk

一个能跑的例子

from langchain_core.language_models.fake_chat_models import GenericFakeChatModel
from langchain_core.messages import AIMessage

# GenericFakeChatModel 会把回复按空白符拆分成 chunk
chat = GenericFakeChatModel(messages=iter([AIMessage(content="Hello World LangChain")]))

# 逐 chunk 流式输出
chunks = []
for chunk in chat.stream("hi"):
    print(repr(chunk.content), end=" ")
    chunks.append(chunk)

# 输出: 'Hello' ' ' 'World' ' ' 'LangChain'
print()

# 把 chunks 拼回完整消息
full = chunks[0]
for c in chunks[1:]:
    full = full + c           # ← AIMessageChunk.__add__
print(full.content)           # Hello World LangChain
print(type(full))             # <class 'langchain_core.messages.ai.AIMessageChunk'>

流式输出的核心:模型把回复拆成多个 AIMessageChunk,通过 __add__ 可以拼回完整消息。


stream 方法的完整执行路径

BaseChatModel.streamchat_models.py:479-603)做了以下事情:

stream(input)
  │
  ├─ _should_stream() == False?
  │     └─ 是 → 直接调用 invoke(),yield 一次整个 AIMessage,结束
  │
  └─ _should_stream() == True?
        │
        ├─ 1. _convert_input(input).to_messages()    转换输入
        ├─ 2. CallbackManager.configure(...)          配置回调
        ├─ 3. on_chat_model_start(...)                触发开始回调
        ├─ 4. rate_limiter.acquire()                  限流(如果配置了)
        │
        ├─ 5. for chunk in self._stream(messages):    ← 调用子类的 _stream
        │     ├─ 设置 chunk.message.id
        │     ├─ 设置 response_metadata
        │     ├─ on_llm_new_token(chunk)              触发 token 回调
        │     └─ yield chunk.message                  ← 返回给用户
        │
        ├─ 6. yield 最终空 chunk (chunk_position="last")
        │
        └─ 7. merge_chat_generation_chunks(chunks)    合并所有 chunk
              └─ on_llm_end(result)                   触发结束回调

看关键代码段(chat_models.py:479-603,简化后):

# chat_models.py:479-603(简化)
def stream(self, input, config=None, *, stop=None, **kwargs):
    if not self._should_stream(async_api=False, **{**kwargs, "stream": True}):
        # 不支持流式 → 退化为 invoke
        yield cast("AIMessageChunk", self.invoke(input, config=config, stop=stop, **kwargs))
    else:
        config = ensure_config(config)
        messages = self._convert_input(input).to_messages()
        # ... 配置回调 ...
        (run_manager,) = callback_manager.on_chat_model_start(...)  # ← 触发 start

        if self.rate_limiter:
            self.rate_limiter.acquire(blocking=True)              # ← 限流

        try:
            for chunk in self._stream(messages, stop=stop, **kwargs):  # ← 核心循环
                chunk.message.id = run_id
                run_manager.on_llm_new_token(chunk.message.content, chunk=chunk)
                yield chunk.message                                # ← 返回给用户

            # 确保最后一个 chunk 标记为 "last"
            if not chunk.message.chunk_position:
                yield AIMessageChunk(content="", chunk_position="last", id=run_id)

        except BaseException as e:
            run_manager.on_llm_error(e, response=...)              # ← 错误回调
            raise

        generation = merge_chat_generation_chunks(chunks)
        run_manager.on_llm_end(LLMResult(generations=[[generation]]))  # ← 结束回调

注意:stream 方法是 BaseChatModel 自己完全重写的,不是用 Runnable.stream 的默认实现(默认是 yield self.invoke(...))。


_should_stream 决策逻辑

_should_streamchat_models.py:439-477)决定是否走流式路径:

# chat_models.py:439-477
def _should_stream(self, *, async_api: bool, run_manager=None, **kwargs) -> bool:
    # 1. 检查子类是否实现了 _stream/_astream
    sync_not_implemented = type(self)._stream == BaseChatModel._stream
    async_not_implemented = type(self)._astream == BaseChatModel._astream

    if (not async_api) and sync_not_implemented:
        return False
    if async_api and async_not_implemented and sync_not_implemented:
        return False

    # 2. 检查 disable_streaming 字段
    if self.disable_streaming is True:
        return False
    if self.disable_streaming == "tool_calling" and kwargs.get("tools"):
        return False                          # ← tool_calling 模式时不流式

    # 3. 显式传入 stream=True/False
    if "stream" in kwargs:
        return bool(kwargs["stream"])

    # 4. 检查 streaming 字段(部分模型有此字段)
    if "streaming" in self.model_fields_set:
        streaming_value = getattr(self, "streaming", None)
        if isinstance(streaming_value, bool):
            return streaming_value

    # 5. 检查是否有流式回调处理器(如 astream_events)
    handlers = run_manager.handlers if run_manager else []
    return any(isinstance(h, _StreamingCallbackHandler) for h in handlers)

决策优先级:

子类未实现 _stream?  ──→ False(不可能流式)
     │
disable_streaming=True?  ──→ False(用户禁用)
     │
disable_streaming="tool_calling" 且有 tools?  ──→ False
     │
kwargs 中有 stream=True/False?  ──→ 用它
     │
模型字段 streaming=True/False?  ──→ 用它
     │
有 StreamingCallbackHandler?  ──→ True(隐式流式)
     │
     └──→ False(默认不流式)

disable_streaming 字段有三个值:

含义
False(默认) 正常流式
True 完全禁用流式,stream() 退化为 invoke()
"tool_calling" 仅在传入 tools 参数时禁用流式

_stream 返回 Iterator[ChatGenerationChunk]

子类要实现的流式方法(chat_models.py:1435-1453):

# chat_models.py:1435-1453
def _stream(
    self,
    messages: list[BaseMessage],
    stop: list[str] | None = None,
    run_manager: CallbackManagerForLLMRun | None = None,
    **kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
    raise NotImplementedError   # ← 基类默认不实现

每个 ChatGenerationChunk 内部包含一个 AIMessageChunk。看 FakeListChatModel 的流式实现(fake_chat_models.py:94-121):

# fake_chat_models.py:94-121
def _stream(self, messages, stop=None, run_manager=None, **kwargs):
    response = self.responses[self.i]       # 取预设回复(如 "hello")
    # ... 更新索引 ...
    for i_c, c in enumerate(response):      # 逐字符遍历
        chunk_position = "last" if i_c == len(response) - 1 else None
        yield ChatGenerationChunk(
            message=AIMessageChunk(
                content=c,                   # ← 每个字符一个 chunk
                chunk_position=chunk_position # ← 最后一个标记为 "last"
            )
        )

关键点:最后一个 chunk 必须设置 chunk_position="last"。这个标记告诉框架"流结束了",触发 tool_call_chunks 到 tool_calls 的解析。


AIMessageChunk.__add__ 的拼接机制

流式输出的 chunk 需要能拼接回完整消息。这由 AIMessageChunk.__add__ 实现(ai.py:627-718):

# ai.py:627-635
def __add__(self, other: Any) -> BaseMessageChunk:
    if isinstance(other, AIMessageChunk):
        return add_ai_message_chunks(self, other)
    if isinstance(other, (list, tuple)) and all(isinstance(o, AIMessageChunk) for o in other):
        return add_ai_message_chunks(self, *other)
    return super().__add__(other)

核心逻辑在 add_ai_message_chunksai.py:638-718):

# ai.py:638-718(简化)
def add_ai_message_chunks(left, *others):
    # 1. 合并 content(字符串拼接 or 列表合并)
    content = merge_content(left.content, *(o.content for o in others))

    # 2. 合并 additional_kwargs(深度合并字典)
    additional_kwargs = merge_dicts(left.additional_kwargs, *(o.additional_kwargs for o in others))

    # 3. 合并 response_metadata
    response_metadata = merge_dicts(left.response_metadata, *(o.response_metadata for o in others))

    # 4. 合并 tool_call_chunks(按 index 合并列表)
    raw_tool_calls = merge_lists(left.tool_call_chunks, *(o.tool_call_chunks for o in others))

    # 5. 合并 usage_metadata(数值相加)
    usage_metadata = add_usage(left.usage_metadata, *(o.usage_metadata for o in others))

    # 6. 选择最佳 ID(优先用 provider 分配的,其次 lc_run-*)
    chunk_id = ...  # 优先级:provider ID > lc_run-* > lc_* > None

    # 7. chunk_position:任一为 "last" 则结果为 "last"
    chunk_position = "last" if any(x.chunk_position == "last" for x in [left, *others]) else None

    return left.__class__(
        content=content,
        additional_kwargs=additional_kwargs,
        tool_call_chunks=tool_call_chunks,
        response_metadata=response_metadata,
        usage_metadata=usage_metadata,
        id=chunk_id,
        chunk_position=chunk_position,
    )

各字段的合并策略:

字段 合并方式 示例
content(str) 字符串拼接 "Hel" + "lo""Hello"
content(list) 按 index 合并 merge_content
additional_kwargs 深度合并字典 merge_dicts
tool_call_chunks 按 index 合并列表 merge_lists
usage_metadata 数值相加 input_tokens: 5 + 0 = 5
id 取优先级最高的 provider ID > lc_run-*
chunk_position 任一 “last” 则 “last” OR 逻辑

用代码验证:

from langchain_core.messages import AIMessageChunk
from langchain_core.messages.ai import UsageMetadata

c1 = AIMessageChunk(content="Hello", usage_metadata=UsageMetadata(input_tokens=10, output_tokens=1, total_tokens=11))
c2 = AIMessageChunk(content=" World", usage_metadata=UsageMetadata(input_tokens=0, output_tokens=2, total_tokens=2))
c3 = AIMessageChunk(content="!", chunk_position="last")

full = c1 + c2 + c3
print(full.content)                        # Hello World!
print(full.usage_metadata["input_tokens"]) # 10
print(full.usage_metadata["output_tokens"])# 3
print(full.chunk_position)                 # last

generate_from_stream:流 → 结果转换

流式路径最终需要把所有 chunk 合并成一个 ChatResult。这由 generate_from_streamchat_models.py:182-208)完成:

# chat_models.py:182-208
def generate_from_stream(stream: Iterator[ChatGenerationChunk]) -> ChatResult:
    generation = next(stream, None)          # 取第一个 chunk
    if generation:
        generation += list(stream)           # ← 用 __add__ 逐个合并后续 chunk
    if generation is None:
        raise ValueError("No generations found in stream.")
    return ChatResult(
        generations=[
            ChatGeneration(
                message=message_chunk_to_message(generation.message),  # Chunk → Message
                generation_info=generation.generation_info,
            )
        ]
    )

注意 generation += list(stream) 这行——ChatGenerationChunk.__add__ 内部会调用 AIMessageChunk.__add__ 来合并消息。


_generate_with_cache 中的流式路径

_generate_with_cachechat_models.py:1136-1260)中,流式并不是用户显式调用 stream 才触发。如果有 StreamingCallbackHandlerinvoke 内部也会走流式路径:

# chat_models.py:1176-1231(简化)
def _generate_with_cache(self, messages, stop=None, run_manager=None, **kwargs):
    # ... 缓存检查 ...
    # ... 限流 ...

    if self._should_stream(async_api=False, run_manager=run_manager, **kwargs):
        # 即使是 invoke 调用,也走流式路径!
        chunks = []
        for chunk in self._stream(messages, stop=stop, **kwargs):
            if run_manager:
                run_manager.on_llm_new_token(chunk.message.content, chunk=chunk)
            chunks.append(chunk)
        result = generate_from_stream(iter(chunks))  # 合并成 ChatResult
    else:
        # 正常的非流式路径
        result = self._generate(messages, stop=stop, run_manager=run_manager, **kwargs)

    return result

这意味着:当使用 astream_events 时,即使你调用的是 invoke,底层也会走 _stream 来获得逐 token 的回调。


on_llm_new_token 回调

每个 chunk 都会触发 on_llm_new_token 回调。看 stream 方法中的调用(chat_models.py:557-558):

# chat_models.py:557-558
run_manager.on_llm_new_token(
    cast("str", chunk.message.content),   # token 文本
    chunk=chunk                            # 完整的 ChatGenerationChunk
)

回调的完整生命周期:

on_chat_model_start(serialized, messages)     ← stream/invoke 开始
    │
    ├─ on_llm_new_token(token, chunk=chunk)   ← 每个 chunk(仅流式)
    ├─ on_llm_new_token(token, chunk=chunk)
    ├─ ...
    │
    ├─ on_llm_end(result)                     ← 正常结束
    └─ on_llm_error(error)                    ← 异常结束

小结

本篇拆解了 ChatModel 的流式机制:

  1. stream 方法自己管理回调生命周期,不依赖 Runnable.stream 的默认实现
  2. _should_stream 按优先级判断:子类实现 → disable_streaming → 显式参数 → 模型字段 → 回调处理器
  3. _stream 返回 Iterator[ChatGenerationChunk],每个包含一个 AIMessageChunk
  4. AIMessageChunk.__add__ 逐字段合并:content 拼接、usage 相加、tool_call_chunks 按 index 合并
  5. _generate_with_cache 中,如果检测到流式回调处理器,invoke 也会走 _stream 路径
  6. chunk_position="last" 标记流结束,触发 tool_call_chunks 解析

下一篇预告bind_tools 是怎么把 Pydantic 模型变成 tool schema 的?with_structured_output 又是怎么把 tool call 结果解析成结构化对象的?


LangChain ChatModel 深度解析(三):Tool Calling 与结构化输出

一个能跑的例子

from langchain_core.language_models.fake_chat_models import FakeMessagesListChatModel
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.output_parsers.openai_tools import PydanticToolsParser
from pydantic import BaseModel, Field

# 定义结构化输出 schema
class Weather(BaseModel):
    """天气查询结果。"""
    city: str = Field(description="城市名")
    temperature: float = Field(description="温度(摄氏度)")

# 模拟一个返回 tool call 的模型
fake_response = AIMessage(
    content="",
    tool_calls=[{
        "name": "Weather",
        "args": {"city": "北京", "temperature": 25.0},
        "id": "call_001",
    }]
)

chat = FakeMessagesListChatModel(responses=[fake_response])

# 方式一:手动 bind_tools + PydanticToolsParser
# (FakeMessagesListChatModel 没实现 bind_tools,所以我们直接演示 parser)
parser = PydanticToolsParser(tools=[Weather], first_tool_only=True)

from langchain_core.outputs import ChatGeneration
result = parser.parse_result([ChatGeneration(message=fake_response)])
print(result)              # city='北京' temperature=25.0
print(type(result))        # <class '__main__.Weather'>

# 方式二:直接从 AIMessage 读取 tool_calls
msg = chat.invoke([HumanMessage(content="北京天气")])
print(msg.tool_calls)      # [{'name': 'Weather', 'args': {'city': '北京', 'temperature': 25.0}, 'id': 'call_001', 'type': 'tool_call'}]

Tool Calling 的本质:模型返回的 AIMessage.tool_calls 是一个结构化的字典列表,描述"要调用什么函数、传什么参数"。


bind_tools 的实现

BaseChatModel.bind_toolschat_models.py:1521-1538)在基类中直接抛异常:

# chat_models.py:1521-1538
def bind_tools(
    self,
    tools: Sequence[dict[str, Any] | type | Callable | BaseTool],
    *,
    tool_choice: str | None = None,
    **kwargs: Any,
) -> Runnable[LanguageModelInput, AIMessage]:
    raise NotImplementedError

每个 partner 包必须自己覆盖 bind_tools。看 OpenAI 的实现(openai/base.py:1819):

# openai/base.py:1819-1828(签名)
def bind_tools(
    self,
    tools: Sequence[dict[str, Any] | type | Callable | BaseTool],
    *,
    tool_choice: dict | str | bool | None = None,
    strict: bool | None = None,              # ← OpenAI 特有:严格模式
    parallel_tool_calls: bool | None = None, # ← OpenAI 特有:并行调用
    response_format: _DictOrPydanticClass | None = None,
    **kwargs: Any,
) -> Runnable[LanguageModelInput, AIMessage]:
    # 核心逻辑:把 tools 转成 OpenAI 格式,然后 self.bind(tools=formatted_tools)
    ...

而 Anthropic 的实现(anthropic/chat_models.py:1451):

# anthropic/chat_models.py:1451(签名)
def bind_tools(
    self,
    tools: Sequence[Mapping[str, Any] | type | Callable | BaseTool],
    *,
    tool_choice: dict[str, str] | str | None = None,
    parallel_tool_calls: bool | None = None,
    strict: bool | None = None,
    **kwargs: Any,
) -> Runnable[LanguageModelInput, AIMessage]:
    ...

bind_tools 的本质是 self.bind(**kwargs) —— 即创建一个 RunnableBinding,把 tools 参数固定到后续的每次调用中。


convert_to_openai_tool:统一的 tool schema 转换

无论你传入什么格式的 tool,最终都要转成 OpenAI 格式。这由 convert_to_openai_toolfunction_calling.py:498-557)完成:

# function_calling.py:498-557(简化)
def convert_to_openai_tool(
    tool: Mapping[str, Any] | type[BaseModel] | Callable | BaseTool,
    *,
    strict: bool | None = None,
) -> dict[str, Any]:
    # 1. 已经是 OpenAI Responses API 格式?直接返回
    if isinstance(tool, dict) and tool.get("type") in ("function", "file_search", ...):
        return tool

    # 2. 其他格式 → 先转成 OpenAI function
    oai_function = convert_to_openai_function(tool, strict=strict)

    # 3. 包装成 {"type": "function", "function": {...}}
    return {"type": "function", "function": oai_function}

convert_to_openai_functionfunction_calling.py:361-480)处理各种输入:

# function_calling.py:361-480(简化)
def convert_to_openai_function(function, *, strict=None):
    # Anthropic 格式 {"name": ..., "input_schema": ...}
    if isinstance(function, dict) and "input_schema" in function:
        oai_function = {"name": function["name"], "parameters": function["input_schema"]}

    # Amazon Bedrock 格式 {"toolSpec": {...}}
    elif isinstance(function, dict) and "toolSpec" in function:
        oai_function = {"name": ..., "parameters": ...}

    # 已经是 OpenAI 格式 {"name": ...}
    elif isinstance(function, dict) and "name" in function:
        oai_function = {k: v for k, v in function.items() if k in {"name", "description", "parameters", "strict"}}

    # JSON Schema {"title": ...}
    elif isinstance(function, dict) and "title" in function:
        oai_function = {"name": function.pop("title"), ...}

    # Pydantic BaseModel
    elif isinstance(function, type) and is_basemodel_subclass(function):
        oai_function = _convert_pydantic_to_openai_function(function)

    # TypedDict
    elif is_typeddict(function):
        oai_function = _convert_typed_dict_to_openai_function(function)

    # BaseTool
    elif isinstance(function, BaseTool):
        oai_function = _format_tool_to_openai_function(function)

    # 普通 Python 函数
    elif callable(function):
        oai_function = _convert_python_function_to_openai_function(function)

    else:
        raise ValueError(...)

    return oai_function

输入类型 → OpenAI tool 格式对照表:

输入类型 转换函数 输出格式
Pydantic BaseModel _convert_pydantic_to_openai_function model.model_json_schema() → 清理
Python function _convert_python_function_to_openai_function 从 type hints + docstring 提取
TypedDict _convert_typed_dict_to_openai_function 先转 Pydantic,再转 schema
BaseTool _format_tool_to_openai_function tool.tool_call_schema
dict(Anthropic 格式) 直接转换 input_schemaparameters
dict(OpenAI 格式) 透传 保留 name/description/parameters

Pydantic → OpenAI 的转换中有两个重要的清理步骤:

  1. _rm_titlesfunction_calling.py:88-119):递归删除 JSON schema 中的 title 字段(Pydantic 自动生成的)
  2. dereference_refs:展开 $ref 引用,把嵌套的 $defs 内联

OpenAI vs Anthropic 的 tool 格式差异

字段 OpenAI 格式 Anthropic 格式
外层结构 {"type": "function", "function": {...}} {"name": ..., "input_schema": {...}}
名称 function.name name
描述 function.description description
参数 function.parameters input_schema
严格模式 function.strict: true 不支持
并行调用 parallel_tool_calls: true(请求级) parallel_tool_calls: true

convert_to_openai_tool 统一把所有格式转成 OpenAI 格式。Anthropic 的 bind_tools 再把 OpenAI 格式转回 Anthropic 格式。


with_structured_output:bind_tools + Parser 的组合

with_structured_outputchat_models.py:1540-1721)是最常用的结构化输出方法。它的实现出人意料地简单:

# chat_models.py:1685-1721(简化)
def with_structured_output(self, schema, *, include_raw=False, **kwargs):
    # 0. 检查 bind_tools 是否实现
    if type(self).bind_tools is BaseChatModel.bind_tools:
        raise NotImplementedError("with_structured_output is not implemented for this model.")

    # 1. 绑定工具
    llm = self.bind_tools(
        [schema],
        tool_choice="any",                   # ← 强制模型调用工具
        ls_structured_output_format={...},   # ← LangSmith 追踪信息
    )

    # 2. 选择 Output Parser
    if isinstance(schema, type) and is_basemodel_subclass(schema):
        # Pydantic 类 → PydanticToolsParser
        output_parser = PydanticToolsParser(tools=[schema], first_tool_only=True)
    else:
        # dict/JSON Schema → JsonOutputKeyToolsParser
        key_name = convert_to_openai_tool(schema)["function"]["name"]
        output_parser = JsonOutputKeyToolsParser(key_name=key_name, first_tool_only=True)

    # 3. 组合
    if include_raw:
        # include_raw=True → 返回 {"raw": AIMessage, "parsed": Pydantic, "parsing_error": None}
        parser_assign = RunnablePassthrough.assign(
            parsed=itemgetter("raw") | output_parser,
            parsing_error=lambda _: None
        )
        parser_none = RunnablePassthrough.assign(parsed=lambda _: None)
        parser_with_fallback = parser_assign.with_fallbacks(
            [parser_none], exception_key="parsing_error"
        )
        return RunnableMap(raw=llm) | parser_with_fallback
    else:
        # include_raw=False → 直接返回解析结果
        return llm | output_parser

流程图:

include_raw=False:

    input → llm.bind_tools([schema]) → AIMessage → output_parser → Pydantic/dict
            ╰─ tool_choice="any"                    ╰─ 从 tool_calls 提取

include_raw=True:

    input → RunnableMap(raw=llm) → {"raw": AIMessage}
                                        │
                                        ▼
                                   parser_assign
                                   ├─ parsed = raw | output_parser
                                   └─ parsing_error = None
                                        │
                                        ▼(如果解析失败,fallback)
                                   parser_none
                                   ├─ parsed = None
                                   └─ parsing_error = Exception
                                        │
                                        ▼
                                   {"raw": AIMessage, "parsed": ..., "parsing_error": ...}

Output Parser 体系

PydanticToolsParseropenai_tools.py:306-384

用于把 AIMessage.tool_calls 解析为 Pydantic 对象:

# openai_tools.py:306-384(简化)
class PydanticToolsParser(JsonOutputToolsParser):
    tools: list[TypeBaseModel]               # 可能的 Pydantic 类列表

    def parse_result(self, result, *, partial=False):
        json_results = super().parse_result(result, partial=partial)
        # super() 从 AIMessage.tool_calls 提取 [{"type": "Weather", "args": {...}}]

        # 构建 name → class 映射
        name_dict = {tool.__name__: tool for tool in self.tools}

        pydantic_objects = []
        for res in json_results:
            tool = name_dict[res["type"]]    # 找到对应的 Pydantic 类
            pydantic_objects.append(tool(**res["args"]))  # 实例化

        if self.first_tool_only:
            return pydantic_objects[0]
        return pydantic_objects

JsonOutputKeyToolsParseropenai_tools.py:225-295

用于提取指定工具名的 args 字典:

# openai_tools.py:225-295(简化)
class JsonOutputKeyToolsParser(JsonOutputToolsParser):
    key_name: str                            # 要提取的工具名

    def parse_result(self, result, *, partial=False):
        # 从 tool_calls 中筛选 name == key_name 的
        # 返回 args dict(如果 first_tool_only)或 args dict 列表
        ...

解析链路:

AIMessage.tool_calls
  = [{"name": "Weather", "args": {"city": "北京", ...}, "id": "call_001"}]
                │
                ▼
JsonOutputToolsParser.parse_result()
  → [{"type": "Weather", "args": {"city": "北京", ...}}]   # name → type
                │
        ┌───────┴───────┐
        ▼               ▼
PydanticToolsParser   JsonOutputKeyToolsParser
  → Weather(...)        → {"city": "北京", ...}

小结

本篇拆解了 ChatModel 的 Tool Calling 和结构化输出机制:

  1. bind_tools 在基类抛 NotImplementedError,各 partner 包自行实现,本质是 self.bind(tools=...)
  2. convert_to_openai_tool 统一转换 Pydantic/函数/dict/BaseTool → OpenAI tool schema
  3. Schema 清理_rm_titles 删除 Pydantic 自动生成的 title,dereference_refs 展开 $ref
  4. with_structured_output = bind_tools([schema], tool_choice="any") + OutputParser
  5. 两种 ParserPydanticToolsParser(→ Pydantic 对象)、JsonOutputKeyToolsParser(→ dict)
  6. include_raw=True 模式用 RunnablePassthrough.assign + with_fallbacks 包装

下一篇预告invoke 完整调用栈是什么?缓存和限流怎么工作?OpenAI 和 Anthropic 的 partner 实现有哪些差异?


LangChain ChatModel 深度解析(四):源码架构——缓存、限流与 Partner 集成

一个能跑的例子

from langchain_core.language_models.chat_models import BaseChatModel, SimpleChatModel
from langchain_core.language_models.base import BaseLanguageModel
from langchain_core.runnables import RunnableSerializable, Runnable

# 5 个 assert 验证继承关系
assert issubclass(BaseChatModel, BaseLanguageModel)        # ChatModel IS-A LanguageModel
assert issubclass(BaseLanguageModel, RunnableSerializable)  # LanguageModel IS-A RunnableSerializable
assert issubclass(RunnableSerializable, Runnable)           # RunnableSerializable IS-A Runnable
assert issubclass(SimpleChatModel, BaseChatModel)           # SimpleChatModel IS-A BaseChatModel

# BaseChatModel 的输出类型是 AIMessage
from langchain_core.messages import AnyMessage
model = SimpleChatModel.__dict__  # 不能直接实例化抽象类
# 通过 OutputType 属性验证
from langchain_core.language_models.fake_chat_models import FakeChatModel
assert FakeChatModel().OutputType == AnyMessage

print("所有断言通过!继承链完整。")

完整调用栈

用户调用 model.invoke("hello")
  │
  ▼
BaseChatModel.invoke()                              (chat_models.py:388)
  │  config = ensure_config(config)
  │  input → _convert_input() → PromptValue
  │
  ▼
BaseChatModel.generate_prompt(prompts)              (chat_models.py:1112)
  │  prompts → [p.to_messages() for p in prompts]
  │
  ▼
BaseChatModel.generate(messages)                    (chat_models.py:842)
  │  配置 CallbackManager
  │  on_chat_model_start(serialized, messages)
  │
  ▼
BaseChatModel._generate_with_cache(messages)        (chat_models.py:1136)
  │
  ├─ 1. 缓存查找
  │     llm_cache.lookup(prompt, llm_string)
  │     命中?→ 直接返回 ChatResult
  │
  ├─ 2. 限流
  │     rate_limiter.acquire(blocking=True)
  │
  ├─ 3. 流式判断
  │     _should_stream() == True?
  │     ├─ Yes → _stream(messages) → generate_from_stream()
  │     └─ No  → _generate(messages)              ← 子类实现
  │
  ├─ 4. 后处理
  │     设置 response_metadata, message.id
  │
  └─ 5. 缓存写入
        llm_cache.update(prompt, llm_string, generations)
  │
  ▼
返回 ChatResult → .generations[0].message → AIMessage
  │
  ▼
BaseChatModel.generate()
  │  on_llm_end(result)
  │
  ▼
用户拿到 AIMessage

每一层的职责非常明确:

方法 职责
invoke 输入转换 + config 处理
generate_prompt PromptValue → list[BaseMessage]
generate 回调管理(start/end/error)
_generate_with_cache 缓存 + 限流 + 流式判断
_generate(子类) 实际调用 LLM API

缓存机制

BaseCache 接口(caches.py:32

# caches.py:32-46
class BaseCache(ABC):
    @abstractmethod
    def lookup(self, prompt: str, llm_string: str) -> RETURN_VAL_TYPE | None:
        """缓存查找。返回 None 表示未命中。"""

    @abstractmethod
    def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None:
        """写入缓存。"""

    @abstractmethod
    def clear(self, **kwargs: Any) -> None:
        """清空缓存。"""

缓存的 key 是 (prompt, llm_string) 二元组:

  • prompt:消息列表的 JSON 序列化(dumps(normalized_messages)
  • llm_string:模型配置的序列化(模型名、温度、stop 等参数)

InMemoryCachecaches.py:155

# caches.py:155-272(简化)
class InMemoryCache(BaseCache):
    def __init__(self, *, maxsize: int | None = None):
        self._cache: dict[tuple[str, str], RETURN_VAL_TYPE] = {}  # ← 就是一个字典
        self._maxsize = maxsize

    def lookup(self, prompt, llm_string):
        return self._cache.get((prompt, llm_string), None)

    def update(self, prompt, llm_string, return_val):
        if self._maxsize and len(self._cache) == self._maxsize:
            del self._cache[next(iter(self._cache))]   # ← FIFO 淘汰
        self._cache[prompt, llm_string] = return_val

    def clear(self, **kwargs):
        self._cache = {}

缓存在 _generate_with_cache 中的使用(chat_models.py:1136-1260

# chat_models.py:1136-1260(简化)
def _generate_with_cache(self, messages, stop=None, run_manager=None, **kwargs):
    llm_cache = self.cache if isinstance(self.cache, BaseCache) else get_llm_cache()
    check_cache = self.cache or self.cache is None   # None = 用全局缓存(如果有)

    if check_cache and llm_cache:
        llm_string = self._get_llm_string(stop=stop, **kwargs)
        # 消息 ID 归零,确保相同内容的消息命中同一缓存
        normalized_messages = [
            msg.model_copy(update={"id": None}) if msg.id else msg
            for msg in messages
        ]
        prompt = dumps(normalized_messages)

        # 查找缓存
        cache_val = llm_cache.lookup(prompt, llm_string)
        if isinstance(cache_val, list):
            return ChatResult(generations=self._convert_cached_generations(cache_val))

    # 缓存未命中 → 限流 → 调用模型
    if self.rate_limiter:
        self.rate_limiter.acquire(blocking=True)

    result = self._generate(messages, stop=stop, ...)

    # 写入缓存
    if check_cache and llm_cache:
        llm_cache.update(prompt, llm_string, result.generations)

    return result

缓存流程图:

_generate_with_cache(messages)
  │
  ├─ cache = self.cache or get_llm_cache()
  │
  ├─ check_cache?
  │   ├─ self.cache = True     → 用全局缓存,没有则报错
  │   ├─ self.cache = False    → 不检查
  │   ├─ self.cache = None     → 用全局缓存(如果有)
  │   └─ self.cache = BaseCache → 用这个实例
  │
  ├─ lookup(prompt, llm_string)
  │   ├─ 命中 → 返回缓存结果(跳过限流和 API 调用)
  │   └─ 未命中 → 继续
  │
  ├─ rate_limiter.acquire()     ← 限流在缓存之后!
  │
  ├─ _generate(messages)        ← 调用模型
  │
  └─ update(prompt, llm_string, generations)  ← 写入缓存

重要细节:限流在缓存检查之后。缓存命中时不会消耗限流配额。


限流机制

BaseRateLimiter 接口(rate_limiters.py:11

# rate_limiters.py:11-64
class BaseRateLimiter(ABC):
    @abstractmethod
    def acquire(self, *, blocking: bool = True) -> bool:
        """获取令牌。blocking=True 时阻塞等待。"""

    @abstractmethod
    async def aacquire(self, *, blocking: bool = True) -> bool:
        """异步获取令牌。"""

InMemoryRateLimiter:Token Bucket 算法(rate_limiters.py:67

# rate_limiters.py:67-250(简化)
class InMemoryRateLimiter(BaseRateLimiter):
    def __init__(self, *, requests_per_second=1, check_every_n_seconds=0.1, max_bucket_size=1):
        self.requests_per_second = requests_per_second
        self.available_tokens = 0.0         # 当前可用令牌数
        self.max_bucket_size = max_bucket_size
        self._consume_lock = threading.Lock()
        self.last = None                     # 上次消费时间

    def _consume(self) -> bool:
        with self._consume_lock:
            now = time.monotonic()
            if self.last is None:
                self.last = now              # 首次调用初始化

            elapsed = now - self.last
            if elapsed * self.requests_per_second >= 1:
                self.available_tokens += elapsed * self.requests_per_second
                self.last = now

            # 限制最大突发
            self.available_tokens = min(self.available_tokens, self.max_bucket_size)

            if self.available_tokens >= 1:
                self.available_tokens -= 1
                return True                  # 获取成功
            return False                     # 令牌不足

    def acquire(self, *, blocking=True):
        if not blocking:
            return self._consume()
        while not self._consume():           # 阻塞轮询
            time.sleep(self.check_every_n_seconds)
        return True

Token Bucket 算法图解:

                    ┌──────────────┐
  requests_per_second ──→│   令 牌 桶     │←── max_bucket_size(上限)
  (每秒补充速率)        │              │
                    │  ○ ○ ○ ○ ○   │    available_tokens(当前令牌数)
                    │              │
                    └──────┬───────┘
                           │
                      _consume()
                      available >= 1?
                     ╱              ╲
                  Yes               No
              tokens -= 1      blocking?
              return True      ╱        ╲
                           Yes          No
                         sleep()    return False
                         retry

使用示例:

from langchain_core.rate_limiters import InMemoryRateLimiter

rate_limiter = InMemoryRateLimiter(
    requests_per_second=0.5,        # 每 2 秒 1 次请求
    check_every_n_seconds=0.1,      # 每 100ms 检查一次
    max_bucket_size=2,              # 最多攒 2 个令牌(允许小突发)
)

# 传给 ChatModel
from langchain_core.language_models.fake_chat_models import FakeChatModel
model = FakeChatModel(rate_limiter=rate_limiter)

import time
for i in range(3):
    start = time.time()
    model.invoke("hello")
    print(f"请求 {i}: {time.time() - start:.1f}s")
# 请求 0: 0.0s(首次立即通过 — 初始化不给令牌,但首次 elapsed 够用)
# 请求 1: 2.0s(等待 2 秒)
# 请求 2: 2.0s(等待 2 秒)

回调层级

ChatModel 的回调遵循统一的生命周期:

on_chat_model_start(serialized, messages, ...)
  │
  │  ┌─────── 流式路径 ───────┐
  │  │                         │
  │  │  on_llm_new_token(token, chunk=chunk)   ← 每个 chunk
  │  │  on_llm_new_token(token, chunk=chunk)
  │  │  ...                    │
  │  └─────────────────────────┘
  │
  ├─ on_llm_end(result)        ← 成功结束
  └─ on_llm_error(error)       ← 异常结束

回调在 generate 方法中配置(chat_models.py:903-923):

# chat_models.py:903-923
callback_manager = CallbackManager.configure(
    callbacks,        # 用户传入的回调
    self.callbacks,   # 模型实例上的回调
    self.verbose,
    tags,
    self.tags,
    inheritable_metadata,
    self.metadata,
)
run_managers = callback_manager.on_chat_model_start(
    self._serialized,
    messages_to_trace,
    invocation_params=params,
    options=options,
    name=run_name,
    run_id=run_id,
    batch_size=len(messages),
)

Partner 集成模式对比

OpenAI 和 Anthropic 是两个最重要的 partner 包。它们的核心差异:

方面 OpenAI(BaseChatOpenAI Anthropic(ChatAnthropic
文件 openai/base.py anthropic/chat_models.py
总行数 4609 行 2152 行
类定义 第 488 行 第 733 行
_generate 第 1338 行 第 1394 行
_stream 第 1267 行 第 1264 行
bind_tools 第 1819 行 第 1451 行
API 客户端 openai.OpenAI anthropic.Anthropic
Tool 格式 OpenAI 原生 Anthropic 原生(内部转换)

OpenAI 实现特点

# openai/base.py:488
class BaseChatOpenAI(BaseChatModel):
    client: Any                              # openai.OpenAI
    async_client: Any                        # openai.AsyncOpenAI
    model_name: str = Field(alias="model")   # gpt-4o, gpt-4o-mini 等
    temperature: float | None = None
    stream_usage: bool | None = None         # 流式时是否返回 usage

OpenAI 的 _generatebase.py:1338)有两条路径:

  1. response_format 存在 → 用 client.beta.chat.completions.parse()
  2. 其他情况 → 用 client.chat.completions.create()

OpenAI 的 bind_toolsbase.py:1819)有额外参数:

  • strict: bool —— JSON Schema 严格模式
  • parallel_tool_calls: bool —— 允许一次调用多个工具
  • response_format —— 结构化输出格式

Anthropic 实现特点

# anthropic/chat_models.py:733
class ChatAnthropic(BaseChatModel):
    model: str = Field(alias="model_name")   # claude-sonnet-4-5-20250929 等
    max_tokens: int = 1024                   # Anthropic 必须指定
    temperature: float | None = None
    default_headers: dict[str, str] | None = None

Anthropic 的 tool 格式不同于 OpenAI:

# OpenAI 格式
{"type": "function", "function": {"name": "weather", "parameters": {...}}}

# Anthropic 格式
{"name": "weather", "input_schema": {...}}

ChatAnthropic.bind_tools 内部会把 tools 转成 Anthropic 格式。


_get_ls_params:LangSmith 追踪参数提取

_get_ls_paramschat_models.py:788-826)为 LangSmith 追踪提取标准参数:

# chat_models.py:788-826(简化)
def _get_ls_params(self, stop=None, **kwargs) -> LangSmithParams:
    # 从类名推断 provider(ChatOpenAI → openai, ChatAnthropic → anthropic)
    default_provider = self.__class__.__name__
    if default_provider.startswith("Chat"):
        default_provider = default_provider[4:].lower()

    ls_params = LangSmithParams(ls_provider=default_provider, ls_model_type="chat")

    # 从 kwargs 或 self 提取 model, temperature, max_tokens, stop
    if hasattr(self, "model"):
        ls_params["ls_model_name"] = self.model
    if hasattr(self, "temperature"):
        ls_params["ls_temperature"] = self.temperature
    if hasattr(self, "max_tokens"):
        ls_params["ls_max_tokens"] = self.max_tokens
    if stop:
        ls_params["ls_stop"] = stop

    return ls_params

这些参数会传入 inheritable_metadata,最终出现在 LangSmith 追踪中。


自定义 ChatModel 实现清单

基于全部 4 篇的分析,这是实现一个生产级 ChatModel 的完整清单:

必须实现(最小合约)

方法/属性 签名 说明
_generate (messages, stop, run_manager, **kwargs) → ChatResult 核心生成逻辑
_llm_type @property → str 模型类型标识

建议实现(提升体验)

方法 签名 说明
_stream (messages, stop, run_manager, **kwargs) → Iterator[ChatGenerationChunk] 流式输出
_agenerate async (messages, stop, run_manager, **kwargs) → ChatResult 原生异步
_astream async (messages, stop, run_manager, **kwargs) → AsyncIterator[ChatGenerationChunk] 异步流式
bind_tools (tools, tool_choice, **kwargs) → Runnable 工具绑定
_get_ls_params (stop, **kwargs) → LangSmithParams LangSmith 追踪

可选实现(高级功能)

方法/属性 说明
with_structured_output 覆盖基类默认实现(如支持 json_schema method)
_identifying_params 模型参数(用于缓存 key、追踪)
_get_llm_string 自定义缓存 key 生成
profile: ModelProfile 声明模型能力(上下文窗口、支持的功能等)

最小实现模板

from typing import Any, Iterator
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import AIMessage, AIMessageChunk, BaseMessage
from langchain_core.outputs import ChatResult, ChatGeneration, ChatGenerationChunk
from langchain_core.callbacks import CallbackManagerForLLMRun

class MyProductionChat(BaseChatModel):
    """自定义 ChatModel 模板。"""

    model: str = "my-model-v1"
    temperature: float = 0.7
    api_key: str = ""

    def _generate(
        self,
        messages: list[BaseMessage],
        stop: list[str] | None = None,
        run_manager: CallbackManagerForLLMRun | None = None,
        **kwargs: Any,
    ) -> ChatResult:
        # TODO: 调用你的 LLM API
        response_text = self._call_api(messages, stop, **kwargs)
        return ChatResult(
            generations=[ChatGeneration(message=AIMessage(content=response_text))]
        )

    def _stream(
        self,
        messages: list[BaseMessage],
        stop: list[str] | None = None,
        run_manager: CallbackManagerForLLMRun | None = None,
        **kwargs: Any,
    ) -> Iterator[ChatGenerationChunk]:
        # TODO: 调用你的流式 API
        for token in self._call_api_stream(messages, stop, **kwargs):
            chunk = ChatGenerationChunk(
                message=AIMessageChunk(content=token)
            )
            if run_manager:
                run_manager.on_llm_new_token(token, chunk=chunk)
            yield chunk

    @property
    def _llm_type(self) -> str:
        return "my-production-chat"

    def _call_api(self, messages, stop, **kwargs) -> str:
        """实际 API 调用(占位)。"""
        return "This is a placeholder response."

    def _call_api_stream(self, messages, stop, **kwargs):
        """实际流式 API 调用(占位)。"""
        for word in "This is a streaming response".split():
            yield word + " "

小结

本篇拆解了 ChatModel 的底层架构:

  1. 完整调用栈invoke → generate_prompt → generate → _generate_with_cache → _generate
  2. 缓存BaseCache 接口,key = (prompt_json, llm_string),在限流之前检查
  3. 限流InMemoryRateLimiter 基于 Token Bucket 算法,线程安全
  4. 回调on_chat_model_start → on_llm_new_token → on_llm_end/on_llm_error
  5. Partner 差异:OpenAI 4609 行 vs Anthropic 2152 行,tool 格式不同但通过 convert_to_openai_tool 统一
  6. LangSmith 追踪_get_ls_params 从类名和字段自动提取 provider/model/temperature

4 篇系列回顾

主题 核心源码
第 1 篇 5 分钟上手 _convert_input、继承链、_generate/_llm_type 最小合约
第 2 篇 流式原理 stream_should_streamAIMessageChunk.__add__generate_from_stream
第 3 篇 Tool Calling bind_toolsconvert_to_openai_toolwith_structured_outputPydanticToolsParser
第 4 篇 源码架构 _generate_with_cacheInMemoryCacheInMemoryRateLimiter、Partner 对比

ChatModel 是连接消息系统和 Runnable 体系的核心枢纽:

  • 输入端:接收 list[BaseMessage](消息系统,见"消息深度解析"系列)
  • 自身:作为 Runnable 子类,拥有 invoke/stream/batch 全套能力(见"Runnables 深度解析"系列)
  • 输出端:返回 AIMessage,可能包含 tool_calls,驱动 Agent 循环
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐