LangChain 流式调用、并行处理与 Runnable 组件详解
本文详细介绍了LangChain框架中的流式调用、并行处理和Runnable组件三大核心技术。流式调用通过逐步返回结果提升用户体验,可使用stream()方法实现同步/异步调用。并行处理能同时运行多个任务,提高效率。Runnable组件(如RunnablePassthrough、RunnableLambda等)作为核心构建块,提供了灵活的链式组合能力。文章还对比了传统Chain与LCEL的差异,强
9-2. LangChain 流式调用、并行处理与 Runnable 组件详解
目录
1. 简介
LangChain 作为构建语言模型应用的强大框架,提供了多种机制来优化应用性能和用户体验。其中,流式调用和并行处理是两个重要的特性,它们能够显著提升应用的响应速度和用户体验。同时,Runnable 组件作为 LangChain 的核心构建块,为开发者提供了灵活且强大的组合能力。
本指南将深入探讨以下关键主题:
- 流式调用 - 实现实时输出,提升用户体验
- 并行处理 - 同时运行多个任务,提高处理效率
- Runnable 组件 - 核心构建块的使用方法
- 迁移指南 - 从传统 Chain 到 LCEL 的迁移
- 实际应用 - 通过代码示例展示各种技术的实际应用
通过本指南的学习,您将能够:
- 理解并实现流式调用,提升应用的响应性
- 掌握并行处理技术,优化应用性能
- 熟练使用各种 Runnable 组件构建复杂应用
- 顺利完成从传统 Chain 到 LCEL 的迁移
- 在实际项目中应用这些技术解决实际问题
2. 流式调用详解
2.1 流式调用的概念
流式调用(Streaming)是一种数据处理方式,它允许在数据生成的过程中逐步返回结果,而不是等待所有处理完成后再一次性返回。在 LangChain 中,流式调用特别适用于与大语言模型的交互,因为它可以让用户在模型生成响应的过程中实时看到输出内容。
传统的同步调用模式下,用户需要等待整个响应生成完毕后才能看到结果,这在处理长文本或复杂任务时可能导致较长的等待时间。而流式调用则可以逐步返回生成的内容,让用户能够立即看到部分结果,从而显著改善用户体验。
流式调用的工作原理如下:
- 用户发起请求
- 系统开始处理请求并生成响应
- 在生成过程中,系统逐步将已生成的部分内容返回给用户
- 用户可以实时看到内容的生成过程
- 当所有内容生成完毕后,流式调用结束
2.2 流式调用的实现
在 LangChain 中,流式调用通过 stream() 方法实现。让我们通过具体代码示例来演示:
# streaming_basics.py
import os
import asyncio
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 加载环境变量
load_dotenv()
# 创建语言模型实例
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
# 创建提示词模板
prompt = ChatPromptTemplate.from_template(
"请详细解释{topic}的概念,并提供实际应用示例。"
)
# 创建 Chain
chain = prompt | llm | StrOutputParser()
# 同步流式调用示例
def sync_streaming_example():
print("=== 同步流式调用示例 ===")
print("问题: 请详细解释人工智能的概念,并提供实际应用示例。")
print("回答: ", end="", flush=True)
# 使用 stream() 方法进行流式调用
for chunk in chain.stream({"topic": "人工智能"}):
print(chunk, end="", flush=True)
print("\n" + "="*50)
# 异步流式调用示例
async def async_streaming_example():
print("=== 异步流式调用示例 ===")
print("问题: 请详细解释机器学习的概念,并提供实际应用示例。")
print("回答: ", end="", flush=True)
# 使用 astream() 方法进行异步流式调用
async for chunk in chain.astream({"topic": "机器学习"}):
print(chunk, end="", flush=True)
print("\n" + "="*50)
# 流式调用与回调函数结合
def streaming_with_callback():
print("=== 流式调用与回调函数结合 ===")
# 定义回调函数
def on_token(token):
print(token, end="", flush=True)
def on_complete():
print("\n生成完成!")
print("问题: 请解释深度学习的基本原理。")
print("回答: ", end="", flush=True)
# 流式调用并处理每个 token
try:
for chunk in chain.stream({"topic": "深度学习的基本原理"}):
on_token(chunk)
except Exception as e:
print(f"\n发生错误: {e}")
finally:
on_complete()
# 复杂流式调用示例
async def complex_streaming_example():
print("=== 复杂流式调用示例 ===")
# 创建更复杂的 Chain
complex_chain = (
ChatPromptTemplate.from_messages([
("system", "你是一个专业的技术顾问,擅长解释复杂的技术概念。"),
("human", "请详细解释{topic},包括其历史发展、核心技术、应用场景和未来趋势。")
])
| llm
| StrOutputParser()
)
print("问题: 请详细解释自然语言处理,包括其历史发展、核心技术、应用场景和未来趋势。")
print("回答: ", end="", flush=True)
# 异步流式调用
async for chunk in complex_chain.astream({"topic": "自然语言处理"}):
print(chunk, end="", flush=True)
print("\n" + "="*50)
# 带有错误处理的流式调用
def streaming_with_error_handling():
print("=== 带有错误处理的流式调用 ===")
# 创建可能出错的 Chain(使用无效的模型)
error_chain = (
ChatPromptTemplate.from_template("请解释{topic}")
| ChatOpenAI(model="invalid-model-name", api_key="invalid-key")
| StrOutputParser()
)
print("问题: 请解释量子计算。")
print("回答: ", end="", flush=True)
try:
for chunk in error_chain.stream({"topic": "量子计算"}):
print(chunk, end="", flush=True)
except Exception as e:
print(f"\n发生错误: {e}")
print("="*50)
# 流式调用性能对比
def streaming_performance_comparison():
print("=== 流式调用性能对比 ===")
import time
# 同步调用
print("1. 同步调用(等待完整响应):")
start_time = time.time()
result = chain.invoke({"topic": "区块链技术"})
end_time = time.time()
print(f" 耗时: {end_time - start_time:.2f} 秒")
print(f" 响应长度: {len(result)} 字符")
# 流式调用
print("\n2. 流式调用(实时输出):")
start_time = time.time()
print(" 实时输出: ", end="", flush=True)
for chunk in chain.stream({"topic": "区块链技术"}):
print(chunk, end="", flush=True)
end_time = time.time()
print(f"\n 耗时: {end_time - start_time:.2f} 秒")
# 运行所有示例
if __name__ == "__main__":
# 同步流式调用
sync_streaming_example()
# 异步流式调用
asyncio.run(async_streaming_example())
# 流式调用与回调函数结合
streaming_with_callback()
# 复杂流式调用
asyncio.run(complex_streaming_example())
# 带错误处理的流式调用
streaming_with_error_handling()
# 性能对比
streaming_performance_comparison()
2.3 流式调用的优势
流式调用在实际应用中具有以下显著优势:
2.3.1 用户体验提升
# user_experience_demo.py
import time
from langchain_core.runnables import RunnableLambda
# 模拟长时间运行的任务
def long_running_task(input_text):
"""模拟一个需要较长时间完成的任务"""
time.sleep(3) # 模拟 3 秒处理时间
return f"处理完成: {input_text}"
# 模拟流式任务
def streaming_task(input_text):
"""模拟流式任务,逐步返回结果"""
words = input_text.split()
for word in words:
yield f"{word} "
time.sleep(0.5) # 模拟每个词的处理时间
# 创建 Runnable 组件
long_task = RunnableLambda(long_running_task)
stream_task = RunnableLambda(streaming_task)
def compare_user_experience():
print("=== 用户体验对比 ===")
# 传统方式 - 等待完整结果
print("1. 传统方式(等待完整结果):")
print(" 开始处理...")
start_time = time.time()
result = long_task.invoke("这是一个需要长时间处理的文本")
end_time = time.time()
print(f" 结果: {result}")
print(f" 总耗时: {end_time - start_time:.2f} 秒")
print(" 用户在整个过程中看不到任何进度")
print("\n" + "-"*40)
# 流式方式 - 实时反馈
print("2. 流式方式(实时反馈):")
print(" 开始处理...")
start_time = time.time()
print(" 实时结果: ", end="", flush=True)
for chunk in stream_task.stream("这是一个需要长时间处理的文本"):
print(chunk, end="", flush=True)
end_time = time.time()
print(f"\n 总耗时: {end_time - start_time:.2f} 秒")
print(" 用户可以看到处理进度,体验更好")
# 运行用户体验对比
compare_user_experience()
2.3.2 资源利用优化
# resource_optimization_demo.py
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
# 模拟 CPU 密集型任务
def cpu_intensive_task(data):
"""模拟 CPU 密集型任务"""
# 模拟复杂计算
time.sleep(1)
return f"CPU 处理完成: {data}"
# 模拟 I/O 密集型任务
async def io_intensive_task(data):
"""模拟 I/O 密集型任务"""
# 模拟网络请求
await asyncio.sleep(1)
return f"I/O 处理完成: {data}"
# 流式处理多个任务
async def streaming_multiple_tasks():
print("=== 流式处理多个任务 ===")
# 创建任务列表
tasks = [
cpu_intensive_task("数据1"),
cpu_intensive_task("数据2"),
cpu_intensive_task("数据3")
]
print("1. 传统批量处理:")
start_time = time.time()
results = []
for task in tasks:
result = task # 实际中这里会调用函数
results.append(result)
end_time = time.time()
print(f" 批量处理完成,耗时: {end_time - start_time:.2f} 秒")
print("\n2. 流式处理:")
start_time = time.time()
print(" 流式处理结果:")
for i, task in enumerate(tasks, 1):
result = task # 实际中这里会调用函数
print(f" 任务 {i} 完成: {result}")
# 模拟任务完成后的处理时间
time.sleep(0.1)
end_time = time.time()
print(f" 流式处理完成,耗时: {end_time - start_time:.2f} 秒")
# 异步流式处理
async def async_streaming_demo():
print("\n=== 异步流式处理 ===")
# 创建异步任务
async_tasks = [
io_intensive_task("数据A"),
io_intensive_task("数据B"),
io_intensive_task("数据C")
]
print("异步流式处理结果:")
# 并发执行所有任务
for i, task in enumerate(async_tasks, 1):
result = await task
print(f" 任务 {i} 完成: {result}")
# 运行资源优化演示
async def run_resource_optimization_demo():
streaming_multiple_tasks()
await async_streaming_demo()
# asyncio.run(run_resource_optimization_demo())
2.3.3 实时交互能力
# real_time_interaction_demo.py
import asyncio
from langchain_core.runnables import RunnableLambda
# 模拟实时数据生成器
def data_generator():
"""模拟实时数据生成"""
import random
data_points = ["用户登录", "页面浏览", "商品搜索", "加入购物车", "完成购买"]
for point in data_points:
yield {
"event": point,
"timestamp": f"{random.randint(1, 12)}:{random.randint(0, 59):02d}",
"user_id": f"user_{random.randint(1000, 9999)}"
}
asyncio.sleep(0.5) # 模拟数据生成间隔
# 实时数据分析器
def real_time_analyzer(data_stream):
"""实时分析数据流"""
for data in data_stream:
# 模拟分析过程
analysis = f"[{data['timestamp']}] {data['user_id']} 执行了 {data['event']}"
yield analysis
# 创建 Runnable 组件
generator = RunnableLambda(data_generator)
analyzer = RunnableLambda(real_time_analyzer)
def real_time_interaction_example():
print("=== 实时交互能力演示 ===")
print("实时用户行为分析:")
# 模拟实时数据流处理
data_stream = data_generator()
analysis_stream = real_time_analyzer(data_stream)
for analysis in analysis_stream:
print(f" {analysis}")
# 实时聊天机器人模拟
async def chatbot_streaming_demo():
print("\n=== 实时聊天机器人模拟 ===")
# 模拟用户输入
user_messages = [
"你好!",
"你能告诉我天气怎么样吗?",
"谢谢你的帮助!"
]
# 模拟机器人的流式响应
bot_responses = [
"你好!很高兴见到你。 ",
"我是一个AI助手,无法获取实时天气信息,但你可以查询天气应用。 ",
"不客气!随时欢迎你提出问题。 "
]
for user_msg, bot_response in zip(user_messages, bot_responses):
print(f"用户: {user_msg}")
print("机器人: ", end="", flush=True)
# 流式输出机器人的响应
for char in bot_response:
print(char, end="", flush=True)
await asyncio.sleep(0.1) # 模拟打字效果
print() # 换行
# 运行实时交互演示
real_time_interaction_example()
# asyncio.run(chatbot_streaming_demo())
3. 并行运行多条链
3.1 并行处理的概念
并行处理是指同时执行多个任务以提高整体处理效率的技术。在 LangChain 中,并行处理可以通过 RunnableParallel 组件实现,它允许同时运行多个 Chain 并将结果合并。
并行处理的核心优势在于:
- 时间效率 - 多个任务同时执行,减少总处理时间
- 资源利用 - 充分利用系统资源,提高吞吐量
- 用户体验 - 减少用户等待时间,提升满意度
3.2 并行链的实现方法
LangChain 提供了多种实现并行处理的方法,其中最常用的是 RunnableParallel:
# parallel_processing_basics.py
import os
import asyncio
import time
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnableLambda
# 加载环境变量
load_dotenv()
# 创建语言模型实例
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
# 基础并行处理示例
def basic_parallel_example():
print("=== 基础并行处理示例 ===")
# 创建多个不同的 Chain
translation_chain = (
ChatPromptTemplate.from_template("将以下文本翻译成英文: {text}")
| llm
| StrOutputParser()
)
summary_chain = (
ChatPromptTemplate.from_template("为以下文本生成摘要: {text}")
| llm
| StrOutputParser()
)
sentiment_chain = (
ChatPromptTemplate.from_template("分析以下文本的情感倾向: {text}")
| llm
| StrOutputParser()
)
# 使用 RunnableParallel 并行执行多个 Chain
parallel_chain = RunnableParallel(
translation=translation_chain,
summary=summary_chain,
sentiment=sentiment_chain
)
# 输入文本
input_text = "人工智能是计算机科学的一个重要分支,它致力于创建能够执行通常需要人类智能的任务的系统。"
print(f"输入文本: {input_text}")
print("并行处理中...")
# 执行并行处理
start_time = time.time()
results = parallel_chain.invoke({"text": input_text})
end_time = time.time()
print(f"处理完成,耗时: {end_time - start_time:.2f} 秒")
print("处理结果:")
print(f" 翻译: {results['translation']}")
print(f" 摘要: {results['summary']}")
print(f" 情感: {results['sentiment']}")
# 异步并行处理示例
async def async_parallel_example():
print("\n=== 异步并行处理示例 ===")
# 创建异步 Chain
async def async_translation(input_dict):
# 模拟异步翻译
await asyncio.sleep(1)
return f"Async translation of: {input_dict['text']}"
async def async_summary(input_dict):
# 模拟异步摘要
await asyncio.sleep(1)
return f"Async summary of: {input_dict['text']}"
async def async_sentiment(input_dict):
# 模拟异步情感分析
await asyncio.sleep(1)
return f"Async sentiment of: {input_dict['text']}"
# 创建异步 Runnable
async_parallel_chain = RunnableParallel(
translation=RunnableLambda(async_translation),
summary=RunnableLambda(async_summary),
sentiment=RunnableLambda(async_sentiment)
)
input_text = "这是一个用于异步并行处理测试的文本。"
print(f"输入文本: {input_text}")
print("异步并行处理中...")
start_time = time.time()
results = await async_parallel_chain.ainvoke({"text": input_text})
end_time = time.time()
print(f"异步处理完成,耗时: {end_time - start_time:.2f} 秒")
print("处理结果:")
for key, value in results.items():
print(f" {key}: {value}")
# 批量并行处理示例
def batch_parallel_example():
print("\n=== 批量并行处理示例 ===")
# 创建用于处理多个输入的 Chain
analysis_chain = (
ChatPromptTemplate.from_template("分析以下文本: {text}")
| llm
| StrOutputParser()
)
# 准备批量输入数据
batch_inputs = [
{"text": "人工智能在医疗领域的应用"},
{"text": "机器学习的基本原理"},
{"text": "自然语言处理的发展趋势"}
]
print("批量并行处理中...")
start_time = time.time()
# 使用 batch 方法并行处理多个输入
results = analysis_chain.batch(batch_inputs)
end_time = time.time()
print(f"批量处理完成,耗时: {end_time - start_time:.2f} 秒")
print("处理结果:")
for i, result in enumerate(results, 1):
print(f" 输入 {i}: {batch_inputs[i-1]['text']}")
print(f" 结果 {i}: {result}")
print()
# 复杂并行处理示例
def complex_parallel_example():
print("=== 复杂并行处理示例 ===")
# 创建多个复杂的 Chain
technical_analysis_chain = (
ChatPromptTemplate.from_template("从技术角度分析: {topic}")
| llm
| StrOutputParser()
)
business_analysis_chain = (
ChatPromptTemplate.from_template("从业务角度分析: {topic}")
| llm
| StrOutputParser()
)
market_analysis_chain = (
ChatPromptTemplate.from_template("从市场角度分析: {topic}")
| llm
| StrOutputParser()
)
# 创建嵌套的并行处理结构
multi_dimension_analysis = RunnableParallel(
technical=technical_analysis_chain,
business=business_analysis_chain,
market=market_analysis_chain
)
# 创建最终报告生成 Chain
report_chain = (
ChatPromptTemplate.from_template("""
基于以下分析结果生成综合报告:
技术分析: {technical}
业务分析: {business}
市场分析: {market}
""")
| llm
| StrOutputParser()
)
# 组合完整的并行处理流程
complete_analysis_chain = multi_dimension_analysis | report_chain
topic = "区块链技术在金融领域的应用"
print(f"分析主题: {topic}")
print("多维度并行分析中...")
start_time = time.time()
final_report = complete_analysis_chain.invoke({"topic": topic})
end_time = time.time()
print(f"分析完成,耗时: {end_time - start_time:.2f} 秒")
print("最终报告:")
print(final_report)
# 带错误处理的并行处理
def parallel_with_error_handling():
print("\n=== 带错误处理的并行处理 ===")
# 创建可能出错的 Chain
def risky_operation(input_dict):
if "error" in input_dict.get("text", ""):
raise Exception("模拟的处理错误")
return f"处理成功: {input_dict['text']}"
# 创建安全的 Chain
def safe_operation(input_dict):
return f"安全处理: {input_dict['text']}"
# 创建并行处理 Chain
error_handling_parallel = RunnableParallel(
risky=RunnableLambda(risky_operation),
safe=RunnableLambda(safe_operation)
)
# 测试正常情况
print("1. 正常处理:")
try:
results = error_handling_parallel.invoke({"text": "正常文本"})
print(" 处理结果:")
for key, value in results.items():
print(f" {key}: {value}")
except Exception as e:
print(f" 发生错误: {e}")
# 测试错误情况
print("\n2. 错误处理:")
try:
results = error_handling_parallel.invoke({"text": "包含error的文本"})
print(" 处理结果:")
for key, value in results.items():
print(f" {key}: {value}")
except Exception as e:
print(f" 捕获到错误: {e}")
print(" 注意: 在实际应用中,应该为每个 Chain 实现独立的错误处理")
# 运行所有并行处理示例
if __name__ == "__main__":
basic_parallel_example()
# asyncio.run(async_parallel_example())
batch_parallel_example()
complex_parallel_example()
parallel_with_error_handling()
3.3 适用场景与示例
并行处理在多种场景下都非常有用,以下是一些典型的应用场景:
3.3.1 文档多维度分析
# document_multi_analysis.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel
# 加载环境变量
load_dotenv()
# 创建语言模型实例
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.3
)
def document_multi_dimensional_analysis():
print("=== 文档多维度分析 ===")
# 创建不同维度的分析 Chain
content_summary_chain = (
ChatPromptTemplate.from_template("为以下文档生成内容摘要:\n\n{document}")
| llm
| StrOutputParser()
)
key_points_chain = (
ChatPromptTemplate.from_template("提取以下文档的5个关键要点:\n\n{document}")
| llm
| StrOutputParser()
)
sentiment_analysis_chain = (
ChatPromptTemplate.from_template("分析以下文档的整体情感倾向:\n\n{document}")
| llm
| StrOutputParser()
)
technical_terms_chain = (
ChatPromptTemplate.from_template("提取以下文档中的专业技术术语:\n\n{document}")
| llm
| StrOutputParser()
)
readability_analysis_chain = (
ChatPromptTemplate.from_template("分析以下文档的可读性水平:\n\n{document}")
| llm
| StrOutputParser()
)
# 并行执行所有分析
comprehensive_analysis = RunnableParallel(
summary=content_summary_chain,
key_points=key_points_chain,
sentiment=sentiment_analysis_chain,
technical_terms=technical_terms_chain,
readability=readability_analysis_chain
)
# 示例文档
sample_document = """
人工智能(Artificial Intelligence,AI)是计算机科学的一个分支,它企图了解智能的实质,
并生产出一种新的能以人类智能相似的方式做出反应的智能机器。该领域的研究包括机器人、
语言识别、图像识别、自然语言处理和专家系统等。
人工智能从诞生以来,理论和技术日益成熟,应用领域也不断扩大。可以设想,未来人工智能
带来的科技产品,将会是人类智慧的"容器"。人工智能可以对人的意识、思维的信息过程的模拟。
人工智能不是人的智能,但能像人那样思考、也可能超过人的智能。
近年来,随着深度学习技术的发展,人工智能在图像识别、语音识别、自然语言处理等领域
取得了突破性进展。特别是在大语言模型方面,如GPT系列、BERT等模型的出现,使得机器在
理解和生成自然语言方面的能力大幅提升。
"""
print("开始对文档进行多维度分析...")
# 执行并行分析
results = comprehensive_analysis.invoke({"document": sample_document})
print("分析结果:")
print("="*50)
print(f"内容摘要:\n{results['summary']}\n")
print(f"关键要点:\n{results['key_points']}\n")
print(f"情感倾向:\n{results['sentiment']}\n")
print(f"技术术语:\n{results['technical_terms']}\n")
print(f"可读性分析:\n{results['readability']}\n")
# 运行文档多维度分析
document_multi_dimensional_analysis()
3.3.2 多语言翻译系统
# multi_language_translation.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel
# 加载环境变量
load_dotenv()
# 创建语言模型实例
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.3
)
def multi_language_translation_system():
print("=== 多语言翻译系统 ===")
# 创建针对不同语言的翻译 Chain
english_translation_chain = (
ChatPromptTemplate.from_template("将以下中文文本翻译成英文:\n\n{text}")
| llm
| StrOutputParser()
)
japanese_translation_chain = (
ChatPromptTemplate.from_template("将以下中文文本翻译成日文:\n\n{text}")
| llm
| StrOutputParser()
)
korean_translation_chain = (
ChatPromptTemplate.from_template("将以下中文文本翻译成韩文:\n\n{text}")
| llm
| StrOutputParser()
)
french_translation_chain = (
ChatPromptTemplate.from_template("将以下中文文本翻译成法文:\n\n{text}")
| llm
| StrOutputParser()
)
german_translation_chain = (
ChatPromptTemplate.from_template("将以下中文文本翻译成德文:\n\n{text}")
| llm
| StrOutputParser()
)
# 并行执行所有翻译
multi_language_translator = RunnableParallel(
english=english_translation_chain,
japanese=japanese_translation_chain,
korean=korean_translation_chain,
french=french_translation_chain,
german=german_translation_chain
)
# 待翻译文本
chinese_text = "人工智能是现代科技发展的重要方向,它正在改变我们的生活方式和工作方式。"
print(f"原文: {chinese_text}")
print("正在进行多语言翻译...")
# 执行并行翻译
translations = multi_language_translator.invoke({"text": chinese_text})
print("翻译结果:")
print("="*50)
for language, translation in translations.items():
print(f"{language.upper()}: {translation}")
# 运行多语言翻译系统
multi_language_translation_system()
3.3.3 股票多维度分析
# stock_multi_analysis.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel
# 加载环境变量
load_dotenv()
# 创建语言模型实例
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.3
)
def stock_comprehensive_analysis():
print("=== 股票多维度分析系统 ===")
# 技术分析 Chain
technical_analysis_chain = (
ChatPromptTemplate.from_template("""
基于以下股票数据进行技术分析:
股票代码: {symbol}
当前价格: {price}
技术指标: {technical_indicators}
请分析技术面趋势和交易信号。
""")
| llm
| StrOutputParser()
)
# 基本面分析 Chain
fundamental_analysis_chain = (
ChatPromptTemplate.from_template("""
基于以下股票数据进行基本面分析:
股票代码: {symbol}
财务数据: {financial_data}
行业信息: {industry_info}
请分析公司基本面和投资价值。
""")
| llm
| StrOutputParser()
)
# 市场情绪分析 Chain
sentiment_analysis_chain = (
ChatPromptTemplate.from_template("""
基于以下信息分析市场对该股票的情绪:
股票代码: {symbol}
新闻舆情: {news_sentiment}
社交媒体讨论: {social_media_trend}
请分析市场情绪和潜在影响。
""")
| llm
| StrOutputParser()
)
# 风险评估 Chain
risk_assessment_chain = (
ChatPromptTemplate.from_template("""
基于以下信息评估该股票的投资风险:
股票代码: {symbol}
市场风险: {market_risk}
公司风险: {company_risk}
行业风险: {industry_risk}
请评估整体风险水平和注意事项。
""")
| llm
| StrOutputParser()
)
# 投资建议 Chain
investment_advice_chain = (
ChatPromptTemplate.from_template("""
基于以下综合分析为投资者提供建议:
股票代码: {symbol}
技术分析: {technical_analysis}
基本面分析: {fundamental_analysis}
市场情绪: {sentiment_analysis}
风险评估: {risk_assessment}
请提供投资建议和策略。
""")
| llm
| StrOutputParser()
)
# 创建并行分析 Chain
stock_analysis_parallel = RunnableParallel(
technical=technical_analysis_chain,
fundamental=fundamental_analysis_chain,
sentiment=sentiment_analysis_chain,
risk=risk_assessment_chain
)
# 创建完整分析流程
complete_stock_analysis = stock_analysis_parallel | investment_advice_chain
# 示例股票数据
stock_data = {
"symbol": "AAPL",
"price": "$150.00",
"technical_indicators": {
"RSI": 65.5,
"MACD": 2.3,
"Moving_Average_20": 145.2,
"Moving_Average_50": 140.7
},
"financial_data": {
"P/E_Ratio": 28.5,
"EPS": 6.12,
"Revenue_Growth": 0.08,
"Debt_to_Equity": 1.52
},
"industry_info": "科技硬件与设备",
"news_sentiment": "积极",
"social_media_trend": "关注度上升",
"market_risk": "中等",
"company_risk": "低",
"industry_risk": "中等"
}
print(f"正在对股票 {stock_data['symbol']} 进行多维度分析...")
# 准备分析输入
analysis_input = {
"symbol": stock_data["symbol"],
"price": stock_data["price"],
"technical_indicators": str(stock_data["technical_indicators"]),
"financial_data": str(stock_data["financial_data"]),
"industry_info": stock_data["industry_info"],
"news_sentiment": stock_data["news_sentiment"],
"social_media_trend": stock_data["social_media_trend"],
"market_risk": stock_data["market_risk"],
"company_risk": stock_data["company_risk"],
"industry_risk": stock_data["industry_risk"]
}
# 执行并行分析
intermediate_results = stock_analysis_parallel.invoke(analysis_input)
# 准备投资建议输入
advice_input = {
"symbol": stock_data["symbol"],
"technical_analysis": intermediate_results["technical"],
"fundamental_analysis": intermediate_results["fundamental"],
"sentiment_analysis": intermediate_results["sentiment"],
"risk_assessment": intermediate_results["risk"]
}
# 生成最终投资建议
final_advice = investment_advice_chain.invoke(advice_input)
print("分析结果:")
print("="*50)
print(f"技术分析:\n{intermediate_results['technical']}\n")
print(f"基本面分析:\n{intermediate_results['fundamental']}\n")
print(f"市场情绪:\n{intermediate_results['sentiment']}\n")
print(f"风险评估:\n{intermediate_results['risk']}\n")
print(f"投资建议:\n{final_advice}\n")
# 运行股票多维度分析
stock_comprehensive_analysis()
4. Runnable 组件详解
4.1 RunnablePassthrough 详解
RunnablePassthrough 是 LangChain 中一个非常有用的组件,它直接传递输入数据而不进行任何修改。这在构建复杂 Chain 时特别有用,特别是在需要保留原始输入数据的同时添加额外处理步骤的场景中。
# runnable_passthrough_detailed.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
# 加载环境变量
load_dotenv()
# 创建语言模型实例
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.3
)
def runnable_passthrough_basics():
print("=== RunnablePassthrough 基础用法 ===")
# 创建简单的 Passthrough
passthrough = RunnablePassthrough()
# 测试数据传递
test_data = {"message": "Hello, World!", "number": 42}
result = passthrough.invoke(test_data)
print(f"输入数据: {test_data}")
print(f"输出数据: {result}")
print(f"数据是否相同: {test_data == result}")
def passthrough_in_complex_chains():
print("\n=== 在复杂 Chain 中使用 Passthrough ===")
# 创建分析 Chain
analysis_chain = (
ChatPromptTemplate.from_template("分析以下文本的情感倾向: {text}")
| llm
| StrOutputParser()
)
# 使用 RunnableParallel 结合 Passthrough
complex_chain = RunnableParallel(
original=RunnablePassthrough(), # 保留原始输入
analysis=analysis_chain # 添加分析结果
)
input_data = {"text": "今天天气真好,我很开心!"}
print(f"输入文本: {input_data['text']}")
result = complex_chain.invoke(input_data)
print("处理结果:")
print(f" 原始输入: {result['original']}")
print(f" 情感分析: {result['analysis']}")
def passthrough_with_additional_processing():
print("\n=== Passthrough 与额外处理结合 ===")
# 创建多个处理 Chain
summary_chain = (
ChatPromptTemplate.from_template("为以下文本生成摘要: {content}")
| llm
| StrOutputParser()
)
keyword_chain = (
ChatPromptTemplate.from_template("提取以下文本的关键词: {content}")
| llm
| StrOutputParser()
)
# 使用 RunnableParallel 结合 Passthrough 和处理 Chain
processing_chain = RunnableParallel(
original=RunnablePassthrough(), # 保留原始内容
summary=summary_chain, # 添加摘要
keywords=keyword_chain # 添加关键词
)
# 创建最终报告 Chain
report_chain = (
ChatPromptTemplate.from_template("""
基于以下分析结果生成完整报告:
原始内容: {original}
内容摘要: {summary}
关键词: {keywords}
""")
| llm
| StrOutputParser()
)
# 组合完整 Chain
complete_chain = processing_chain | report_chain
content = "人工智能是计算机科学的一个重要分支,它致力于创建能够执行通常需要人类智能的任务的系统。近年来,随着深度学习技术的发展,人工智能在图像识别、语音识别、自然语言处理等领域取得了突破性进展。"
print(f"原始内容: {content}")
print("生成完整报告中...")
final_report = complete_chain.invoke({"content": content})
print("最终报告:")
print(final_report)
def passthrough_assign_example():
print("\n=== RunnablePassthrough.assign() 用法 ===")
from langchain_core.runnables import RunnablePassthrough
# 创建处理函数
def add_timestamp(input_data):
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def add_word_count(input_data):
return len(input_data.get("text", "").split())
# 使用 assign 方法添加新字段
enhanced_chain = RunnablePassthrough.assign(
timestamp=lambda x: add_timestamp(x),
word_count=lambda x: add_word_count(x)
)
input_data = {"text": "这是一个用于测试的示例文本,包含多个词汇。"}
print(f"原始数据: {input_data}")
enhanced_data = enhanced_chain.invoke(input_data)
print(f"增强后数据: {enhanced_data}")
def passthrough_in_data_pipeline():
print("\n=== 在数据管道中使用 Passthrough ===")
# 模拟数据处理管道
def data_validator(input_data):
"""数据验证器"""
if not input_data.get("text"):
raise ValueError("文本内容不能为空")
return input_data
def data_enricher(input_data):
"""数据增强器"""
import hashlib
text = input_data.get("text", "")
input_data["text_hash"] = hashlib.md5(text.encode()).hexdigest()
return input_data
# 创建数据处理管道
data_pipeline = (
RunnablePassthrough() | # 传递原始数据
RunnablePassthrough().assign(validated=data_validator) | # 验证数据
RunnablePassthrough().assign(enriched=data_enricher) # 增强数据
)
test_data = {"text": "测试数据管道", "source": "user_input"}
print(f"输入数据: {test_data}")
try:
processed_data = data_pipeline.invoke(test_data)
print(f"处理后数据: {processed_data}")
except Exception as e:
print(f"处理出错: {e}")
# 运行所有 Passthrough 示例
if __name__ == "__main__":
runnable_passthrough_basics()
passthrough_in_complex_chains()
passthrough_with_additional_processing()
passthrough_assign_example()
passthrough_in_data_pipeline()
4.2 RunnableLambda 详解
RunnableLambda 允许将普通的 Python 函数包装为 Runnable 组件,从而可以在 Chain 中使用。
# runnable_lambda_detailed.py
from langchain_core.runnables import RunnableLambda
import asyncio
import time
def runnable_lambda_basics():
print("=== RunnableLambda 基础用法 ===")
# 创建简单的处理函数
def text_processor(text):
return text.upper()
# 包装为 RunnableLambda
processor = RunnableLambda(text_processor)
# 测试处理
input_text = "hello world"
result = processor.invoke(input_text)
print(f"输入: {input_text}")
print(f"输出: {result}")
def lambda_with_complex_processing():
print("\n=== 复杂处理的 RunnableLambda ===")
# 创建复杂处理函数
def complex_processor(input_data):
if isinstance(input_data, str):
# 处理字符串
return {
"original": input_data,
"processed": input_data.upper(),
"length": len(input_data),
"word_count": len(input_data.split())
}
elif isinstance(input_data, dict):
# 处理字典
text = input_data.get("text", "")
return {
"original": text,
"processed": text.lower(),
"length": len(text),
"word_count": len(text.split()),
"metadata": input_data.get("metadata", {})
}
else:
return {"error": "不支持的输入类型"}
# 包装为 RunnableLambda
complex_lambda = RunnableLambda(complex_processor)
# 测试字符串输入
string_input = "Hello World from LangChain"
result1 = complex_lambda.invoke(string_input)
print(f"字符串输入处理结果: {result1}")
# 测试字典输入
dict_input = {
"text": "Hello World from LangChain",
"metadata": {"source": "user", "priority": "high"}
}
result2 = complex_lambda.invoke(dict_input)
print(f"字典输入处理结果: {result2}")
async def async_lambda_example():
print("\n=== 异步 RunnableLambda ===")
# 创建异步处理函数
async def async_processor(input_data):
# 模拟异步操作
await asyncio.sleep(1)
return f"异步处理完成: {input_data}"
# 包装为 RunnableLambda
async_lambda = RunnableLambda(async_processor)
# 测试异步处理
input_data = "异步测试数据"
print(f"开始异步处理: {input_data}")
start_time = time.time()
result = await async_lambda.ainvoke(input_data)
end_time = time.time()
print(f"异步处理结果: {result}")
print(f"处理耗时: {end_time - start_time:.2f} 秒")
def lambda_with_error_handling():
print("\n=== 带错误处理的 RunnableLambda ===")
# 创建带错误处理的函数
def safe_processor(input_data):
try:
if isinstance(input_data, str):
return input_data * 2
elif isinstance(input_data, int):
return input_data ** 2
else:
raise ValueError(f"不支持的数据类型: {type(input_data)}")
except Exception as e:
return {"error": str(e), "input": input_data}
# 包装为 RunnableLambda
safe_lambda = RunnableLambda(safe_processor)
# 测试各种输入
test_inputs = ["test", 5, [1, 2, 3], {"key": "value"}]
for input_data in test_inputs:
result = safe_lambda.invoke(input_data)
print(f"输入: {input_data} -> 输出: {result}")
def lambda_in_chain_example():
print("\n=== 在 Chain 中使用 RunnableLambda ===")
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
import os
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 创建语言模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.3
)
# 创建预处理函数
def preprocess_input(input_data):
"""预处理输入数据"""
if isinstance(input_data, str):
return {"text": input_data.strip(), "word_count": len(input_data.split())}
return input_data
# 创建后处理函数
def postprocess_output(output_data):
"""后处理输出数据"""
return {
"original_output": output_data,
"character_count": len(output_data),
"line_count": len(output_data.split('\n'))
}
# 创建 Chain
chain = (
RunnableLambda(preprocess_input) |
ChatPromptTemplate.from_template("总结以下文本: {text}") |
llm |
StrOutputParser() |
RunnableLambda(postprocess_output)
)
input_text = " 人工智能是计算机科学的一个重要分支。它致力于创建能够执行通常需要人类智能的任务的系统。 "
print(f"原始输入: '{input_text}'")
result = chain.invoke(input_text)
print("处理结果:")
for key, value in result.items():
print(f" {key}: {value}")
# 运行所有 Lambda 示例
if __name__ == "__main__":
runnable_lambda_basics()
lambda_with_complex_processing()
# asyncio.run(async_lambda_example())
lambda_with_error_handling()
lambda_in_chain_example()
4.3 RunnableParallel 详解
RunnableParallel 允许并行执行多个 Runnable 组件,并将结果合并为一个字典。
# runnable_parallel_detailed.py
from langchain_core.runnables import RunnableParallel, RunnableLambda
import time
import asyncio
def runnable_parallel_basics():
print("=== RunnableParallel 基础用法 ===")
# 创建简单的处理函数
def processor_a(input_data):
time.sleep(0.5) # 模拟处理时间
return f"A 处理结果: {input_data}"
def processor_b(input_data):
time.sleep(0.5) # 模拟处理时间
return f"B 处理结果: {input_data}"
def processor_c(input_data):
time.sleep(0.5) # 模拟处理时间
return f"C 处理结果: {input_data}"
# 创建并行处理
parallel_processors = RunnableParallel(
a=RunnableLambda(processor_a),
b=RunnableLambda(processor_b),
c=RunnableLambda(processor_c)
)
input_data = "测试数据"
print(f"输入数据: {input_data}")
print("开始并行处理...")
start_time = time.time()
results = parallel_processors.invoke(input_data)
end_time = time.time()
print(f"并行处理完成,耗时: {end_time - start_time:.2f} 秒")
print("处理结果:")
for key, value in results.items():
print(f" {key}: {value}")
async def async_parallel_example():
print("\n=== 异步 RunnableParallel ===")
# 创建异步处理函数
async def async_processor_a(input_data):
await asyncio.sleep(0.5) # 模拟异步处理
return f"异步 A 处理结果: {input_data}"
async def async_processor_b(input_data):
await asyncio.sleep(0.5) # 模拟异步处理
return f"异步 B 处理结果: {input_data}"
async def async_processor_c(input_data):
await asyncio.sleep(0.5) # 模拟异步处理
return f"异步 C 处理结果: {input_data}"
# 创建异步并行处理
async_parallel = RunnableParallel(
a=RunnableLambda(async_processor_a),
b=RunnableLambda(async_processor_b),
c=RunnableLambda(async_processor_c)
)
input_data = "异步测试数据"
print(f"输入数据: {input_data}")
print("开始异步并行处理...")
start_time = time.time()
results = await async_parallel.ainvoke(input_data)
end_time = time.time()
print(f"异步并行处理完成,耗时: {end_time - start_time:.2f} 秒")
print("处理结果:")
for key, value in results.items():
print(f" {key}: {value}")
def parallel_with_different_input_types():
print("\n=== 处理不同类型输入的并行处理 ===")
# 创建处理不同输入类型的函数
def text_analyzer(input_data):
if isinstance(input_data, dict) and "text" in input_data:
text = input_data["text"]
return {
"length": len(text),
"word_count": len(text.split()),
"char_count": len(text.replace(" ", ""))
}
return {"error": "无效输入"}
def sentiment_analyzer(input_data):
if isinstance(input_data, dict) and "text" in input_data:
# 简单的情感分析模拟
text = input_data["text"].lower()
positive_words = ["好", "棒", "优秀", "开心", "高兴"]
negative_words = ["坏", "差", "糟糕", "难过", "沮丧"]
positive_count = sum(1 for word in positive_words if word in text)
negative_count = sum(1 for word in negative_words if word in text)
if positive_count > negative_count:
return "积极"
elif negative_count > positive_count:
return "消极"
else:
return "中性"
return "无法分析"
def keyword_extractor(input_data):
if isinstance(input_data, dict) and "text" in input_data:
text = input_data["text"]
# 简单的关键词提取模拟
words = text.split()
return words[:5] # 返回前5个词
return []
# 创建并行分析器
text_analysis_parallel = RunnableParallel(
statistics=RunnableLambda(text_analyzer),
sentiment=RunnableLambda(sentiment_analyzer),
keywords=RunnableLambda(keyword_extractor)
)
# 测试数据
test_input = {
"text": "今天天气真好,我很开心!这是一个很棒的日子,让我感到非常高兴。",
"source": "user_input"
}
print(f"输入文本: {test_input['text']}")
print("开始并行文本分析...")
results = text_analysis_parallel.invoke(test_input)
print("分析结果:")
print(f" 统计信息: {results['statistics']}")
print(f" 情感分析: {results['sentiment']}")
print(f" 关键词提取: {results['keywords']}")
def nested_parallel_example():
print("\n=== 嵌套并行处理 ===")
# 创建第一层处理函数
def level_one_a(input_data):
return f"Level 1-A: {input_data}"
def level_one_b(input_data):
return f"Level 1-B: {input_data}"
# 创建第二层处理函数
def level_two_a(input_data):
return f"Level 2-A: {input_data}"
def level_two_b(input_data):
return f"Level 2-B: {input_data}"
# 创建嵌套并行结构
nested_parallel = RunnableParallel(
first_level=RunnableParallel(
a=RunnableLambda(level_one_a),
b=RunnableLambda(level_one_b)
),
second_level=RunnableParallel(
a=RunnableLambda(level_two_a),
b=RunnableLambda(level_two_b)
)
)
input_data = "嵌套测试"
print(f"输入数据: {input_data}")
results = nested_parallel.invoke(input_data)
print("嵌套并行处理结果:")
print(f" 第一层:")
print(f" A: {results['first_level']['a']}")
print(f" B: {results['first_level']['b']}")
print(f" 第二层:")
print(f" A: {results['second_level']['a']}")
print(f" B: {results['second_level']['b']}")
def parallel_with_error_handling():
print("\n=== 带错误处理的并行处理 ===")
# 创建可能出错的处理函数
def risky_processor(input_data):
if "error" in str(input_data).lower():
raise Exception("模拟处理错误")
return f"成功处理: {input_data}"
def safe_processor(input_data):
return f"安全处理: {input_data}"
# 创建带错误处理的并行处理
safe_parallel = RunnableParallel(
risky=RunnableLambda(risky_processor),
safe=RunnableLambda(safe_processor)
)
# 测试正常情况
print("1. 正常处理:")
try:
results = safe_parallel.invoke("正常数据")
print(" 处理结果:")
for key, value in results.items():
print(f" {key}: {value}")
except Exception as e:
print(f" 发生错误: {e}")
# 测试错误情况
print("\n2. 错误处理:")
try:
results = safe_parallel.invoke("包含 ERROR 的数据")
print(" 处理结果:")
for key, value in results.items():
print(f" {key}: {value}")
except Exception as e:
print(f" 捕获到错误: {e}")
print(" 注意: 在实际应用中,应该为每个处理器实现独立的错误处理")
# 运行所有并行处理示例
if __name__ == "__main__":
runnable_parallel_basics()
# asyncio.run(async_parallel_example())
parallel_with_different_input_types()
nested_parallel_example()
parallel_with_error_handling()
5. 迁移与核心组件对比
5.1 传统 Chain 与 LCEL 对比
随着 LangChain 的发展,新的 LangChain Expression Language (LCEL) 提供了更简洁、更强大的方式来构建 Chain。让我们通过具体示例来对比传统方式和新方式:
# chain_migration_comparison.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 加载环境变量
load_dotenv()
# 创建语言模型实例
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.3
)
def traditional_chain_example():
print("=== 传统 Chain 方式 ===")
# 传统方式:使用 LLMChain
from langchain.chains import LLMChain
# 创建提示词模板
prompt = ChatPromptTemplate.from_template("解释{concept}的概念")
# 创建传统 Chain
traditional_chain = LLMChain(
llm=llm,
prompt=prompt,
output_parser=StrOutputParser()
)
# 使用传统 Chain
result = traditional_chain.invoke({"concept": "人工智能"})
print(f"传统 Chain 结果: {result}")
def lcel_example():
print("\n=== LCEL 方式 ===")
# LCEL 方式:使用管道符
prompt = ChatPromptTemplate.from_template("解释{concept}的概念")
# 创建 LCEL Chain
lcel_chain = prompt | llm | StrOutputParser()
# 使用 LCEL Chain
result = lcel_chain.invoke({"concept": "人工智能"})
print(f"LCEL Chain 结果: {result}")
def complex_traditional_chain():
print("\n=== 复杂传统 Chain ===")
from langchain.chains import SequentialChain, LLMChain
# 创建多个传统 Chain
prompt1 = ChatPromptTemplate.from_template("列出{topic}的5个要点")
chain1 = LLMChain(llm=llm, prompt=prompt1, output_key="points")
prompt2 = ChatPromptTemplate.from_template("详细解释以下要点:\n{points}")
chain2 = LLMChain(llm=llm, prompt=prompt2, output_key="explanation")
# 组合传统 Chain
complex_traditional = SequentialChain(
chains=[chain1, chain2],
input_variables=["topic"],
output_variables=["explanation"]
)
result = complex_traditional.invoke({"topic": "机器学习"})
print(f"复杂传统 Chain 结果长度: {len(result['explanation'])} 字符")
def complex_lcel_chain():
print("\n=== 复杂 LCEL Chain ===")
# 创建多个 LCEL 组件
prompt1 = ChatPromptTemplate.from_template("列出{topic}的5个要点")
prompt2 = ChatPromptTemplate.from_template("详细解释以下要点:\n{points}")
# 使用 LCEL 组合
complex_lcel = (
{"points": prompt1 | llm | StrOutputParser()} |
prompt2 |
llm |
StrOutputParser()
)
result = complex_lcel.invoke({"topic": "机器学习"})
print(f"复杂 LCEL Chain 结果长度: {len(result)} 字符")
def parallel_traditional_chain():
print("\n=== 传统并行 Chain ===")
from langchain.chains import SequentialChain, LLMChain
# 创建多个传统 Chain
translation_prompt = ChatPromptTemplate.from_template("将以下文本翻译成英文: {text}")
translation_chain = LLMChain(llm=llm, prompt=translation_prompt, output_key="translation")
summary_prompt = ChatPromptTemplate.from_template("为以下文本生成摘要: {text}")
summary_chain = LLMChain(llm=llm, prompt=summary_prompt, output_key="summary")
# 传统方式实现并行较复杂,通常需要手动实现
def traditional_parallel(input_data):
text = input_data["text"]
# 手动并行执行
translation_result = translation_chain.invoke({"text": text})
summary_result = summary_chain.invoke({"text": text})
return {
"translation": translation_result["translation"],
"summary": summary_result["summary"]
}
input_text = "人工智能是计算机科学的一个重要分支。"
result = traditional_parallel({"text": input_text})
print("传统并行 Chain 结果:")
print(f" 翻译: {result['translation']}")
print(f" 摘要: {result['summary']}")
def parallel_lcel_chain():
print("\n=== LCEL 并行 Chain ===")
from langchain_core.runnables import RunnableParallel
# 创建 LCEL 组件
translation_prompt = ChatPromptTemplate.from_template("将以下文本翻译成英文: {text}")
summary_prompt = ChatPromptTemplate.from_template("为以下文本生成摘要: {text}")
# 使用 LCEL 实现并行非常简单
parallel_lcel = RunnableParallel(
translation=translation_prompt | llm | StrOutputParser(),
summary=summary_prompt | llm | StrOutputParser()
)
input_text = "人工智能是计算机科学的一个重要分支。"
result = parallel_lcel.invoke({"text": input_text})
print("LCEL 并行 Chain 结果:")
print(f" 翻译: {result['translation']}")
print(f" 摘要: {result['summary']}")
# 运行对比示例
if __name__ == "__main__":
traditional_chain_example()
lcel_example()
complex_traditional_chain()
complex_lcel_chain()
parallel_traditional_chain()
parallel_lcel_chain()
5.2 LCEL 的主要优势
LCEL (LangChain Expression Language) 相比传统 Chain 方式具有以下三个主要优势:
5.2.1 语法简洁性
# lcel_advantages.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel
# 加载环境变量
load_dotenv()
# 创建语言模型实例
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.3
)
def syntax_simplicity_demo():
print("=== 语法简洁性对比 ===")
# 传统方式 - 冗长的代码
print("1. 传统方式:")
print(" ```python")
print(" from langchain.chains import LLMChain")
print(" from langchain.prompts import ChatPromptTemplate")
print(" from langchain.output_parsers import StrOutputParser")
print(" ")
print(" prompt = ChatPromptTemplate.from_template('解释{concept}')")
print(" chain = LLMChain(")
print(" llm=llm,")
print(" prompt=prompt,")
print(" output_parser=StrOutputParser()")
print(" )")
print(" result = chain.invoke({'concept': 'AI'})")
print(" ```")
# LCEL 方式 - 简洁的代码
print("\n2. LCEL 方式:")
print(" ```python")
print(" prompt = ChatPromptTemplate.from_template('解释{concept}')")
print(" chain = prompt | llm | StrOutputParser()")
print(" result = chain.invoke({'concept': 'AI'})")
print(" ```")
print("\n优势: LCEL 使用管道符(|)连接组件,代码更简洁易读")
def composability_demo():
print("\n=== 组合性优势 ===")
# 创建基础组件
prompt1 = ChatPromptTemplate.from_template("列出{topic}的要点")
prompt2 = ChatPromptTemplate.from_template("详细解释: {points}")
prompt3 = ChatPromptTemplate.from_template("总结: {explanation}")
# LCEL 的强大组合能力
complex_chain = (
{"points": prompt1 | llm | StrOutputParser()} |
prompt2 |
llm |
StrOutputParser() |
(lambda x: {"explanation": x}) |
prompt3 |
llm |
StrOutputParser()
)
print("LCEL 支持复杂的组合:")
print("1. 顺序组合: component1 | component2 | component3")
print("2. 并行组合: RunnableParallel(a=chain1, b=chain2)")
print("3. 数据转换: (lambda x: {'key': x})")
print("4. 条件分支: RunnableBranch(condition, chain1, chain2)")
print("5. 循环处理: 自定义迭代逻辑")
# 演示复杂 Chain 的执行
print("\n执行复杂 Chain 示例:")
try:
result = complex_chain.invoke({"topic": "深度学习"})
print(f"执行成功,结果长度: {len(result)} 字符")
except Exception as e:
print(f"执行出错: {e}")
def uniform_interface_demo():
print("\n=== 统一接口优势 ===")
from langchain_core.runnables import RunnableLambda
# 所有 Runnable 组件都有统一的接口
def custom_processor(input_data):
return f"处理结果: {input_data}"
# 不同类型的 Runnable 组件
llm_runnable = llm
prompt_runnable = ChatPromptTemplate.from_template("处理: {input}")
lambda_runnable = RunnableLambda(custom_processor)
# 它们都支持相同的方法
runnables = [
("LLM", llm_runnable),
("Prompt", prompt_runnable),
("Lambda", lambda_runnable)
]
print("所有 Runnable 组件支持统一接口:")
print("1. invoke(input) - 单次调用")
print("2. batch(inputs) - 批量处理")
print("3. stream(input) - 流式处理")
print("4. ainvoke(input) - 异步单次调用")
print("5. abatch(inputs) - 异步批量处理")
print("6. astream(input) - 异步流式处理")
# 演示统一接口
test_input = "测试数据"
print(f"\n使用相同输入 '{test_input}' 测试不同组件:")
for name, runnable in runnables:
try:
if name == "LLM":
# LLM 需要消息格式
from langchain_core.messages import HumanMessage
result = runnable.invoke([HumanMessage(content=test_input)])
print(f" {name}: {type(result).__name__}")
elif name == "Prompt":
result = runnable.invoke({"input": test_input})
print(f" {name}: {type(result).__name__}")
else:
result = runnable.invoke(test_input)
print(f" {name}: {result}")
except Exception as e:
print(f" {name}: 执行出错 - {e}")
# 运行 LCEL 优势演示
if __name__ == "__main__":
syntax_simplicity_demo()
composability_demo()
uniform_interface_demo()
6. 总结
在本指南中,我们深入探讨了 LangChain 中的流式调用、并行处理和 Runnable 组件等核心概念和实践方法。
主要学习内容回顾
-
流式调用
- 理解了流式调用的概念和工作原理
- 掌握了同步和异步流式调用的实现方法
- 认识到流式调用在提升用户体验方面的重要作用
-
并行处理
- 学会了使用
RunnableParallel实现并行处理 - 了解了并行处理在提高效率方面的优势
- 通过多个实际场景示例加深了理解
- 学会了使用
-
Runnable 组件
- 详细了解了
RunnablePassthrough、RunnableLambda和RunnableParallel的使用 - 掌握了这些组件在构建复杂应用中的重要作用
- 详细了解了
-
迁移与对比
- 对比了传统 Chain 方式和 LCEL 方式的差异
- 理解了 LCEL 在语法简洁性、组合性和统一接口方面的优势
实践建议
在实际项目中应用这些技术时,建议:
-
合理使用流式调用
- 在需要实时反馈的场景中优先考虑流式调用
- 注意处理流式调用中的错误和异常情况
-
优化并行处理
- 识别可以并行执行的任务以提高效率
- 注意控制并行度以避免资源竞争
-
充分利用 Runnable 组件
- 使用
RunnablePassthrough保留原始数据 - 通过
RunnableLambda集成自定义处理逻辑 - 利用
RunnableParallel实现并行处理
- 使用
-
采用 LCEL 方式
- 优先使用 LCEL 构建 Chain 以获得更好的可读性和可维护性
- 充分利用 LCEL 强大的组合能力
更多推荐
所有评论(0)