LangChain Chains 基础详解
LangChain Chains 是 LangChain 框架中的核心概念之一,它代表了一系列组件的有序组合,这些组件可以依次处理输入并产生输出。Chains 提供了一种简单而强大的方式来构建复杂的 AI 应用程序,将不同的组件(如语言模型、提示词模板、工具等)连接在一起。可以把 Chains 想象成一条流水线,数据从一端进入,经过多个处理步骤,最终从另一端输出结果。每个处理步骤都可以是一个简单的
9. LangChain Chains 基础概念详解
目录
- 简介
- 环境准备
- 什么是 Chains
- Chains 与 Runnable 的关系
- 基础 Chain 类型
- Chains 的组合
- 实际应用示例
- Chains 的高级特性
- Chains 的性能优化
- 调试和监控 Chains
- Chains 的最佳实践
- Chains 与微服务架构
- Chains 在企业级应用中的部署
- Chains 的未来发展
- 总结
1. 简介
1.1 什么是 LangChain Chains?
LangChain Chains 是 LangChain 框架中的核心概念之一,它代表了一系列组件的有序组合,这些组件可以依次处理输入并产生输出。Chains 提供了一种简单而强大的方式来构建复杂的 AI 应用程序,将不同的组件(如语言模型、提示词模板、工具等)连接在一起。
可以把 Chains 想象成一条流水线,数据从一端进入,经过多个处理步骤,最终从另一端输出结果。每个处理步骤都可以是一个简单的组件,也可以是另一个复杂的 Chain。
LangChain 中的 Chains 设计灵感来源于函数式编程的思想,它强调组件的可组合性和可重用性。这种设计使得开发者能够以模块化的方式构建 AI 应用,提高了代码的可维护性和可扩展性。
1.2 为什么需要学习 Chains?
- 模块化设计 - Chains 允许我们将复杂的应用程序分解为可重用的模块
- 易于组合 - 不同的 Chains 可以轻松组合,构建更强大的应用程序
- 标准化接口 - 所有 Chains 都遵循相同的接口,便于学习和使用
- 灵活扩展 - 可以轻松添加新的处理步骤或替换现有组件
在传统的 AI 应用开发中,我们通常需要手动管理各个组件之间的数据流动和状态转换,这种方式容易出错且难以维护。而 LangChain 的 Chains 提供了一种声明式的编程方式,开发者只需要定义组件之间的连接关系,框架会自动处理数据的传递和转换。
1.3 本指南的学习目标
通过本指南,您将学会:
- 理解 Chains 的基本概念和工作原理
- 掌握不同类型的 Chains 及其使用方法
- 学会如何组合和扩展 Chains
- 构建简单的 AI 应用程序
- 为后续学习高级 Chains 应用打下基础
本指南将从最基础的概念开始,逐步深入到高级特性和实际应用场景,确保您能够全面掌握 LangChain Chains 的使用方法。
1.4 Chains 在现代 AI 应用中的重要性
随着大语言模型(LLM)的快速发展,AI 应用的复杂性也在不断增加。传统的单一模型调用已经无法满足复杂业务场景的需求。Chains 的出现正是为了解决这一问题,它提供了一种结构化的方式来组织和管理复杂的 AI 工作流。
在实际应用中,一个典型的 AI 应用可能需要:
- 接收用户输入
- 对输入进行预处理和分析
- 调用一个或多个语言模型
- 对模型输出进行后处理
- 与其他系统或数据库进行交互
- 返回最终结果给用户
Chains 使得这些步骤可以被清晰地定义和组织,大大提高了开发效率和代码质量。
1.5 Chains 与其他 AI 框架的对比
与其他 AI 框架相比,LangChain 的 Chains 具有以下优势:
- 高度可组合性 - 组件可以像乐高积木一样自由组合
- 标准化接口 - 所有组件遵循统一的接口规范
- 丰富的生态系统 - 支持多种模型和工具集成
- 强大的调试能力 - 提供详细的执行跟踪和可视化功能
1.6 Chains 的发展历程
LangChain 的 Chains 概念经历了以下几个发展阶段:
- 初期版本 - 简单的 LLMChain 实现
- LCEL 引入 - LangChain Expression Language 的出现
- Runnable 接口 - 更加通用和灵活的接口设计
- 当前版本 - 高度可组合和可扩展的 Chains 系统
1.7 Chains 的核心价值
Chains 的核心价值体现在以下几个方面:
- 简化复杂性 - 将复杂的 AI 工作流分解为简单的组件
- 提高可维护性 - 模块化设计使得代码更易于维护
- 增强可测试性 - 每个组件都可以独立测试
- 促进重用 - 组件可以在不同项目中重复使用
1.8 学习 Chains 的建议
为了更好地学习和掌握 Chains,建议您:
- 从基础开始 - 先理解 Runnable 接口和基本概念
- 动手实践 - 通过实际代码练习加深理解
- 循序渐进 - 从简单 Chain 开始,逐步构建复杂应用
- 参考示例 - 学习官方和社区提供的示例代码
- 持续学习 - 关注 LangChain 的更新和发展
1.9 本章结构说明
本章将按照以下结构进行组织:
- 首先介绍环境准备,确保您能顺利运行示例代码
- 然后深入讲解 Chains 的基本概念和核心原理
- 接着介绍不同类型的 Chain 及其使用方法
- 通过实际应用示例展示 Chains 的强大功能
- 最后探讨高级特性和最佳实践
2. 环境准备
2.1 安装必要的库
首先,我们需要安装 LangChain 和相关的依赖库。打开终端并运行以下命令:
pip install langchain langchain-openai python-dotenv
如果您使用的是其他大语言模型提供商,比如 Anthropic 或 Google,您还需要安装相应的库:
# 对于 Anthropic 的 Claude 模型
pip install langchain-anthropic
# 对于 Google 的 Gemini 模型
pip install langchain-google-genai
# 对于 Hugging Face 模型
pip install langchain-huggingface
2.2 配置环境变量
创建一个 .env 文件来存储您的 API 密钥和其他配置信息:
OPENAI_API_KEY=your_openai_api_key_here
OPENAI_BASE_URL=https://api.openai-proxy.org/v1
OPENAI_MODEL=gpt-4o-mini
请将 your_openai_api_key_here 替换为您实际的 OpenAI API 密钥。
对于不同的模型提供商,您可能需要配置不同的环境变量:
``env
OpenAI 配置
OPENAI_API_KEY=your_openai_api_key
OPENAI_ORGANIZATION=your_organization_id
Anthropic 配置
ANTHROPIC_API_KEY=your_anthropic_api_key
Google 配置
GOOGLE_API_KEY=your_google_api_key
Hugging Face 配置
HUGGINGFACEHUB_API_TOKEN=your_huggingface_token
### 2.3 验证环境配置
创建一个简单的测试文件 `hello_chains.py` 来验证环境配置:
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
加载环境变量
load_dotenv()
获取环境变量
api_key = os.getenv(“OPENAI_API_KEY”)
base_url = os.getenv(“OPENAI_BASE_URL”)
model_name = os.getenv(“OPENAI_MODEL”)
验证环境变量
if not all([api_key, base_url, model_name]):
print(“请确保在 .env 文件中设置了所有必需的环境变量”)
exit(1)
print(“环境变量配置正确:”)
print(f"API Key 长度: {len(api_key)} 字符")
print(f"Base URL: {base_url}“)
print(f"Model Name: {model_name}”)
测试模型连接
try:
llm = ChatOpenAI(
model=model_name,
base_url=base_url,
api_key=api_key,
temperature=0.7
)
response = llm.invoke("你好,世界!")
print(f"模型响应: {response.content}")
print("环境配置成功!")
except Exception as e:
print(f"连接模型时出错: {e}")
运行这个文件来验证您的环境配置是否正确:
```bash
python hello_chains.py
如果一切正常,您应该看到类似以下的输出:
环境变量配置正确:
API Key 长度: 51 字符
Base URL: https://api.openai-proxy.org/v1
Model Name: gpt-4o-mini
模型响应: 你好!有什么我可以帮助你的吗?
环境配置成功!
2.4 多环境配置管理
在实际项目中,我们通常需要管理多个环境(开发、测试、生产)的配置。可以通过以下方式实现:
# config.py
import os
from dotenv import load_dotenv
# 根据环境变量加载不同的 .env 文件
env = os.getenv("ENVIRONMENT", "development")
dotenv_path = f".env.{env}"
load_dotenv(dotenv_path)
class Config:
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL")
OPENAI_MODEL = os.getenv("OPENAI_MODEL")
TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7"))
@classmethod
def validate(cls):
"""验证配置是否完整"""
required_vars = [
cls.OPENAI_API_KEY,
cls.OPENAI_BASE_URL,
cls.OPENAI_MODEL
]
if not all(required_vars):
missing = [var for var in required_vars if not var]
raise ValueError(f"缺少必要的环境变量: {missing}")
# 使用示例
try:
Config.validate()
print("配置验证通过")
except ValueError as e:
print(f"配置验证失败: {e}")
2.5 依赖管理最佳实践
在生产环境中,建议使用 requirements.txt 文件来管理项目依赖:
# requirements.txt
langchain==0.1.0
langchain-openai==0.1.0
python-dotenv==1.0.0
pydantic==2.5.0
安装依赖:
pip install -r requirements.txt
2.6 虚拟环境管理
为了确保项目的依赖隔离,建议使用虚拟环境:
# 创建虚拟环境
python -m venv langchain_env
# 激活虚拟环境 (Windows)
langchain_env\Scripts\activate
# 激活虚拟环境 (macOS/Linux)
source langchain_env/bin/activate
# 安装依赖
pip install -r requirements.txt
3. 什么是 Chains
3.1 Chains 的定义
在 LangChain 中,Chain 是一个接口,它接受一些输入并产生一些输出。Chains 可以是简单的(单个组件),也可以是复杂的(多个组件的组合)。它们是 LangChain 中可组合性的核心体现。
Chains 的关键特性包括:
- 输入/输出 - 每个 Chain 都有明确定义的输入和输出
- 可组合性 - Chains 可以与其他 Chains 组合形成更复杂的 Chains
- 标准化接口 - 所有 Chains 都遵循相同的接口,便于使用
- 可配置性 - Chains 可以通过配置进行自定义
LangChain 中的 Chains 设计遵循了 Unix 哲学:“做一件事,并做好”。每个 Chain 都专注于完成特定的任务,通过组合不同的 Chains 来构建复杂的应用。
3.2 Chains 的核心方法
每个 Chain 都实现了以下核心方法:
- invoke() - 处理单个输入并返回单个输出
- batch() - 批量处理多个输入
- stream() - 流式处理输入并逐步返回输出
- ainvoke(), abatch(), astream() - 对应的异步版本
让我们详细看看这些方法的使用:
# chain_methods.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 加载环境变量
load_dotenv()
# 创建语言模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
# 创建 Chain
chain = (
ChatPromptTemplate.from_template("告诉我关于{topic}的5个要点")
| llm
| StrOutputParser()
)
# 1. invoke() - 单个输入处理
print("=== invoke() 方法 ===")
result = chain.invoke({"topic": "人工智能"})
print(result)
# 2. batch() - 批量处理
print("\n=== batch() 方法 ===")
inputs = [
{"topic": "机器学习"},
{"topic": "深度学习"},
{"topic": "自然语言处理"}
]
results = chain.batch(inputs)
for i, result in enumerate(results):
print(f"结果 {i+1}: {result}")
# 3. stream() - 流式处理
print("\n=== stream() 方法 ===")
for chunk in chain.stream({"topic": "计算机视觉"}):
print(chunk, end="", flush=True)
print() # 换行
3.3 简单示例
让我们通过一个简单的例子来理解 Chains 的工作原理:
# simple_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 加载环境变量
load_dotenv()
# 创建语言模型实例
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
# 创建提示词模板
prompt = ChatPromptTemplate.from_template("告诉我一个关于{topic}的有趣事实")
# 创建输出解析器
parser = StrOutputParser()
# 使用管道符(|)组合组件创建 Chain
chain = prompt | llm | parser
# 调用 Chain
result = chain.invoke({"topic": "人工智能"})
print("AI 生成的内容:")
print(result)
在这个例子中,我们创建了一个由三个组件组成的 Chain:
prompt- 提示词模板llm- 语言模型parser- 输出解析器
这三个组件通过管道符 | 连接在一起,形成了一个完整的 Chain。
3.4 Chain 的执行流程
当我们调用 chain.invoke() 时,执行流程如下:
- 输入
{"topic": "人工智能"}被传递给prompt prompt将输入格式化为实际的提示词文本- 格式化后的提示词被传递给
llm llm生成响应- 响应被传递给
parser parser处理响应并返回最终结果
这个流程可以用以下图示表示:
输入: {"topic": "人工智能"}
↓
[提示词模板] → "告诉我一个关于人工智能的有趣事实"
↓
[语言模型] → "人工智能的一个有趣事实是..."
↓
[输出解析器] → "人工智能的一个有趣事实是..."
↓
输出: 最终结果
3.5 Chain 的内部机制
让我们深入了解 Chain 的内部工作机制:
# chain_internals.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 加载环境变量
load_dotenv()
# 创建组件
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
prompt = ChatPromptTemplate.from_template("解释{concept}的概念")
parser = StrOutputParser()
# 创建 Chain
chain = prompt | llm | parser
# 查看 Chain 的结构
print("Chain 类型:", type(chain))
print("Chain 结构:", chain)
# 查看 Chain 的输入和输出模式
print("\n输入模式:")
print(chain.input_schema.schema())
print("\n输出模式:")
print(chain.output_schema.schema())
# 查看 Chain 的图形表示
print("\nChain 图形表示:")
chain.get_graph().print_ascii()
通过这些方法,我们可以深入了解 Chain 的内部结构和工作原理。
3.6 Chain 的类型系统
LangChain 提供了丰富的类型系统来确保 Chains 的输入输出正确性:
# chain_types.py
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List
# 定义输入模型
class QuestionInput(BaseModel):
question: str = Field(description="用户提出的问题")
context: str = Field(description="问题的上下文信息")
# 定义输出模型
class AnswerOutput(BaseModel):
answer: str = Field(description="问题的答案")
confidence: float = Field(description="答案的置信度", ge=0, le=1)
references: List[str] = Field(description="参考信息")
# 创建提示词模板
prompt = ChatPromptTemplate.from_template(
"基于以下上下文回答问题:\n\n上下文: {context}\n\n问题: {question}"
)
# 查看输入输出类型
print("输入类型:")
print(prompt.input_schema.schema())
print("\n输出类型:")
# 注意:实际的输出类型取决于后续组件
3.7 Chain 的可组合性详解
Chain 的可组合性是其最强大的特性之一。这种特性使得我们可以将简单的组件组合成复杂的处理流程。让我们通过一个更复杂的示例来演示这一点:
# chain_composability.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
# 加载环境变量
load_dotenv()
# 创建语言模型实例
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
# 创建多个提示词模板
translation_prompt = ChatPromptTemplate.from_template(
"将以下文本翻译成英文: {text}"
)
summary_prompt = ChatPromptTemplate.from_template(
"为以下文本生成一个简短的摘要: {text}"
)
# 创建 Chain 组件
translator = translation_prompt | llm | StrOutputParser()
summarizer = summary_prompt | llm | StrOutputParser()
# 使用 RunnableParallel 并行执行多个 Chain
parallel_chain = RunnableParallel(
translation=translator,
summary=summarizer
)
# 使用示例
input_text = "人工智能是计算机科学的一个重要分支,它致力于创建能够执行通常需要人类智能的任务的系统。"
result = parallel_chain.invoke({"text": input_text})
print("并行处理结果:")
print(f"翻译结果: {result['translation']}")
print(f"摘要结果: {result['summary']}")
3.8 Chain 的输入输出验证
LangChain 提供了强大的输入输出验证机制,确保数据符合预期的格式:
# input_output_validation.py
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List
# 定义严格的输入模型
class TranslationInput(BaseModel):
text: str = Field(..., min_length=1, max_length=1000, description="要翻译的文本")
target_language: str = Field(..., min_length=2, max_length=20, description="目标语言")
# 定义严格的输出模型
class TranslationOutput(BaseModel):
translated_text: str = Field(..., description="翻译后的文本")
source_language: str = Field(..., description="源语言")
confidence: float = Field(..., ge=0, le=1, description="翻译置信度")
# 创建提示词模板
prompt = ChatPromptTemplate.from_template(
"将以下文本翻译成{target_language},并告诉我源语言是什么:\n\n{text}\n\n"
"请以JSON格式返回结果,包含translated_text、source_language和confidence字段。"
)
print("输入模型结构:")
print(TranslationInput.schema())
print("\n输出模型结构:")
print(TranslationOutput.schema())
4. Chains 与 Runnable 的关系
4.1 Runnable 接口
在 LangChain 中,Chain 是 Runnable 接口的一个实现。Runnable 是 LangChain 中最基础的接口,它定义了所有可运行组件的行为规范。
Runnable 接口的核心特性:
- 统一接口 - 所有 Runnable 组件都有相同的调用方式
- 可组合性 - Runnable 组件可以轻松组合
- 链式调用 - 支持通过管道符进行链式调用
Runnable 接口定义了以下核心方法:
invoke(input, config)- 执行单个输入batch(inputs, config)- 批量执行多个输入stream(input, config)- 流式执行async版本的方法
4.2 Chain 作为 Runnable 的实现
Chain 继承了 Runnable 的所有特性,并添加了一些专门用于构建应用程序的功能:
# chain_as_runnable.py
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
# 创建一个简单的 RunnableLambda
def uppercase_transform(input_text):
return input_text.upper()
transformer = RunnableLambda(uppercase_transform)
# 创建语言模型
llm = ChatOpenAI(temperature=0.7)
# 创建提示词模板
prompt = ChatPromptTemplate.from_template("将以下文本翻译成英文: {text}")
# 创建 Chain(实际上是 Runnable 的组合)
chain = prompt | llm
# 验证 Chain 是 Runnable 的实例
print(f"Chain 是 Runnable 的实例: {isinstance(chain, RunnableLambda)}")
print(f"LLM 是 Runnable 的实例: {isinstance(llm, RunnableLambda)}")
print(f"Prompt 是 Runnable 的实例: {isinstance(prompt, RunnableLambda)}")
4.3 Runnable 与 Chain 的区别
虽然 Chain 是 Runnable 的一种实现,但它们之间有一些细微的区别:
-
概念范围:
- Runnable 是一个通用接口,代表任何可运行的组件
- Chain 是一个更具体的概念,通常指代多个组件的组合
-
使用场景:
- Runnable 更多用于底层组件和接口定义
- Chain 更多用于应用程序级别的组件组合
-
功能特性:
- Runnable 提供基础的运行能力
- Chain 提供更丰富的应用程序构建功能
4.4 Runnable 的其他实现
除了 Chain,LangChain 中还有其他 Runnable 的实现:
# other_runnables.py
from langchain_core.runnables import RunnableLambda, RunnableParallel, RunnablePassthrough
# RunnableLambda - 将普通函数包装为 Runnable
def add_prefix(text):
return f"前缀: {text}"
prefix_adder = RunnableLambda(add_prefix)
# RunnableParallel - 并行执行多个 Runnable
parallel_runnable = RunnableParallel(
upper=lambda x: x.upper(),
lower=lambda x: x.lower(),
length=lambda x: len(x)
)
# RunnablePassthrough - 直接传递输入
passthrough = RunnablePassthrough()
# 使用示例
print("RunnableLambda:")
print(prefix_adder.invoke("测试文本"))
print("\nRunnableParallel:")
print(parallel_runnable.invoke("Hello World"))
print("\nRunnablePassthrough:")
print(passthrough.invoke("直接传递的文本"))
4.5 Runnable 的组合能力
Runnable 的强大之处在于其组合能力,这使得我们可以构建复杂的处理流程:
# runnable_composition.py
from langchain_core.runnables import RunnableLambda, RunnableParallel
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 创建处理函数
def text_analyzer(text):
return {
"length": len(text),
"word_count": len(text.split()),
"char_count": len(text.replace(" ", ""))
}
def sentiment_classifier(analysis):
# 模拟情感分类
if analysis["word_count"] > 10:
return "复杂文本"
else:
return "简单文本"
# 创建 Runnable 组件
analyzer = RunnableLambda(text_analyzer)
classifier = RunnableLambda(sentiment_classifier)
llm = ChatOpenAI(temperature=0.7)
prompt = ChatPromptTemplate.from_template("总结以下文本: {text}")
parser = StrOutputParser()
# 复杂组合示例
complex_chain = (
analyzer
| RunnableParallel(
analysis=lambda x: x,
classification=classifier,
summary=(prompt | llm | parser)
)
)
# 使用示例
result = complex_chain.invoke("这是一个用于测试的示例文本,它包含了一些词汇。")
print("复杂组合结果:")
print(result)
5. 基础 Chain 类型
LLMChain
LLMChain 是最基础也是最常用的 Chain 类型之一。它将提示词模板与语言模型组合在一起。
# llm_chain_example.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 加载环境变量
load_dotenv()
# 创建语言模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
# 创建提示词模板
prompt = ChatPromptTemplate.from_template(
"你是一个专业的{profession}。请回答以下问题: {question}"
)
# 创建输出解析器
parser = StrOutputParser()
# 创建 LLMChain(通过组合方式)
llm_chain = prompt | llm | parser
# 使用 Chain
result = llm_chain.invoke({
"profession": "历史学家",
"question": "请简述工业革命对人类社会的影响"
})
print("LLMChain 输出:")
print(result)
LLMChain 的工作原理:
- 接收包含模板变量的输入
- 使用输入数据格式化提示词模板
- 将格式化后的提示词发送给语言模型
- 获取模型响应
- 使用输出解析器处理响应
- 返回最终结果
PromptTemplate + LLM
这是构建 Chain 最基础的方式,通过将提示词模板和语言模型组合在一起:
# prompt_llm_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
# 加载环境变量
load_dotenv()
# 创建语言模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
# 创建提示词模板
prompt = ChatPromptTemplate.from_template("写一首关于{subject}的{style}诗")
# 创建 Chain
chain = prompt | llm
# 使用 Chain
result = chain.invoke({
"subject": "春天",
"style": "五言绝句"
})
print("诗歌生成结果:")
print(result.content)
这种方式的优点:
- 简单直观,易于理解
- 灵活性高,可以根据需要组合不同的组件
- 符合函数式编程的思想
Transformation Chain
Transformation Chain 用于对数据进行转换处理:
# transformation_chain.py
from langchain_core.runnables import RunnableLambda
# 定义转换函数
def word_count(text):
return {"text": text, "word_count": len(text.split())}
def uppercase_text(data):
return {"text": data["text"].upper(), "word_count": data["word_count"]}
# 创建 Transformation Chain
word_counter = RunnableLambda(word_count)
uppercaser = RunnableLambda(uppercase_text)
# 组合 Chain
transformation_chain = word_counter | uppercaser
# 使用 Chain
result = transformation_chain.invoke("hello world this is a test")
print("转换结果:")
print(result)
Transformation Chain 的应用场景:
- 数据预处理
- 格式转换
- 特征提取
- 数据清洗
其他基础 Chain 类型
LangChain 还提供了其他一些基础的 Chain 类型:
# other_basic_chains.py
from langchain_core.runnables import RunnableLambda, RunnableParallel
# 条件 Chain
def conditional_processor(data):
if data["score"] > 80:
return "优秀"
elif data["score"] > 60:
return "良好"
else:
return "需要改进"
conditional_chain = RunnableLambda(conditional_processor)
# 并行 Chain
def get_word_count(text):
return len(text.split())
def get_char_count(text):
return len(text)
def get_line_count(text):
return len(text.split('\n'))
parallel_chain = RunnableParallel(
word_count=RunnableLambda(get_word_count),
char_count=RunnableLambda(get_char_count),
line_count=RunnableLambda(get_line_count)
)
# 使用示例
print("条件 Chain:")
print(conditional_chain.invoke({"score": 85}))
print("\n并行 Chain:")
print(parallel_chain.invoke("这是第一行\n这是第二行\n这是第三行"))
6. Chains 的组合
顺序组合
Chains 可以通过管道符 | 进行顺序组合:
# sequential_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
# 加载环境变量
load_dotenv()
# 创建语言模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
# 创建多个提示词模板
prompt1 = ChatPromptTemplate.from_template("将以下内容总结为一句话: {content}")
prompt2 = ChatPromptTemplate.from_template("将以下摘要翻译成英文: {summary}")
# 创建输出解析器
parser = StrOutputParser()
# 创建处理函数
def add_prefix(data):
return f"摘要: {data}"
prefix_adder = RunnableLambda(add_prefix)
# 组合多个 Chain
chain = prompt1 | llm | parser | prefix_adder | prompt2 | llm | parser
# 使用 Chain
result = chain.invoke({
"content": "人工智能是计算机科学的一个分支,它企图了解智能的实质,并生产出一种新的能以人类智能相似的方式做出反应的智能机器。该领域的研究包括机器人、语言识别、图像识别、自然语言处理和专家系统等。",
"summary": "一句话摘要"
})
print("最终结果:")
print(result)
顺序组合的特点:
- 数据流是线性的
- 每个组件的输出是下一个组件的输入
- 执行顺序是确定的
- 错误会沿着链条传播
并行组合
Chains 也可以并行执行:
# parallel_chain.py
from langchain_core.runnables import RunnableParallel
# 定义处理函数
def get_word_count(text):
return len(text.split())
def get_char_count(text):
return len(text)
def get_line_count(text):
return len(text.split('\n'))
# 创建 Runnable 组件
word_counter = RunnableLambda(get_word_count)
char_counter = RunnableLambda(get_char_count)
line_counter = RunnableLambda(get_line_count)
# 并行组合
parallel_chain = RunnableParallel(
word_count=word_counter,
char_count=char_counter,
line_count=line_counter
)
# 使用 Chain
text = """这是第一行文本。
这是第二行文本。
这是第三行文本。"""
result = parallel_chain.invoke(text)
print("并行处理结果:")
print(result)
并行组合的特点:
- 多个组件同时执行
- 各组件之间相互独立
- 结果以字典形式返回
- 可以提高处理效率
复杂组合
我们可以将顺序组合和并行组合结合起来,创建更复杂的 Chains:
# complex_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnableLambda
# 加载环境变量
load_dotenv()
# 创建语言模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
# 创建分析 Chain
analysis_chain = (
ChatPromptTemplate.from_template("分析以下文本的情感倾向: {text}")
| llm
| StrOutputParser()
)
# 创建摘要 Chain
summary_chain = (
ChatPromptTemplate.from_template("为以下文本生成摘要: {text}")
| llm
| StrOutputParser()
)
# 创建关键词提取 Chain
keywords_chain = (
ChatPromptTemplate.from_template("提取以下文本的关键词: {text}")
| llm
| StrOutputParser()
)
# 并行执行多个分析任务
parallel_analysis = RunnableParallel(
sentiment=analysis_chain,
summary=summary_chain,
keywords=keywords_chain
)
# 创建报告生成 Chain
report_chain = (
ChatPromptTemplate.from_template("""
基于以下分析结果生成报告:
情感分析: {sentiment}
文本摘要: {summary}
关键词: {keywords}
""")
| llm
| StrOutputParser()
)
# 组合完整 Chain
complete_chain = parallel_analysis | report_chain
# 使用 Chain
text = "人工智能技术正在快速发展,它在医疗、教育、交通等领域都有广泛应用。虽然带来了很多便利,但也引发了一些关于就业和隐私的担忧。"
result = complete_chain.invoke({"text": text})
print("完整分析报告:")
print(result)
7. 实际应用示例
文档摘要生成器
# document_summarizer.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 加载环境变量
load_dotenv()
# 创建语言模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.3
)
# 创建文档摘要 Chain
document_summarizer = (
ChatPromptTemplate.from_template(
"请为以下文档生成一个简洁的摘要(不超过100字):\n\n{document}"
)
| llm
| StrOutputParser()
)
# 示例文档
sample_document = """
人工智能(Artificial Intelligence,AI)是计算机科学的一个分支,它企图了解智能的实质,
并生产出一种新的能以人类智能相似的方式做出反应的智能机器。该领域的研究包括机器人、
语言识别、图像识别、自然语言处理和专家系统等。
人工智能从诞生以来,理论和技术日益成熟,应用领域也不断扩大。可以设想,未来人工智能
带来的科技产品,将会是人类智慧的"容器"。人工智能可以对人的意识、思维的信息过程的模拟。
人工智能不是人的智能,但能像人那样思考、也可能超过人的智能。
"""
# 生成摘要
summary = document_summarizer.invoke({"document": sample_document})
print("文档摘要:")
print(summary)
多步骤问答系统
# multi_step_qa.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 加载环境变量
load_dotenv()
# 创建语言模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.3
)
# 第一步:理解问题
question_analyzer = (
ChatPromptTemplate.from_template(
"分析以下问题并确定回答该问题需要哪些关键信息:\n\n问题: {question}"
)
| llm
| StrOutputParser()
)
# 第二步:生成答案
answer_generator = (
ChatPromptTemplate.from_template(
"基于以下信息回答问题:\n\n问题: {question}\n\n相关信息: {info}\n\n请提供详细的回答:"
)
| llm
| StrOutputParser()
)
# 组合 Chain
qa_chain = question_analyzer | (lambda info: {
"question": "人工智能是什么?",
"info": info
}) | answer_generator
# 使用 Chain
result = qa_chain.invoke({"question": "人工智能是什么?"})
print("问答结果:")
print(result)
智能邮件分类器
# email_classifier.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List
# 加载环境变量
load_dotenv()
# 定义输出结构
class EmailClassification(BaseModel):
category: str = Field(description="邮件分类: 工作、个人、促销、社交、垃圾邮件")
priority: str = Field(description="优先级: 高、中、低")
summary: str = Field(description="邮件摘要")
action_items: List[str] = Field(description="需要采取的行动")
# 创建语言模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.3
)
# 创建解析器
parser = JsonOutputParser(pydantic_object=EmailClassification)
# 创建分类 Chain
email_classifier = (
ChatPromptTemplate.from_template("""
请分析以下邮件内容并进行分类:
邮件内容: {email_content}
{format_instructions}
""")
| llm
| parser
)
# 示例邮件
sample_email = """
主题: 项目进度更新会议
亲爱的团队成员,
希望这封邮件找到你们都安好。我想提醒大家,我们的季度项目进度更新会议定于本周五下午2点举行。
请确保你们已经准备好了各自的项目报告,并准备好回答可能的问题。
议程包括:
1. 各项目进度汇报
2. 遇到的挑战和解决方案
3. 下一季度的计划
请确认你们的出席情况。
谢谢,
项目经理
"""
# 分类邮件
result = email_classifier.invoke({
"email_content": sample_email,
"format_instructions": parser.get_format_instructions()
})
print("邮件分类结果:")
print(f"分类: {result['category']}")
print(f"优先级: {result['priority']}")
print(f"摘要: {result['summary']}")
print("行动项:")
for item in result['action_items']:
print(f" - {item}")
8. Chains 的高级特性
配置化 Chains
Chains 支持通过配置来定制其行为,这使得 Chains 更加灵活和可重用:
# configurable_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 加载环境变量
load_dotenv()
# 创建语言模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY")
)
# 创建可配置的提示词模板
configurable_prompt = ChatPromptTemplate.from_template(
"你是一个{role},请用{tone}的语气回答关于{topic}的问题"
)
# 创建 Chain
configurable_chain = configurable_prompt | llm | StrOutputParser()
# 使用不同配置
configs = [
{"role": "教师", "tone": "耐心", "topic": "数学"},
{"role": "工程师", "tone": "专业", "topic": "编程"},
{"role": "朋友", "tone": "轻松", "topic": "旅行"}
]
print("配置化 Chain 示例:")
for i, config in enumerate(configs, 1):
result = configurable_chain.invoke(
{"role": config["role"], "tone": config["tone"], "topic": config["topic"]},
config={"configurable": config}
)
print(f"\n配置 {i}: {config}")
print(f"结果: {result}")
错误处理与重试机制
在实际应用中,我们需要处理可能出现的错误并实现重试机制:
# error_handling_chain.py
import time
import random
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.utils import ConfigurableField
# 模拟可能失败的函数
def unreliable_function(text):
# 模拟 30% 的失败率
if random.random() < 0.3:
raise Exception("模拟的网络错误")
return f"处理结果: {text.upper()}"
# 创建带重试机制的 Runnable
def create_retryable_runnable(func, max_retries=3):
def retry_wrapper(input_data):
for attempt in range(max_retries):
try:
return func(input_data)
except Exception as e:
if attempt == max_retries - 1:
raise e
print(f"尝试 {attempt + 1} 失败: {e},正在重试...")
time.sleep(1) # 等待 1 秒后重试
return None
return RunnableLambda(retry_wrapper)
# 创建 Chain
unreliable_chain = create_retryable_runnable(unreliable_function)
# 测试重试机制
print("错误处理与重试机制示例:")
try:
result = unreliable_chain.invoke("测试文本")
print(f"最终结果: {result}")
except Exception as e:
print(f"所有重试都失败了: {e}")
缓存机制
为了提高性能,我们可以为 Chains 添加缓存机制:
# caching_chain.py
import hashlib
import json
from langchain_core.runnables import RunnableLambda
# 简单的内存缓存实现
class SimpleCache:
def __init__(self):
self.cache = {}
def get(self, key):
return self.cache.get(key)
def set(self, key, value):
self.cache[key] = value
# 创建缓存实例
cache = SimpleCache()
# 带缓存的处理函数
def cached_processing(input_data):
# 生成缓存键
cache_key = hashlib.md5(json.dumps(input_data, sort_keys=True).encode()).hexdigest()
# 检查缓存
cached_result = cache.get(cache_key)
if cached_result:
print("从缓存中获取结果")
return cached_result
# 模拟耗时处理
import time
time.sleep(2) # 模拟 2 秒的处理时间
result = f"处理结果: {input_data}"
# 存储到缓存
cache.set(cache_key, result)
print("处理完成并存储到缓存")
return result
# 创建 Chain
cached_chain = RunnableLambda(cached_processing)
# 测试缓存机制
print("缓存机制示例:")
print("第一次调用:")
result1 = cached_chain.invoke("测试数据")
print(f"结果: {result1}")
print("\n第二次调用(相同输入):")
result2 = cached_chain.invoke("测试数据")
print(f"结果: {result2}")
print("\n第三次调用(不同输入):")
result3 = cached_chain.invoke("不同数据")
print(f"结果: {result3}")
9. Chains 的性能优化
批处理优化
批处理可以显著提高 Chains 的处理效率:
# batch_processing.py
import time
from langchain_core.runnables import RunnableLambda
# 模拟处理函数
def process_item(item):
# 模拟处理时间
time.sleep(0.1)
return f"处理完成: {item}"
# 创建处理 Runnable
processor = RunnableLambda(process_item)
# 单个处理
def single_processing(items):
start_time = time.time()
results = []
for item in items:
result = processor.invoke(item)
results.append(result)
end_time = time.time()
return results, end_time - start_time
# 批处理
def batch_processing(items):
start_time = time.time()
results = processor.batch(items)
end_time = time.time()
return results, end_time - start_time
# 测试数据
test_items = [f"项目{i}" for i in range(10)]
print("性能对比测试:")
print("单个处理:")
single_results, single_time = single_processing(test_items)
print(f"耗时: {single_time:.2f} 秒")
print("\n批处理:")
batch_results, batch_time = batch_processing(test_items)
print(f"耗时: {batch_time:.2f} 秒")
print(f"\n性能提升: {single_time/batch_time:.2f} 倍")
异步处理
异步处理可以进一步提高 Chains 的并发性能:
# async_processing.py
import asyncio
import time
from langchain_core.runnables import RunnableLambda
# 模拟异步处理函数
async def async_process_item(item):
# 模拟异步处理
await asyncio.sleep(0.1)
return f"异步处理完成: {item}"
# 创建异步处理 Runnable
async_processor = RunnableLambda(async_process_item)
# 异步批处理
async def async_batch_processing(items):
start_time = time.time()
results = await async_processor.abatch(items)
end_time = time.time()
return results, end_time - start_time
# 同步批处理对比
def sync_batch_processing(items):
def sync_process_item(item):
time.sleep(0.1) # 模拟同步处理
return f"同步处理完成: {item}"
sync_processor = RunnableLambda(sync_process_item)
start_time = time.time()
results = sync_processor.batch(items)
end_time = time.time()
return results, end_time - start_time
# 测试
async def main():
test_items = [f"项目{i}" for i in range(10)]
print("异步处理测试:")
async_results, async_time = await async_batch_processing(test_items)
print(f"异步处理耗时: {async_time:.2f} 秒")
print("\n同步处理对比:")
sync_results, sync_time = sync_batch_processing(test_items)
print(f"同步处理耗时: {sync_time:.2f} 秒")
print(f"\n性能提升: {sync_time/async_time:.2f} 倍")
# 运行测试
# asyncio.run(main())
10. 调试和监控 Chains
调试 Chains
调试 Chains 是开发过程中的重要环节:
# chain_debugging.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
# 加载环境变量
load_dotenv()
# 创建带调试信息的处理函数
def debug_processor(input_data):
print(f"调试信息 - 输入: {input_data}")
result = f"处理结果: {input_data}"
print(f"调试信息 - 输出: {result}")
return result
# 创建语言模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
# 创建组件
debug_runnable = RunnableLambda(debug_processor)
prompt = ChatPromptTemplate.from_template("总结以下内容: {content}")
parser = StrOutputParser()
# 创建 Chain
debug_chain = debug_runnable | prompt | llm | parser
# 使用回调函数进行调试
def debug_callback():
def wrapper(func):
def inner(*args, **kwargs):
print(f"调用函数: {func.__name__}")
print(f"参数: args={args}, kwargs={kwargs}")
result = func(*args, **kwargs)
print(f"返回值: {result}")
return result
return inner
return wrapper
# 可视化 Chain 结构
print("Chain 结构可视化:")
debug_chain.get_graph().print_ascii()
监控 Chains
监控 Chains 的执行情况有助于优化性能和发现问题:
# chain_monitoring.py
import time
from langchain_core.runnables import RunnableLambda
from functools import wraps
# 性能监控装饰器
def monitor_performance(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
print(f"函数 {func.__name__} 执行时间: {execution_time:.4f} 秒")
return result
except Exception as e:
end_time = time.time()
execution_time = end_time - start_time
print(f"函数 {func.__name__} 执行失败,耗时: {execution_time:.4f} 秒,错误: {e}")
raise
return wrapper
# 带监控的处理函数
@monitor_performance
def monitored_processing(input_data):
# 模拟处理时间
time.sleep(0.1)
return f"监控处理结果: {input_data}"
# 创建 Chain
monitored_chain = RunnableLambda(monitored_processing)
# 测试监控
print("监控 Chains 执行:")
result = monitored_chain.invoke("测试数据")
print(f"结果: {result}")
11. Chains 的最佳实践
11.1 代码组织
在编写 Chains 时,建议遵循以下代码组织原则:
- 模块化 - 将不同的功能模块化,便于管理和维护
- 可重用性 - 尽量复用现有的 Chains 和组件
- 清晰性 - 保持代码结构清晰,易于理解
# example_module.py
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
# 创建提示词模板
prompt = ChatPromptTemplate.from_template("解释{concept}的概念")
# 创建输出解析器
parser = StrOutputParser()
# 创建语言模型
llm = ChatOpenAI(temperature=0.7)
# 创建 Chain
chain = prompt | llm | parser
# 导出 Chain
__all__ = ["chain"]
11.2 配置管理
使用配置文件来管理 Chains 的配置,便于在不同环境中使用:
# config.py
import os
from dotenv import load_dotenv
# 根据环境变量加载不同的 .env 文件
env = os.getenv("ENVIRONMENT", "development")
dotenv_path = f".env.{env}"
load_dotenv(dotenv_path)
class Config:
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL")
OPENAI_MODEL = os.getenv("OPENAI_MODEL")
TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7"))
@classmethod
def validate(cls):
"""验证配置是否完整"""
required_vars = [
cls.OPENAI_API_KEY,
cls.OPENAI_BASE_URL,
cls.OPENAI_MODEL
]
if not all(required_vars):
missing = [var for var in required_vars if not var]
raise ValueError(f"缺少必要的环境变量: {missing}")
# 使用示例
try:
Config.validate()
print("配置验证通过")
except ValueError as e:
print(f"配置验证失败: {e}")
11.3 错误处理
在实际应用中,需要对 Chains 的错误进行适当的处理:
# error_handling.py
import time
import random
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.utils import ConfigurableField
# 模拟可能失败的函数
def unreliable_function(text):
# 模拟 30% 的失败率
if random.random() < 0.3:
raise Exception("模拟的网络错误")
return f"处理结果: {text.upper()}"
# 创建带重试机制的 Runnable
def create_retryable_runnable(func, max_retries=3):
def retry_wrapper(input_data):
for attempt in range(max_retries):
try:
return func(input_data)
except Exception as e:
if attempt == max_retries - 1:
raise e
print(f"尝试 {attempt + 1} 失败: {e},正在重试...")
time.sleep(1) # 等待 1 秒后重试
return None
return RunnableLambda(retry_wrapper)
# 创建 Chain
unreliable_chain = create_retryable_runnable(unreliable_function)
# 测试重试机制
print("错误处理与重试机制示例:")
try:
result = unreliable_chain.invoke("测试文本")
print(f"最终结果: {result}")
except Exception as e:
print(f"所有重试都失败了: {e}")
11.4 性能优化
通过批处理和异步处理来优化 Chains 的性能:
# performance_optimization.py
import time
from langchain_core.runnables import RunnableLambda
# 模拟处理函数
def process_item(item):
# 模拟处理时间
time.sleep(0.1)
return f"处理完成: {item}"
# 创建处理 Runnable
processor = RunnableLambda(process_item)
# 单个处理
def single_processing(items):
start_time = time.time()
results = []
for item in items:
result = processor.invoke(item)
results.append(result)
end_time = time.time()
return results, end_time - start_time
# 批处理
def batch_processing(items):
start_time = time.time()
results = processor.batch(items)
end_time = time.time()
return results, end_time - start_time
# 测试数据
test_items = [f"项目{i}" for i in range(10)]
print("性能对比测试:")
print("单个处理:")
single_results, single_time = single_processing(test_items)
print(f"耗时: {single_time:.2f} 秒")
print("\n批处理:")
batch_results, batch_time = batch_processing(test_items)
print(f"耗时: {batch_time:.2f} 秒")
print(f"\n性能提升: {single_time/batch_time:.2f} 倍")
# async_processing.py
import asyncio
import time
from langchain_core.runnables import RunnableLambda
# 模拟异步处理函数
async def async_process_item(item):
# 模拟异步处理
await asyncio.sleep(0.1)
return f"异步处理完成: {item}"
# 创建异步处理 Runnable
async_processor = RunnableLambda(async_process_item)
# 异步批处理
async def async_batch_processing(items):
start_time = time.time()
results = await async_processor.abatch(items)
end_time = time.time()
return results, end_time - start_time
# 同步批处理对比
def sync_batch_processing(items):
def sync_process_item(item):
time.sleep(0.1) # 模拟同步处理
return f"同步处理完成: {item}"
sync_processor = RunnableLambda(sync_process_item)
start_time = time.time()
results = sync_processor.batch(items)
end_time = time.time()
return results, end_time - start_time
# 测试
async def main():
test_items = [f"项目{i}" for i in range(10)]
print("异步处理测试:")
async_results, async_time = await async_batch_processing(test_items)
print(f"异步处理耗时: {async_time:.2f} 秒")
print("\n同步处理对比:")
sync_results, sync_time = sync_batch_processing(test_items)
print(f"同步处理耗时: {sync_time:.2f} 秒")
print(f"\n性能提升: {sync_time/async_time:.2f} 倍")
# 运行测试
# asyncio.run(main())
12. Chains 与微服务架构
12.1 微服务架构的优势
微服务架构具有以下优势:
- 可扩展性 - 可以独立扩展不同的服务
- 灵活性 - 可以使用不同的技术栈
- 可维护性 - 服务之间相互独立,便于管理和维护
- 容错性 - 单个服务的故障不会影响整个系统
- 技术多样性 - 不同的服务可以使用最适合的技术栈
12.2 Chains 与微服务的结合
Chains 可以与微服务架构很好地结合,提供以下好处:
- 模块化 - Chains 可以作为独立的服务运行
- 可组合性 - 可以通过 API 调用组合不同的 Chains
- 可扩展性 - 可以独立扩展不同的 Chains
- 重用性 - 同一个 Chain 服务可以被多个应用调用
# chain_microservice.py
from flask import Flask, request, jsonify
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
app = Flask(__name__)
# 初始化模型
llm = ChatOpenAI(temperature=0.7)
prompt = ChatPromptTemplate.from_template("总结以下文本: {text}")
parser = StrOutputParser()
# 创建 Chain
summary_chain = prompt | llm | parser
@app.route('/summarize', methods=['POST'])
def summarize():
try:
data = request.json
text = data.get('text', '')
if not text:
return jsonify({'error': '文本不能为空'}), 400
# 调用 Chain
result = summary_chain.invoke({'text': text})
return jsonify({'summary': result})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 5000
# 启动应用
CMD ["python", "chain_microservice.py"]
# docker-compose.yml
version: '3.8'
services:
chain-service:
build: .
ports:
- "5000:5000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- OPENAI_BASE_URL=${OPENAI_BASE_URL}
- OPENAI_MODEL=${OPENAI_MODEL}
env_file:
- .env
12.3 Chains 作为微服务的实现
通过将 Chains 封装为 RESTful API,可以轻松地将它们部署为独立的微服务:
# chain_service_example.py
from flask import Flask, request, jsonify
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 加载环境变量
load_dotenv()
app = Flask(__name__)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('chain_service.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 配置模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
# 创建不同类型的 Chains
translation_prompt = ChatPromptTemplate.from_template("将以下文本翻译成{target_language}: {text}")
translation_chain = translation_prompt | llm | StrOutputParser()
sentiment_prompt = ChatPromptTemplate.from_template("分析以下文本的情感倾向: {text}")
sentiment_chain = sentiment_prompt | llm | StrOutputParser()
@app.route('/translate', methods=['POST'])
def translate():
data = request.json
text = data.get('text')
target_language = data.get('target_language', '英文')
try:
result = translation_chain.invoke({
'text': text,
'target_language': target_language
})
return jsonify({'translated_text': result})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/sentiment', methods=['POST'])
def analyze_sentiment():
data = request.json
text = data.get('text')
try:
result = sentiment_chain.invoke({'text': text})
return jsonify({'sentiment': result})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, port=5001)
12.4 Chains 与微服务的通信
在微服务架构中,不同的 Chains 服务可以通过 HTTP API 进行通信:
# chain_client.py
import requests
import json
class ChainServiceClient:
def __init__(self, base_url):
self.base_url = base_url
def translate_text(self, text, target_language="英文"):
"""调用翻译服务"""
url = f"{self.base_url}/translate"
payload = {
"text": text,
"target_language": target_language
}
response = requests.post(url, json=payload)
if response.status_code == 200:
return response.json()['translated_text']
else:
raise Exception(f"翻译服务调用失败: {response.text}")
def analyze_sentiment(self, text):
"""调用情感分析服务"""
url = f"{self.base_url}/sentiment"
payload = {"text": text}
response = requests.post(url, json=payload)
if response.status_code == 200:
return response.json()['sentiment']
else:
raise Exception(f"情感分析服务调用失败: {response.text}")
# 使用示例
# client = ChainServiceClient("http://localhost:5001")
# translation = client.translate_text("你好,世界!", "English")
# sentiment = client.analyze_sentiment("今天天气真好!")
13. Chains 在企业级应用中的部署
13.1 部署环境
在企业级应用中部署 Chains 时,需要考虑以下环境因素:
- 计算资源 - 确保有足够的 CPU 和内存资源
- 网络环境 - 稳定的网络连接以访问外部 API
- 安全性 - 保护 API 密钥和敏感数据
- 监控系统 - 实时监控服务状态和性能指标
- 日志系统 - 记录详细的运行日志用于问题排查
13.2 部署工具
常用的部署工具包括:
- Docker - 容器化部署,确保环境一致性
- Kubernetes - 容器编排,实现自动扩缩容
- CI/CD 工具 - 自动化部署流程
- 云服务提供商 - AWS, Azure, GCP 等
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 5000
# 启动应用
CMD ["python", "chain_microservice.py"]
# docker-compose.yml
version: '3.8'
services:
chain-service:
build: .
ports:
- "5000:5000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- OPENAI_BASE_URL=${OPENAI_BASE_URL}
- OPENAI_MODEL=${OPENAI_MODEL}
env_file:
- .env
13.3 部署流程
企业级部署流程通常包括以下步骤:
- 代码打包 - 将应用代码和依赖打包
- 环境配置 - 配置生产环境变量
- 容器构建 - 构建 Docker 镜像
- 服务部署 - 部署到服务器或云平台
- 健康检查 - 验证服务是否正常运行
- 监控配置 - 配置监控和告警
# 部署脚本示例
#!/bin/bash
# 1. 代码打包
git clone https://github.com/your-repo/chain-app.git
cd chain-app
# 2. 环境配置
cp .env.example .env
# 编辑 .env 文件,设置必要的环境变量
# 3. 容器构建
docker build -t chain-service:latest .
# 4. 服务部署
docker stop chain-service || true
docker rm chain-service || true
docker run -d \
--name chain-service \
-p 5000:5000 \
-e OPENAI_API_KEY=$OPENAI_API_KEY \
-e OPENAI_BASE_URL=$OPENAI_BASE_URL \
-e OPENAI_MODEL=$OPENAI_MODEL \
chain-service:latest
# 5. 健康检查
sleep 5
curl -f http://localhost:5000/health || echo "服务启动失败"
13.4 监控与日志
在生产环境中,监控和日志是确保服务稳定运行的关键:
# monitored_chain_service.py
import logging
from flask import Flask, request, jsonify
import time
from datetime import datetime
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
app = Flask(__name__)
# 初始化日志
logging.basicConfig(level=logging.INFO)
# 加载环境变量
load_dotenv()
# 初始化模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
# 创建 Chain
prompt = ChatPromptTemplate.from_template("总结以下文本: {text}")
chain = prompt | llm | StrOutputParser()
# 性能监控装饰器
def monitor_performance(func):
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
# 记录性能日志
logger.info(f"{func.__name__} 执行成功,耗时: {execution_time:.2f}秒")
return result
except Exception as e:
end_time = time.time()
execution_time = end_time - start_time
logger.error(f"{func.__name__} 执行失败,耗时: {execution_time:.2f}秒,错误: {str(e)}")
raise
return wrapper
@app.route('/summarize', methods=['POST'])
@monitor_performance
def summarize():
try:
data = request.json
text = data.get('text', '')
logger.info(f"收到摘要请求,文本长度: {len(text)} 字符")
if not text:
logger.warning("收到空文本请求")
return jsonify({'error': '文本不能为空'}), 400
# 调用 Chain
result = chain.invoke({'text': text})
logger.info("摘要生成成功")
return jsonify({'summary': result})
except Exception as e:
logger.error(f"摘要生成失败: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查端点"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'service': 'chain-service'
})
if __name__ == '__main__':
logger.info("Chain 服务启动中...")
app.run(host='0.0.0.0', port=5000)
14. Chains 的未来发展
LangChain Chains 的未来发展将集中在以下几个方向:
14.1 更强的可组合性
未来的 Chains 将支持更复杂的组合模式,包括:
- 条件组合 - 根据输入或中间结果动态选择处理路径
- 循环组合 - 支持迭代处理直到满足特定条件
- 分支组合 - 同时执行多个处理路径并合并结果
# conditional_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
# 加载环境变量
load_dotenv()
# 创建语言模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
# 创建提示词模板
prompt = ChatPromptTemplate.from_template("解释{concept}的概念")
# 创建输出解析器
parser = StrOutputParser()
# 创建 Chain
chain = prompt | llm | parser
# 创建条件处理函数
def conditional_processor(data):
if data["concept"] == "人工智能":
return chain.invoke({"concept": "AI"})
elif data["concept"] == "机器学习":
return chain.invoke({"concept": "ML"})
else:
return "未知概念"
# 创建 RunnableLambda
conditional_chain = RunnableLambda(conditional_processor)
# 使用 Chain
result = conditional_chain.invoke({"concept": "人工智能"})
print("条件 Chain 输出:")
print(result)
# loop_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
# 加载环境变量
load_dotenv()
# 创建语言模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
# 创建提示词模板
prompt = ChatPromptTemplate.from_template("解释{concept}的概念")
# 创建输出解析器
parser = StrOutputParser()
# 创建 Chain
chain = prompt | llm | parser
# 创建循环处理函数
def loop_processor(data):
results = []
for concept in data["concepts"]:
result = chain.invoke({"concept": concept})
results.append(result)
return results
# 创建 RunnableLambda
loop_chain = RunnableLambda(loop_processor)
# 使用 Chain
result = loop_chain.invoke({"concepts": ["人工智能", "机器学习", "深度学习"]})
print("循环 Chain 输出:")
print(result)
# branch_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
# 加载环境变量
load_dotenv()
# 创建语言模型
llm = ChatOpenAI(
model=os.getenv("OPENAI_MODEL"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7
)
# 创建提示词模板
prompt = ChatPromptTemplate.from_template("解释{concept}的概念")
# 创建输出解析器
parser = StrOutputParser()
# 创建 Chain
chain = prompt | llm | parser
# 创建分支处理函数
def branch_processor(data):
if data["type"] == "AI":
return chain.invoke({"concept": "人工智能"})
elif data["type"] == "ML":
return chain.invoke({"concept": "机器学习"})
else:
return "未知类型"
# 创建 RunnableLambda
branch_chain = RunnableLambda(branch_processor)
# 使用 Chain
result = branch_chain.invoke({"type": "AI"})
print("分支 Chain 输出:")
print(result)
# future_conditional_chain.py
from langchain_core.runnables import RunnableLambda, RunnableBranch
# 条件判断函数
def route_by_length(input_data):
if len(input_data["text"]) < 100:
return "short"
elif len(input_data["text"]) < 1000:
return "medium"
else:
return "long"
# 创建 RunnableLambda
short_chain = RunnableLambda(lambda x: f"短文本: {x}")
medium_chain = RunnableLambda(lambda x: f"中等文本: {x}")
long_chain = RunnableLambda(lambda x: f"长文本: {x}")
# 创建 RunnableBranch
conditional_chain = RunnableBranch(
route_by_length,
short=short_chain,
medium=medium_chain,
long=long_chain
)
# 使用 Chain
result = conditional_chain.invoke({"text": "这是一个简短的文本"})
print("条件 Chain 输出:")
print(result)
更多推荐

所有评论(0)