LangChain LCEL 与 Chains 链式调用深度解析
LCEL(LangChain Expression Language)是 LangChain v1.0 引入的革命性特性,它提供了一种声明式、可组合的方式来构建 AI 应用链。本文将深入解析 LCEL 的核心概念、Runnable 接口、链式组合、并行执行、条件路由等高级特性,帮助开发者构建灵活、可维护的 AI 应用。
·
关于作者
- 深耕领域:大语言模型开发 / RAG 知识库 / AI Agent 落地 / 模型微调
- 技术栈:Python | RAG (LangChain / Dify + Milvus) | FastAPI + Docker
- 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案
「让 AI 交互更智能,让技术落地更高效」
欢迎技术探讨与项目合作,解锁大模型与智能交互的无限可能!
摘要
LCEL(LangChain Expression Language)是 LangChain v1.0 引入的革命性特性,它提供了一种声明式、可组合的方式来构建 AI 应用链。本文将深入解析 LCEL 的核心概念、Runnable 接口、链式组合、并行执行、条件路由等高级特性,帮助开发者构建灵活、可维护的 AI 应用。
一、LCEL 概述
1.1 什么是 LCEL
LCEL(LangChain Expression Language)是一种声明式的链式编排语言,它让开发者可以用简洁的语法定义复杂的数据处理流程。
1.2 LCEL 核心优势
| 优势 | 说明 |
|---|---|
| 声明式语法 | 用管道符串联组件,代码简洁 |
| 自动数据流转 | 组件间自动传递输入输出 |
| 统一接口 | 所有组件实现 Runnable 接口 |
| 流式支持 | 自动支持流式输出 |
| 异步原生 | 同步/异步 API 自动生成 |
| 可观测性 | 内置追踪和回调支持 |
1.3 Hello LCEL
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 定义组件
model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话")
parser = StrOutputParser()
# LCEL 链式组合
chain = prompt | model | parser
# 调用
result = chain.invoke({"topic": "程序员"})
print(result)
二、Runnable 接口
2.1 Runnable 协议
所有 LCEL 组件都实现 Runnable 接口,提供统一的调用方式:
2.2 核心方法
from langchain_core.runnables import Runnable
from typing import Protocol
class RunnableProtocol(Protocol):
"""Runnable 协议定义。"""
# 同步调用
def invoke(self, input: Any, config: RunnableConfig = None) -> Any: ...
# 异步调用
async def ainvoke(self, input: Any, config: RunnableConfig = None) -> Any: ...
# 流式输出
def stream(self, input: Any, config: RunnableConfig = None) -> Iterator[Any]: ...
# 异步流式
async def astream(self, input: Any, config: RunnableConfig = None) -> AsyncIterator[Any]: ...
# 批量调用
def batch(self, inputs: list[Any], config: RunnableConfig = None) -> list[Any]: ...
# 异步批量
async def abatch(self, inputs: list[Any], config: RunnableConfig = None) -> list[Any]: ...
2.3 使用示例
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("翻译成英文: {text}")
chain = prompt | model
# 1. 同步调用
result = chain.invoke({"text": "你好世界"})
print(result.content)
# 2. 异步调用
import asyncio
async def async_example():
result = await chain.ainvoke({"text": "你好世界"})
print(result.content)
asyncio.run(async_example())
# 3. 流式输出
for chunk in chain.stream({"text": "你好世界"}):
print(chunk.content, end="", flush=True)
# 4. 批量调用
results = chain.batch([
{"text": "你好"},
{"text": "世界"},
{"text": "Python"},
])
for r in results:
print(r.content)
# 5. 异步流式
async def async_stream():
async for chunk in chain.astream({"text": "你好世界"}):
print(chunk.content, end="", flush=True)
asyncio.run(async_stream())
三、链式组合
3.1 管道操作符
LCEL 使用管道操作符串联组件,数据自动流转:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 组件定义
prompt = ChatPromptTemplate.from_template(
"用一句话解释 {concept},面向 {audience} 受众"
)
model = ChatOpenAI(model="gpt-4")
parser = StrOutputParser()
# 链式组合
chain = prompt | model | parser
# 调用
result = chain.invoke({
"concept": "机器学习",
"audience": "小学生"
})
print(result)
3.2 多步骤链
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4")
# 步骤1:生成大纲
outline_prompt = ChatPromptTemplate.from_template(
"为关于 {topic} 的文章生成一个大纲"
)
outline_chain = outline_prompt | model | StrOutputParser()
# 步骤2:根据大纲写文章
article_prompt = ChatPromptTemplate.from_template(
"根据以下大纲写一篇详细的文章:\n\n{outline}"
)
article_chain = article_prompt | model | StrOutputParser()
# 步骤3:总结文章
summary_prompt = ChatPromptTemplate.from_template(
"用 3 句话总结以下文章:\n\n{article}"
)
summary_chain = summary_prompt | model | StrOutputParser()
# 组合成完整链
full_chain = (
{"outline": outline_chain}
| article_chain
| {"summary": summary_chain}
)
# 调用
result = full_chain.invoke({"topic": "人工智能的未来"})
print(result["summary"])
3.3 RunnableLambda - 自定义函数
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 自定义处理函数
def to_uppercase(text: str) -> str:
return text.upper()
def add_prefix(text: str) -> str:
return f"【AI 回答】{text}"
def count_words(text: str) -> dict:
return {
"text": text,
"word_count": len(text.split()),
"char_count": len(text)
}
model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("用 50 字解释 {concept}")
# 使用 RunnableLambda 包装函数
chain = (
prompt
| model
| StrOutputParser()
| RunnableLambda(to_uppercase)
| RunnableLambda(add_prefix)
| RunnableLambda(count_words)
)
result = chain.invoke({"concept": "量子计算"})
print(result)
# {'text': '【AI 回答】量子计算是利用量子力学原理进行信息处理的技术...', 'word_count': 45, 'char_count': 150}
四、RunnableParallel - 并行执行
4.1 并行执行多个链
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel
model = ChatOpenAI(model="gpt-4")
# 定义多个并行链
translate_chain = (
ChatPromptTemplate.from_template("将以下内容翻译成英文: {text}")
| model
| StrOutputParser()
)
summarize_chain = (
ChatPromptTemplate.from_template("用一句话总结: {text}")
| model
| StrOutputParser()
)
keywords_chain = (
ChatPromptTemplate.from_template("提取以下内容的关键词(逗号分隔): {text}")
| model
| StrOutputParser()
)
# 并行执行
parallel_chain = RunnableParallel(
translation=translate_chain,
summary=summarize_chain,
keywords=keywords_chain,
)
# 调用
result = parallel_chain.invoke({
"text": "人工智能是计算机科学的一个分支,致力于创建能够执行通常需要人类智能的任务的系统。"
})
print(result["translation"]) # 英文翻译
print(result["summary"]) # 中文总结
print(result["keywords"]) # 关键词
4.2 字典语法糖
# RunnableParallel 的简写形式
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4")
# 使用字典语法(等同于 RunnableParallel)
chain = {
"joke": (
ChatPromptTemplate.from_template("讲一个关于 {topic} 的笑话")
| model
| StrOutputParser()
),
"poem": (
ChatPromptTemplate.from_template("写一首关于 {topic} 的诗")
| model
| StrOutputParser()
),
"story": (
ChatPromptTemplate.from_template("写一个关于 {topic} 的短故事")
| model
| StrOutputParser()
),
}
result = chain.invoke({"topic": "程序员"})
print(result["joke"])
print(result["poem"])
print(result["story"])
4.3 嵌套并行
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4")
# 嵌套并行结构
chain = {
"analysis": {
"sentiment": (
ChatPromptTemplate.from_template("分析情感: {text}")
| model | StrOutputParser()
),
"topics": (
ChatPromptTemplate.from_template("提取主题: {text}")
| model | StrOutputParser()
),
},
"generation": {
"summary": (
ChatPromptTemplate.from_template("总结: {text}")
| model | StrOutputParser()
),
"questions": (
ChatPromptTemplate.from_template("生成 3 个相关问题: {text}")
| model | StrOutputParser()
),
},
}
result = chain.invoke({"text": "今天天气真好,心情特别愉快!"})
print(result["analysis"]["sentiment"])
print(result["generation"]["summary"])
五、RunnablePassthrough - 数据传递
5.1 基本用法
from langchain_core.runnables import RunnablePassthrough
# RunnablePassthrough 直接传递输入
chain = RunnablePassthrough() | (lambda x: f"收到: {x}")
result = chain.invoke("你好")
print(result) # 收到: 你好
5.2 保留原始输入
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
model = ChatOpenAI(model="gpt-4")
# 保留原始输入,同时添加处理结果
chain = (
{
"original": RunnablePassthrough(), # 保留原始输入
"analysis": (
ChatPromptTemplate.from_template("分析这段文字: {text}")
| model
| StrOutputParser()
),
}
)
result = chain.invoke({"text": "人工智能正在改变世界"})
print(result["original"]) # {"text": "人工智能正在改变世界"}
print(result["analysis"]) # 分析结果
5.3 RunnablePassthrough.assign
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
model = ChatOpenAI(model="gpt-4")
# assign 方法添加新字段
chain = (
RunnablePassthrough.assign(
# 添加 word_count 字段
word_count=lambda x: len(x["text"].split()),
)
.assign(
# 添加 summary 字段(依赖 word_count)
summary=(
ChatPromptTemplate.from_template(
"用 {word_count} 个词左右总结: {text}"
)
| model
| StrOutputParser()
)
)
)
result = chain.invoke({"text": "人工智能是计算机科学的一个分支"})
print(result)
# {'text': '人工智能是计算机科学的一个分支', 'word_count': 5, 'summary': '...'}
六、条件路由
6.1 RunnableBranch
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableBranch
model = ChatOpenAI(model="gpt-4")
# 定义不同类型的处理链
tech_chain = (
ChatPromptTemplate.from_template(
"作为技术专家,回答以下问题: {question}"
)
| model | StrOutputParser()
)
life_chain = (
ChatPromptTemplate.from_template(
"作为生活顾问,回答以下问题: {question}"
)
| model | StrOutputParser()
)
general_chain = (
ChatPromptTemplate.from_template(
"回答以下问题: {question}"
)
| model | StrOutputParser()
)
# 分类函数
def classify_question(input_dict: dict) -> str:
question = input_dict["question"].lower()
tech_keywords = ["代码", "编程", "python", "算法", "数据库", "api"]
life_keywords = ["健康", "美食", "旅游", "运动", "生活"]
for keyword in tech_keywords:
if keyword in question:
return "tech"
for keyword in life_keywords:
if keyword in question:
return "life"
return "general"
# 条件路由
branch = RunnableBranch(
(lambda x: classify_question(x) == "tech", tech_chain),
(lambda x: classify_question(x) == "life", life_chain),
general_chain, # 默认分支
)
# 调用
print(branch.invoke({"question": "Python 如何实现单例模式?"}))
print(branch.invoke({"question": "如何保持健康的生活方式?"}))
print(branch.invoke({"question": "今天天气怎么样?"}))
6.2 使用 LLM 分类
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableBranch, RunnableLambda
model = ChatOpenAI(model="gpt-4")
# LLM 分类链
classify_prompt = ChatPromptTemplate.from_template(
"""判断以下问题属于哪个类别,只返回类别名称(tech/life/other):
问题: {question}
类别:"""
)
classify_chain = classify_prompt | model | StrOutputParser()
# 处理链
tech_chain = (
ChatPromptTemplate.from_template("技术回答: {question}")
| model | StrOutputParser()
)
life_chain = (
ChatPromptTemplate.from_template("生活建议: {question}")
| model | StrOutputParser()
)
other_chain = (
ChatPromptTemplate.from_template("通用回答: {question}")
| model | StrOutputParser()
)
# 完整路由链
def route_by_llm(input_dict: dict):
category = classify_chain.invoke(input_dict).strip().lower()
input_dict["category"] = category
return input_dict
full_chain = (
RunnableLambda(route_by_llm)
| RunnableBranch(
(lambda x: x["category"] == "tech", tech_chain),
(lambda x: x["category"] == "life", life_chain),
other_chain,
)
)
result = full_chain.invoke({"question": "Python 如何读取文件?"})
七、重试与回退
7.1 RunnableRetry
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableRetry
model = ChatOpenAI(model="gpt-4")
chain = (
ChatPromptTemplate.from_template("解释: {concept}")
| model
| StrOutputParser()
)
# 添加重试机制
retry_chain = RunnableRetry(
bound=chain,
max_attempt_count=3, # 最大尝试次数
wait_exponential_jitter=True, # 指数退避
max_wait=10, # 最大等待时间
retry_on_exception=lambda e: True, # 所有异常都重试
)
result = retry_chain.invoke({"concept": "量子纠缠"})
7.2 with_fallbacks - 降级策略
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 多个模型
primary_model = ChatOpenAI(model="gpt-4")
fallback_model = ChatAnthropic(model="claude-3-5-sonnet")
last_resort_model = ChatOpenAI(model="gpt-3.5-turbo")
prompt = ChatPromptTemplate.from_template("回答: {question}")
parser = StrOutputParser()
# 主链
primary_chain = prompt | primary_model | parser
fallback_chain = prompt | fallback_model | parser
last_resort_chain = prompt | last_resort_model | parser
# 配置降级
chain_with_fallbacks = primary_chain.with_fallbacks(
[fallback_chain, last_resort_chain],
exceptions_to_handle=(Exception,), # 捕获所有异常
)
# 调用(自动降级)
result = chain_with_fallbacks.invoke({"question": "你好"})
八、配置与参数绑定
8.1 bind 方法
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4")
# 绑定模型参数
creative_model = model.bind(temperature=1.2, max_tokens=500)
precise_model = model.bind(temperature=0.1, max_tokens=1000)
prompt = ChatPromptTemplate.from_template("写一段关于 {topic} 的文字")
creative_chain = prompt | creative_model | StrOutputParser()
precise_chain = prompt | precise_model | StrOutputParser()
print(creative_chain.invoke({"topic": "春天"})) # 创意输出
print(precise_chain.invoke({"topic": "春天"})) # 精确输出
8.2 with_config 方法
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("解释: {concept}")
chain = prompt | model | StrOutputParser()
# 配置标签和元数据
configured_chain = chain.with_config(
tags=["production", "v1"],
metadata={"version": "1.0", "environment": "prod"},
callbacks=[...], # 回调处理器
max_concurrency=5, # 最大并发
)
result = configured_chain.invoke({"concept": "机器学习"})
8.3 configurable_fields - 动态配置
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4", temperature=0.7)
prompt = ChatPromptTemplate.from_template("解释: {concept}")
# 声明可配置字段
chain = (
prompt
| model.configurable_fields(
model=...,
temperature=...,
)
| StrOutputParser()
)
# 运行时配置
result = chain.invoke(
{"concept": "机器学习"},
config={
"configurable": {
"model": "gpt-4o",
"temperature": 0.3,
}
}
)
九、流式处理详解
9.1 流式输出
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("写一篇关于 {topic} 的短文")
chain = prompt | model | StrOutputParser()
# 同步流式
for chunk in chain.stream({"topic": "人工智能"}):
print(chunk, end="", flush=True)
# 异步流式
import asyncio
async def async_stream():
async for chunk in chain.astream({"topic": "人工智能"}):
print(chunk, end="", flush=True)
asyncio.run(async_stream())
9.2 流式事件
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("解释: {concept}")
chain = prompt | model | StrOutputParser()
async def stream_with_events():
async for event in chain.astream_events(
{"concept": "量子计算"},
version="v2",
):
# 事件类型
if event["event"] == "on_chat_model_stream":
print(event["data"]["chunk"].content, end="", flush=True)
elif event["event"] == "on_prompt_start":
print("\n[Prompt 开始]")
elif event["event"] == "on_llm_start":
print("\n[LLM 开始]")
elif event["event"] == "on_parser_start":
print("\n[解析开始]")
asyncio.run(stream_with_events())
9.3 流式输出解析
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel
class Person(BaseModel):
name: str
age: int
hobbies: list[str]
model = ChatOpenAI(model="gpt-4")
parser = JsonOutputParser(pydantic_object=Person)
prompt = ChatPromptTemplate.from_template(
"生成一个人物信息:\n{format_instructions}"
).partial(format_instructions=parser.get_format_instructions())
chain = prompt | model | parser
# 流式解析 JSON
async def stream_json():
async for partial in chain.astream({}):
print(partial) # 逐步构建的 JSON 对象
asyncio.run(stream_json())
十、最佳实践
10.1 链的设计原则
| 原则 | 说明 |
|---|---|
| 单一职责 | 每个组件只做一件事 |
| 可测试性 | 组件可独立测试 |
| 可复用性 | 组件可在多个链中复用 |
| 可观测性 | 添加标签和元数据便于追踪 |
10.2 错误处理最佳实践
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
def safe_parse(output: str) -> dict:
"""安全的 JSON 解析。"""
import json
try:
return json.loads(output)
except json.JSONDecodeError:
return {"error": "解析失败", "raw": output}
model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("生成 JSON: {request}")
# 带错误处理的链
chain = (
prompt
| model
| StrOutputParser()
| RunnableLambda(safe_parse)
| RunnableLambda(lambda x: x if "error" not in x else {"result": None})
)
10.3 性能优化
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 1. 使用批量处理
model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("翻译: {text}")
chain = prompt | model | StrOutputParser()
# 批量处理多个请求
texts = ["hello", "world", "python"]
results = chain.batch([{"text": t} for t in texts])
# 2. 并行执行独立任务
from langchain_core.runnables import RunnableParallel
parallel_chain = RunnableParallel(
translation=chain,
sentiment=(
ChatPromptTemplate.from_template("情感分析: {text}")
| model
| StrOutputParser()
),
)
# 3. 使用缓存
from langchain_core.caches import InMemoryCache
from langchain_core.globals import set_llm_cache
set_llm_cache(InMemoryCache())
十一、总结
11.1 LCEL 核心概念
11.2 LCEL vs 传统方式
| 方面 | 传统方式 | LCEL |
|---|---|---|
| 代码量 | 多 | 少 |
| 可读性 | 一般 | 高 |
| 流式支持 | 需手动实现 | 自动支持 |
| 异步支持 | 需单独编写 | 自动生成 |
| 可组合性 | 低 | 高 |
| 可观测性 | 需手动添加 | 内置支持 |
参考资料
更多推荐



所有评论(0)