9-2. LangChain 流式调用、并行处理与 Runnable 组件详解

目录

  1. 简介
  2. 流式调用详解
  3. 并行运行多条链
  4. Runnable 组件详解
  5. 迁移与核心组件对比
  6. 总结

1. 简介

LangChain 作为构建语言模型应用的强大框架,提供了多种机制来优化应用性能和用户体验。其中,流式调用和并行处理是两个重要的特性,它们能够显著提升应用的响应速度和用户体验。同时,Runnable 组件作为 LangChain 的核心构建块,为开发者提供了灵活且强大的组合能力。

本指南将深入探讨以下关键主题:

  1. 流式调用 - 实现实时输出,提升用户体验
  2. 并行处理 - 同时运行多个任务,提高处理效率
  3. Runnable 组件 - 核心构建块的使用方法
  4. 迁移指南 - 从传统 Chain 到 LCEL 的迁移
  5. 实际应用 - 通过代码示例展示各种技术的实际应用

通过本指南的学习,您将能够:

  • 理解并实现流式调用,提升应用的响应性
  • 掌握并行处理技术,优化应用性能
  • 熟练使用各种 Runnable 组件构建复杂应用
  • 顺利完成从传统 Chain 到 LCEL 的迁移
  • 在实际项目中应用这些技术解决实际问题

2. 流式调用详解

2.1 流式调用的概念

流式调用(Streaming)是一种数据处理方式,它允许在数据生成的过程中逐步返回结果,而不是等待所有处理完成后再一次性返回。在 LangChain 中,流式调用特别适用于与大语言模型的交互,因为它可以让用户在模型生成响应的过程中实时看到输出内容。

传统的同步调用模式下,用户需要等待整个响应生成完毕后才能看到结果,这在处理长文本或复杂任务时可能导致较长的等待时间。而流式调用则可以逐步返回生成的内容,让用户能够立即看到部分结果,从而显著改善用户体验。

流式调用的工作原理如下:

  1. 用户发起请求
  2. 系统开始处理请求并生成响应
  3. 在生成过程中,系统逐步将已生成的部分内容返回给用户
  4. 用户可以实时看到内容的生成过程
  5. 当所有内容生成完毕后,流式调用结束

2.2 流式调用的实现

在 LangChain 中,流式调用通过 stream() 方法实现。让我们通过具体代码示例来演示:

# streaming_basics.py
import os
import asyncio
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 加载环境变量
load_dotenv()

# 创建语言模型实例
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# 创建提示词模板
prompt = ChatPromptTemplate.from_template(
    "请详细解释{topic}的概念,并提供实际应用示例。"
)

# 创建 Chain
chain = prompt | llm | StrOutputParser()

# 同步流式调用示例
def sync_streaming_example():
    print("=== 同步流式调用示例 ===")
    print("问题: 请详细解释人工智能的概念,并提供实际应用示例。")
    print("回答: ", end="", flush=True)
    
    # 使用 stream() 方法进行流式调用
    for chunk in chain.stream({"topic": "人工智能"}):
        print(chunk, end="", flush=True)
    
    print("\n" + "="*50)

# 异步流式调用示例
async def async_streaming_example():
    print("=== 异步流式调用示例 ===")
    print("问题: 请详细解释机器学习的概念,并提供实际应用示例。")
    print("回答: ", end="", flush=True)
    
    # 使用 astream() 方法进行异步流式调用
    async for chunk in chain.astream({"topic": "机器学习"}):
        print(chunk, end="", flush=True)
    
    print("\n" + "="*50)

# 流式调用与回调函数结合
def streaming_with_callback():
    print("=== 流式调用与回调函数结合 ===")
    
    # 定义回调函数
    def on_token(token):
        print(token, end="", flush=True)
    
    def on_complete():
        print("\n生成完成!")
    
    print("问题: 请解释深度学习的基本原理。")
    print("回答: ", end="", flush=True)
    
    # 流式调用并处理每个 token
    try:
        for chunk in chain.stream({"topic": "深度学习的基本原理"}):
            on_token(chunk)
    except Exception as e:
        print(f"\n发生错误: {e}")
    finally:
        on_complete()

# 复杂流式调用示例
async def complex_streaming_example():
    print("=== 复杂流式调用示例 ===")
    
    # 创建更复杂的 Chain
    complex_chain = (
        ChatPromptTemplate.from_messages([
            ("system", "你是一个专业的技术顾问,擅长解释复杂的技术概念。"),
            ("human", "请详细解释{topic},包括其历史发展、核心技术、应用场景和未来趋势。")
        ])
        | llm
        | StrOutputParser()
    )
    
    print("问题: 请详细解释自然语言处理,包括其历史发展、核心技术、应用场景和未来趋势。")
    print("回答: ", end="", flush=True)
    
    # 异步流式调用
    async for chunk in complex_chain.astream({"topic": "自然语言处理"}):
        print(chunk, end="", flush=True)
    
    print("\n" + "="*50)

# 带有错误处理的流式调用
def streaming_with_error_handling():
    print("=== 带有错误处理的流式调用 ===")
    
    # 创建可能出错的 Chain(使用无效的模型)
    error_chain = (
        ChatPromptTemplate.from_template("请解释{topic}")
        | ChatOpenAI(model="invalid-model-name", api_key="invalid-key")
        | StrOutputParser()
    )
    
    print("问题: 请解释量子计算。")
    print("回答: ", end="", flush=True)
    
    try:
        for chunk in error_chain.stream({"topic": "量子计算"}):
            print(chunk, end="", flush=True)
    except Exception as e:
        print(f"\n发生错误: {e}")
    
    print("="*50)

# 流式调用性能对比
def streaming_performance_comparison():
    print("=== 流式调用性能对比 ===")
    
    import time
    
    # 同步调用
    print("1. 同步调用(等待完整响应):")
    start_time = time.time()
    result = chain.invoke({"topic": "区块链技术"})
    end_time = time.time()
    print(f"   耗时: {end_time - start_time:.2f} 秒")
    print(f"   响应长度: {len(result)} 字符")
    
    # 流式调用
    print("\n2. 流式调用(实时输出):")
    start_time = time.time()
    print("   实时输出: ", end="", flush=True)
    for chunk in chain.stream({"topic": "区块链技术"}):
        print(chunk, end="", flush=True)
    end_time = time.time()
    print(f"\n   耗时: {end_time - start_time:.2f} 秒")

# 运行所有示例
if __name__ == "__main__":
    # 同步流式调用
    sync_streaming_example()
    
    # 异步流式调用
    asyncio.run(async_streaming_example())
    
    # 流式调用与回调函数结合
    streaming_with_callback()
    
    # 复杂流式调用
    asyncio.run(complex_streaming_example())
    
    # 带错误处理的流式调用
    streaming_with_error_handling()
    
    # 性能对比
    streaming_performance_comparison()

2.3 流式调用的优势

流式调用在实际应用中具有以下显著优势:

2.3.1 用户体验提升
# user_experience_demo.py
import time
from langchain_core.runnables import RunnableLambda

# 模拟长时间运行的任务
def long_running_task(input_text):
    """模拟一个需要较长时间完成的任务"""
    time.sleep(3)  # 模拟 3 秒处理时间
    return f"处理完成: {input_text}"

# 模拟流式任务
def streaming_task(input_text):
    """模拟流式任务,逐步返回结果"""
    words = input_text.split()
    for word in words:
        yield f"{word} "
        time.sleep(0.5)  # 模拟每个词的处理时间

# 创建 Runnable 组件
long_task = RunnableLambda(long_running_task)
stream_task = RunnableLambda(streaming_task)

def compare_user_experience():
    print("=== 用户体验对比 ===")
    
    # 传统方式 - 等待完整结果
    print("1. 传统方式(等待完整结果):")
    print("   开始处理...")
    start_time = time.time()
    result = long_task.invoke("这是一个需要长时间处理的文本")
    end_time = time.time()
    print(f"   结果: {result}")
    print(f"   总耗时: {end_time - start_time:.2f} 秒")
    print("   用户在整个过程中看不到任何进度")
    
    print("\n" + "-"*40)
    
    # 流式方式 - 实时反馈
    print("2. 流式方式(实时反馈):")
    print("   开始处理...")
    start_time = time.time()
    print("   实时结果: ", end="", flush=True)
    for chunk in stream_task.stream("这是一个需要长时间处理的文本"):
        print(chunk, end="", flush=True)
    end_time = time.time()
    print(f"\n   总耗时: {end_time - start_time:.2f} 秒")
    print("   用户可以看到处理进度,体验更好")

# 运行用户体验对比
compare_user_experience()
2.3.2 资源利用优化
# resource_optimization_demo.py
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

# 模拟 CPU 密集型任务
def cpu_intensive_task(data):
    """模拟 CPU 密集型任务"""
    # 模拟复杂计算
    time.sleep(1)
    return f"CPU 处理完成: {data}"

# 模拟 I/O 密集型任务
async def io_intensive_task(data):
    """模拟 I/O 密集型任务"""
    # 模拟网络请求
    await asyncio.sleep(1)
    return f"I/O 处理完成: {data}"

# 流式处理多个任务
async def streaming_multiple_tasks():
    print("=== 流式处理多个任务 ===")
    
    # 创建任务列表
    tasks = [
        cpu_intensive_task("数据1"),
        cpu_intensive_task("数据2"),
        cpu_intensive_task("数据3")
    ]
    
    print("1. 传统批量处理:")
    start_time = time.time()
    results = []
    for task in tasks:
        result = task  # 实际中这里会调用函数
        results.append(result)
    end_time = time.time()
    print(f"   批量处理完成,耗时: {end_time - start_time:.2f} 秒")
    
    print("\n2. 流式处理:")
    start_time = time.time()
    print("   流式处理结果:")
    for i, task in enumerate(tasks, 1):
        result = task  # 实际中这里会调用函数
        print(f"   任务 {i} 完成: {result}")
        # 模拟任务完成后的处理时间
        time.sleep(0.1)
    end_time = time.time()
    print(f"   流式处理完成,耗时: {end_time - start_time:.2f} 秒")

# 异步流式处理
async def async_streaming_demo():
    print("\n=== 异步流式处理 ===")
    
    # 创建异步任务
    async_tasks = [
        io_intensive_task("数据A"),
        io_intensive_task("数据B"),
        io_intensive_task("数据C")
    ]
    
    print("异步流式处理结果:")
    # 并发执行所有任务
    for i, task in enumerate(async_tasks, 1):
        result = await task
        print(f"   任务 {i} 完成: {result}")

# 运行资源优化演示
async def run_resource_optimization_demo():
    streaming_multiple_tasks()
    await async_streaming_demo()

# asyncio.run(run_resource_optimization_demo())
2.3.3 实时交互能力
# real_time_interaction_demo.py
import asyncio
from langchain_core.runnables import RunnableLambda

# 模拟实时数据生成器
def data_generator():
    """模拟实时数据生成"""
    import random
    data_points = ["用户登录", "页面浏览", "商品搜索", "加入购物车", "完成购买"]
    for point in data_points:
        yield {
            "event": point,
            "timestamp": f"{random.randint(1, 12)}:{random.randint(0, 59):02d}",
            "user_id": f"user_{random.randint(1000, 9999)}"
        }
        asyncio.sleep(0.5)  # 模拟数据生成间隔

# 实时数据分析器
def real_time_analyzer(data_stream):
    """实时分析数据流"""
    for data in data_stream:
        # 模拟分析过程
        analysis = f"[{data['timestamp']}] {data['user_id']} 执行了 {data['event']}"
        yield analysis

# 创建 Runnable 组件
generator = RunnableLambda(data_generator)
analyzer = RunnableLambda(real_time_analyzer)

def real_time_interaction_example():
    print("=== 实时交互能力演示 ===")
    print("实时用户行为分析:")
    
    # 模拟实时数据流处理
    data_stream = data_generator()
    analysis_stream = real_time_analyzer(data_stream)
    
    for analysis in analysis_stream:
        print(f"   {analysis}")

# 实时聊天机器人模拟
async def chatbot_streaming_demo():
    print("\n=== 实时聊天机器人模拟 ===")
    
    # 模拟用户输入
    user_messages = [
        "你好!",
        "你能告诉我天气怎么样吗?",
        "谢谢你的帮助!"
    ]
    
    # 模拟机器人的流式响应
    bot_responses = [
        "你好!很高兴见到你。 ",
        "我是一个AI助手,无法获取实时天气信息,但你可以查询天气应用。 ",
        "不客气!随时欢迎你提出问题。 "
    ]
    
    for user_msg, bot_response in zip(user_messages, bot_responses):
        print(f"用户: {user_msg}")
        print("机器人: ", end="", flush=True)
        
        # 流式输出机器人的响应
        for char in bot_response:
            print(char, end="", flush=True)
            await asyncio.sleep(0.1)  # 模拟打字效果
        
        print()  # 换行

# 运行实时交互演示
real_time_interaction_example()
# asyncio.run(chatbot_streaming_demo())

3. 并行运行多条链

3.1 并行处理的概念

并行处理是指同时执行多个任务以提高整体处理效率的技术。在 LangChain 中,并行处理可以通过 RunnableParallel 组件实现,它允许同时运行多个 Chain 并将结果合并。

并行处理的核心优势在于:

  1. 时间效率 - 多个任务同时执行,减少总处理时间
  2. 资源利用 - 充分利用系统资源,提高吞吐量
  3. 用户体验 - 减少用户等待时间,提升满意度

3.2 并行链的实现方法

LangChain 提供了多种实现并行处理的方法,其中最常用的是 RunnableParallel

# parallel_processing_basics.py
import os
import asyncio
import time
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnableLambda

# 加载环境变量
load_dotenv()

# 创建语言模型实例
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# 基础并行处理示例
def basic_parallel_example():
    print("=== 基础并行处理示例 ===")
    
    # 创建多个不同的 Chain
    translation_chain = (
        ChatPromptTemplate.from_template("将以下文本翻译成英文: {text}")
        | llm
        | StrOutputParser()
    )
    
    summary_chain = (
        ChatPromptTemplate.from_template("为以下文本生成摘要: {text}")
        | llm
        | StrOutputParser()
    )
    
    sentiment_chain = (
        ChatPromptTemplate.from_template("分析以下文本的情感倾向: {text}")
        | llm
        | StrOutputParser()
    )
    
    # 使用 RunnableParallel 并行执行多个 Chain
    parallel_chain = RunnableParallel(
        translation=translation_chain,
        summary=summary_chain,
        sentiment=sentiment_chain
    )
    
    # 输入文本
    input_text = "人工智能是计算机科学的一个重要分支,它致力于创建能够执行通常需要人类智能的任务的系统。"
    
    print(f"输入文本: {input_text}")
    print("并行处理中...")
    
    # 执行并行处理
    start_time = time.time()
    results = parallel_chain.invoke({"text": input_text})
    end_time = time.time()
    
    print(f"处理完成,耗时: {end_time - start_time:.2f} 秒")
    print("处理结果:")
    print(f"  翻译: {results['translation']}")
    print(f"  摘要: {results['summary']}")
    print(f"  情感: {results['sentiment']}")

# 异步并行处理示例
async def async_parallel_example():
    print("\n=== 异步并行处理示例 ===")
    
    # 创建异步 Chain
    async def async_translation(input_dict):
        # 模拟异步翻译
        await asyncio.sleep(1)
        return f"Async translation of: {input_dict['text']}"
    
    async def async_summary(input_dict):
        # 模拟异步摘要
        await asyncio.sleep(1)
        return f"Async summary of: {input_dict['text']}"
    
    async def async_sentiment(input_dict):
        # 模拟异步情感分析
        await asyncio.sleep(1)
        return f"Async sentiment of: {input_dict['text']}"
    
    # 创建异步 Runnable
    async_parallel_chain = RunnableParallel(
        translation=RunnableLambda(async_translation),
        summary=RunnableLambda(async_summary),
        sentiment=RunnableLambda(async_sentiment)
    )
    
    input_text = "这是一个用于异步并行处理测试的文本。"
    
    print(f"输入文本: {input_text}")
    print("异步并行处理中...")
    
    start_time = time.time()
    results = await async_parallel_chain.ainvoke({"text": input_text})
    end_time = time.time()
    
    print(f"异步处理完成,耗时: {end_time - start_time:.2f} 秒")
    print("处理结果:")
    for key, value in results.items():
        print(f"  {key}: {value}")

# 批量并行处理示例
def batch_parallel_example():
    print("\n=== 批量并行处理示例 ===")
    
    # 创建用于处理多个输入的 Chain
    analysis_chain = (
        ChatPromptTemplate.from_template("分析以下文本: {text}")
        | llm
        | StrOutputParser()
    )
    
    # 准备批量输入数据
    batch_inputs = [
        {"text": "人工智能在医疗领域的应用"},
        {"text": "机器学习的基本原理"},
        {"text": "自然语言处理的发展趋势"}
    ]
    
    print("批量并行处理中...")
    
    start_time = time.time()
    # 使用 batch 方法并行处理多个输入
    results = analysis_chain.batch(batch_inputs)
    end_time = time.time()
    
    print(f"批量处理完成,耗时: {end_time - start_time:.2f} 秒")
    print("处理结果:")
    for i, result in enumerate(results, 1):
        print(f"  输入 {i}: {batch_inputs[i-1]['text']}")
        print(f"  结果 {i}: {result}")
        print()

# 复杂并行处理示例
def complex_parallel_example():
    print("=== 复杂并行处理示例 ===")
    
    # 创建多个复杂的 Chain
    technical_analysis_chain = (
        ChatPromptTemplate.from_template("从技术角度分析: {topic}")
        | llm
        | StrOutputParser()
    )
    
    business_analysis_chain = (
        ChatPromptTemplate.from_template("从业务角度分析: {topic}")
        | llm
        | StrOutputParser()
    )
    
    market_analysis_chain = (
        ChatPromptTemplate.from_template("从市场角度分析: {topic}")
        | llm
        | StrOutputParser()
    )
    
    # 创建嵌套的并行处理结构
    multi_dimension_analysis = RunnableParallel(
        technical=technical_analysis_chain,
        business=business_analysis_chain,
        market=market_analysis_chain
    )
    
    # 创建最终报告生成 Chain
    report_chain = (
        ChatPromptTemplate.from_template("""
        基于以下分析结果生成综合报告:
        
        技术分析: {technical}
        业务分析: {business}
        市场分析: {market}
        """)
        | llm
        | StrOutputParser()
    )
    
    # 组合完整的并行处理流程
    complete_analysis_chain = multi_dimension_analysis | report_chain
    
    topic = "区块链技术在金融领域的应用"
    
    print(f"分析主题: {topic}")
    print("多维度并行分析中...")
    
    start_time = time.time()
    final_report = complete_analysis_chain.invoke({"topic": topic})
    end_time = time.time()
    
    print(f"分析完成,耗时: {end_time - start_time:.2f} 秒")
    print("最终报告:")
    print(final_report)

# 带错误处理的并行处理
def parallel_with_error_handling():
    print("\n=== 带错误处理的并行处理 ===")
    
    # 创建可能出错的 Chain
    def risky_operation(input_dict):
        if "error" in input_dict.get("text", ""):
            raise Exception("模拟的处理错误")
        return f"处理成功: {input_dict['text']}"
    
    # 创建安全的 Chain
    def safe_operation(input_dict):
        return f"安全处理: {input_dict['text']}"
    
    # 创建并行处理 Chain
    error_handling_parallel = RunnableParallel(
        risky=RunnableLambda(risky_operation),
        safe=RunnableLambda(safe_operation)
    )
    
    # 测试正常情况
    print("1. 正常处理:")
    try:
        results = error_handling_parallel.invoke({"text": "正常文本"})
        print("   处理结果:")
        for key, value in results.items():
            print(f"     {key}: {value}")
    except Exception as e:
        print(f"   发生错误: {e}")
    
    # 测试错误情况
    print("\n2. 错误处理:")
    try:
        results = error_handling_parallel.invoke({"text": "包含error的文本"})
        print("   处理结果:")
        for key, value in results.items():
            print(f"     {key}: {value}")
    except Exception as e:
        print(f"   捕获到错误: {e}")
        print("   注意: 在实际应用中,应该为每个 Chain 实现独立的错误处理")

# 运行所有并行处理示例
if __name__ == "__main__":
    basic_parallel_example()
    # asyncio.run(async_parallel_example())
    batch_parallel_example()
    complex_parallel_example()
    parallel_with_error_handling()

3.3 适用场景与示例

并行处理在多种场景下都非常有用,以下是一些典型的应用场景:

3.3.1 文档多维度分析
# document_multi_analysis.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel

# 加载环境变量
load_dotenv()

# 创建语言模型实例
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.3
)

def document_multi_dimensional_analysis():
    print("=== 文档多维度分析 ===")
    
    # 创建不同维度的分析 Chain
    content_summary_chain = (
        ChatPromptTemplate.from_template("为以下文档生成内容摘要:\n\n{document}")
        | llm
        | StrOutputParser()
    )
    
    key_points_chain = (
        ChatPromptTemplate.from_template("提取以下文档的5个关键要点:\n\n{document}")
        | llm
        | StrOutputParser()
    )
    
    sentiment_analysis_chain = (
        ChatPromptTemplate.from_template("分析以下文档的整体情感倾向:\n\n{document}")
        | llm
        | StrOutputParser()
    )
    
    technical_terms_chain = (
        ChatPromptTemplate.from_template("提取以下文档中的专业技术术语:\n\n{document}")
        | llm
        | StrOutputParser()
    )
    
    readability_analysis_chain = (
        ChatPromptTemplate.from_template("分析以下文档的可读性水平:\n\n{document}")
        | llm
        | StrOutputParser()
    )
    
    # 并行执行所有分析
    comprehensive_analysis = RunnableParallel(
        summary=content_summary_chain,
        key_points=key_points_chain,
        sentiment=sentiment_analysis_chain,
        technical_terms=technical_terms_chain,
        readability=readability_analysis_chain
    )
    
    # 示例文档
    sample_document = """
    人工智能(Artificial Intelligence,AI)是计算机科学的一个分支,它企图了解智能的实质,
    并生产出一种新的能以人类智能相似的方式做出反应的智能机器。该领域的研究包括机器人、
    语言识别、图像识别、自然语言处理和专家系统等。

    人工智能从诞生以来,理论和技术日益成熟,应用领域也不断扩大。可以设想,未来人工智能
    带来的科技产品,将会是人类智慧的"容器"。人工智能可以对人的意识、思维的信息过程的模拟。
    人工智能不是人的智能,但能像人那样思考、也可能超过人的智能。

    近年来,随着深度学习技术的发展,人工智能在图像识别、语音识别、自然语言处理等领域
    取得了突破性进展。特别是在大语言模型方面,如GPT系列、BERT等模型的出现,使得机器在
    理解和生成自然语言方面的能力大幅提升。
    """
    
    print("开始对文档进行多维度分析...")
    
    # 执行并行分析
    results = comprehensive_analysis.invoke({"document": sample_document})
    
    print("分析结果:")
    print("="*50)
    print(f"内容摘要:\n{results['summary']}\n")
    print(f"关键要点:\n{results['key_points']}\n")
    print(f"情感倾向:\n{results['sentiment']}\n")
    print(f"技术术语:\n{results['technical_terms']}\n")
    print(f"可读性分析:\n{results['readability']}\n")

# 运行文档多维度分析
document_multi_dimensional_analysis()
3.3.2 多语言翻译系统
# multi_language_translation.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel

# 加载环境变量
load_dotenv()

# 创建语言模型实例
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.3
)

def multi_language_translation_system():
    print("=== 多语言翻译系统 ===")
    
    # 创建针对不同语言的翻译 Chain
    english_translation_chain = (
        ChatPromptTemplate.from_template("将以下中文文本翻译成英文:\n\n{text}")
        | llm
        | StrOutputParser()
    )
    
    japanese_translation_chain = (
        ChatPromptTemplate.from_template("将以下中文文本翻译成日文:\n\n{text}")
        | llm
        | StrOutputParser()
    )
    
    korean_translation_chain = (
        ChatPromptTemplate.from_template("将以下中文文本翻译成韩文:\n\n{text}")
        | llm
        | StrOutputParser()
    )
    
    french_translation_chain = (
        ChatPromptTemplate.from_template("将以下中文文本翻译成法文:\n\n{text}")
        | llm
        | StrOutputParser()
    )
    
    german_translation_chain = (
        ChatPromptTemplate.from_template("将以下中文文本翻译成德文:\n\n{text}")
        | llm
        | StrOutputParser()
    )
    
    # 并行执行所有翻译
    multi_language_translator = RunnableParallel(
        english=english_translation_chain,
        japanese=japanese_translation_chain,
        korean=korean_translation_chain,
        french=french_translation_chain,
        german=german_translation_chain
    )
    
    # 待翻译文本
    chinese_text = "人工智能是现代科技发展的重要方向,它正在改变我们的生活方式和工作方式。"
    
    print(f"原文: {chinese_text}")
    print("正在进行多语言翻译...")
    
    # 执行并行翻译
    translations = multi_language_translator.invoke({"text": chinese_text})
    
    print("翻译结果:")
    print("="*50)
    for language, translation in translations.items():
        print(f"{language.upper()}: {translation}")

# 运行多语言翻译系统
multi_language_translation_system()
3.3.3 股票多维度分析
# stock_multi_analysis.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel

# 加载环境变量
load_dotenv()

# 创建语言模型实例
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.3
)

def stock_comprehensive_analysis():
    print("=== 股票多维度分析系统 ===")
    
    # 技术分析 Chain
    technical_analysis_chain = (
        ChatPromptTemplate.from_template("""
        基于以下股票数据进行技术分析:
        股票代码: {symbol}
        当前价格: {price}
        技术指标: {technical_indicators}
        
        请分析技术面趋势和交易信号。
        """)
        | llm
        | StrOutputParser()
    )
    
    # 基本面分析 Chain
    fundamental_analysis_chain = (
        ChatPromptTemplate.from_template("""
        基于以下股票数据进行基本面分析:
        股票代码: {symbol}
        财务数据: {financial_data}
        行业信息: {industry_info}
        
        请分析公司基本面和投资价值。
        """)
        | llm
        | StrOutputParser()
    )
    
    # 市场情绪分析 Chain
    sentiment_analysis_chain = (
        ChatPromptTemplate.from_template("""
        基于以下信息分析市场对该股票的情绪:
        股票代码: {symbol}
        新闻舆情: {news_sentiment}
        社交媒体讨论: {social_media_trend}
        
        请分析市场情绪和潜在影响。
        """)
        | llm
        | StrOutputParser()
    )
    
    # 风险评估 Chain
    risk_assessment_chain = (
        ChatPromptTemplate.from_template("""
        基于以下信息评估该股票的投资风险:
        股票代码: {symbol}
        市场风险: {market_risk}
        公司风险: {company_risk}
        行业风险: {industry_risk}
        
        请评估整体风险水平和注意事项。
        """)
        | llm
        | StrOutputParser()
    )
    
    # 投资建议 Chain
    investment_advice_chain = (
        ChatPromptTemplate.from_template("""
        基于以下综合分析为投资者提供建议:
        股票代码: {symbol}
        技术分析: {technical_analysis}
        基本面分析: {fundamental_analysis}
        市场情绪: {sentiment_analysis}
        风险评估: {risk_assessment}
        
        请提供投资建议和策略。
        """)
        | llm
        | StrOutputParser()
    )
    
    # 创建并行分析 Chain
    stock_analysis_parallel = RunnableParallel(
        technical=technical_analysis_chain,
        fundamental=fundamental_analysis_chain,
        sentiment=sentiment_analysis_chain,
        risk=risk_assessment_chain
    )
    
    # 创建完整分析流程
    complete_stock_analysis = stock_analysis_parallel | investment_advice_chain
    
    # 示例股票数据
    stock_data = {
        "symbol": "AAPL",
        "price": "$150.00",
        "technical_indicators": {
            "RSI": 65.5,
            "MACD": 2.3,
            "Moving_Average_20": 145.2,
            "Moving_Average_50": 140.7
        },
        "financial_data": {
            "P/E_Ratio": 28.5,
            "EPS": 6.12,
            "Revenue_Growth": 0.08,
            "Debt_to_Equity": 1.52
        },
        "industry_info": "科技硬件与设备",
        "news_sentiment": "积极",
        "social_media_trend": "关注度上升",
        "market_risk": "中等",
        "company_risk": "低",
        "industry_risk": "中等"
    }
    
    print(f"正在对股票 {stock_data['symbol']} 进行多维度分析...")
    
    # 准备分析输入
    analysis_input = {
        "symbol": stock_data["symbol"],
        "price": stock_data["price"],
        "technical_indicators": str(stock_data["technical_indicators"]),
        "financial_data": str(stock_data["financial_data"]),
        "industry_info": stock_data["industry_info"],
        "news_sentiment": stock_data["news_sentiment"],
        "social_media_trend": stock_data["social_media_trend"],
        "market_risk": stock_data["market_risk"],
        "company_risk": stock_data["company_risk"],
        "industry_risk": stock_data["industry_risk"]
    }
    
    # 执行并行分析
    intermediate_results = stock_analysis_parallel.invoke(analysis_input)
    
    # 准备投资建议输入
    advice_input = {
        "symbol": stock_data["symbol"],
        "technical_analysis": intermediate_results["technical"],
        "fundamental_analysis": intermediate_results["fundamental"],
        "sentiment_analysis": intermediate_results["sentiment"],
        "risk_assessment": intermediate_results["risk"]
    }
    
    # 生成最终投资建议
    final_advice = investment_advice_chain.invoke(advice_input)
    
    print("分析结果:")
    print("="*50)
    print(f"技术分析:\n{intermediate_results['technical']}\n")
    print(f"基本面分析:\n{intermediate_results['fundamental']}\n")
    print(f"市场情绪:\n{intermediate_results['sentiment']}\n")
    print(f"风险评估:\n{intermediate_results['risk']}\n")
    print(f"投资建议:\n{final_advice}\n")

# 运行股票多维度分析
stock_comprehensive_analysis()

4. Runnable 组件详解

4.1 RunnablePassthrough 详解

RunnablePassthrough 是 LangChain 中一个非常有用的组件,它直接传递输入数据而不进行任何修改。这在构建复杂 Chain 时特别有用,特别是在需要保留原始输入数据的同时添加额外处理步骤的场景中。

# runnable_passthrough_detailed.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough

# 加载环境变量
load_dotenv()

# 创建语言模型实例
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.3
)

def runnable_passthrough_basics():
    print("=== RunnablePassthrough 基础用法 ===")
    
    # 创建简单的 Passthrough
    passthrough = RunnablePassthrough()
    
    # 测试数据传递
    test_data = {"message": "Hello, World!", "number": 42}
    result = passthrough.invoke(test_data)
    
    print(f"输入数据: {test_data}")
    print(f"输出数据: {result}")
    print(f"数据是否相同: {test_data == result}")

def passthrough_in_complex_chains():
    print("\n=== 在复杂 Chain 中使用 Passthrough ===")
    
    # 创建分析 Chain
    analysis_chain = (
        ChatPromptTemplate.from_template("分析以下文本的情感倾向: {text}")
        | llm
        | StrOutputParser()
    )
    
    # 使用 RunnableParallel 结合 Passthrough
    complex_chain = RunnableParallel(
        original=RunnablePassthrough(),  # 保留原始输入
        analysis=analysis_chain           # 添加分析结果
    )
    
    input_data = {"text": "今天天气真好,我很开心!"}
    
    print(f"输入文本: {input_data['text']}")
    
    result = complex_chain.invoke(input_data)
    
    print("处理结果:")
    print(f"  原始输入: {result['original']}")
    print(f"  情感分析: {result['analysis']}")

def passthrough_with_additional_processing():
    print("\n=== Passthrough 与额外处理结合 ===")
    
    # 创建多个处理 Chain
    summary_chain = (
        ChatPromptTemplate.from_template("为以下文本生成摘要: {content}")
        | llm
        | StrOutputParser()
    )
    
    keyword_chain = (
        ChatPromptTemplate.from_template("提取以下文本的关键词: {content}")
        | llm
        | StrOutputParser()
    )
    
    # 使用 RunnableParallel 结合 Passthrough 和处理 Chain
    processing_chain = RunnableParallel(
        original=RunnablePassthrough(),  # 保留原始内容
        summary=summary_chain,           # 添加摘要
        keywords=keyword_chain           # 添加关键词
    )
    
    # 创建最终报告 Chain
    report_chain = (
        ChatPromptTemplate.from_template("""
        基于以下分析结果生成完整报告:
        
        原始内容: {original}
        内容摘要: {summary}
        关键词: {keywords}
        """)
        | llm
        | StrOutputParser()
    )
    
    # 组合完整 Chain
    complete_chain = processing_chain | report_chain
    
    content = "人工智能是计算机科学的一个重要分支,它致力于创建能够执行通常需要人类智能的任务的系统。近年来,随着深度学习技术的发展,人工智能在图像识别、语音识别、自然语言处理等领域取得了突破性进展。"
    
    print(f"原始内容: {content}")
    print("生成完整报告中...")
    
    final_report = complete_chain.invoke({"content": content})
    
    print("最终报告:")
    print(final_report)

def passthrough_assign_example():
    print("\n=== RunnablePassthrough.assign() 用法 ===")
    
    from langchain_core.runnables import RunnablePassthrough
    
    # 创建处理函数
    def add_timestamp(input_data):
        from datetime import datetime
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    def add_word_count(input_data):
        return len(input_data.get("text", "").split())
    
    # 使用 assign 方法添加新字段
    enhanced_chain = RunnablePassthrough.assign(
        timestamp=lambda x: add_timestamp(x),
        word_count=lambda x: add_word_count(x)
    )
    
    input_data = {"text": "这是一个用于测试的示例文本,包含多个词汇。"}
    
    print(f"原始数据: {input_data}")
    
    enhanced_data = enhanced_chain.invoke(input_data)
    
    print(f"增强后数据: {enhanced_data}")

def passthrough_in_data_pipeline():
    print("\n=== 在数据管道中使用 Passthrough ===")
    
    # 模拟数据处理管道
    def data_validator(input_data):
        """数据验证器"""
        if not input_data.get("text"):
            raise ValueError("文本内容不能为空")
        return input_data
    
    def data_enricher(input_data):
        """数据增强器"""
        import hashlib
        text = input_data.get("text", "")
        input_data["text_hash"] = hashlib.md5(text.encode()).hexdigest()
        return input_data
    
    # 创建数据处理管道
    data_pipeline = (
        RunnablePassthrough() |  # 传递原始数据
        RunnablePassthrough().assign(validated=data_validator) |  # 验证数据
        RunnablePassthrough().assign(enriched=data_enricher)     # 增强数据
    )
    
    test_data = {"text": "测试数据管道", "source": "user_input"}
    
    print(f"输入数据: {test_data}")
    
    try:
        processed_data = data_pipeline.invoke(test_data)
        print(f"处理后数据: {processed_data}")
    except Exception as e:
        print(f"处理出错: {e}")

# 运行所有 Passthrough 示例
if __name__ == "__main__":
    runnable_passthrough_basics()
    passthrough_in_complex_chains()
    passthrough_with_additional_processing()
    passthrough_assign_example()
    passthrough_in_data_pipeline()

4.2 RunnableLambda 详解

RunnableLambda 允许将普通的 Python 函数包装为 Runnable 组件,从而可以在 Chain 中使用。

# runnable_lambda_detailed.py
from langchain_core.runnables import RunnableLambda
import asyncio
import time

def runnable_lambda_basics():
    print("=== RunnableLambda 基础用法 ===")
    
    # 创建简单的处理函数
    def text_processor(text):
        return text.upper()
    
    # 包装为 RunnableLambda
    processor = RunnableLambda(text_processor)
    
    # 测试处理
    input_text = "hello world"
    result = processor.invoke(input_text)
    
    print(f"输入: {input_text}")
    print(f"输出: {result}")

def lambda_with_complex_processing():
    print("\n=== 复杂处理的 RunnableLambda ===")
    
    # 创建复杂处理函数
    def complex_processor(input_data):
        if isinstance(input_data, str):
            # 处理字符串
            return {
                "original": input_data,
                "processed": input_data.upper(),
                "length": len(input_data),
                "word_count": len(input_data.split())
            }
        elif isinstance(input_data, dict):
            # 处理字典
            text = input_data.get("text", "")
            return {
                "original": text,
                "processed": text.lower(),
                "length": len(text),
                "word_count": len(text.split()),
                "metadata": input_data.get("metadata", {})
            }
        else:
            return {"error": "不支持的输入类型"}
    
    # 包装为 RunnableLambda
    complex_lambda = RunnableLambda(complex_processor)
    
    # 测试字符串输入
    string_input = "Hello World from LangChain"
    result1 = complex_lambda.invoke(string_input)
    print(f"字符串输入处理结果: {result1}")
    
    # 测试字典输入
    dict_input = {
        "text": "Hello World from LangChain",
        "metadata": {"source": "user", "priority": "high"}
    }
    result2 = complex_lambda.invoke(dict_input)
    print(f"字典输入处理结果: {result2}")

async def async_lambda_example():
    print("\n=== 异步 RunnableLambda ===")
    
    # 创建异步处理函数
    async def async_processor(input_data):
        # 模拟异步操作
        await asyncio.sleep(1)
        return f"异步处理完成: {input_data}"
    
    # 包装为 RunnableLambda
    async_lambda = RunnableLambda(async_processor)
    
    # 测试异步处理
    input_data = "异步测试数据"
    print(f"开始异步处理: {input_data}")
    
    start_time = time.time()
    result = await async_lambda.ainvoke(input_data)
    end_time = time.time()
    
    print(f"异步处理结果: {result}")
    print(f"处理耗时: {end_time - start_time:.2f} 秒")

def lambda_with_error_handling():
    print("\n=== 带错误处理的 RunnableLambda ===")
    
    # 创建带错误处理的函数
    def safe_processor(input_data):
        try:
            if isinstance(input_data, str):
                return input_data * 2
            elif isinstance(input_data, int):
                return input_data ** 2
            else:
                raise ValueError(f"不支持的数据类型: {type(input_data)}")
        except Exception as e:
            return {"error": str(e), "input": input_data}
    
    # 包装为 RunnableLambda
    safe_lambda = RunnableLambda(safe_processor)
    
    # 测试各种输入
    test_inputs = ["test", 5, [1, 2, 3], {"key": "value"}]
    
    for input_data in test_inputs:
        result = safe_lambda.invoke(input_data)
        print(f"输入: {input_data} -> 输出: {result}")

def lambda_in_chain_example():
    print("\n=== 在 Chain 中使用 RunnableLambda ===")
    
    from langchain_openai import ChatOpenAI
    from langchain_core.prompts import ChatPromptTemplate
    from langchain_core.output_parsers import StrOutputParser
    import os
    from dotenv import load_dotenv
    
    # 加载环境变量
    load_dotenv()
    
    # 创建语言模型
    llm = ChatOpenAI(
        model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
        base_url=os.getenv("OPENAI_BASE_URL"),
        api_key=os.getenv("OPENAI_API_KEY"),
        temperature=0.3
    )
    
    # 创建预处理函数
    def preprocess_input(input_data):
        """预处理输入数据"""
        if isinstance(input_data, str):
            return {"text": input_data.strip(), "word_count": len(input_data.split())}
        return input_data
    
    # 创建后处理函数
    def postprocess_output(output_data):
        """后处理输出数据"""
        return {
            "original_output": output_data,
            "character_count": len(output_data),
            "line_count": len(output_data.split('\n'))
        }
    
    # 创建 Chain
    chain = (
        RunnableLambda(preprocess_input) |
        ChatPromptTemplate.from_template("总结以下文本: {text}") |
        llm |
        StrOutputParser() |
        RunnableLambda(postprocess_output)
    )
    
    input_text = "  人工智能是计算机科学的一个重要分支。它致力于创建能够执行通常需要人类智能的任务的系统。  "
    
    print(f"原始输入: '{input_text}'")
    
    result = chain.invoke(input_text)
    
    print("处理结果:")
    for key, value in result.items():
        print(f"  {key}: {value}")

# 运行所有 Lambda 示例
if __name__ == "__main__":
    runnable_lambda_basics()
    lambda_with_complex_processing()
    # asyncio.run(async_lambda_example())
    lambda_with_error_handling()
    lambda_in_chain_example()

4.3 RunnableParallel 详解

RunnableParallel 允许并行执行多个 Runnable 组件,并将结果合并为一个字典。

# runnable_parallel_detailed.py
from langchain_core.runnables import RunnableParallel, RunnableLambda
import time
import asyncio

def runnable_parallel_basics():
    print("=== RunnableParallel 基础用法 ===")
    
    # 创建简单的处理函数
    def processor_a(input_data):
        time.sleep(0.5)  # 模拟处理时间
        return f"A 处理结果: {input_data}"
    
    def processor_b(input_data):
        time.sleep(0.5)  # 模拟处理时间
        return f"B 处理结果: {input_data}"
    
    def processor_c(input_data):
        time.sleep(0.5)  # 模拟处理时间
        return f"C 处理结果: {input_data}"
    
    # 创建并行处理
    parallel_processors = RunnableParallel(
        a=RunnableLambda(processor_a),
        b=RunnableLambda(processor_b),
        c=RunnableLambda(processor_c)
    )
    
    input_data = "测试数据"
    
    print(f"输入数据: {input_data}")
    print("开始并行处理...")
    
    start_time = time.time()
    results = parallel_processors.invoke(input_data)
    end_time = time.time()
    
    print(f"并行处理完成,耗时: {end_time - start_time:.2f} 秒")
    print("处理结果:")
    for key, value in results.items():
        print(f"  {key}: {value}")

async def async_parallel_example():
    print("\n=== 异步 RunnableParallel ===")
    
    # 创建异步处理函数
    async def async_processor_a(input_data):
        await asyncio.sleep(0.5)  # 模拟异步处理
        return f"异步 A 处理结果: {input_data}"
    
    async def async_processor_b(input_data):
        await asyncio.sleep(0.5)  # 模拟异步处理
        return f"异步 B 处理结果: {input_data}"
    
    async def async_processor_c(input_data):
        await asyncio.sleep(0.5)  # 模拟异步处理
        return f"异步 C 处理结果: {input_data}"
    
    # 创建异步并行处理
    async_parallel = RunnableParallel(
        a=RunnableLambda(async_processor_a),
        b=RunnableLambda(async_processor_b),
        c=RunnableLambda(async_processor_c)
    )
    
    input_data = "异步测试数据"
    
    print(f"输入数据: {input_data}")
    print("开始异步并行处理...")
    
    start_time = time.time()
    results = await async_parallel.ainvoke(input_data)
    end_time = time.time()
    
    print(f"异步并行处理完成,耗时: {end_time - start_time:.2f} 秒")
    print("处理结果:")
    for key, value in results.items():
        print(f"  {key}: {value}")

def parallel_with_different_input_types():
    print("\n=== 处理不同类型输入的并行处理 ===")
    
    # 创建处理不同输入类型的函数
    def text_analyzer(input_data):
        if isinstance(input_data, dict) and "text" in input_data:
            text = input_data["text"]
            return {
                "length": len(text),
                "word_count": len(text.split()),
                "char_count": len(text.replace(" ", ""))
            }
        return {"error": "无效输入"}
    
    def sentiment_analyzer(input_data):
        if isinstance(input_data, dict) and "text" in input_data:
            # 简单的情感分析模拟
            text = input_data["text"].lower()
            positive_words = ["好", "棒", "优秀", "开心", "高兴"]
            negative_words = ["坏", "差", "糟糕", "难过", "沮丧"]
            
            positive_count = sum(1 for word in positive_words if word in text)
            negative_count = sum(1 for word in negative_words if word in text)
            
            if positive_count > negative_count:
                return "积极"
            elif negative_count > positive_count:
                return "消极"
            else:
                return "中性"
        return "无法分析"
    
    def keyword_extractor(input_data):
        if isinstance(input_data, dict) and "text" in input_data:
            text = input_data["text"]
            # 简单的关键词提取模拟
            words = text.split()
            return words[:5]  # 返回前5个词
        return []
    
    # 创建并行分析器
    text_analysis_parallel = RunnableParallel(
        statistics=RunnableLambda(text_analyzer),
        sentiment=RunnableLambda(sentiment_analyzer),
        keywords=RunnableLambda(keyword_extractor)
    )
    
    # 测试数据
    test_input = {
        "text": "今天天气真好,我很开心!这是一个很棒的日子,让我感到非常高兴。",
        "source": "user_input"
    }
    
    print(f"输入文本: {test_input['text']}")
    print("开始并行文本分析...")
    
    results = text_analysis_parallel.invoke(test_input)
    
    print("分析结果:")
    print(f"  统计信息: {results['statistics']}")
    print(f"  情感分析: {results['sentiment']}")
    print(f"  关键词提取: {results['keywords']}")

def nested_parallel_example():
    print("\n=== 嵌套并行处理 ===")
    
    # 创建第一层处理函数
    def level_one_a(input_data):
        return f"Level 1-A: {input_data}"
    
    def level_one_b(input_data):
        return f"Level 1-B: {input_data}"
    
    # 创建第二层处理函数
    def level_two_a(input_data):
        return f"Level 2-A: {input_data}"
    
    def level_two_b(input_data):
        return f"Level 2-B: {input_data}"
    
    # 创建嵌套并行结构
    nested_parallel = RunnableParallel(
        first_level=RunnableParallel(
            a=RunnableLambda(level_one_a),
            b=RunnableLambda(level_one_b)
        ),
        second_level=RunnableParallel(
            a=RunnableLambda(level_two_a),
            b=RunnableLambda(level_two_b)
        )
    )
    
    input_data = "嵌套测试"
    
    print(f"输入数据: {input_data}")
    
    results = nested_parallel.invoke(input_data)
    
    print("嵌套并行处理结果:")
    print(f"  第一层:")
    print(f"    A: {results['first_level']['a']}")
    print(f"    B: {results['first_level']['b']}")
    print(f"  第二层:")
    print(f"    A: {results['second_level']['a']}")
    print(f"    B: {results['second_level']['b']}")

def parallel_with_error_handling():
    print("\n=== 带错误处理的并行处理 ===")
    
    # 创建可能出错的处理函数
    def risky_processor(input_data):
        if "error" in str(input_data).lower():
            raise Exception("模拟处理错误")
        return f"成功处理: {input_data}"
    
    def safe_processor(input_data):
        return f"安全处理: {input_data}"
    
    # 创建带错误处理的并行处理
    safe_parallel = RunnableParallel(
        risky=RunnableLambda(risky_processor),
        safe=RunnableLambda(safe_processor)
    )
    
    # 测试正常情况
    print("1. 正常处理:")
    try:
        results = safe_parallel.invoke("正常数据")
        print("   处理结果:")
        for key, value in results.items():
            print(f"     {key}: {value}")
    except Exception as e:
        print(f"   发生错误: {e}")
    
    # 测试错误情况
    print("\n2. 错误处理:")
    try:
        results = safe_parallel.invoke("包含 ERROR 的数据")
        print("   处理结果:")
        for key, value in results.items():
            print(f"     {key}: {value}")
    except Exception as e:
        print(f"   捕获到错误: {e}")
        print("   注意: 在实际应用中,应该为每个处理器实现独立的错误处理")

# 运行所有并行处理示例
if __name__ == "__main__":
    runnable_parallel_basics()
    # asyncio.run(async_parallel_example())
    parallel_with_different_input_types()
    nested_parallel_example()
    parallel_with_error_handling()

5. 迁移与核心组件对比

5.1 传统 Chain 与 LCEL 对比

随着 LangChain 的发展,新的 LangChain Expression Language (LCEL) 提供了更简洁、更强大的方式来构建 Chain。让我们通过具体示例来对比传统方式和新方式:

# chain_migration_comparison.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 加载环境变量
load_dotenv()

# 创建语言模型实例
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.3
)

def traditional_chain_example():
    print("=== 传统 Chain 方式 ===")
    
    # 传统方式:使用 LLMChain
    from langchain.chains import LLMChain
    
    # 创建提示词模板
    prompt = ChatPromptTemplate.from_template("解释{concept}的概念")
    
    # 创建传统 Chain
    traditional_chain = LLMChain(
        llm=llm,
        prompt=prompt,
        output_parser=StrOutputParser()
    )
    
    # 使用传统 Chain
    result = traditional_chain.invoke({"concept": "人工智能"})
    print(f"传统 Chain 结果: {result}")

def lcel_example():
    print("\n=== LCEL 方式 ===")
    
    # LCEL 方式:使用管道符
    prompt = ChatPromptTemplate.from_template("解释{concept}的概念")
    
    # 创建 LCEL Chain
    lcel_chain = prompt | llm | StrOutputParser()
    
    # 使用 LCEL Chain
    result = lcel_chain.invoke({"concept": "人工智能"})
    print(f"LCEL Chain 结果: {result}")

def complex_traditional_chain():
    print("\n=== 复杂传统 Chain ===")
    
    from langchain.chains import SequentialChain, LLMChain
    
    # 创建多个传统 Chain
    prompt1 = ChatPromptTemplate.from_template("列出{topic}的5个要点")
    chain1 = LLMChain(llm=llm, prompt=prompt1, output_key="points")
    
    prompt2 = ChatPromptTemplate.from_template("详细解释以下要点:\n{points}")
    chain2 = LLMChain(llm=llm, prompt=prompt2, output_key="explanation")
    
    # 组合传统 Chain
    complex_traditional = SequentialChain(
        chains=[chain1, chain2],
        input_variables=["topic"],
        output_variables=["explanation"]
    )
    
    result = complex_traditional.invoke({"topic": "机器学习"})
    print(f"复杂传统 Chain 结果长度: {len(result['explanation'])} 字符")

def complex_lcel_chain():
    print("\n=== 复杂 LCEL Chain ===")
    
    # 创建多个 LCEL 组件
    prompt1 = ChatPromptTemplate.from_template("列出{topic}的5个要点")
    prompt2 = ChatPromptTemplate.from_template("详细解释以下要点:\n{points}")
    
    # 使用 LCEL 组合
    complex_lcel = (
        {"points": prompt1 | llm | StrOutputParser()} |
        prompt2 |
        llm |
        StrOutputParser()
    )
    
    result = complex_lcel.invoke({"topic": "机器学习"})
    print(f"复杂 LCEL Chain 结果长度: {len(result)} 字符")

def parallel_traditional_chain():
    print("\n=== 传统并行 Chain ===")
    
    from langchain.chains import SequentialChain, LLMChain
    
    # 创建多个传统 Chain
    translation_prompt = ChatPromptTemplate.from_template("将以下文本翻译成英文: {text}")
    translation_chain = LLMChain(llm=llm, prompt=translation_prompt, output_key="translation")
    
    summary_prompt = ChatPromptTemplate.from_template("为以下文本生成摘要: {text}")
    summary_chain = LLMChain(llm=llm, prompt=summary_prompt, output_key="summary")
    
    # 传统方式实现并行较复杂,通常需要手动实现
    def traditional_parallel(input_data):
        text = input_data["text"]
        
        # 手动并行执行
        translation_result = translation_chain.invoke({"text": text})
        summary_result = summary_chain.invoke({"text": text})
        
        return {
            "translation": translation_result["translation"],
            "summary": summary_result["summary"]
        }
    
    input_text = "人工智能是计算机科学的一个重要分支。"
    result = traditional_parallel({"text": input_text})
    
    print("传统并行 Chain 结果:")
    print(f"  翻译: {result['translation']}")
    print(f"  摘要: {result['summary']}")

def parallel_lcel_chain():
    print("\n=== LCEL 并行 Chain ===")
    
    from langchain_core.runnables import RunnableParallel
    
    # 创建 LCEL 组件
    translation_prompt = ChatPromptTemplate.from_template("将以下文本翻译成英文: {text}")
    summary_prompt = ChatPromptTemplate.from_template("为以下文本生成摘要: {text}")
    
    # 使用 LCEL 实现并行非常简单
    parallel_lcel = RunnableParallel(
        translation=translation_prompt | llm | StrOutputParser(),
        summary=summary_prompt | llm | StrOutputParser()
    )
    
    input_text = "人工智能是计算机科学的一个重要分支。"
    result = parallel_lcel.invoke({"text": input_text})
    
    print("LCEL 并行 Chain 结果:")
    print(f"  翻译: {result['translation']}")
    print(f"  摘要: {result['summary']}")

# 运行对比示例
if __name__ == "__main__":
    traditional_chain_example()
    lcel_example()
    complex_traditional_chain()
    complex_lcel_chain()
    parallel_traditional_chain()
    parallel_lcel_chain()

5.2 LCEL 的主要优势

LCEL (LangChain Expression Language) 相比传统 Chain 方式具有以下三个主要优势:

5.2.1 语法简洁性
# lcel_advantages.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel

# 加载环境变量
load_dotenv()

# 创建语言模型实例
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.3
)

def syntax_simplicity_demo():
    print("=== 语法简洁性对比 ===")
    
    # 传统方式 - 冗长的代码
    print("1. 传统方式:")
    print("   ```python")
    print("   from langchain.chains import LLMChain")
    print("   from langchain.prompts import ChatPromptTemplate")
    print("   from langchain.output_parsers import StrOutputParser")
    print("   ")
    print("   prompt = ChatPromptTemplate.from_template('解释{concept}')")
    print("   chain = LLMChain(")
    print("       llm=llm,")
    print("       prompt=prompt,")
    print("       output_parser=StrOutputParser()")
    print("   )")
    print("   result = chain.invoke({'concept': 'AI'})")
    print("   ```")
    
    # LCEL 方式 - 简洁的代码
    print("\n2. LCEL 方式:")
    print("   ```python")
    print("   prompt = ChatPromptTemplate.from_template('解释{concept}')")
    print("   chain = prompt | llm | StrOutputParser()")
    print("   result = chain.invoke({'concept': 'AI'})")
    print("   ```")
    
    print("\n优势: LCEL 使用管道符(|)连接组件,代码更简洁易读")

def composability_demo():
    print("\n=== 组合性优势 ===")
    
    # 创建基础组件
    prompt1 = ChatPromptTemplate.from_template("列出{topic}的要点")
    prompt2 = ChatPromptTemplate.from_template("详细解释: {points}")
    prompt3 = ChatPromptTemplate.from_template("总结: {explanation}")
    
    # LCEL 的强大组合能力
    complex_chain = (
        {"points": prompt1 | llm | StrOutputParser()} |
        prompt2 |
        llm |
        StrOutputParser() |
        (lambda x: {"explanation": x}) |
        prompt3 |
        llm |
        StrOutputParser()
    )
    
    print("LCEL 支持复杂的组合:")
    print("1. 顺序组合: component1 | component2 | component3")
    print("2. 并行组合: RunnableParallel(a=chain1, b=chain2)")
    print("3. 数据转换: (lambda x: {'key': x})")
    print("4. 条件分支: RunnableBranch(condition, chain1, chain2)")
    print("5. 循环处理: 自定义迭代逻辑")
    
    # 演示复杂 Chain 的执行
    print("\n执行复杂 Chain 示例:")
    try:
        result = complex_chain.invoke({"topic": "深度学习"})
        print(f"执行成功,结果长度: {len(result)} 字符")
    except Exception as e:
        print(f"执行出错: {e}")

def uniform_interface_demo():
    print("\n=== 统一接口优势 ===")
    
    from langchain_core.runnables import RunnableLambda
    
    # 所有 Runnable 组件都有统一的接口
    def custom_processor(input_data):
        return f"处理结果: {input_data}"
    
    # 不同类型的 Runnable 组件
    llm_runnable = llm
    prompt_runnable = ChatPromptTemplate.from_template("处理: {input}")
    lambda_runnable = RunnableLambda(custom_processor)
    
    # 它们都支持相同的方法
    runnables = [
        ("LLM", llm_runnable),
        ("Prompt", prompt_runnable),
        ("Lambda", lambda_runnable)
    ]
    
    print("所有 Runnable 组件支持统一接口:")
    print("1. invoke(input)      - 单次调用")
    print("2. batch(inputs)      - 批量处理")
    print("3. stream(input)      - 流式处理")
    print("4. ainvoke(input)     - 异步单次调用")
    print("5. abatch(inputs)     - 异步批量处理")
    print("6. astream(input)     - 异步流式处理")
    
    # 演示统一接口
    test_input = "测试数据"
    print(f"\n使用相同输入 '{test_input}' 测试不同组件:")
    
    for name, runnable in runnables:
        try:
            if name == "LLM":
                # LLM 需要消息格式
                from langchain_core.messages import HumanMessage
                result = runnable.invoke([HumanMessage(content=test_input)])
                print(f"  {name}: {type(result).__name__}")
            elif name == "Prompt":
                result = runnable.invoke({"input": test_input})
                print(f"  {name}: {type(result).__name__}")
            else:
                result = runnable.invoke(test_input)
                print(f"  {name}: {result}")
        except Exception as e:
            print(f"  {name}: 执行出错 - {e}")

# 运行 LCEL 优势演示
if __name__ == "__main__":
    syntax_simplicity_demo()
    composability_demo()
    uniform_interface_demo()

6. 总结

在本指南中,我们深入探讨了 LangChain 中的流式调用、并行处理和 Runnable 组件等核心概念和实践方法。

主要学习内容回顾

  1. 流式调用

    • 理解了流式调用的概念和工作原理
    • 掌握了同步和异步流式调用的实现方法
    • 认识到流式调用在提升用户体验方面的重要作用
  2. 并行处理

    • 学会了使用 RunnableParallel 实现并行处理
    • 了解了并行处理在提高效率方面的优势
    • 通过多个实际场景示例加深了理解
  3. Runnable 组件

    • 详细了解了 RunnablePassthroughRunnableLambdaRunnableParallel 的使用
    • 掌握了这些组件在构建复杂应用中的重要作用
  4. 迁移与对比

    • 对比了传统 Chain 方式和 LCEL 方式的差异
    • 理解了 LCEL 在语法简洁性、组合性和统一接口方面的优势

实践建议

在实际项目中应用这些技术时,建议:

  1. 合理使用流式调用

    • 在需要实时反馈的场景中优先考虑流式调用
    • 注意处理流式调用中的错误和异常情况
  2. 优化并行处理

    • 识别可以并行执行的任务以提高效率
    • 注意控制并行度以避免资源竞争
  3. 充分利用 Runnable 组件

    • 使用 RunnablePassthrough 保留原始数据
    • 通过 RunnableLambda 集成自定义处理逻辑
    • 利用 RunnableParallel 实现并行处理
  4. 采用 LCEL 方式

    • 优先使用 LCEL 构建 Chain 以获得更好的可读性和可维护性
    • 充分利用 LCEL 强大的组合能力
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐