008-LangChain Runnable 案例分析
本案例分析详细介绍了 LangChain 中的 Runnable 对象,这些对象提供了一种模块化和灵活的方法来设计工作流,通过启用链式调用、并行执行和数据转换,实现了高效的结构化输入和输出处理。

本案例分析详细介绍了 LangChain 中的 Runnable 对象,这些对象提供了一种模块化和灵活的方法来设计工作流,通过启用链式调用、并行执行和数据转换,实现了高效的结构化输入和输出处理。
1. 案例目标
本案例旨在展示如何使用 LangChain 的 Runnable 组件构建灵活的工作流,主要目标包括:
- 掌握 RunnablePassthrough 的使用,实现数据传递和增强
- 学习 RunnableParallel 的并行执行能力,提高处理效率
- 了解 RunnableLambda 如何通过自定义函数实现动态数据处理
- 掌握 itemgetter 工具在数据提取中的应用
- 构建端到端的数据处理管道,展示组件组合的强大功能
2. 技术栈与核心依赖
- LangChain - 核心框架,提供 Runnable 组件
- LangChain Core - 提供 RunnablePassthrough, RunnableParallel, RunnableLambda 等核心组件
- LangChain OpenAI - 提供 OpenAI 模型集成
- Python operator 模块 - 提供 itemgetter 工具
- LangSmith - 用于跟踪和监控工作流执行
- Jupyter Notebook - 用于交互式开发和演示
3. 环境配置
案例运行需要以下环境配置:
- 安装必要的依赖包:
pip install langchain-opentutorial
pip install langsmith
pip install langchain
pip install langchain_core
pip install langchain_openai
- 配置环境变量:
from dotenv import load_dotenv
load_dotenv(override=True)
from langchain_opentutorial import set_env
set_env(
{
"LANGCHAIN_TRACING_V2": "true",
"LANGCHAIN_ENDPOINT": "https://api.smith.langchain.com",
"LANGCHAIN_PROJECT": "05-Runnable",
}
)
- 配置 OpenAI API 密钥:通过 .env 文件或环境变量设置 OPENAI_API_KEY
4. 案例实现
4.1 RunnablePassthrough 实现
RunnablePassthrough 是一个用于简化数据处理工作流的实用工具,它可以通过以下两种方式工作:
- 简单数据传递:直接传递输入数据,不做任何修改
- 动态数据增强:通过 assign() 方法添加额外属性
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnablePassthrough
# 创建提示和模型
prompt = PromptTemplate.from_template("What is 10 times {num}?")
llm = ChatOpenAI(temperature=0)
# 使用 RunnablePassthrough 创建链
runnable_chain = {"num": RunnablePassthrough()} | prompt | ChatOpenAI()
# 调用链
result = runnable_chain.invoke(10)
使用 RunnablePassthrough.assign() 的示例:
# 输入键: num, 分配键: new_num
result = (RunnablePassthrough.assign(new_num=lambda x: x["num"] * 3)).invoke({"num": 1})
# 结果: {'num': 1, 'new_num': 3}
4.2 RunnableParallel 实现
RunnableParallel 用于并发执行多个 Runnable 对象,优化工作流程:
from langchain_core.runnables import RunnableParallel
# 创建 RunnableParallel 实例
runnable = RunnableParallel(
# 使用 RunnablePassthrough 实例作为 'passed' 参数,直接传递输入数据
passed=RunnablePassthrough(),
# 使用 RunnablePassthrough.assign 作为 'extra' 参数,分配一个 lambda 函数 'mult'
# 该函数将输入字典中 'num' 键关联的值乘以 3
extra=RunnablePassthrough.assign(mult=lambda x: x["num"] * 3),
# 将 lambda 函数作为 'modified' 参数传递
# 该函数将输入字典中 'num' 键关联的值加 1
modified=lambda x: x["num"] + 1,
)
# 调用 invoke 方法,传递字典 {'num': 1} 作为输入
result = runnable.invoke({"num": 1})
# 结果: {'passed': {'num': 1}, 'extra': {'num': 1, 'mult': 3}, 'modified': 2}
RunnableParallel 也可以应用于链:
chain1 = (
{"country": RunnablePassthrough()}
| PromptTemplate.from_template("What is the capital of {country}?")
| ChatOpenAI()
)
chain2 = (
{"country": RunnablePassthrough()}
| PromptTemplate.from_template("What is the area of {country}?")
| ChatOpenAI()
)
combined_chain = RunnableParallel(capital=chain1, area=chain2)
result = combined_chain.invoke("United States of America")
4.3 RunnableLambda 实现
RunnableLambda 允许开发者使用 lambda 函数定义自定义数据转换逻辑:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from datetime import datetime
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
def get_today(a):
# 获取今天的日期
return datetime.today().strftime("%b-%d")
# 创建提示和模型
prompt = PromptTemplate.from_template(
"List {n} famous people whose birthday is on {today}. Include their date of birth."
)
llm = ChatOpenAI(temperature=0, model_name="gpt-4o")
# 创建链
chain = (
{"today": RunnableLambda(get_today), "n": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
# 调用链
result = chain.invoke(3)
4.4 itemgetter 实现
itemgetter 是 Python operator 模块中的实用函数,用于高效地从字典、元组和列表中提取特定键或索引的值:
from operator import itemgetter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
# 返回句子长度的函数
def length_function(text):
return len(text)
# 返回两个句子长度乘积的函数
def _multiple_length_function(text1, text2):
return len(text1) * len(text2)
# 使用 _multiple_length_function 返回两个句子长度乘积的函数
def multiple_length_function(_dict):
return _multiple_length_function(_dict["text1"], _dict["text2"])
prompt = ChatPromptTemplate.from_template("What is {a} + {b}?")
model = ChatOpenAI()
chain = (
{
"a": itemgetter("word1") | RunnableLambda(length_function),
"b": {"text1": itemgetter("word1"), "text2": itemgetter("word2")}
| RunnableLambda(multiple_length_function),
}
| prompt
| model
)
# 调用链
result = chain.invoke({"word1": "hello", "word2": "world"})
5. 案例效果
通过本案例的实现,可以达到以下效果:
- 数据流控制:精确控制数据在工作流中的传递和转换
- 并行处理:同时执行多个任务,提高整体处理效率
- 动态处理:根据输入数据动态调整处理逻辑
- 高效数据提取:从复杂结构中快速提取所需数据
- 模块化设计:通过组合不同组件构建复杂工作流
6. 案例实现思路
本案例的实现基于以下核心思路:
- 组件化思维:将复杂任务分解为独立的可重用组件
- 数据流设计:明确数据在各个组件间的流动路径和转换方式
- 函数式编程:利用 lambda 函数实现轻量级的数据转换
- 并行优化:识别可并行执行的任务,提高处理效率
- 链式组合:通过管道操作符(|)将组件串联成完整工作流
7. 扩展建议
- 错误处理:添加适当的错误处理机制,提高工作流的健壮性
- 性能监控:集成性能监控工具,跟踪各组件执行时间和资源消耗
- 动态路由:实现基于输入内容的动态路由,根据不同条件选择不同的处理路径
- 缓存机制:为频繁调用的组件添加缓存,减少重复计算
- 批处理支持:扩展组件以支持批量数据处理,提高大规模数据处理的效率
- 可视化工具:开发工作流可视化工具,帮助理解和调试复杂的数据管道
8. 总结
LangChain 的 Runnable 组件提供了一套强大而灵活的工具,用于构建高效的数据处理工作流。通过 RunnablePassthrough、RunnableParallel、RunnableLambda 和 itemgetter 等组件的组合使用,开发者可以创建出既简洁又功能强大的数据处理管道。这种模块化的方法不仅提高了代码的可读性和可维护性,还为构建复杂的 AI 应用提供了坚实的基础。掌握这些组件的使用,将大大提升开发者在 LangChain 生态系统中的工作效率和应用构建能力。
更多推荐


所有评论(0)