本案例分析详细介绍了 LangChain 中的 Runnable 对象,这些对象提供了一种模块化和灵活的方法来设计工作流,通过启用链式调用、并行执行和数据转换,实现了高效的结构化输入和输出处理。

1. 案例目标

本案例旨在展示如何使用 LangChain 的 Runnable 组件构建灵活的工作流,主要目标包括:

  • 掌握 RunnablePassthrough 的使用,实现数据传递和增强
  • 学习 RunnableParallel 的并行执行能力,提高处理效率
  • 了解 RunnableLambda 如何通过自定义函数实现动态数据处理
  • 掌握 itemgetter 工具在数据提取中的应用
  • 构建端到端的数据处理管道,展示组件组合的强大功能

2. 技术栈与核心依赖

  • LangChain - 核心框架,提供 Runnable 组件
  • LangChain Core - 提供 RunnablePassthrough, RunnableParallel, RunnableLambda 等核心组件
  • LangChain OpenAI - 提供 OpenAI 模型集成
  • Python operator 模块 - 提供 itemgetter 工具
  • LangSmith - 用于跟踪和监控工作流执行
  • Jupyter Notebook - 用于交互式开发和演示

3. 环境配置

案例运行需要以下环境配置:

  1. 安装必要的依赖包
pip install langchain-opentutorial
pip install langsmith
pip install langchain
pip install langchain_core
pip install langchain_openai
  1. 配置环境变量
from dotenv import load_dotenv
load_dotenv(override=True)

from langchain_opentutorial import set_env
set_env(
    {
        "LANGCHAIN_TRACING_V2": "true",
        "LANGCHAIN_ENDPOINT": "https://api.smith.langchain.com",
        "LANGCHAIN_PROJECT": "05-Runnable",
    }
)
  1. 配置 OpenAI API 密钥:通过 .env 文件或环境变量设置 OPENAI_API_KEY

4. 案例实现

4.1 RunnablePassthrough 实现

RunnablePassthrough 是一个用于简化数据处理工作流的实用工具,它可以通过以下两种方式工作:

  • 简单数据传递:直接传递输入数据,不做任何修改
  • 动态数据增强:通过 assign() 方法添加额外属性
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnablePassthrough

# 创建提示和模型
prompt = PromptTemplate.from_template("What is 10 times {num}?")
llm = ChatOpenAI(temperature=0)

# 使用 RunnablePassthrough 创建链
runnable_chain = {"num": RunnablePassthrough()} | prompt | ChatOpenAI()

# 调用链
result = runnable_chain.invoke(10)

使用 RunnablePassthrough.assign() 的示例:

# 输入键: num, 分配键: new_num
result = (RunnablePassthrough.assign(new_num=lambda x: x["num"] * 3)).invoke({"num": 1})
# 结果: {'num': 1, 'new_num': 3}

4.2 RunnableParallel 实现

RunnableParallel 用于并发执行多个 Runnable 对象,优化工作流程:

from langchain_core.runnables import RunnableParallel

# 创建 RunnableParallel 实例
runnable = RunnableParallel(
    # 使用 RunnablePassthrough 实例作为 'passed' 参数,直接传递输入数据
    passed=RunnablePassthrough(),
    # 使用 RunnablePassthrough.assign 作为 'extra' 参数,分配一个 lambda 函数 'mult'
    # 该函数将输入字典中 'num' 键关联的值乘以 3
    extra=RunnablePassthrough.assign(mult=lambda x: x["num"] * 3),
    # 将 lambda 函数作为 'modified' 参数传递
    # 该函数将输入字典中 'num' 键关联的值加 1
    modified=lambda x: x["num"] + 1,
)

# 调用 invoke 方法,传递字典 {'num': 1} 作为输入
result = runnable.invoke({"num": 1})
# 结果: {'passed': {'num': 1}, 'extra': {'num': 1, 'mult': 3}, 'modified': 2}

RunnableParallel 也可以应用于链:

chain1 = (
    {"country": RunnablePassthrough()}
    | PromptTemplate.from_template("What is the capital of {country}?")
    | ChatOpenAI()
)
chain2 = (
    {"country": RunnablePassthrough()}
    | PromptTemplate.from_template("What is the area of {country}?")
    | ChatOpenAI()
)

combined_chain = RunnableParallel(capital=chain1, area=chain2)
result = combined_chain.invoke("United States of America")

4.3 RunnableLambda 实现

RunnableLambda 允许开发者使用 lambda 函数定义自定义数据转换逻辑:

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from datetime import datetime
from langchain_core.runnables import RunnableLambda, RunnablePassthrough

def get_today(a):
    # 获取今天的日期
    return datetime.today().strftime("%b-%d")

# 创建提示和模型
prompt = PromptTemplate.from_template(
    "List {n} famous people whose birthday is on {today}. Include their date of birth."
)
llm = ChatOpenAI(temperature=0, model_name="gpt-4o")

# 创建链
chain = (
    {"today": RunnableLambda(get_today), "n": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

# 调用链
result = chain.invoke(3)

4.4 itemgetter 实现

itemgetter 是 Python operator 模块中的实用函数,用于高效地从字典、元组和列表中提取特定键或索引的值:

from operator import itemgetter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI

# 返回句子长度的函数
def length_function(text):
    return len(text)

# 返回两个句子长度乘积的函数
def _multiple_length_function(text1, text2):
    return len(text1) * len(text2)

# 使用 _multiple_length_function 返回两个句子长度乘积的函数
def multiple_length_function(_dict):
    return _multiple_length_function(_dict["text1"], _dict["text2"])

prompt = ChatPromptTemplate.from_template("What is {a} + {b}?")
model = ChatOpenAI()

chain = (
    {
        "a": itemgetter("word1") | RunnableLambda(length_function),
        "b": {"text1": itemgetter("word1"), "text2": itemgetter("word2")}
        | RunnableLambda(multiple_length_function),
    }
    | prompt
    | model
)

# 调用链
result = chain.invoke({"word1": "hello", "word2": "world"})

5. 案例效果

通过本案例的实现,可以达到以下效果:

  • 数据流控制:精确控制数据在工作流中的传递和转换
  • 并行处理:同时执行多个任务,提高整体处理效率
  • 动态处理:根据输入数据动态调整处理逻辑
  • 高效数据提取:从复杂结构中快速提取所需数据
  • 模块化设计:通过组合不同组件构建复杂工作流

6. 案例实现思路

本案例的实现基于以下核心思路:

  1. 组件化思维:将复杂任务分解为独立的可重用组件
  2. 数据流设计:明确数据在各个组件间的流动路径和转换方式
  3. 函数式编程:利用 lambda 函数实现轻量级的数据转换
  4. 并行优化:识别可并行执行的任务,提高处理效率
  5. 链式组合:通过管道操作符(|)将组件串联成完整工作流

7. 扩展建议

  • 错误处理:添加适当的错误处理机制,提高工作流的健壮性
  • 性能监控:集成性能监控工具,跟踪各组件执行时间和资源消耗
  • 动态路由:实现基于输入内容的动态路由,根据不同条件选择不同的处理路径
  • 缓存机制:为频繁调用的组件添加缓存,减少重复计算
  • 批处理支持:扩展组件以支持批量数据处理,提高大规模数据处理的效率
  • 可视化工具:开发工作流可视化工具,帮助理解和调试复杂的数据管道

8. 总结

LangChain 的 Runnable 组件提供了一套强大而灵活的工具,用于构建高效的数据处理工作流。通过 RunnablePassthrough、RunnableParallel、RunnableLambda 和 itemgetter 等组件的组合使用,开发者可以创建出既简洁又功能强大的数据处理管道。这种模块化的方法不仅提高了代码的可读性和可维护性,还为构建复杂的 AI 应用提供了坚实的基础。掌握这些组件的使用,将大大提升开发者在 LangChain 生态系统中的工作效率和应用构建能力。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐