1 概述

1.1 LangChain 是什么?

在人工智能领域,Agent(智能体) 是指一个能够感知环境、自主决策、执行动作以达成目标的软件实体。LangChain 是一个用于构建基于大语言模型(LLM)应用的开发框架,原生支持 Agent 构建。LangChain 不提供自己的 LLM,而是帮助开发者完成以下工作。

  • 连接各种 LLM(OpenAI、Anthropic、Ollama、本地模型等)
  • 管理上下文(如对话历史、文档检索)
  • 编排复杂工作流(链式调用、条件分支、循环)
  • 集成外部工具(搜索、数据库、API)
  • 构建可观察、可测试、可部署的应用

LangChain 提供了包括模型、提示词、工具、能力在内的很多组件,这些组件通过管道操作符可以组装成流水线,完成复杂的工作。

需要注意的是,LangChain 不是“万能胶水”,过度依赖可能导致系统复杂、调试困难,应根据需求权衡是否使用。

1.2 快速体验

LangChain 采用分层架构,由若干个互不依赖、可独立安装的包组成,因此 LangChain 有别于传统的 python 模块,更像是一个生态圈。倘若使用 pip 命令像下面这样安装的话,只会安装 langchain、langchain-core、langgraph 和 langsmith 等核心包。

pip install langchain

若要连接 LLM,还需要安装 langchain-community 包,或者安装针对特定模型的包,比如 langchain-openai(适用于 OpenAI 模型)或 langchain-dashscope(适用于Qwen 模型,经测试暂不兼容),这些安装包均有 LangChain 官方维护。我使用 Qwen 模型,所以选择这样安装:

pip install langchain langchain-community

如果你使用 ChatGPT 模型,建议这样安装:

pip install langchain langchain-community langchain-openai

安装过程可能会遇到一些小问题,基本上都属于版本兼容性问题,通常只要升级或降级就可以解决。我遇到的问题是已安装的torch版本过高,我不想降级(实际上我尝试过降级,但仍未解决问题),求助AI,告诉我问题的根源在于 transformers 版本太旧了,于是升级transformers,问题迎刃而解。

pip install --upgrade transformers

安装完成后,检查一下 langchain 版本。如果你的版本号和我的相差太多,我无法保证本文提供的代码能够正确运行在你的电脑上。

>>> import langchain
>>> langchain.__version__
    '1.2.10'

下面这段代码展示了一个最简单的 LangChain 应用。

#!/usr/bin/env python

import os
from dotenv import load_dotenv

from langchain_community.chat_models import ChatTongyi
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 0. 加载环境变量文件.env
load_dotenv()

# 1. 初始化模型(使用DashScope的Qwen模型)
llm = ChatTongyi(
    model_name = "qwen-max",
    dashscope_api_key = os.getenv("DASHSCOPE_API_KEY"),
    temperature = 0.7,
    max_retries = 3
)

# 2. 定义 Prompt
prompt = ChatPromptTemplate.from_template("请讲一个关于{topic}的笑话")

# 3. 组合成链(LCEL - LangChain Expression Language)
chain = prompt | llm | StrOutputParser() # 使用管道操作符

# 4. 运行
response = chain.invoke({"topic": "程序员"})

# 5. 显示运行结果
print(response)

StrOutputParser 也是核心组件,其功能是输出解析。这段代码运行前,需要先创建一个名为.env的文件,保存 API Key,参考格式如下。如果不担心 API Key 泄露,亦可直接将其写入代码中。

DASHSCOPE_API_KEY=your_api_key

2 设计理念

2.1 成长历史:从 Chain 到 LCEL

在快速演进的过程中,LangChain 的设计理念也发生了剧烈改变。初期的 LangChain 使用 chain 的概念(正所谓人如其名),Chain 是一个抽象基类(BaseChain),用户通常通过继承或使用内置子类(如 LLMChain, SequentialChain, RetrievalQA 等)来构建应用。

示例(旧式 Chain):

from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI

prompt = PromptTemplate.from_template("解释 {concept} 是什么")
llm = OpenAI()
chain = LLMChain(llm=llm, prompt=prompt)

result = chain.run(concept="量子纠缠")

这种写法:

  • 依赖具体的 Chain 子类
  • 接口不统一(有的用 .run(),有的用 .invoke())
  • 组合能力弱(难以嵌套、分支、流式处理)
  • 扩展性差(需继承或包装)

自 V0.2 开始,LCEL 全面取代 Chain 成为 LangChain 的核心理念。LCEL(LangChain Expression Language)是一套基于 Runnable 抽象和操作符(如 |)的声明式 DSL(领域特定语言),用于以组合式、函数式风格构建 LLM 应用流水线。其核心思想是:万物皆为 Runnable,一切皆可组合。

和 Chain 相比,LCEL 的具有以下优势:

  1. 真正的端到端流式(Streaming)
for chunk in (prompt | llm | output_parser).stream({"concept": "AI"}):
    print(chunk, end="", flush=True) 
  1. 天然支持异步
await (prompt | llm).ainvoke({"concept": "blockchain"})  
  1. 灵活的并行与分支
from langchain_core.runnables import RunnablePassthrough, RunnableParallel

# 并行生成摘要和关键词
map_chain = RunnableParallel(
    summary=prompt1 | llm | parser1,
    keywords=prompt2 | llm | parser2
)

# 输入透传 + 并行处理
full_chain = {"input": RunnablePassthrough()} | map_chain
  1. 中间状态可访问(用于 RAG 等场景)
# 在 RAG 中保留检索结果
retriever = ...
prompt = ...

# 将检索结果和原始问题同时送入 prompt
rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
) 
  1. 与 LangSmith 深度集成

    LCEL 链在 LangSmith 中自动记录每一步的输入/输出、耗时、token 使用等,便于调试和监控。

2.2 统一接口:Runnable

LangChain 的所有组件都可以像乐高一样组合,都实现 Runnable 接口。

Runnable 的本质是一个抽象基类(ABC),定义在 langchain_core.runnables 模块中,它为所有可“运行”的组件提供统一的接口规范,包括:

  • .invoke(input, config=None):同步单次调用
  • .ainvoke(input, config=None):异步单次调用
  • .batch(inputs, config=None):批量同步调用
  • .abatch(inputs, config=None):批量异步调用
  • .stream(input, config=None):流式输出(适用于支持流的组件,如 LLM)
  • .astream(input, config=None):异步流式输出

所有 LangChain 组件(如 LLM, ChatModel, Retriever, Tool, Chain, PromptTemplate 等)都继承自 Runnable 或被包装为 Runnable。

LangChain 提供了装饰器函数 RunnableLambda 用于将普通函数包装为 Runnable 对象。

2.3 链式操作:组合 Runnable 成流水线

管道操作符(|)可以将多个 Runnable 组合成流水线。

  • a | b 表示:将 a 的输出作为 b 的输入
  • 返回一个新的 Runnable(通常是 RunnableSequence),仍可继续用 | 连接
  • 自动处理输入/输出适配,如果类型不匹配,可通过 .with_types(…) 或自定义 Runnable 进行适配
  • 支持任意长度的链式调用,即使异步调用(ainvoke)也会遵循前后顺序关系
  • 对于 stream 模式,只有最后一个组件支持流时才能流式输出(除非中间组件也支持并正确传递流)

LangChain 目前不原生支持并行分支(如 a | [b, c] | d),但可通过自定义 Runnable(如 RunnableParallel)实现。下面的代码实现了输入同时送入两个分支,输出为 dict: {“branch1”: …, “branch2”: …}

from langchain_core.runnables import RunnableParallel

parallel = RunnableParallel(
    branch1=prompt1 | llm1,
    branch2=prompt2 | llm2
)

2.4 模块解耦:分层架构

LangChain 采用清晰的分层结构,建议导入时仅导入需要的包,避免引入冗余依赖。

包名 作用
langchain 主包,组合 core + community + 常用接口,提供高层 API
langchain-core 核心抽象(Runnable、Message、BaseLanguageModel、Document 等)
langchain-community 社区维护的集成(向量库、文档加载器、部分 LLM)
langsmith LangChain 官方调试/监控平台客户端(非必需,但推荐)
langgraph 构建状态机/图状工作流(用于 Agent、多轮对话)
langchain-openai 接入 OpenAI(GPT-4/3.5)、Azure OpenAI
langchain-anthropic 接入 Anthropic Claude 系列
langchain-google-genai 接入 Google Gemini
langchain-ollama 本地运行 Ollama 支持的开源模型(如 Llama3、Qwen)
langchain-dashscope 接入阿里 Qwen

3 核心组件

3.1 Models(模型)

模型分为纯文本模型和对话模型两类。

  • LLMs:纯文本输入输出(逐渐被 Chat Models 取代)
  • Chat Models:以对话消息(HumanMessage, AIMessage)为输入输出(推荐)

推荐使用 ChatOpenAI, ChatAnthropic, ChatOllama, ChatTongyi 等。

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_anthropic import ChatAnthropic
from langchain_community.chat_models import ChatTongyi
from langchain_dashscope import ChatTongyi
from langchain_ollama import ChatOllama

前面已经提到,因 langchain-dashscope 模块存在兼容性问题,千问模型暂时只能像下面这样使用。

from langchain_community.chat_models import ChatTongyi

llm = ChatTongyi(
    model_name="qwen-max",
    dashscope_api_key="your_api_key",
    temperature=0.7,
    max_retries=3
)

3.2 Messages(消息)

langchain_core.messages 是 LangChain 核心库中用于表示对话消息的标准数据结构模块。它定义了一套统一的消息类型,便于在不同大语言模型(LLM)、聊天模型(Chat Models)和工具调用之间传递上下文。这些消息类型共同构成了“消息历史”(message history)或“聊天记录”,是构建多轮对话、Agent、RAG 等应用的基础。以下是 langchain_core.messages 中常见的几种消息类型及其用途。

3.2.1 HumanMessage

  • 含义:代表由人类用户(或外部输入)发送的消息。
  • 典型用途:用户提问、指令、输入文本。
  • 字段:
    • content: str | List[Union[str, dict]]:消息内容,可以是纯文本,也可以是多模态内容(如图片、音频等,以字典形式表示)。
    • name: Optional[str]:可选的发送者名称(用于多用户场景)。
  • 示例:
from langchain_core.messages import HumanMessage

msg = HumanMessage(content="你好,请介绍一下 LangChain。")

3.2.2 AIMessage

  • 含义:代表由 AI 模型(如 LLM 或 Chat Model)生成的回复。
  • 典型用途:模型输出、回答、推理结果。
  • 字段:
    • content: str | List[Union[str, dict]]:消息内容,可以是纯文本,也可以是多模态内容(如图片、音频等,以字典形式表示)。
    • tool_calls: Optional[List[ToolCall]]:如果模型调用了工具(function calling),会在此列出调用详情(包括工具名、参数等)。
  • 示例:
from langchain_core.messages import AIMessage

# 无工具调用时的样例
msg = AIMessage(content="LangChain 是一个用于构建 LLM 应用的框架。")

# 包含工具调用时的样例
msg = AIMessage(
    content="",
    tool_calls=[{"name": "search", "args": {"query": "LangChain"}, "id": "call_123"}]
)

3.2.3 SystemMessage

  • 含义:代表系统级别的指令或上下文,用于引导模型行为。
  • 典型用途:设置角色(如“你是一个 helpful assistant”)、提供背景知识、约束输出格式等。
  • 注意:并非所有模型都支持 SystemMessage(例如某些开源模型可能忽略它,或需拼接到 prompt 开头)。
  • 示例:
from langchain_core.messages import SystemMessage

msg = SystemMessage(content="你是一个专业的 Python 编程助手。")

3.2.4 ToolMessage

  • 含义:代表工具(函数)执行后的结果,作为对 AIMessage 中 tool_calls 的响应。
  • 典型用途:在函数调用(Function Calling)流程中,将工具执行结果返回给模型,供其继续推理。
  • 字段:
    • content: str :工具执行的返回结果(通常为 JSON 字符串或自然语言描述)。
    • tool_call_id: str:必须与触发该工具调用的 AIMessage.tool_calls[i].id 匹配。
  • 示例:
from langchain_core.messages import ToolMessage

msg = ToolMessage(
    content='{"result": "LangChain 是由 Harrison Chase 创建的..."}',
    tool_call_id="call_123"
)

3.2.5 典型对话流程示例(含工具调用)

from langchain_core.messages import HumanMessage, AIMessage, ToolMessage

# 用户提问
user_msg = HumanMessage(content="今天北京天气如何?")

# 模型决定调用工具
ai_msg = AIMessage(
    content="",
    tool_calls=[{"name": "get_weather", "args": {"location": "北京"}, "id": "tool_001"}]
)

# 工具执行后返回结果
tool_msg = ToolMessage(content="晴,气温 5°C", tool_call_id="tool_001")

# 模型基于工具结果生成最终回答
final_ai_msg = AIMessage(content="今天北京天气晴,气温 5°C。")

3.3 Prompts(提示)

Chat Prompts(聊天提示)本质是 List[BaseMessage],可直接传给支持聊天接口的 LLM(如 ChatOpenAI),或者作为作为 LangChain 表达式链(LCEL)的一部分。

3.3.1 ChatPromptTemplate(聊天提示模版)

ChatPromptTemplate 是 LangChain 中用于构建聊天式提示(chat prompts)的核心组件之一,属于 langchain_core.prompts 模块。它特别适用于与支持对话格式的模型(如 OpenAI 的 GPT 系列、Anthropic Claude、Llama 3 聊天版等)交互。

  • ChatPromptTemplate 用于构造由多个消息(Message)组成的提示。
  • 每条消息有角色(如 system、human、ai、tool 等)和内容。
  • 支持变量插值(通过 {variable} 占位符),便于动态生成提示。

用法示例:

from langchain_core.prompts import ChatPromptTemplate

# 创建模板
template = ChatPromptTemplate.from_messages([
    ("system", "你是一个乐于助人的助手。"),
    ("human", "你好!我的名字是 {name}。"),
    ("ai", "你好,{name}!有什么我可以帮你的吗?"),
    ("human", "{user_input}")
])

# 格式化提示
prompt = template.format_messages(name="小明", user_input="今天天气怎么样?")

有些情况下,可能需要部分填充变量:

partial_template = template.partial(name="小明")

3.3.2 MessagesPlaceholder(聊天消息占位)

在 ChatPromptTemplate 中占位,用于动态插入消息列表(如历史对话),常用于带记忆的聊天机器人。

from langchain_core.prompts import MessagesPlaceholder

template = ChatPromptTemplate.from_messages([
    ("system", "你是一个助手。"),
    MessagesPlaceholder(variable_name="chat_history"),
    ("human", "{input}")
])
# chat_history 是 List[BaseMessage]

3.4 Tools(工具)

定义工具函数时,需要使用 langchain_core.tools 中的装饰器函数 tool 将其封装起来。将所有工具函数集中到一个数组(通常该数组命名为tools)中,并绑定到 llm 上。若 LLM 决定调用工具时,它不会在 content 中生成文本回复,而是在 tool_calls 中生成工具调用信息。这表示 LLM 认为需要调用工具来获取信息,获得工具执行结果后再生成最终回复。

from langchain_core.tools import tool

@tool
def multiply(a: int, b: int) -> int:
    """将两个整数相乘"""
    return a * b

# 3. 将工具函数集绑定到模型上 
tools = [multiply]
llm_with_tools = llm.bind_tools(tools)

3.5 Output Parsers(输出解析)

在 LangChain 中,langchain_core.output_parsers 模块提供了多种输出解析器(Output Parsers),用于将大语言模型(LLM)的原始文本输出结构化为程序可处理的数据格式。这些解析器可直接绑定到 Prompt 或 Chain 上。

3.5.1 StrOutputParser

最简单的解析器,直接返回 LLM 输出的原始字符串(去除前后空白)。

from langchain_core.output_parsers import StrOutputParser

parser = StrOutputParser()
raw_output = "  The answer is 42.  \n"
parsed = parser.parse(raw_output)
print(repr(parsed))  # 'The answer is 42.'

上面代码中 StrOutputParser 的用法仅是为了演示,实际应用时通常使用下面的写法。

from langchain_core.output_parsers import StrOutputParser

chain = prompt | llm | StrOutputParser()

3.5.2 JsonOutputParser

将 LLM 输出解析为 JSON 对象(Python 字典)。要求 LLM 的输出是合法的 JSON 格式(通常通过提示词强制模型输出 JSON)。如果 LLM 输出不是合法 JSON(如包含额外解释文字),会抛出 JSONDecodeError。因此通常配合提示模板使用,例如:“请仅输出 JSON,不要任何其他文字。”

from langchain_core.output_parsers import JsonOutputParser

parser = JsonOutputParser()
raw_output = '{"name": "Bob", "score": 95}'
parsed = parser.parse(raw_output)
print(parsed)  # {'name': 'Bob', 'score': 95}
print(type(parsed))  # <class 'dict'>

3.5.3 PydanticOutputParser

将 LLM 输出解析为 Pydantic 模型实例,兼具类型校验与结构化能力。比 JsonOutputParser 更强大,支持字段验证、默认值、类型转换等。在这种应用场景下,要求 LLM 的原始输出通常是 JSON 格式字符串,但也可以是“看起来像 JSON”的文本(部分容错)。

from pydantic import BaseModel, Field
from langchain_core.output_parsers import PydanticOutputParser

class Article(BaseModel):
    title: str = Field(description="文章标题")
    author: str = Field(description="作者名")
    views: int = Field(description="阅读量")

parser = PydanticOutputParser(pydantic_object=Article)

raw_output = '{"title": "Using LangChain", "author": "Alice", "views": "2500"}'
# 注意 views 是字符串,但会被转为 int

article = parser.parse(raw_output)
print(article)
# Article(title='Using LangChain', author='Alice', views=2500)

print(type(article))        # <class '__main__.Article'>
print(article.views)        # 2500 (int)
print(article.model_dump()) # {'title': 'Using LangChain', 'author': 'Alice', 'views': 2500}

3.6 Retrieval(检索)

用于 RAG(检索增强生成):

  • DocumentLoader:加载 PDF、网页、数据库等
  • TextSplitter:切分长文本
  • Embeddings:生成向量(如 OpenAIEmbeddings)
  • VectorStore:存储与检索(如 Chroma, FAISS, Pinecone)
  • Retriever:封装检索逻辑

典型 RAG 链:

retriever = vectorstore.as_retriever()
context = retriever.invoke("用户问题")
# 将 context 注入 prompt

3.7 Chains(链)

最新的 LangChain 使用 LCEL(LangChain Expression Language) 直接组合 Runnable,而非预定义 Chain 类。

例如,手动构建 RAG 链:

from operator import itemgetter

rag_chain = (
    {"context": retriever, "question": itemgetter("question")}
    | prompt
    | llm
    | StrOutputParser()
)

3.8 Agents(智能体)

Agent = LLM + Tools + ReAct/Plan-and-Execute 等推理策略。

当前推荐方式:使用 LangGraph 构建自定义 Agent 循环,因为:

  • 更透明(可观察每一步)
  • 支持中断、人工审核、循环控制
  • 避免旧版 AgentExecutor 的黑盒问题

基本结构:

from langgraph.graph import StateGraph, END

# 定义状态 schema
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]

# 定义节点(如 call_model, should_continue)
# 构建图:add_node, add_conditional_edges, set_entry_point
# compile() 得到可运行的 graph

3.9 Memory(记忆)

  • 对话历史通过 messages 列表管理
  • 在 LangGraph 中,状态天然包含历史
  • 可使用 MessageHistory 类配合 RunnableWithMessageHistory 实现会话隔离

4 基本技能

4.1 RAG 入门 —— 检索增强生成

RAG 是 Retrieval-Augmented Generation 的缩写,中文通常译为 “检索增强生成”。它是一种结合信息检索(Retrieval) 与大语言模型生成(Generation) 的技术架构,旨在让 LLM(大语言模型)在回答问题时,能够基于外部知识库中的真实、最新、特定领域的内容进行生成,从而克服 LLM 本身固有的局限性。

#!/usr/bin/env python

"""
使用 LangChain LCEL 管道操作符构建 RAG 应用
解决 Embeddings 兼容性问题,同时保留 LCEL 的优雅链式调用
"""

import os
from operator import itemgetter
from typing import List, Dict, Any
from dotenv import load_dotenv
import numpy as np

from langchain_community.chat_models import ChatTongyi
from langchain_core.embeddings import Embeddings
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda

import dashscope
from dashscope import TextEmbedding

# 加载环境变量
load_dotenv()
DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY")
dashscope.api_key = DASHSCOPE_API_KEY

template = """根据以下上下文回答问题:

上下文:
{context}

问题:{question}

请用简洁、准确的语言回答。"""


class DashScopeEmbeddings(Embeddings):
    """
    继承 langchain_core.embeddings.Embeddings
    但内部使用 DashScope SDK 实现,解决兼容性问题
    """

    def __init__(self, model: str = "text-embedding-v2"):
        self.model = model

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """批量嵌入文档"""
        if not texts:
            return []
        
        # 确保输入是字符串
        texts = [str(t) for t in texts]
        
        # DashScope 支持批量输入,最多 25 条
        batch_size = 25
        all_embeddings = []
        
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]
            resp = TextEmbedding.call(model=self.model, input=batch)
            
            if resp.status_code != 200:
                raise ValueError(f"Embedding failed: {resp.code} - {resp.message}")
            
            # 按原始顺序提取 embeddings
            batch_embeddings = resp.output["embeddings"]
            sorted_embeddings = sorted(batch_embeddings, key=lambda x: x["text_index"])
            all_embeddings.extend([item["embedding"] for item in sorted_embeddings])
        
        return all_embeddings

    def embed_query(self, text: str) -> List[float]:
        """嵌入单个查询"""
        return self.embed_documents([text])[0]


class SimpleVectorStore:
    """
    简单的向量存储,使用余弦相似度进行检索
    兼容 LangChain 的 Retriever 接口
    """

    def __init__(self, embeddings: DashScopeEmbeddings):
        self.embeddings = embeddings
        self.documents: List[Document] = []
        self.vectors: List[np.ndarray] = []

    def add_documents(self, documents: List[Document]):
        """添加文档到向量存储"""
        texts = [doc.page_content for doc in documents]
        vectors = self.embeddings.embed_documents(texts)
        
        self.documents.extend(documents)
        self.vectors.extend([np.array(v) for v in vectors])

    def similarity_search(self, query: str, k: int = 3) -> List[Document]:
        """基于相似度搜索返回最相关的文档"""
        if not self.documents:
            return []
        
        # 获取查询向量
        query_vector = np.array(self.embeddings.embed_query(query))
        
        # 计算余弦相似度
        similarities = []
        for vec in self.vectors:
            dot_product = np.dot(query_vector, vec)
            norm_a = np.linalg.norm(query_vector)
            norm_b = np.linalg.norm(vec)
            similarity = dot_product / (norm_a * norm_b) if norm_a > 0 and norm_b > 0 else 0
            similarities.append(similarity)
        
        # 获取 top-k 最相似的文档
        top_indices = np.argsort(similarities)[::-1][:k]
        return [self.documents[i] for i in top_indices]

    def as_retriever(self):
        """返回一个可调用的检索函数,用于 LCEL 管道"""
        return RunnableLambda(self._retrieve)

    def _retrieve(self, inputs: Dict[str, Any]) -> str:
        """检索函数,适配 LCEL 管道"""
        # 支持两种输入:直接字符串或包含 question 的字典
        if isinstance(inputs, str):
            query = inputs
            k = 3
        else:
            query = inputs.get("question", inputs.get("query", ""))
            k = inputs.get("k", 3)
        
        docs = self.similarity_search(query, k=k)
        # 返回格式化的上下文文本
        return "\n\n".join([doc.page_content for doc in docs])


def main():
    # 1. 创建文档
    docs = [
        Document(page_content="苹果公司由史蒂夫·乔布斯、史蒂夫·沃兹尼亚克和罗纳德·韦恩于 1976 年 4 月 1 日创立。"),
        Document(page_content="香蕉是一种富含钾的热带水果,每 100 克香蕉约含 358 毫克钾。"),
        Document(page_content="Python 是一种高级编程语言,由吉多·范罗苏姆于 1991 年首次发布。"),
        Document(page_content="人工智能(AI)是计算机科学的一个分支,致力于创建能够执行需要人类智能的任务的系统。"),
    ]

    # 2. 初始化嵌入模型和向量存储
    print("正在初始化嵌入模型...")
    embeddings = DashScopeEmbeddings(model="text-embedding-v2")
    
    print("正在构建向量存储...")
    vectorstore = SimpleVectorStore(embeddings)
    vectorstore.add_documents(docs)

    # 3. 获取检索器(RunnableLambda)
    retriever = vectorstore.as_retriever()

    # 4. 初始化 ChatTongyi 模型
    model = ChatTongyi(
        model_name="qwen-max",
        dashscope_api_key=DASHSCOPE_API_KEY,
        temperature=0.7,
        max_retries=3
    )

    # 5. 定义 Prompt
    prompt = ChatPromptTemplate.from_template(template)

    # 6. 使用 LCEL 管道操作符构建 RAG 链
    rag_chain = (
        {
            "context": retriever,  # 检索器返回格式化的上下文
            "question": itemgetter("question")  # 从输入中提取问题
        }
        | prompt
        | model
        | StrOutputParser()
    )

    # 7. 测试
    print("\n" + "=" * 50)
    print("测试 1: 谁创立了苹果公司?")
    print("=" * 50)
    answer = rag_chain.invoke({"question": "谁创立了苹果公司?"})
    print(f"答案:{answer}")

    print("\n" + "=" * 50)
    print("测试 2: 香蕉富含什么矿物质?")
    print("=" * 50)
    answer = rag_chain.invoke({"question": "香蕉富含什么矿物质?"})
    print(f"答案:{answer}")

    print("\n" + "=" * 50)
    print("测试 3: Python 是谁创建的?")
    print("=" * 50)
    answer = rag_chain.invoke({"question": "Python 是谁创建的?"})
    print(f"答案:{answer}")


if __name__ == "__main__":
    main()

4.2 结构化输出 —— Pydantic 与 JSON 解析

通过将 LLM 输出解析为 Pydantic 模型实例,演示如何使用输出解析器。

#!/usr/bin/env python

import os
from dotenv import load_dotenv
from pydantic import BaseModel, Field

from langchain_community.chat_models import ChatTongyi
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser

load_dotenv()

# 1. 定义输出 schema
class Joke(BaseModel):
    setup: str = Field(description="笑话的铺垫")
    punchline: str = Field(description="笑点")

# 2. 创建解析器
parser = PydanticOutputParser(pydantic_object=Joke)

# 3. 构建带格式提示的模板
prompt = ChatPromptTemplate.from_template(
    "生成一个关于{topic}的笑话。\n{format_instructions}"
).partial(format_instructions=parser.get_format_instructions())

llm = ChatTongyi(
    model_name = "qwen-max",
    dashscope_api_key = os.getenv("DASHSCOPE_API_KEY"),
    temperature = 0.7,
    max_retries = 3
)

chain = prompt | llm | parser

# 4. 调用
joke = chain.invoke({"topic": "程序员"})
print(f"铺垫: {joke.setup}\n笑点: {joke.punchline}")

4.3 多轮对话记忆 —— MessageHistory

目标:实现带历史记录的聊天机器人。


#!/usr/bin/env python

"""
多轮对话记忆 —— MessageHistory
"""

import os
from dotenv import load_dotenv
from langchain_core.messages import HumanMessage, AIMessage
from langchain_community.chat_models import ChatTongyi
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory

load_dotenv()

# 1. 模型 + Prompt(含历史占位符)
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个 helpful AI 助手。"),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{input}")
])
model = ChatTongyi(
    model_name = "qwen-max",
    dashscope_api_key = os.getenv("DASHSCOPE_API_KEY"),
    temperature = 0.7,
    max_retries = 3
)

chain = prompt | model

# 2. 添加记忆
store = {}  # 简单内存存储(生产环境用 Redis/DynamoDB)

def get_session_history(session_id: str):
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]

with_history = RunnableWithMessageHistory(
    chain,
    get_session_history,
    input_messages_key="input",
    history_messages_key="history",
)

# 3. 对话
config = {"configurable": {"session_id": "abc123"}}
response1 = with_history.invoke({"input": "我叫小明"}, config=config)
response2 = with_history.invoke({"input": "我叫什么?"}, config=config)

print(response2.content)  # 应该回答“小明”

4.4 工具调用 —— 自定义 Tool 与简单 Agent

目标:让 LLM 调用 Python 函数(工具)。

使用 langchain_core.tools 中的装饰器函数 tool

总结:当 LLM 决定调用工具时,它不会在 content 中生成文本回复,而是在 tool_calls 中生成工具调用信息。这表示 LLM 认为需要调用工具来获取信息,获得工具执行结果后再生成最终回复。

#!/usr/bin/env python

import os
from dotenv import load_dotenv
from pydantic import BaseModel, Field

from langchain_community.chat_models import ChatTongyi
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool

# 0. 加载环境变量文件.env
load_dotenv()

# 1. 初始化模型(使用DashScope的Qwen模型)
llm = ChatTongyi(
    model_name = "qwen-max",
    dashscope_api_key = os.getenv("DASHSCOPE_API_KEY"),
    temperature = 0.7,
    max_retries = 3
)

# 2. 定义工具函数
class MultiplyParams(BaseModel):
    """乘法参数模型"""
    a: int = Field(description="被乘数")
    b: int = Field(description="乘数")

@tool("multiply", args_schema=MultiplyParams)
def multiply(a: int, b: int) -> int:
    """将两个整数相乘"""
    return a * b

# 3. 将工具函数集绑定到模型上 
tools = [multiply]
llm_with_tools = llm.bind_tools(tools)

# 4. 调用模型
messages = [HumanMessage(content="5 乘以 7 等于多少?")]
ai_msg = llm_with_tools.invoke(messages)

# 5. 执行工具调用(简化版)
if ai_msg.tool_calls:
    for tool_call in ai_msg.tool_calls:
        if tool_call["name"] == "multiply":
            result = multiply.invoke(tool_call["args"])
            print(f"计算结果: {result}")
            break

5 进阶技能

5.1 LangGraph 简介

LangChain 的 LangGraph 模块是一个用于构建状态驱动的、可循环的工作流(workflow)或智能体(agent)系统的工具。它基于有向图(Directed Graph)来建模应用逻辑,特别适合需要多步骤、条件分支、循环迭代或协作式智能体交互的复杂任务。
LangGraph 的核心思想是:将整个流程表示为一个状态机(State Machine),其中:

  • 节点(Nodes) 表示执行某些操作的函数(如调用 LLM、检索文档、做决策等);
  • 边(Edges) 表示状态之间的转移条件;
  • 状态(State) 是一个共享的、可变的数据结构(通常是一个 Pydantic 模型或字典),所有节点都可以读取和更新它;
  • 图(Graph) 控制执行流程,支持条件跳转和循环,不像普通链式(Chain)那样只能线性执行。

LangGraph 的本质是 “用图结构 + 共享状态” 构建可控、可调试、可循环的智能体工作流。它把传统的“链式调用”升级为“状态机驱动的智能流程”,特别适合:

  • 需要反思/重试的任务;
  • 多角色协作;
  • 动态决策路径。

如果你正在构建复杂的 AI 应用(如自主 Agent、客服机器人、研究助手),LangGraph 是 LangChain 生态中不可或缺的模块。

5.2 状态机和图

在传统 LangChain Chain 中,流程是线性的:A → B → C。但很多真实场景需要:

  • 根据 LLM 输出决定下一步(比如“是否需要更多工具调用?”)
  • 多个智能体协作(如“研究员”和“写作者”来回讨论)
  • 循环直到满足某个终止条件(如“最多尝试3次”或“答案置信度 > 0.9”)

这时候,状态机 + 图结构就可以大显身手了。

#!/usr/bin/env python

import os
from dotenv import load_dotenv

from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_community.chat_models import ChatTongyi
from langchain_core.messages import HumanMessage, ToolMessage

load_dotenv()

# 定义状态
class State(TypedDict):
    messages: Annotated[list, add_messages]

# 节点函数
def call_model(state: State):
    model = ChatTongyi(
        model_name = "qwen-max",
        dashscope_api_key = os.getenv("DASHSCOPE_API_KEY"),
        temperature = 0.7,
        max_retries = 3
    )
    response = model.invoke(state["messages"])
    return {"messages": [response]}

# 构建图
graph = StateGraph(State)
graph.add_node("call_model", call_model)
graph.add_edge(START, "call_model")
graph.add_edge("call_model", END)

# 编译
app = graph.compile()

# 调用
result = app.invoke({"messages": [HumanMessage(content="你好!")]})
print(result["messages"][-1].content)

5.3 完整 Agent —— 带重试机制的问答系统

这是一个可运行的“带重试机制的问答系统”的 Python 文件的代码。该代码使用 LangGraph + LangChain 构建了一个支持最多两次重试(当模型回答“不确定”或“不知道”时触发检索)的状态机问答系统。

#!/usr/bin/env python

"""
使用 LangGraph + Qwen 构建多轮对话状态图
支持自动重试机制:当模型表示不确定时,会模拟检索后重新回答
"""

import os
from typing import Annotated, Literal
from dotenv import load_dotenv

from typing_extensions import TypedDict
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_community.chat_models import ChatTongyi
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

# 加载环境变量
load_dotenv()
DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY")


# ======================
# 1. 定义状态(State)
# ======================
class State(TypedDict):
    """定义图的状态结构"""
    messages: Annotated[list, add_messages]  # 消息历史,自动合并
    retry_count: int                         # 当前重试次数


# ======================
# 2. 初始化 Qwen 模型
# ======================
def create_llm():
    """创建并返回 Qwen 模型实例"""
    return ChatTongyi(
        model_name="qwen-max",
        dashscope_api_key=DASHSCOPE_API_KEY,
        temperature=0,
        streaming=False
    )


# ======================
# 3. 定义节点函数
# ======================
def answer_directly(state: State) -> dict:
    """直接让模型回答问题"""
    llm = create_llm()
    response = llm.invoke(state["messages"])
    return {
        "messages": [response],
        "retry_count": state["retry_count"]
    }


def retrieve_and_answer(state: State) -> dict:
    """
    模拟检索后再次回答
    将检索信息作为 system 消息放在消息列表开头(DashScope 要求)
    """
    llm = create_llm()
    
    # 找到最后一条用户消息
    last_user_query = None
    for msg in reversed(state["messages"]):
        if msg.type == "human":
            last_user_query = msg.content
            break
    
    # 模拟检索结果
    retrieved_info = f"(模拟检索结果:关于 '{last_user_query}' 的补充说明。)"
    
    # 构造新上下文:system 消息必须在最前面
    new_messages = [SystemMessage(content=retrieved_info)] + state["messages"]
    response = llm.invoke(new_messages)
    
    return {
        "messages": [response],
        "retry_count": state["retry_count"] + 1
    }


# ======================
# 4. 定义条件路由函数
# ======================
def should_retry(state: State) -> Literal["retrieve", "end"]:
    """
    判断是否需要重试
    检查模型回答中是否包含不确定表达或信息不足的暗示
    """
    last_message = state["messages"][-1].content.lower()
    retry_count = state["retry_count"]
    
    # 更全面的不确定/信息不足表达
    uncertain_phrases = [
        # 直接不确定
        "不确定", "不知道", "无法回答", "不清楚", "暂无", 
        "没有相关信息", "无法确定", "抱歉", "不好意思",
        "据我所知", "据我了解",
        # 时间相关
        "尚未公布", "还未公布", "还没公布", "未公布",
        "尚未揭晓", "还未揭晓",
        "未来", "将来", "以后",
        "截至", "截止目前", "最后更新",
        "无法提供", "不能提供", "难以预测",
        # 建议查询官方
        "官方网站", "权威", "关注", "推测",
        # 模糊表达
        "以上", "仅供参考", "具体情况",
        "建议", "最好", "可以", "可能",
        # 英文表达
        "i'm not sure", "i don't know", "unable to", "sorry",
        "as far as i know", "to the best of my knowledge",
        "not yet announced", "future", "latest update"
    ]
    
    has_uncertainty = any(phrase in last_message for phrase in uncertain_phrases)
    should_retry_flag = has_uncertainty and retry_count < 2
    
    return "retrieve" if should_retry_flag else "end"


# ======================
# 5. 构建状态图
# ======================
def build_graph() -> StateGraph:
    """构建并编译状态图"""
    builder = StateGraph(State)
    
    # 添加节点
    builder.add_node("answer", answer_directly)
    builder.add_node("retrieve", retrieve_and_answer)
    
    # 添加边
    builder.add_edge(START, "answer")
    
    # 添加条件边
    builder.add_conditional_edges(
        "answer",
        should_retry,
        {
            "retrieve": "retrieve",
            "end": END
        }
    )
    
    # 形成循环:检索后重新回答
    builder.add_edge("retrieve", "answer")
    
    return builder.compile()


# ======================
# 6. 主函数
# ======================
def main():
    """运行示例"""
    # 使用更容易触发重试的问题:
    # 虚构/不存在的事件 - 模型无法确认具体细节
    
    user_question = "3+2=?"
    #user_question = "宇宙的终极答案是什么?"
    #user_question = "2026 年诺贝尔物理学奖得主是谁?他们的主要贡献是什么?"
    
    print(f"用户提问:{user_question}\n")
    print("系统正在处理(使用 Qwen 模型)...\n")
    
    graph = build_graph()
    result = graph.invoke({
        "messages": [HumanMessage(content=user_question)],
        "retry_count": 0
    })
    
    final_answer = result["messages"][-1].content
    retry_used = result["retry_count"]
    
    print("\n✅ 最终答案:")
    print(final_answer)
    print(f"\n📊 重试次数:{retry_used}")


if __name__ == "__main__":
    main()
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐