9. LangChain Chains 基础概念详解

目录

  1. 简介
  2. 环境准备
  3. 什么是 Chains
  4. Chains 与 Runnable 的关系
  5. 基础 Chain 类型
  6. Chains 的组合
  7. 实际应用示例
  8. Chains 的高级特性
  9. Chains 的性能优化
  10. 调试和监控 Chains
  11. Chains 的最佳实践
  12. Chains 与微服务架构
  13. Chains 在企业级应用中的部署
  14. Chains 的未来发展
  15. 总结

1. 简介

1.1 什么是 LangChain Chains?

LangChain Chains 是 LangChain 框架中的核心概念之一,它代表了一系列组件的有序组合,这些组件可以依次处理输入并产生输出。Chains 提供了一种简单而强大的方式来构建复杂的 AI 应用程序,将不同的组件(如语言模型、提示词模板、工具等)连接在一起。

可以把 Chains 想象成一条流水线,数据从一端进入,经过多个处理步骤,最终从另一端输出结果。每个处理步骤都可以是一个简单的组件,也可以是另一个复杂的 Chain。

LangChain 中的 Chains 设计灵感来源于函数式编程的思想,它强调组件的可组合性和可重用性。这种设计使得开发者能够以模块化的方式构建 AI 应用,提高了代码的可维护性和可扩展性。

1.2 为什么需要学习 Chains?

  1. 模块化设计 - Chains 允许我们将复杂的应用程序分解为可重用的模块
  2. 易于组合 - 不同的 Chains 可以轻松组合,构建更强大的应用程序
  3. 标准化接口 - 所有 Chains 都遵循相同的接口,便于学习和使用
  4. 灵活扩展 - 可以轻松添加新的处理步骤或替换现有组件

在传统的 AI 应用开发中,我们通常需要手动管理各个组件之间的数据流动和状态转换,这种方式容易出错且难以维护。而 LangChain 的 Chains 提供了一种声明式的编程方式,开发者只需要定义组件之间的连接关系,框架会自动处理数据的传递和转换。

1.3 本指南的学习目标

通过本指南,您将学会:

  • 理解 Chains 的基本概念和工作原理
  • 掌握不同类型的 Chains 及其使用方法
  • 学会如何组合和扩展 Chains
  • 构建简单的 AI 应用程序
  • 为后续学习高级 Chains 应用打下基础

本指南将从最基础的概念开始,逐步深入到高级特性和实际应用场景,确保您能够全面掌握 LangChain Chains 的使用方法。

1.4 Chains 在现代 AI 应用中的重要性

随着大语言模型(LLM)的快速发展,AI 应用的复杂性也在不断增加。传统的单一模型调用已经无法满足复杂业务场景的需求。Chains 的出现正是为了解决这一问题,它提供了一种结构化的方式来组织和管理复杂的 AI 工作流。

在实际应用中,一个典型的 AI 应用可能需要:

  1. 接收用户输入
  2. 对输入进行预处理和分析
  3. 调用一个或多个语言模型
  4. 对模型输出进行后处理
  5. 与其他系统或数据库进行交互
  6. 返回最终结果给用户

Chains 使得这些步骤可以被清晰地定义和组织,大大提高了开发效率和代码质量。

1.5 Chains 与其他 AI 框架的对比

与其他 AI 框架相比,LangChain 的 Chains 具有以下优势:

  1. 高度可组合性 - 组件可以像乐高积木一样自由组合
  2. 标准化接口 - 所有组件遵循统一的接口规范
  3. 丰富的生态系统 - 支持多种模型和工具集成
  4. 强大的调试能力 - 提供详细的执行跟踪和可视化功能

1.6 Chains 的发展历程

LangChain 的 Chains 概念经历了以下几个发展阶段:

  1. 初期版本 - 简单的 LLMChain 实现
  2. LCEL 引入 - LangChain Expression Language 的出现
  3. Runnable 接口 - 更加通用和灵活的接口设计
  4. 当前版本 - 高度可组合和可扩展的 Chains 系统

1.7 Chains 的核心价值

Chains 的核心价值体现在以下几个方面:

  1. 简化复杂性 - 将复杂的 AI 工作流分解为简单的组件
  2. 提高可维护性 - 模块化设计使得代码更易于维护
  3. 增强可测试性 - 每个组件都可以独立测试
  4. 促进重用 - 组件可以在不同项目中重复使用

1.8 学习 Chains 的建议

为了更好地学习和掌握 Chains,建议您:

  1. 从基础开始 - 先理解 Runnable 接口和基本概念
  2. 动手实践 - 通过实际代码练习加深理解
  3. 循序渐进 - 从简单 Chain 开始,逐步构建复杂应用
  4. 参考示例 - 学习官方和社区提供的示例代码
  5. 持续学习 - 关注 LangChain 的更新和发展

1.9 本章结构说明

本章将按照以下结构进行组织:

  1. 首先介绍环境准备,确保您能顺利运行示例代码
  2. 然后深入讲解 Chains 的基本概念和核心原理
  3. 接着介绍不同类型的 Chain 及其使用方法
  4. 通过实际应用示例展示 Chains 的强大功能
  5. 最后探讨高级特性和最佳实践

2. 环境准备

2.1 安装必要的库

首先,我们需要安装 LangChain 和相关的依赖库。打开终端并运行以下命令:

pip install langchain langchain-openai python-dotenv

如果您使用的是其他大语言模型提供商,比如 Anthropic 或 Google,您还需要安装相应的库:

# 对于 Anthropic 的 Claude 模型
pip install langchain-anthropic

# 对于 Google 的 Gemini 模型
pip install langchain-google-genai

# 对于 Hugging Face 模型
pip install langchain-huggingface

2.2 配置环境变量

创建一个 .env 文件来存储您的 API 密钥和其他配置信息:

OPENAI_API_KEY=your_openai_api_key_here
OPENAI_BASE_URL=https://api.openai-proxy.org/v1
OPENAI_MODEL=gpt-4o-mini

请将 your_openai_api_key_here 替换为您实际的 OpenAI API 密钥。

对于不同的模型提供商,您可能需要配置不同的环境变量:

``env

OpenAI 配置

OPENAI_API_KEY=your_openai_api_key
OPENAI_ORGANIZATION=your_organization_id

Anthropic 配置

ANTHROPIC_API_KEY=your_anthropic_api_key

Google 配置

GOOGLE_API_KEY=your_google_api_key

Hugging Face 配置

HUGGINGFACEHUB_API_TOKEN=your_huggingface_token


### 2.3 验证环境配置

创建一个简单的测试文件 `hello_chains.py` 来验证环境配置:

import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI

加载环境变量

load_dotenv()

获取环境变量

api_key = os.getenv(“OPENAI_API_KEY”)
base_url = os.getenv(“OPENAI_BASE_URL”)
model_name = os.getenv(“OPENAI_MODEL”)

验证环境变量

if not all([api_key, base_url, model_name]):
print(“请确保在 .env 文件中设置了所有必需的环境变量”)
exit(1)

print(“环境变量配置正确:”)
print(f"API Key 长度: {len(api_key)} 字符")
print(f"Base URL: {base_url}“)
print(f"Model Name: {model_name}”)

测试模型连接

try:
llm = ChatOpenAI(
model=model_name,
base_url=base_url,
api_key=api_key,
temperature=0.7
)

response = llm.invoke("你好,世界!")
print(f"模型响应: {response.content}")
print("环境配置成功!")

except Exception as e:
print(f"连接模型时出错: {e}")


运行这个文件来验证您的环境配置是否正确:

```bash
python hello_chains.py

如果一切正常,您应该看到类似以下的输出:

环境变量配置正确:
API Key 长度: 51 字符
Base URL: https://api.openai-proxy.org/v1
Model Name: gpt-4o-mini
模型响应: 你好!有什么我可以帮助你的吗?
环境配置成功!

2.4 多环境配置管理

在实际项目中,我们通常需要管理多个环境(开发、测试、生产)的配置。可以通过以下方式实现:

# config.py
import os
from dotenv import load_dotenv

# 根据环境变量加载不同的 .env 文件
env = os.getenv("ENVIRONMENT", "development")
dotenv_path = f".env.{env}"
load_dotenv(dotenv_path)

class Config:
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
    OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL")
    OPENAI_MODEL = os.getenv("OPENAI_MODEL")
    TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7"))
    
    @classmethod
    def validate(cls):
        """验证配置是否完整"""
        required_vars = [
            cls.OPENAI_API_KEY,
            cls.OPENAI_BASE_URL,
            cls.OPENAI_MODEL
        ]
        
        if not all(required_vars):
            missing = [var for var in required_vars if not var]
            raise ValueError(f"缺少必要的环境变量: {missing}")

# 使用示例
try:
    Config.validate()
    print("配置验证通过")
except ValueError as e:
    print(f"配置验证失败: {e}")

2.5 依赖管理最佳实践

在生产环境中,建议使用 requirements.txt 文件来管理项目依赖:

# requirements.txt
langchain==0.1.0
langchain-openai==0.1.0
python-dotenv==1.0.0
pydantic==2.5.0

安装依赖:

pip install -r requirements.txt

2.6 虚拟环境管理

为了确保项目的依赖隔离,建议使用虚拟环境:

# 创建虚拟环境
python -m venv langchain_env

# 激活虚拟环境 (Windows)
langchain_env\Scripts\activate

# 激活虚拟环境 (macOS/Linux)
source langchain_env/bin/activate

# 安装依赖
pip install -r requirements.txt

3. 什么是 Chains

3.1 Chains 的定义

在 LangChain 中,Chain 是一个接口,它接受一些输入并产生一些输出。Chains 可以是简单的(单个组件),也可以是复杂的(多个组件的组合)。它们是 LangChain 中可组合性的核心体现。

Chains 的关键特性包括:

  1. 输入/输出 - 每个 Chain 都有明确定义的输入和输出
  2. 可组合性 - Chains 可以与其他 Chains 组合形成更复杂的 Chains
  3. 标准化接口 - 所有 Chains 都遵循相同的接口,便于使用
  4. 可配置性 - Chains 可以通过配置进行自定义

LangChain 中的 Chains 设计遵循了 Unix 哲学:“做一件事,并做好”。每个 Chain 都专注于完成特定的任务,通过组合不同的 Chains 来构建复杂的应用。

3.2 Chains 的核心方法

每个 Chain 都实现了以下核心方法:

  1. invoke() - 处理单个输入并返回单个输出
  2. batch() - 批量处理多个输入
  3. stream() - 流式处理输入并逐步返回输出
  4. ainvoke(), abatch(), astream() - 对应的异步版本

让我们详细看看这些方法的使用:

# chain_methods.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 加载环境变量
load_dotenv()

# 创建语言模型
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# 创建 Chain
chain = (
    ChatPromptTemplate.from_template("告诉我关于{topic}的5个要点")
    | llm
    | StrOutputParser()
)

# 1. invoke() - 单个输入处理
print("=== invoke() 方法 ===")
result = chain.invoke({"topic": "人工智能"})
print(result)

# 2. batch() - 批量处理
print("\n=== batch() 方法 ===")
inputs = [
    {"topic": "机器学习"},
    {"topic": "深度学习"},
    {"topic": "自然语言处理"}
]
results = chain.batch(inputs)
for i, result in enumerate(results):
    print(f"结果 {i+1}: {result}")

# 3. stream() - 流式处理
print("\n=== stream() 方法 ===")
for chunk in chain.stream({"topic": "计算机视觉"}):
    print(chunk, end="", flush=True)
print()  # 换行

3.3 简单示例

让我们通过一个简单的例子来理解 Chains 的工作原理:

# simple_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 加载环境变量
load_dotenv()

# 创建语言模型实例
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# 创建提示词模板
prompt = ChatPromptTemplate.from_template("告诉我一个关于{topic}的有趣事实")

# 创建输出解析器
parser = StrOutputParser()

# 使用管道符(|)组合组件创建 Chain
chain = prompt | llm | parser

# 调用 Chain
result = chain.invoke({"topic": "人工智能"})
print("AI 生成的内容:")
print(result)

在这个例子中,我们创建了一个由三个组件组成的 Chain:

  1. prompt - 提示词模板
  2. llm - 语言模型
  3. parser - 输出解析器

这三个组件通过管道符 | 连接在一起,形成了一个完整的 Chain。

3.4 Chain 的执行流程

当我们调用 chain.invoke() 时,执行流程如下:

  1. 输入 {"topic": "人工智能"} 被传递给 prompt
  2. prompt 将输入格式化为实际的提示词文本
  3. 格式化后的提示词被传递给 llm
  4. llm 生成响应
  5. 响应被传递给 parser
  6. parser 处理响应并返回最终结果

这个流程可以用以下图示表示:

输入: {"topic": "人工智能"}
         ↓
[提示词模板] → "告诉我一个关于人工智能的有趣事实"
         ↓
[语言模型] → "人工智能的一个有趣事实是..."
         ↓
[输出解析器] → "人工智能的一个有趣事实是..."
         ↓
输出: 最终结果

3.5 Chain 的内部机制

让我们深入了解 Chain 的内部工作机制:

# chain_internals.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 加载环境变量
load_dotenv()

# 创建组件
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

prompt = ChatPromptTemplate.from_template("解释{concept}的概念")
parser = StrOutputParser()

# 创建 Chain
chain = prompt | llm | parser

# 查看 Chain 的结构
print("Chain 类型:", type(chain))
print("Chain 结构:", chain)

# 查看 Chain 的输入和输出模式
print("\n输入模式:")
print(chain.input_schema.schema())
print("\n输出模式:")
print(chain.output_schema.schema())

# 查看 Chain 的图形表示
print("\nChain 图形表示:")
chain.get_graph().print_ascii()

通过这些方法,我们可以深入了解 Chain 的内部结构和工作原理。

3.6 Chain 的类型系统

LangChain 提供了丰富的类型系统来确保 Chains 的输入输出正确性:

# chain_types.py
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List

# 定义输入模型
class QuestionInput(BaseModel):
    question: str = Field(description="用户提出的问题")
    context: str = Field(description="问题的上下文信息")

# 定义输出模型
class AnswerOutput(BaseModel):
    answer: str = Field(description="问题的答案")
    confidence: float = Field(description="答案的置信度", ge=0, le=1)
    references: List[str] = Field(description="参考信息")

# 创建提示词模板
prompt = ChatPromptTemplate.from_template(
    "基于以下上下文回答问题:\n\n上下文: {context}\n\n问题: {question}"
)

# 查看输入输出类型
print("输入类型:")
print(prompt.input_schema.schema())

print("\n输出类型:")
# 注意:实际的输出类型取决于后续组件

3.7 Chain 的可组合性详解

Chain 的可组合性是其最强大的特性之一。这种特性使得我们可以将简单的组件组合成复杂的处理流程。让我们通过一个更复杂的示例来演示这一点:

# chain_composability.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough

# 加载环境变量
load_dotenv()

# 创建语言模型实例
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# 创建多个提示词模板
translation_prompt = ChatPromptTemplate.from_template(
    "将以下文本翻译成英文: {text}"
)

summary_prompt = ChatPromptTemplate.from_template(
    "为以下文本生成一个简短的摘要: {text}"
)

# 创建 Chain 组件
translator = translation_prompt | llm | StrOutputParser()
summarizer = summary_prompt | llm | StrOutputParser()

# 使用 RunnableParallel 并行执行多个 Chain
parallel_chain = RunnableParallel(
    translation=translator,
    summary=summarizer
)

# 使用示例
input_text = "人工智能是计算机科学的一个重要分支,它致力于创建能够执行通常需要人类智能的任务的系统。"
result = parallel_chain.invoke({"text": input_text})

print("并行处理结果:")
print(f"翻译结果: {result['translation']}")
print(f"摘要结果: {result['summary']}")

3.8 Chain 的输入输出验证

LangChain 提供了强大的输入输出验证机制,确保数据符合预期的格式:

# input_output_validation.py
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List

# 定义严格的输入模型
class TranslationInput(BaseModel):
    text: str = Field(..., min_length=1, max_length=1000, description="要翻译的文本")
    target_language: str = Field(..., min_length=2, max_length=20, description="目标语言")

# 定义严格的输出模型
class TranslationOutput(BaseModel):
    translated_text: str = Field(..., description="翻译后的文本")
    source_language: str = Field(..., description="源语言")
    confidence: float = Field(..., ge=0, le=1, description="翻译置信度")

# 创建提示词模板
prompt = ChatPromptTemplate.from_template(
    "将以下文本翻译成{target_language},并告诉我源语言是什么:\n\n{text}\n\n"
    "请以JSON格式返回结果,包含translated_text、source_language和confidence字段。"
)

print("输入模型结构:")
print(TranslationInput.schema())

print("\n输出模型结构:")
print(TranslationOutput.schema())

4. Chains 与 Runnable 的关系

4.1 Runnable 接口

在 LangChain 中,Chain 是 Runnable 接口的一个实现。Runnable 是 LangChain 中最基础的接口,它定义了所有可运行组件的行为规范。

Runnable 接口的核心特性:

  1. 统一接口 - 所有 Runnable 组件都有相同的调用方式
  2. 可组合性 - Runnable 组件可以轻松组合
  3. 链式调用 - 支持通过管道符进行链式调用

Runnable 接口定义了以下核心方法:

  • invoke(input, config) - 执行单个输入
  • batch(inputs, config) - 批量执行多个输入
  • stream(input, config) - 流式执行
  • async 版本的方法

4.2 Chain 作为 Runnable 的实现

Chain 继承了 Runnable 的所有特性,并添加了一些专门用于构建应用程序的功能:

# chain_as_runnable.py
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# 创建一个简单的 RunnableLambda
def uppercase_transform(input_text):
    return input_text.upper()

transformer = RunnableLambda(uppercase_transform)

# 创建语言模型
llm = ChatOpenAI(temperature=0.7)

# 创建提示词模板
prompt = ChatPromptTemplate.from_template("将以下文本翻译成英文: {text}")

# 创建 Chain(实际上是 Runnable 的组合)
chain = prompt | llm

# 验证 Chain 是 Runnable 的实例
print(f"Chain 是 Runnable 的实例: {isinstance(chain, RunnableLambda)}")
print(f"LLM 是 Runnable 的实例: {isinstance(llm, RunnableLambda)}")
print(f"Prompt 是 Runnable 的实例: {isinstance(prompt, RunnableLambda)}")

4.3 Runnable 与 Chain 的区别

虽然 Chain 是 Runnable 的一种实现,但它们之间有一些细微的区别:

  1. 概念范围

    • Runnable 是一个通用接口,代表任何可运行的组件
    • Chain 是一个更具体的概念,通常指代多个组件的组合
  2. 使用场景

    • Runnable 更多用于底层组件和接口定义
    • Chain 更多用于应用程序级别的组件组合
  3. 功能特性

    • Runnable 提供基础的运行能力
    • Chain 提供更丰富的应用程序构建功能

4.4 Runnable 的其他实现

除了 Chain,LangChain 中还有其他 Runnable 的实现:

# other_runnables.py
from langchain_core.runnables import RunnableLambda, RunnableParallel, RunnablePassthrough

# RunnableLambda - 将普通函数包装为 Runnable
def add_prefix(text):
    return f"前缀: {text}"

prefix_adder = RunnableLambda(add_prefix)

# RunnableParallel - 并行执行多个 Runnable
parallel_runnable = RunnableParallel(
    upper=lambda x: x.upper(),
    lower=lambda x: x.lower(),
    length=lambda x: len(x)
)

# RunnablePassthrough - 直接传递输入
passthrough = RunnablePassthrough()

# 使用示例
print("RunnableLambda:")
print(prefix_adder.invoke("测试文本"))

print("\nRunnableParallel:")
print(parallel_runnable.invoke("Hello World"))

print("\nRunnablePassthrough:")
print(passthrough.invoke("直接传递的文本"))

4.5 Runnable 的组合能力

Runnable 的强大之处在于其组合能力,这使得我们可以构建复杂的处理流程:

# runnable_composition.py
from langchain_core.runnables import RunnableLambda, RunnableParallel
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 创建处理函数
def text_analyzer(text):
    return {
        "length": len(text),
        "word_count": len(text.split()),
        "char_count": len(text.replace(" ", ""))
    }

def sentiment_classifier(analysis):
    # 模拟情感分类
    if analysis["word_count"] > 10:
        return "复杂文本"
    else:
        return "简单文本"

# 创建 Runnable 组件
analyzer = RunnableLambda(text_analyzer)
classifier = RunnableLambda(sentiment_classifier)
llm = ChatOpenAI(temperature=0.7)
prompt = ChatPromptTemplate.from_template("总结以下文本: {text}")
parser = StrOutputParser()

# 复杂组合示例
complex_chain = (
    analyzer
    | RunnableParallel(
        analysis=lambda x: x,
        classification=classifier,
        summary=(prompt | llm | parser)
    )
)

# 使用示例
result = complex_chain.invoke("这是一个用于测试的示例文本,它包含了一些词汇。")
print("复杂组合结果:")
print(result)

5. 基础 Chain 类型

LLMChain

LLMChain 是最基础也是最常用的 Chain 类型之一。它将提示词模板与语言模型组合在一起。

# llm_chain_example.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 加载环境变量
load_dotenv()

# 创建语言模型
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# 创建提示词模板
prompt = ChatPromptTemplate.from_template(
    "你是一个专业的{profession}。请回答以下问题: {question}"
)

# 创建输出解析器
parser = StrOutputParser()

# 创建 LLMChain(通过组合方式)
llm_chain = prompt | llm | parser

# 使用 Chain
result = llm_chain.invoke({
    "profession": "历史学家",
    "question": "请简述工业革命对人类社会的影响"
})

print("LLMChain 输出:")
print(result)

LLMChain 的工作原理:

  1. 接收包含模板变量的输入
  2. 使用输入数据格式化提示词模板
  3. 将格式化后的提示词发送给语言模型
  4. 获取模型响应
  5. 使用输出解析器处理响应
  6. 返回最终结果

PromptTemplate + LLM

这是构建 Chain 最基础的方式,通过将提示词模板和语言模型组合在一起:

# prompt_llm_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# 加载环境变量
load_dotenv()

# 创建语言模型
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# 创建提示词模板
prompt = ChatPromptTemplate.from_template("写一首关于{subject}的{style}诗")

# 创建 Chain
chain = prompt | llm

# 使用 Chain
result = chain.invoke({
    "subject": "春天",
    "style": "五言绝句"
})

print("诗歌生成结果:")
print(result.content)

这种方式的优点:

  1. 简单直观,易于理解
  2. 灵活性高,可以根据需要组合不同的组件
  3. 符合函数式编程的思想

Transformation Chain

Transformation Chain 用于对数据进行转换处理:

# transformation_chain.py
from langchain_core.runnables import RunnableLambda

# 定义转换函数
def word_count(text):
    return {"text": text, "word_count": len(text.split())}

def uppercase_text(data):
    return {"text": data["text"].upper(), "word_count": data["word_count"]}

# 创建 Transformation Chain
word_counter = RunnableLambda(word_count)
uppercaser = RunnableLambda(uppercase_text)

# 组合 Chain
transformation_chain = word_counter | uppercaser

# 使用 Chain
result = transformation_chain.invoke("hello world this is a test")
print("转换结果:")
print(result)

Transformation Chain 的应用场景:

  1. 数据预处理
  2. 格式转换
  3. 特征提取
  4. 数据清洗

其他基础 Chain 类型

LangChain 还提供了其他一些基础的 Chain 类型:

# other_basic_chains.py
from langchain_core.runnables import RunnableLambda, RunnableParallel

# 条件 Chain
def conditional_processor(data):
    if data["score"] > 80:
        return "优秀"
    elif data["score"] > 60:
        return "良好"
    else:
        return "需要改进"

conditional_chain = RunnableLambda(conditional_processor)

# 并行 Chain
def get_word_count(text):
    return len(text.split())

def get_char_count(text):
    return len(text)

def get_line_count(text):
    return len(text.split('\n'))

parallel_chain = RunnableParallel(
    word_count=RunnableLambda(get_word_count),
    char_count=RunnableLambda(get_char_count),
    line_count=RunnableLambda(get_line_count)
)

# 使用示例
print("条件 Chain:")
print(conditional_chain.invoke({"score": 85}))

print("\n并行 Chain:")
print(parallel_chain.invoke("这是第一行\n这是第二行\n这是第三行"))

6. Chains 的组合

顺序组合

Chains 可以通过管道符 | 进行顺序组合:

# sequential_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda

# 加载环境变量
load_dotenv()

# 创建语言模型
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# 创建多个提示词模板
prompt1 = ChatPromptTemplate.from_template("将以下内容总结为一句话: {content}")
prompt2 = ChatPromptTemplate.from_template("将以下摘要翻译成英文: {summary}")

# 创建输出解析器
parser = StrOutputParser()

# 创建处理函数
def add_prefix(data):
    return f"摘要: {data}"

prefix_adder = RunnableLambda(add_prefix)

# 组合多个 Chain
chain = prompt1 | llm | parser | prefix_adder | prompt2 | llm | parser

# 使用 Chain
result = chain.invoke({
    "content": "人工智能是计算机科学的一个分支,它企图了解智能的实质,并生产出一种新的能以人类智能相似的方式做出反应的智能机器。该领域的研究包括机器人、语言识别、图像识别、自然语言处理和专家系统等。",
    "summary": "一句话摘要"
})

print("最终结果:")
print(result)

顺序组合的特点:

  1. 数据流是线性的
  2. 每个组件的输出是下一个组件的输入
  3. 执行顺序是确定的
  4. 错误会沿着链条传播

并行组合

Chains 也可以并行执行:

# parallel_chain.py
from langchain_core.runnables import RunnableParallel

# 定义处理函数
def get_word_count(text):
    return len(text.split())

def get_char_count(text):
    return len(text)

def get_line_count(text):
    return len(text.split('\n'))

# 创建 Runnable 组件
word_counter = RunnableLambda(get_word_count)
char_counter = RunnableLambda(get_char_count)
line_counter = RunnableLambda(get_line_count)

# 并行组合
parallel_chain = RunnableParallel(
    word_count=word_counter,
    char_count=char_counter,
    line_count=line_counter
)

# 使用 Chain
text = """这是第一行文本。
这是第二行文本。
这是第三行文本。"""

result = parallel_chain.invoke(text)
print("并行处理结果:")
print(result)

并行组合的特点:

  1. 多个组件同时执行
  2. 各组件之间相互独立
  3. 结果以字典形式返回
  4. 可以提高处理效率

复杂组合

我们可以将顺序组合和并行组合结合起来,创建更复杂的 Chains:

# complex_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnableLambda

# 加载环境变量
load_dotenv()

# 创建语言模型
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# 创建分析 Chain
analysis_chain = (
    ChatPromptTemplate.from_template("分析以下文本的情感倾向: {text}")
    | llm
    | StrOutputParser()
)

# 创建摘要 Chain
summary_chain = (
    ChatPromptTemplate.from_template("为以下文本生成摘要: {text}")
    | llm
    | StrOutputParser()
)

# 创建关键词提取 Chain
keywords_chain = (
    ChatPromptTemplate.from_template("提取以下文本的关键词: {text}")
    | llm
    | StrOutputParser()
)

# 并行执行多个分析任务
parallel_analysis = RunnableParallel(
    sentiment=analysis_chain,
    summary=summary_chain,
    keywords=keywords_chain
)

# 创建报告生成 Chain
report_chain = (
    ChatPromptTemplate.from_template("""
    基于以下分析结果生成报告:
    
    情感分析: {sentiment}
    文本摘要: {summary}
    关键词: {keywords}
    """)
    | llm
    | StrOutputParser()
)

# 组合完整 Chain
complete_chain = parallel_analysis | report_chain

# 使用 Chain
text = "人工智能技术正在快速发展,它在医疗、教育、交通等领域都有广泛应用。虽然带来了很多便利,但也引发了一些关于就业和隐私的担忧。"

result = complete_chain.invoke({"text": text})
print("完整分析报告:")
print(result)

7. 实际应用示例

文档摘要生成器

# document_summarizer.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 加载环境变量
load_dotenv()

# 创建语言模型
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.3
)

# 创建文档摘要 Chain
document_summarizer = (
    ChatPromptTemplate.from_template(
        "请为以下文档生成一个简洁的摘要(不超过100字):\n\n{document}"
    )
    | llm
    | StrOutputParser()
)

# 示例文档
sample_document = """
人工智能(Artificial Intelligence,AI)是计算机科学的一个分支,它企图了解智能的实质,
并生产出一种新的能以人类智能相似的方式做出反应的智能机器。该领域的研究包括机器人、
语言识别、图像识别、自然语言处理和专家系统等。

人工智能从诞生以来,理论和技术日益成熟,应用领域也不断扩大。可以设想,未来人工智能
带来的科技产品,将会是人类智慧的"容器"。人工智能可以对人的意识、思维的信息过程的模拟。
人工智能不是人的智能,但能像人那样思考、也可能超过人的智能。
"""

# 生成摘要
summary = document_summarizer.invoke({"document": sample_document})
print("文档摘要:")
print(summary)

多步骤问答系统

# multi_step_qa.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 加载环境变量
load_dotenv()

# 创建语言模型
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.3
)

# 第一步:理解问题
question_analyzer = (
    ChatPromptTemplate.from_template(
        "分析以下问题并确定回答该问题需要哪些关键信息:\n\n问题: {question}"
    )
    | llm
    | StrOutputParser()
)

# 第二步:生成答案
answer_generator = (
    ChatPromptTemplate.from_template(
        "基于以下信息回答问题:\n\n问题: {question}\n\n相关信息: {info}\n\n请提供详细的回答:"
    )
    | llm
    | StrOutputParser()
)

# 组合 Chain
qa_chain = question_analyzer | (lambda info: {
    "question": "人工智能是什么?",
    "info": info
}) | answer_generator

# 使用 Chain
result = qa_chain.invoke({"question": "人工智能是什么?"})
print("问答结果:")
print(result)

智能邮件分类器

# email_classifier.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List

# 加载环境变量
load_dotenv()

# 定义输出结构
class EmailClassification(BaseModel):
    category: str = Field(description="邮件分类: 工作、个人、促销、社交、垃圾邮件")
    priority: str = Field(description="优先级: 高、中、低")
    summary: str = Field(description="邮件摘要")
    action_items: List[str] = Field(description="需要采取的行动")

# 创建语言模型
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.3
)

# 创建解析器
parser = JsonOutputParser(pydantic_object=EmailClassification)

# 创建分类 Chain
email_classifier = (
    ChatPromptTemplate.from_template("""
    请分析以下邮件内容并进行分类:
    
    邮件内容: {email_content}
    
    {format_instructions}
    """)
    | llm
    | parser
)

# 示例邮件
sample_email = """
主题: 项目进度更新会议

亲爱的团队成员,

希望这封邮件找到你们都安好。我想提醒大家,我们的季度项目进度更新会议定于本周五下午2点举行。
请确保你们已经准备好了各自的项目报告,并准备好回答可能的问题。

议程包括:
1. 各项目进度汇报
2. 遇到的挑战和解决方案
3. 下一季度的计划

请确认你们的出席情况。

谢谢,
项目经理
"""

# 分类邮件
result = email_classifier.invoke({
    "email_content": sample_email,
    "format_instructions": parser.get_format_instructions()
})

print("邮件分类结果:")
print(f"分类: {result['category']}")
print(f"优先级: {result['priority']}")
print(f"摘要: {result['summary']}")
print("行动项:")
for item in result['action_items']:
    print(f"  - {item}")

8. Chains 的高级特性

配置化 Chains

Chains 支持通过配置来定制其行为,这使得 Chains 更加灵活和可重用:

# configurable_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 加载环境变量
load_dotenv()

# 创建语言模型
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY")
)

# 创建可配置的提示词模板
configurable_prompt = ChatPromptTemplate.from_template(
    "你是一个{role},请用{tone}的语气回答关于{topic}的问题"
)

# 创建 Chain
configurable_chain = configurable_prompt | llm | StrOutputParser()

# 使用不同配置
configs = [
    {"role": "教师", "tone": "耐心", "topic": "数学"},
    {"role": "工程师", "tone": "专业", "topic": "编程"},
    {"role": "朋友", "tone": "轻松", "topic": "旅行"}
]

print("配置化 Chain 示例:")
for i, config in enumerate(configs, 1):
    result = configurable_chain.invoke(
        {"role": config["role"], "tone": config["tone"], "topic": config["topic"]},
        config={"configurable": config}
    )
    print(f"\n配置 {i}: {config}")
    print(f"结果: {result}")

错误处理与重试机制

在实际应用中,我们需要处理可能出现的错误并实现重试机制:

# error_handling_chain.py
import time
import random
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.utils import ConfigurableField

# 模拟可能失败的函数
def unreliable_function(text):
    # 模拟 30% 的失败率
    if random.random() < 0.3:
        raise Exception("模拟的网络错误")
    return f"处理结果: {text.upper()}"

# 创建带重试机制的 Runnable
def create_retryable_runnable(func, max_retries=3):
    def retry_wrapper(input_data):
        for attempt in range(max_retries):
            try:
                return func(input_data)
            except Exception as e:
                if attempt == max_retries - 1:
                    raise e
                print(f"尝试 {attempt + 1} 失败: {e},正在重试...")
                time.sleep(1)  # 等待 1 秒后重试
        return None
    
    return RunnableLambda(retry_wrapper)

# 创建 Chain
unreliable_chain = create_retryable_runnable(unreliable_function)

# 测试重试机制
print("错误处理与重试机制示例:")
try:
    result = unreliable_chain.invoke("测试文本")
    print(f"最终结果: {result}")
except Exception as e:
    print(f"所有重试都失败了: {e}")

缓存机制

为了提高性能,我们可以为 Chains 添加缓存机制:

# caching_chain.py
import hashlib
import json
from langchain_core.runnables import RunnableLambda

# 简单的内存缓存实现
class SimpleCache:
    def __init__(self):
        self.cache = {}
    
    def get(self, key):
        return self.cache.get(key)
    
    def set(self, key, value):
        self.cache[key] = value

# 创建缓存实例
cache = SimpleCache()

# 带缓存的处理函数
def cached_processing(input_data):
    # 生成缓存键
    cache_key = hashlib.md5(json.dumps(input_data, sort_keys=True).encode()).hexdigest()
    
    # 检查缓存
    cached_result = cache.get(cache_key)
    if cached_result:
        print("从缓存中获取结果")
        return cached_result
    
    # 模拟耗时处理
    import time
    time.sleep(2)  # 模拟 2 秒的处理时间
    result = f"处理结果: {input_data}"
    
    # 存储到缓存
    cache.set(cache_key, result)
    print("处理完成并存储到缓存")
    return result

# 创建 Chain
cached_chain = RunnableLambda(cached_processing)

# 测试缓存机制
print("缓存机制示例:")
print("第一次调用:")
result1 = cached_chain.invoke("测试数据")
print(f"结果: {result1}")

print("\n第二次调用(相同输入):")
result2 = cached_chain.invoke("测试数据")
print(f"结果: {result2}")

print("\n第三次调用(不同输入):")
result3 = cached_chain.invoke("不同数据")
print(f"结果: {result3}")

9. Chains 的性能优化

批处理优化

批处理可以显著提高 Chains 的处理效率:

# batch_processing.py
import time
from langchain_core.runnables import RunnableLambda

# 模拟处理函数
def process_item(item):
    # 模拟处理时间
    time.sleep(0.1)
    return f"处理完成: {item}"

# 创建处理 Runnable
processor = RunnableLambda(process_item)

# 单个处理
def single_processing(items):
    start_time = time.time()
    results = []
    for item in items:
        result = processor.invoke(item)
        results.append(result)
    end_time = time.time()
    return results, end_time - start_time

# 批处理
def batch_processing(items):
    start_time = time.time()
    results = processor.batch(items)
    end_time = time.time()
    return results, end_time - start_time

# 测试数据
test_items = [f"项目{i}" for i in range(10)]

print("性能对比测试:")
print("单个处理:")
single_results, single_time = single_processing(test_items)
print(f"耗时: {single_time:.2f} 秒")

print("\n批处理:")
batch_results, batch_time = batch_processing(test_items)
print(f"耗时: {batch_time:.2f} 秒")

print(f"\n性能提升: {single_time/batch_time:.2f} 倍")

异步处理

异步处理可以进一步提高 Chains 的并发性能:

# async_processing.py
import asyncio
import time
from langchain_core.runnables import RunnableLambda

# 模拟异步处理函数
async def async_process_item(item):
    # 模拟异步处理
    await asyncio.sleep(0.1)
    return f"异步处理完成: {item}"

# 创建异步处理 Runnable
async_processor = RunnableLambda(async_process_item)

# 异步批处理
async def async_batch_processing(items):
    start_time = time.time()
    results = await async_processor.abatch(items)
    end_time = time.time()
    return results, end_time - start_time

# 同步批处理对比
def sync_batch_processing(items):
    def sync_process_item(item):
        time.sleep(0.1)  # 模拟同步处理
        return f"同步处理完成: {item}"
    
    sync_processor = RunnableLambda(sync_process_item)
    start_time = time.time()
    results = sync_processor.batch(items)
    end_time = time.time()
    return results, end_time - start_time

# 测试
async def main():
    test_items = [f"项目{i}" for i in range(10)]
    
    print("异步处理测试:")
    async_results, async_time = await async_batch_processing(test_items)
    print(f"异步处理耗时: {async_time:.2f} 秒")
    
    print("\n同步处理对比:")
    sync_results, sync_time = sync_batch_processing(test_items)
    print(f"同步处理耗时: {sync_time:.2f} 秒")
    
    print(f"\n性能提升: {sync_time/async_time:.2f} 倍")

# 运行测试
# asyncio.run(main())

10. 调试和监控 Chains

调试 Chains

调试 Chains 是开发过程中的重要环节:

# chain_debugging.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda

# 加载环境变量
load_dotenv()

# 创建带调试信息的处理函数
def debug_processor(input_data):
    print(f"调试信息 - 输入: {input_data}")
    result = f"处理结果: {input_data}"
    print(f"调试信息 - 输出: {result}")
    return result

# 创建语言模型
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# 创建组件
debug_runnable = RunnableLambda(debug_processor)
prompt = ChatPromptTemplate.from_template("总结以下内容: {content}")
parser = StrOutputParser()

# 创建 Chain
debug_chain = debug_runnable | prompt | llm | parser

# 使用回调函数进行调试
def debug_callback():
    def wrapper(func):
        def inner(*args, **kwargs):
            print(f"调用函数: {func.__name__}")
            print(f"参数: args={args}, kwargs={kwargs}")
            result = func(*args, **kwargs)
            print(f"返回值: {result}")
            return result
        return inner
    return wrapper

# 可视化 Chain 结构
print("Chain 结构可视化:")
debug_chain.get_graph().print_ascii()

监控 Chains

监控 Chains 的执行情况有助于优化性能和发现问题:

# chain_monitoring.py
import time
from langchain_core.runnables import RunnableLambda
from functools import wraps

# 性能监控装饰器
def monitor_performance(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        try:
            result = func(*args, **kwargs)
            end_time = time.time()
            execution_time = end_time - start_time
            print(f"函数 {func.__name__} 执行时间: {execution_time:.4f} 秒")
            return result
        except Exception as e:
            end_time = time.time()
            execution_time = end_time - start_time
            print(f"函数 {func.__name__} 执行失败,耗时: {execution_time:.4f} 秒,错误: {e}")
            raise
    return wrapper

# 带监控的处理函数
@monitor_performance
def monitored_processing(input_data):
    # 模拟处理时间
    time.sleep(0.1)
    return f"监控处理结果: {input_data}"

# 创建 Chain
monitored_chain = RunnableLambda(monitored_processing)

# 测试监控
print("监控 Chains 执行:")
result = monitored_chain.invoke("测试数据")
print(f"结果: {result}")

11. Chains 的最佳实践

11.1 代码组织

在编写 Chains 时,建议遵循以下代码组织原则:

  1. 模块化 - 将不同的功能模块化,便于管理和维护
  2. 可重用性 - 尽量复用现有的 Chains 和组件
  3. 清晰性 - 保持代码结构清晰,易于理解
# example_module.py
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

# 创建提示词模板
prompt = ChatPromptTemplate.from_template("解释{concept}的概念")

# 创建输出解析器
parser = StrOutputParser()

# 创建语言模型
llm = ChatOpenAI(temperature=0.7)

# 创建 Chain
chain = prompt | llm | parser

# 导出 Chain
__all__ = ["chain"]

11.2 配置管理

使用配置文件来管理 Chains 的配置,便于在不同环境中使用:

# config.py
import os
from dotenv import load_dotenv

# 根据环境变量加载不同的 .env 文件
env = os.getenv("ENVIRONMENT", "development")
dotenv_path = f".env.{env}"
load_dotenv(dotenv_path)

class Config:
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
    OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL")
    OPENAI_MODEL = os.getenv("OPENAI_MODEL")
    TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7"))
    
    @classmethod
    def validate(cls):
        """验证配置是否完整"""
        required_vars = [
            cls.OPENAI_API_KEY,
            cls.OPENAI_BASE_URL,
            cls.OPENAI_MODEL
        ]
        
        if not all(required_vars):
            missing = [var for var in required_vars if not var]
            raise ValueError(f"缺少必要的环境变量: {missing}")

# 使用示例
try:
    Config.validate()
    print("配置验证通过")
except ValueError as e:
    print(f"配置验证失败: {e}")

11.3 错误处理

在实际应用中,需要对 Chains 的错误进行适当的处理:

# error_handling.py
import time
import random
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.utils import ConfigurableField

# 模拟可能失败的函数
def unreliable_function(text):
    # 模拟 30% 的失败率
    if random.random() < 0.3:
        raise Exception("模拟的网络错误")
    return f"处理结果: {text.upper()}"

# 创建带重试机制的 Runnable
def create_retryable_runnable(func, max_retries=3):
    def retry_wrapper(input_data):
        for attempt in range(max_retries):
            try:
                return func(input_data)
            except Exception as e:
                if attempt == max_retries - 1:
                    raise e
                print(f"尝试 {attempt + 1} 失败: {e},正在重试...")
                time.sleep(1)  # 等待 1 秒后重试
        return None
    
    return RunnableLambda(retry_wrapper)

# 创建 Chain
unreliable_chain = create_retryable_runnable(unreliable_function)

# 测试重试机制
print("错误处理与重试机制示例:")
try:
    result = unreliable_chain.invoke("测试文本")
    print(f"最终结果: {result}")
except Exception as e:
    print(f"所有重试都失败了: {e}")

11.4 性能优化

通过批处理和异步处理来优化 Chains 的性能:

# performance_optimization.py
import time
from langchain_core.runnables import RunnableLambda

# 模拟处理函数
def process_item(item):
    # 模拟处理时间
    time.sleep(0.1)
    return f"处理完成: {item}"

# 创建处理 Runnable
processor = RunnableLambda(process_item)

# 单个处理
def single_processing(items):
    start_time = time.time()
    results = []
    for item in items:
        result = processor.invoke(item)
        results.append(result)
    end_time = time.time()
    return results, end_time - start_time

# 批处理
def batch_processing(items):
    start_time = time.time()
    results = processor.batch(items)
    end_time = time.time()
    return results, end_time - start_time

# 测试数据
test_items = [f"项目{i}" for i in range(10)]

print("性能对比测试:")
print("单个处理:")
single_results, single_time = single_processing(test_items)
print(f"耗时: {single_time:.2f} 秒")

print("\n批处理:")
batch_results, batch_time = batch_processing(test_items)
print(f"耗时: {batch_time:.2f} 秒")

print(f"\n性能提升: {single_time/batch_time:.2f} 倍")
# async_processing.py
import asyncio
import time
from langchain_core.runnables import RunnableLambda

# 模拟异步处理函数
async def async_process_item(item):
    # 模拟异步处理
    await asyncio.sleep(0.1)
    return f"异步处理完成: {item}"

# 创建异步处理 Runnable
async_processor = RunnableLambda(async_process_item)

# 异步批处理
async def async_batch_processing(items):
    start_time = time.time()
    results = await async_processor.abatch(items)
    end_time = time.time()
    return results, end_time - start_time

# 同步批处理对比
def sync_batch_processing(items):
    def sync_process_item(item):
        time.sleep(0.1)  # 模拟同步处理
        return f"同步处理完成: {item}"
    
    sync_processor = RunnableLambda(sync_process_item)
    start_time = time.time()
    results = sync_processor.batch(items)
    end_time = time.time()
    return results, end_time - start_time

# 测试
async def main():
    test_items = [f"项目{i}" for i in range(10)]
    
    print("异步处理测试:")
    async_results, async_time = await async_batch_processing(test_items)
    print(f"异步处理耗时: {async_time:.2f} 秒")
    
    print("\n同步处理对比:")
    sync_results, sync_time = sync_batch_processing(test_items)
    print(f"同步处理耗时: {sync_time:.2f} 秒")
    
    print(f"\n性能提升: {sync_time/async_time:.2f} 倍")

# 运行测试
# asyncio.run(main())

12. Chains 与微服务架构

12.1 微服务架构的优势

微服务架构具有以下优势:

  1. 可扩展性 - 可以独立扩展不同的服务
  2. 灵活性 - 可以使用不同的技术栈
  3. 可维护性 - 服务之间相互独立,便于管理和维护
  4. 容错性 - 单个服务的故障不会影响整个系统
  5. 技术多样性 - 不同的服务可以使用最适合的技术栈

12.2 Chains 与微服务的结合

Chains 可以与微服务架构很好地结合,提供以下好处:

  1. 模块化 - Chains 可以作为独立的服务运行
  2. 可组合性 - 可以通过 API 调用组合不同的 Chains
  3. 可扩展性 - 可以独立扩展不同的 Chains
  4. 重用性 - 同一个 Chain 服务可以被多个应用调用
# chain_microservice.py
from flask import Flask, request, jsonify
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

app = Flask(__name__)

# 初始化模型
llm = ChatOpenAI(temperature=0.7)
prompt = ChatPromptTemplate.from_template("总结以下文本: {text}")
parser = StrOutputParser()

# 创建 Chain
summary_chain = prompt | llm | parser

@app.route('/summarize', methods=['POST'])
def summarize():
    try:
        data = request.json
        text = data.get('text', '')
        
        if not text:
            return jsonify({'error': '文本不能为空'}), 400
        
        # 调用 Chain
        result = summary_chain.invoke({'text': text})
        return jsonify({'summary': result})
    
    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)
# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# 复制依赖文件
COPY requirements.txt .

# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 暴露端口
EXPOSE 5000

# 启动应用
CMD ["python", "chain_microservice.py"]
# docker-compose.yml
version: '3.8'

services:
  chain-service:
    build: .
    ports:
      - "5000:5000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - OPENAI_BASE_URL=${OPENAI_BASE_URL}
      - OPENAI_MODEL=${OPENAI_MODEL}
    env_file:
      - .env

12.3 Chains 作为微服务的实现

通过将 Chains 封装为 RESTful API,可以轻松地将它们部署为独立的微服务:

# chain_service_example.py
from flask import Flask, request, jsonify
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 加载环境变量
load_dotenv()

app = Flask(__name__)

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('chain_service.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# 配置模型
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# 创建不同类型的 Chains
translation_prompt = ChatPromptTemplate.from_template("将以下文本翻译成{target_language}: {text}")
translation_chain = translation_prompt | llm | StrOutputParser()

sentiment_prompt = ChatPromptTemplate.from_template("分析以下文本的情感倾向: {text}")
sentiment_chain = sentiment_prompt | llm | StrOutputParser()

@app.route('/translate', methods=['POST'])
def translate():
    data = request.json
    text = data.get('text')
    target_language = data.get('target_language', '英文')
    
    try:
        result = translation_chain.invoke({
            'text': text,
            'target_language': target_language
        })
        return jsonify({'translated_text': result})
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/sentiment', methods=['POST'])
def analyze_sentiment():
    data = request.json
    text = data.get('text')
    
    try:
        result = sentiment_chain.invoke({'text': text})
        return jsonify({'sentiment': result})
    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True, port=5001)

12.4 Chains 与微服务的通信

在微服务架构中,不同的 Chains 服务可以通过 HTTP API 进行通信:

# chain_client.py
import requests
import json

class ChainServiceClient:
    def __init__(self, base_url):
        self.base_url = base_url
    
    def translate_text(self, text, target_language="英文"):
        """调用翻译服务"""
        url = f"{self.base_url}/translate"
        payload = {
            "text": text,
            "target_language": target_language
        }
        
        response = requests.post(url, json=payload)
        if response.status_code == 200:
            return response.json()['translated_text']
        else:
            raise Exception(f"翻译服务调用失败: {response.text}")
    
    def analyze_sentiment(self, text):
        """调用情感分析服务"""
        url = f"{self.base_url}/sentiment"
        payload = {"text": text}
        
        response = requests.post(url, json=payload)
        if response.status_code == 200:
            return response.json()['sentiment']
        else:
            raise Exception(f"情感分析服务调用失败: {response.text}")

# 使用示例
# client = ChainServiceClient("http://localhost:5001")
# translation = client.translate_text("你好,世界!", "English")
# sentiment = client.analyze_sentiment("今天天气真好!")

13. Chains 在企业级应用中的部署

13.1 部署环境

在企业级应用中部署 Chains 时,需要考虑以下环境因素:

  1. 计算资源 - 确保有足够的 CPU 和内存资源
  2. 网络环境 - 稳定的网络连接以访问外部 API
  3. 安全性 - 保护 API 密钥和敏感数据
  4. 监控系统 - 实时监控服务状态和性能指标
  5. 日志系统 - 记录详细的运行日志用于问题排查

13.2 部署工具

常用的部署工具包括:

  1. Docker - 容器化部署,确保环境一致性
  2. Kubernetes - 容器编排,实现自动扩缩容
  3. CI/CD 工具 - 自动化部署流程
  4. 云服务提供商 - AWS, Azure, GCP 等
# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# 复制依赖文件
COPY requirements.txt .

# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 暴露端口
EXPOSE 5000

# 启动应用
CMD ["python", "chain_microservice.py"]
# docker-compose.yml
version: '3.8'

services:
  chain-service:
    build: .
    ports:
      - "5000:5000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - OPENAI_BASE_URL=${OPENAI_BASE_URL}
      - OPENAI_MODEL=${OPENAI_MODEL}
    env_file:
      - .env

13.3 部署流程

企业级部署流程通常包括以下步骤:

  1. 代码打包 - 将应用代码和依赖打包
  2. 环境配置 - 配置生产环境变量
  3. 容器构建 - 构建 Docker 镜像
  4. 服务部署 - 部署到服务器或云平台
  5. 健康检查 - 验证服务是否正常运行
  6. 监控配置 - 配置监控和告警
# 部署脚本示例
#!/bin/bash

# 1. 代码打包
git clone https://github.com/your-repo/chain-app.git
cd chain-app

# 2. 环境配置
cp .env.example .env
# 编辑 .env 文件,设置必要的环境变量

# 3. 容器构建
docker build -t chain-service:latest .

# 4. 服务部署
docker stop chain-service || true
docker rm chain-service || true
docker run -d \
  --name chain-service \
  -p 5000:5000 \
  -e OPENAI_API_KEY=$OPENAI_API_KEY \
  -e OPENAI_BASE_URL=$OPENAI_BASE_URL \
  -e OPENAI_MODEL=$OPENAI_MODEL \
  chain-service:latest

# 5. 健康检查
sleep 5
curl -f http://localhost:5000/health || echo "服务启动失败"

13.4 监控与日志

在生产环境中,监控和日志是确保服务稳定运行的关键:

# monitored_chain_service.py
import logging
from flask import Flask, request, jsonify
import time
from datetime import datetime
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

app = Flask(__name__)

# 初始化日志
logging.basicConfig(level=logging.INFO)

# 加载环境变量
load_dotenv()

# 初始化模型
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# 创建 Chain
prompt = ChatPromptTemplate.from_template("总结以下文本: {text}")
chain = prompt | llm | StrOutputParser()

# 性能监控装饰器
def monitor_performance(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        try:
            result = func(*args, **kwargs)
            end_time = time.time()
            execution_time = end_time - start_time
            
            # 记录性能日志
            logger.info(f"{func.__name__} 执行成功,耗时: {execution_time:.2f}秒")
            return result
        except Exception as e:
            end_time = time.time()
            execution_time = end_time - start_time
            logger.error(f"{func.__name__} 执行失败,耗时: {execution_time:.2f}秒,错误: {str(e)}")
            raise
    return wrapper

@app.route('/summarize', methods=['POST'])
@monitor_performance
def summarize():
    try:
        data = request.json
        text = data.get('text', '')
        
        logger.info(f"收到摘要请求,文本长度: {len(text)} 字符")
        
        if not text:
            logger.warning("收到空文本请求")
            return jsonify({'error': '文本不能为空'}), 400
        
        # 调用 Chain
        result = chain.invoke({'text': text})
        
        logger.info("摘要生成成功")
        return jsonify({'summary': result})
    
    except Exception as e:
        logger.error(f"摘要生成失败: {str(e)}")
        return jsonify({'error': str(e)}), 500

@app.route('/health', methods=['GET'])
def health_check():
    """健康检查端点"""
    return jsonify({
        'status': 'healthy',
        'timestamp': datetime.now().isoformat(),
        'service': 'chain-service'
    })

if __name__ == '__main__':
    logger.info("Chain 服务启动中...")
    app.run(host='0.0.0.0', port=5000)

14. Chains 的未来发展

LangChain Chains 的未来发展将集中在以下几个方向:

14.1 更强的可组合性

未来的 Chains 将支持更复杂的组合模式,包括:

  1. 条件组合 - 根据输入或中间结果动态选择处理路径
  2. 循环组合 - 支持迭代处理直到满足特定条件
  3. 分支组合 - 同时执行多个处理路径并合并结果
# conditional_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda

# 加载环境变量
load_dotenv()

# 创建语言模型
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# 创建提示词模板
prompt = ChatPromptTemplate.from_template("解释{concept}的概念")

# 创建输出解析器
parser = StrOutputParser()

# 创建 Chain
chain = prompt | llm | parser

# 创建条件处理函数
def conditional_processor(data):
    if data["concept"] == "人工智能":
        return chain.invoke({"concept": "AI"})
    elif data["concept"] == "机器学习":
        return chain.invoke({"concept": "ML"})
    else:
        return "未知概念"

# 创建 RunnableLambda
conditional_chain = RunnableLambda(conditional_processor)

# 使用 Chain
result = conditional_chain.invoke({"concept": "人工智能"})
print("条件 Chain 输出:")
print(result)

# loop_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda

# 加载环境变量
load_dotenv()

# 创建语言模型
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# 创建提示词模板
prompt = ChatPromptTemplate.from_template("解释{concept}的概念")

# 创建输出解析器
parser = StrOutputParser()

# 创建 Chain
chain = prompt | llm | parser

# 创建循环处理函数
def loop_processor(data):
    results = []
    for concept in data["concepts"]:
        result = chain.invoke({"concept": concept})
        results.append(result)
    return results

# 创建 RunnableLambda
loop_chain = RunnableLambda(loop_processor)

# 使用 Chain
result = loop_chain.invoke({"concepts": ["人工智能", "机器学习", "深度学习"]})
print("循环 Chain 输出:")
print(result)

# branch_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda

# 加载环境变量
load_dotenv()

# 创建语言模型
llm = ChatOpenAI(
    model=os.getenv("OPENAI_MODEL"),
    base_url=os.getenv("OPENAI_BASE_URL"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.7
)

# 创建提示词模板
prompt = ChatPromptTemplate.from_template("解释{concept}的概念")

# 创建输出解析器
parser = StrOutputParser()

# 创建 Chain
chain = prompt | llm | parser

# 创建分支处理函数
def branch_processor(data):
    if data["type"] == "AI":
        return chain.invoke({"concept": "人工智能"})
    elif data["type"] == "ML":
        return chain.invoke({"concept": "机器学习"})
    else:
        return "未知类型"

# 创建 RunnableLambda
branch_chain = RunnableLambda(branch_processor)

# 使用 Chain
result = branch_chain.invoke({"type": "AI"})
print("分支 Chain 输出:")
print(result)

# future_conditional_chain.py
from langchain_core.runnables import RunnableLambda, RunnableBranch

# 条件判断函数
def route_by_length(input_data):
    if len(input_data["text"]) < 100:
        return "short"
    elif len(input_data["text"]) < 1000:
        return "medium"
    else:
        return "long"

# 创建 RunnableLambda
short_chain = RunnableLambda(lambda x: f"短文本: {x}")
medium_chain = RunnableLambda(lambda x: f"中等文本: {x}")
long_chain = RunnableLambda(lambda x: f"长文本: {x}")

# 创建 RunnableBranch
conditional_chain = RunnableBranch(
    route_by_length,
    short=short_chain,
    medium=medium_chain,
    long=long_chain
)

# 使用 Chain
result = conditional_chain.invoke({"text": "这是一个简短的文本"})
print("条件 Chain 输出:")
print(result)

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐