LangChain基础链:LLMChain与SequentialChain的流程编排逻辑深度解析
LLMChain作为LangChain框架中的最小功能单元,本质上是"PromptTemplate + LLM + OutputParser"的标准化封装。如何将结构化的提示词模板与大语言模型进行标准化连接。则是多链串行执行的编排工具,实现了复杂AI工作流的分步骤处理和数据流转。它将多个独立的处理单元按照依赖关系串联,形成完整的处理管道。LangChain基础链(LLMChain与Sequenti
·
LangChain基础链:LLMChain与SequentialChain的流程编排逻辑深度解析
目录
1. 核心定义与价值
1.1 本质定位
LLMChain作为LangChain框架中的最小功能单元,本质上是"PromptTemplate + LLM + OutputParser"的标准化封装。它解决了AI应用开发中最基础的问题:如何将结构化的提示词模板与大语言模型进行标准化连接。
SequentialChain则是多链串行执行的编排工具,实现了复杂AI工作流的分步骤处理和数据流转。它将多个独立的处理单元按照依赖关系串联,形成完整的处理管道。
1.2 核心痛点解决
传统开发困境
# 手动串联的复杂性和易错性
def manual_process(user_input):
# 步骤1:手动格式化提示词
prompt = f"请分析以下文本的情感:{user_input}"
# 步骤2:调用LLM
response1 = llm.invoke(prompt)
# 步骤3:手动解析输出
sentiment = extract_sentiment(response1)
# 步骤4:构建下一个提示词
prompt2 = f"基于情感分析结果'{sentiment}',生成回复:{user_input}"
# 步骤5:再次调用LLM
response2 = llm.invoke(prompt2)
# 问题:错误处理、状态管理、输入验证都需要手动实现
return response2
基础链解决方案
# LLMChain简化单步处理
sentiment_chain = LLMChain(
llm=llm,
prompt=PromptTemplate(
input_variables=["text"],
template="请分析以下文本的情感:{text}"
)
)
# SequentialChain简化多步流程
overall_chain = SequentialChain(
chains=[sentiment_chain, response_chain],
input_variables=["text"],
output_variables=["final_response"]
)
1.3 链路可视化
1.4 核心能力
- 组件解耦:Prompt、LLM、Parser独立配置,支持灵活组合
- 输入输出自动映射:自动处理链间数据传递和类型转换
- 同步/异步支持:完整的async/await支持,适应不同性能需求
- 错误处理机制:内置异常捕获和回调机制
- 内存集成:原生支持对话历史和上下文管理
- 可观测性:完整的回调和日志系统
2. 底层实现逻辑
2.1 LLMChain核心构成
架构组件协同
class LLMChain(Chain):
"""LLMChain的核心组件协同逻辑"""
# 核心组件
prompt: BasePromptTemplate # 提示词模板
llm: Union[Runnable, BaseLanguageModel] # 语言模型
output_parser: BaseLLMOutputParser # 输出解析器
def _call(self, inputs: dict[str, Any], run_manager=None) -> dict[str, str]:
"""完整链路执行逻辑"""
# 1. 提示词准备阶段
prompts, stop = self.prep_prompts([inputs], run_manager)
# 2. LLM推理阶段
response = self.generate([inputs], run_manager)
# 3. 输出处理阶段
return self.create_outputs(response)[0]
def prep_prompts(self, input_list, run_manager=None):
"""提示词准备的详细逻辑"""
prompts = []
for inputs in input_list:
# 提取模板所需变量
selected_inputs = {k: inputs[k] for k in self.prompt.input_variables}
# 格式化提示词
prompt = self.prompt.format_prompt(**selected_inputs)
# 可视化输出(调试用)
if run_manager:
colored_text = get_colored_text(prompt.to_string(), "green")
run_manager.on_text(f"Prompt after formatting:\n{colored_text}")
prompts.append(prompt)
return prompts, stop
数据流转机制
def create_outputs(self, llm_result: LLMResult) -> list[dict[str, Any]]:
"""输出创建的核心逻辑"""
result = []
for generation in llm_result.generations:
# 应用输出解析器
parsed_output = self.output_parser.parse_result(generation)
output_dict = {
self.output_key: parsed_output,
"full_generation": generation # 保留完整生成信息
}
# 根据配置决定返回内容
if self.return_final_only:
output_dict = {self.output_key: parsed_output}
result.append(output_dict)
return result
2.2 SequentialChain串联原理
多链串行核心机制
class SequentialChain(Chain):
"""多链串行执行的核心实现"""
def _call(self, inputs: dict[str, str], run_manager=None) -> dict[str, str]:
"""串行执行的核心逻辑"""
known_values = inputs.copy() # 累积所有已知变量
for i, chain in enumerate(self.chains):
# 为每个链创建子回调管理器
callbacks = run_manager.get_child() if run_manager else None
# 执行当前链,只返回新产生的输出
outputs = chain(known_values, return_only_outputs=True, callbacks=callbacks)
# 将新输出合并到已知变量中
known_values.update(outputs)
# 根据配置返回指定的输出变量
return {k: known_values[k] for k in self.output_variables}
输入输出映射验证
@model_validator(mode="before")
@classmethod
def validate_chains(cls, values: dict) -> Any:
"""链间依赖关系验证"""
chains = values["chains"]
input_variables = values["input_variables"]
# 初始化已知变量集合
known_variables = set(input_variables)
# 如果有内存组件,添加内存变量
if values.get("memory"):
memory_keys = values["memory"].memory_variables
known_variables.update(memory_keys)
# 逐个验证每个链的输入输出
for chain in chains:
# 检查当前链的输入是否都能被满足
missing_vars = set(chain.input_keys).difference(known_variables)
if missing_vars:
raise ValueError(f"Missing required input keys: {missing_vars}")
# 检查输出是否与已知变量冲突
overlapping_keys = known_variables.intersection(chain.output_keys)
if overlapping_keys:
raise ValueError(f"Chain returned keys that already exist: {overlapping_keys}")
# 将当前链的输出添加到已知变量中
known_variables.update(chain.output_keys)
return values
2.3 数据流转逻辑示例
# 示例:文本分析 -> 情感判断 -> 回复生成
def demonstrate_data_flow():
"""演示SequentialChain中的数据流转"""
# 输入: {"user_text": "我今天很开心"}
initial_input = {"user_text": "我今天很开心"}
# Step 1: 文本分析链
# 输入: {"user_text": "我今天很开心"}
# 输出: {"analysis": "积极情绪表达"}
# Step 2: 情感判断链
# 输入: {"user_text": "我今天很开心", "analysis": "积极情绪表达"}
# 输出: {"sentiment": "positive", "confidence": 0.95}
# Step 3: 回复生成链
# 输入: {"user_text": "我今天很开心", "analysis": "积极情绪表达",
# "sentiment": "positive", "confidence": 0.95}
# 输出: {"response": "很高兴听到你今天心情不错!"}
# 最终输出: {"response": "很高兴听到你今天心情不错!"}
---
## 3. 代码实践
### 3.1 基础实践1:LLMChain核心用法
#### 依赖安装
```bash
# 安装核心依赖
pip install langchain langchain-openai langchain-core
# 或使用conda
conda install -c conda-forge langchain
完整可运行代码
import os
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
from langchain_openai import OpenAI
from langchain_core.output_parsers import StrOutputParser
# 设置API密钥
os.environ["OPENAI_API_KEY"] = "your-api-key-here"
def basic_llm_chain_example():
"""LLMChain基础用法示例"""
# 1. 创建提示词模板
prompt_template = PromptTemplate(
input_variables=["product", "audience"],
template="""
为{product}写一个面向{audience}的营销文案。
要求:
- 突出产品特点
- 符合目标受众特征
- 语言简洁有力
营销文案:
"""
)
# 2. 初始化LLM
llm = OpenAI(
temperature=0.7, # 控制创造性
max_tokens=200 # 限制输出长度
)
# 3. 创建LLMChain
marketing_chain = LLMChain(
llm=llm,
prompt=prompt_template,
output_parser=StrOutputParser(), # 字符串输出解析器
verbose=True # 显示详细执行过程
)
# 4. 执行链
result = marketing_chain.invoke({
"product": "智能手表",
"audience": "健身爱好者"
})
print("生成的营销文案:")
print(result["text"])
return marketing_chain
def advanced_llm_chain_features():
"""LLMChain高级特性演示"""
# 自定义输出解析器
class MarketingOutputParser:
def parse(self, text: str) -> dict:
"""解析营销文案为结构化数据"""
lines = text.strip().split('\n')
return {
"headline": lines[0] if lines else "",
"body": '\n'.join(lines[1:]) if len(lines) > 1 else "",
"word_count": len(text.split())
}
prompt = PromptTemplate(
input_variables=["topic"],
template="写一个关于{topic}的简短介绍,包含标题和正文:"
)
chain = LLMChain(
llm=OpenAI(temperature=0.5),
prompt=prompt,
output_parser=MarketingOutputParser()
)
# 批量处理
topics = ["人工智能", "区块链技术", "量子计算"]
results = chain.apply([{"topic": topic} for topic in topics])
for i, result in enumerate(results):
print(f"\n主题 {i+1}: {topics[i]}")
print(f"标题: {result['headline']}")
print(f"字数: {result['word_count']}")
# 异步执行示例
async def async_llm_chain_example():
"""异步LLMChain使用示例"""
prompt = PromptTemplate(
input_variables=["question"],
template="请简洁回答:{question}"
)
chain = LLMChain(
llm=OpenAI(),
prompt=prompt
)
# 异步执行
result = await chain.ainvoke({"question": "什么是机器学习?"})
print("异步执行结果:", result["text"])
if __name__ == "__main__":
# 运行基础示例
basic_llm_chain_example()
# 运行高级特性示例
advanced_llm_chain_features()
# 运行异步示例(需要在异步环境中)
import asyncio
asyncio.run(async_llm_chain_example())
3.2 基础实践2:SequentialChain多链串联
SimpleSequentialChain vs SequentialChain对比
from langchain.chains import SequentialChain, SimpleSequentialChain
from langchain_core.prompts import PromptTemplate
from langchain_openai import OpenAI
def simple_vs_full_sequential_comparison():
"""对比SimpleSequentialChain与SequentialChain"""
llm = OpenAI(temperature=0.7)
# === SimpleSequentialChain示例 ===
# 限制:每个链只能有一个输入和一个输出
# 第一个链:生成故事大纲
outline_prompt = PromptTemplate(
input_variables=["topic"],
template="为'{topic}'写一个简短的故事大纲:"
)
outline_chain = LLMChain(llm=llm, prompt=outline_prompt)
# 第二个链:扩展故事
story_prompt = PromptTemplate(
input_variables=["outline"],
template="基于以下大纲,写一个完整的短故事:\n{outline}"
)
story_chain = LLMChain(llm=llm, prompt=story_prompt)
# 创建SimpleSequentialChain
simple_chain = SimpleSequentialChain(
chains=[outline_chain, story_chain],
verbose=True
)
# 执行SimpleSequentialChain
simple_result = simple_chain.invoke({"input": "太空探险"})
print("SimpleSequentialChain结果:")
print(simple_result["output"])
# === SequentialChain示例 ===
# 优势:支持多输入多输出,更灵活的数据流转
# 第一个链:分析主题
analysis_prompt = PromptTemplate(
input_variables=["topic", "genre"],
template="分析'{topic}'作为{genre}类型故事的要素:\n主要冲突:\n角色设定:\n背景设置:"
)
analysis_chain = LLMChain(
llm=llm,
prompt=analysis_prompt,
output_key="analysis" # 指定输出键名
)
# 第二个链:创建角色
character_prompt = PromptTemplate(
input_variables=["topic", "analysis"],
template="基于主题'{topic}'和分析'{analysis}',创建主要角色:\n姓名:\n性格:\n背景:"
)
character_chain = LLMChain(
llm=llm,
prompt=character_prompt,
output_key="characters"
)
# 第三个链:生成完整故事
full_story_prompt = PromptTemplate(
input_variables=["topic", "analysis", "characters"],
template="""
基于以下信息创作完整故事:
主题:{topic}
分析:{analysis}
角色:{characters}
故事:
"""
)
full_story_chain = LLMChain(
llm=llm,
prompt=full_story_prompt,
output_key="story"
)
# 创建SequentialChain
full_chain = SequentialChain(
chains=[analysis_chain, character_chain, full_story_chain],
input_variables=["topic", "genre"],
output_variables=["story"], # 只返回最终故事
verbose=True
)
# 执行SequentialChain
full_result = full_chain.invoke({
"topic": "太空探险",
"genre": "科幻"
})
print("\nSequentialChain结果:")
print(full_result["story"])
def complex_sequential_chain_example():
"""复杂SequentialChain实际应用示例"""
llm = OpenAI(temperature=0.3)
# 链1:文本预处理
preprocess_prompt = PromptTemplate(
input_variables=["raw_text"],
template="""
对以下文本进行预处理,提取关键信息:
原文:{raw_text}
请提取:
1. 主要主题
2. 关键实体
3. 情感倾向
提取结果:
"""
)
preprocess_chain = LLMChain(
llm=llm,
prompt=preprocess_prompt,
output_key="processed_info"
)
# 链2:内容分析
analysis_prompt = PromptTemplate(
input_variables=["raw_text", "processed_info"],
template="""
基于原文和预处理信息进行深度分析:
原文:{raw_text}
预处理信息:{processed_info}
分析维度:
1. 内容质量评分(1-10)
2. 目标受众
3. 改进建议
分析结果:
"""
)
analysis_chain = LLMChain(
llm=llm,
prompt=analysis_prompt,
output_key="analysis_result"
)
# 链3:优化建议
optimization_prompt = PromptTemplate(
input_variables=["raw_text", "processed_info", "analysis_result"],
template="""
基于分析结果提供具体的优化建议:
原文:{raw_text}
预处理信息:{processed_info}
分析结果:{analysis_result}
请提供:
1. 3个具体的改进点
2. 重写建议
3. SEO优化建议
优化建议:
"""
)
optimization_chain = LLMChain(
llm=llm,
prompt=optimization_prompt,
output_key="optimization_suggestions"
)
# 创建完整的内容分析链
content_analysis_chain = SequentialChain(
chains=[preprocess_chain, analysis_chain, optimization_chain],
input_variables=["raw_text"],
output_variables=["processed_info", "analysis_result", "optimization_suggestions"],
return_all=True, # 返回所有中间结果
verbose=True
)
# 测试文本
test_text = """
人工智能技术正在快速发展,它改变了我们的生活方式。
从智能手机到自动驾驶汽车,AI无处不在。
但是我们也需要考虑AI带来的挑战和风险。
"""
# 执行分析
result = content_analysis_chain.invoke({"raw_text": test_text})
print("=== 内容分析结果 ===")
print(f"预处理信息:\n{result['processed_info']}\n")
print(f"分析结果:\n{result['analysis_result']}\n")
print(f"优化建议:\n{result['optimization_suggestions']}")
if __name__ == "__main__":
# 运行对比示例
simple_vs_full_sequential_comparison()
print("\n" + "="*50 + "\n")
# 运行复杂示例
complex_sequential_chain_example()
3.3 进阶实践:基础链与Runnable的结合
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.output_parsers import JsonOutputParser
from langchain.chains import LLMChain
def chain_runnable_integration():
"""基础链与Runnable接口的集成使用"""
llm = OpenAI(temperature=0.5)
# 1. 传统LLMChain方式
traditional_prompt = PromptTemplate(
input_variables=["question"],
template="回答问题:{question}"
)
traditional_chain = LLMChain(llm=llm, prompt=traditional_prompt)
# 2. 现代Runnable方式(推荐)
modern_prompt = PromptTemplate(
input_variables=["question"],
template="回答问题:{question}"
)
modern_chain = modern_prompt | llm | StrOutputParser()
# 3. 混合使用:LLMChain + Runnable组合
# 创建并行处理分支
parallel_analysis = RunnableParallel({
"sentiment": LLMChain(
llm=llm,
prompt=PromptTemplate(
input_variables=["text"],
template="分析情感:{text}\n情感:"
)
),
"keywords": LLMChain(
llm=llm,
prompt=PromptTemplate(
input_variables=["text"],
template="提取关键词:{text}\n关键词:"
)
),
"summary": LLMChain(
llm=llm,
prompt=PromptTemplate(
input_variables=["text"],
template="总结内容:{text}\n总结:"
)
)
})
# 创建综合分析链
synthesis_chain = LLMChain(
llm=llm,
prompt=PromptTemplate(
input_variables=["sentiment", "keywords", "summary", "original_text"],
template="""
基于以下分析结果,生成综合报告:
原文:{original_text}
情感分析:{sentiment}
关键词:{keywords}
内容总结:{summary}
综合报告:
"""
)
)
# 组合完整流程
complete_pipeline = (
RunnablePassthrough.assign(
analysis=parallel_analysis
)
| RunnablePassthrough.assign(
report=lambda x: synthesis_chain.invoke({
"sentiment": x["analysis"]["sentiment"]["text"],
"keywords": x["analysis"]["keywords"]["text"],
"summary": x["analysis"]["summary"]["text"],
"original_text": x["text"]
})["text"]
)
)
# 测试流程
test_input = {
"text": "今天的会议非常成功,我们讨论了新产品的发布计划,团队士气很高。"
}
result = complete_pipeline.invoke(test_input)
print("=== 综合分析结果 ===")
print(f"原文:{result['text']}")
print(f"情感分析:{result['analysis']['sentiment']['text']}")
print(f"关键词:{result['analysis']['keywords']['text']}")
print(f"内容总结:{result['analysis']['summary']['text']}")
print(f"综合报告:{result['report']}")
def memory_enabled_sequential_chain():
"""带内存的SequentialChain示例"""
from langchain.memory import ConversationBufferMemory
llm = OpenAI(temperature=0.7)
# 创建对话内存
memory = ConversationBufferMemory(
memory_key="chat_history",
input_key="user_input",
output_key="response"
)
# 上下文理解链
context_chain = LLMChain(
llm=llm,
prompt=PromptTemplate(
input_variables=["user_input", "chat_history"],
template="""
基于对话历史理解用户意图:
对话历史:{chat_history}
用户输入:{user_input}
用户意图分析:
"""
),
output_key="intent_analysis"
)
# 回复生成链
response_chain = LLMChain(
llm=llm,
prompt=PromptTemplate(
input_variables=["user_input", "chat_history", "intent_analysis"],
template="""
基于对话历史和意图分析生成回复:
对话历史:{chat_history}
用户输入:{user_input}
意图分析:{intent_analysis}
回复:
"""
),
output_key="response"
)
# 创建带内存的顺序链
conversation_chain = SequentialChain(
chains=[context_chain, response_chain],
input_variables=["user_input"],
output_variables=["response"],
memory=memory,
verbose=True
)
# 模拟多轮对话
conversations = [
"你好,我想了解人工智能",
"它有哪些应用领域?",
"在医疗方面具体有什么用途?",
"谢谢你的解答"
]
print("=== 多轮对话示例 ===")
for i, user_input in enumerate(conversations):
print(f"\n轮次 {i+1}:")
print(f"用户:{user_input}")
result = conversation_chain.invoke({"user_input": user_input})
print(f"AI:{result['response']}")
if __name__ == "__main__":
# 运行集成示例
chain_runnable_integration()
print("\n" + "="*50 + "\n")
# 运行内存示例
memory_enabled_sequential_chain()
4. 设计考量
4.1 组件解耦价值
职责分离原则
LangChain基础链的设计遵循单一职责原则,每个组件都有明确的边界:
# 组件职责清晰分离
class ComponentSeparationDemo:
"""演示组件解耦的价值"""
def __init__(self):
# 提示词模板:负责输入格式化
self.prompt_template = PromptTemplate(
input_variables=["context", "question"],
template="""
上下文:{context}
问题:{question}
请基于上下文回答问题:
"""
)
# 语言模型:负责推理生成
self.llm = OpenAI(temperature=0.3)
# 输出解析器:负责结果处理
self.output_parser = JsonOutputParser()
def demonstrate_flexibility(self):
"""演示组件独立替换的灵活性"""
# 场景1:更换不同的LLM
gpt4_chain = LLMChain(
prompt=self.prompt_template,
llm=ChatOpenAI(model="gpt-4"), # 替换为GPT-4
output_parser=self.output_parser
)
# 场景2:更换不同的提示词策略
cot_prompt = PromptTemplate(
input_variables=["context", "question"],
template="""
上下文:{context}
问题:{question}
让我们一步步思考:
1. 首先分析问题的关键点
2. 然后在上下文中寻找相关信息
3. 最后得出结论
回答:
"""
)
cot_chain = LLMChain(
prompt=cot_prompt, # 替换为思维链提示词
llm=self.llm,
output_parser=StrOutputParser() # 替换为字符串解析器
)
return gpt4_chain, cot_chain
配置驱动的灵活性
def configuration_driven_chains():
"""配置驱动的链构建"""
# 配置文件定义
chain_configs = {
"qa_chain": {
"prompt_template": "基于{context}回答{question}",
"llm_config": {"temperature": 0.1, "max_tokens": 200},
"output_format": "string"
},
"creative_chain": {
"prompt_template": "创造性地扩展这个想法:{idea}",
"llm_config": {"temperature": 0.9, "max_tokens": 500},
"output_format": "json"
},
"analysis_chain": {
"prompt_template": "分析{data}的趋势和模式",
"llm_config": {"temperature": 0.3, "max_tokens": 300},
"output_format": "structured"
}
}
def build_chain_from_config(config_name: str) -> LLMChain:
"""根据配置构建链"""
config = chain_configs[config_name]
# 动态创建提示词模板
prompt = PromptTemplate.from_template(config["prompt_template"])
# 动态配置LLM
llm = OpenAI(**config["llm_config"])
# 动态选择输出解析器
parser_map = {
"string": StrOutputParser(),
"json": JsonOutputParser(),
"structured": PydanticOutputParser(pydantic_object=AnalysisResult)
}
output_parser = parser_map[config["output_format"]]
return LLMChain(
prompt=prompt,
llm=llm,
output_parser=output_parser
)
# 根据需求动态构建不同的链
qa_chain = build_chain_from_config("qa_chain")
creative_chain = build_chain_from_config("creative_chain")
analysis_chain = build_chain_from_config("analysis_chain")
return qa_chain, creative_chain, analysis_chain
4.2 流程复用性
模板化流程设计
class ReusableChainTemplates:
"""可复用的链模板设计"""
@staticmethod
def create_analysis_template(analysis_type: str) -> SequentialChain:
"""创建通用分析流程模板"""
# 数据预处理链
preprocess_chain = LLMChain(
llm=OpenAI(temperature=0.1),
prompt=PromptTemplate(
input_variables=["raw_data"],
template=f"对以下数据进行{analysis_type}分析的预处理:{{raw_data}}"
),
output_key="preprocessed_data"
)
# 核心分析链
analysis_chain = LLMChain(
llm=OpenAI(temperature=0.3),
prompt=PromptTemplate(
input_variables=["raw_data", "preprocessed_data"],
template=f"""
基于预处理结果进行{analysis_type}分析:
原始数据:{{raw_data}}
预处理结果:{{preprocessed_data}}
分析结果:
"""
),
output_key="analysis_result"
)
# 结论生成链
conclusion_chain = LLMChain(
llm=OpenAI(temperature=0.2),
prompt=PromptTemplate(
input_variables=["raw_data", "analysis_result"],
template=f"""
基于{analysis_type}分析结果生成结论和建议:
原始数据:{{raw_data}}
分析结果:{{analysis_result}}
结论和建议:
"""
),
output_key="conclusion"
)
return SequentialChain(
chains=[preprocess_chain, analysis_chain, conclusion_chain],
input_variables=["raw_data"],
output_variables=["conclusion"],
verbose=True
)
4.3 与其他组件协同
内存系统集成
def memory_integration_patterns():
"""内存系统集成模式"""
from langchain.memory import (
ConversationBufferMemory,
ConversationSummaryMemory,
ConversationBufferWindowMemory
)
# 模式1:缓冲内存 - 保存完整对话历史
buffer_memory = ConversationBufferMemory(
memory_key="chat_history",
input_key="user_input",
output_key="ai_response"
)
# 模式2:摘要内存 - 压缩长对话
summary_memory = ConversationSummaryMemory(
llm=OpenAI(temperature=0),
memory_key="chat_summary",
input_key="user_input",
output_key="ai_response"
)
# 模式3:滑动窗口内存 - 保持最近N轮对话
window_memory = ConversationBufferWindowMemory(
k=5, # 保持最近5轮对话
memory_key="recent_chat",
input_key="user_input",
output_key="ai_response"
)
# 创建带不同内存的链
def create_memory_aware_chain(memory_type: str):
memory_map = {
"buffer": buffer_memory,
"summary": summary_memory,
"window": window_memory
}
memory = memory_map[memory_type]
memory_key = memory.memory_key
chain = LLMChain(
llm=OpenAI(temperature=0.7),
prompt=PromptTemplate(
input_variables=["user_input", memory_key],
template=f"""
基于对话历史回复用户:
对话历史:{{{memory_key}}}
用户输入:{{user_input}}
回复:
"""
),
memory=memory,
verbose=True
)
return chain
return create_memory_aware_chain
4.4 开发者体验
调试和可观测性
def debugging_and_observability():
"""调试和可观测性功能"""
from langchain.callbacks import StdOutCallbackHandler
from langchain_core.callbacks import BaseCallbackHandler
class CustomCallbackHandler(BaseCallbackHandler):
"""自定义回调处理器"""
def on_chain_start(self, serialized, inputs, **kwargs):
print(f"🚀 链开始执行: {inputs}")
def on_chain_end(self, outputs, **kwargs):
print(f"✅ 链执行完成: {outputs}")
def on_llm_start(self, serialized, prompts, **kwargs):
print(f"🤖 LLM开始推理,提示词数量: {len(prompts)}")
def on_llm_end(self, response, **kwargs):
print(f"🎯 LLM推理完成,生成数量: {len(response.generations)}")
# 创建带回调的链
chain_with_callbacks = LLMChain(
llm=OpenAI(temperature=0.5),
prompt=PromptTemplate(
input_variables=["question"],
template="回答问题:{question}"
),
callbacks=[CustomCallbackHandler(), StdOutCallbackHandler()],
verbose=True
)
# 测试回调效果
result = chain_with_callbacks.invoke({"question": "什么是LangChain?"})
return result
5. 替代方案与优化空间
5.1 替代实现方案
方案1:纯Runnable实现
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.output_parsers import StrOutputParser
def pure_runnable_alternative():
"""使用纯Runnable接口替代基础链"""
# 替代LLMChain的现代实现
def create_modern_llm_chain():
prompt = PromptTemplate(
input_variables=["topic"],
template="写一篇关于{topic}的文章"
)
# 传统LLMChain方式
traditional = LLMChain(
llm=OpenAI(),
prompt=prompt
)
# 现代Runnable方式
modern = prompt | OpenAI() | StrOutputParser()
return traditional, modern
# 替代SequentialChain的现代实现
def create_modern_sequential_chain():
# 步骤1:主题分析
topic_analysis = (
PromptTemplate.from_template("分析主题:{topic}")
| OpenAI()
| StrOutputParser()
)
# 步骤2:内容生成
content_generation = (
PromptTemplate.from_template(
"基于分析'{analysis}',为主题'{topic}'生成内容"
)
| OpenAI()
| StrOutputParser()
)
# 组合成完整流程
complete_pipeline = (
RunnablePassthrough.assign(
analysis=topic_analysis
)
| RunnablePassthrough.assign(
content=content_generation
)
)
return complete_pipeline
return create_modern_llm_chain, create_modern_sequential_chain
# 方案2:自定义链实现
def custom_chain_implementation():
"""自定义链实现方案"""
class CustomLLMChain:
"""自定义LLM链实现"""
def __init__(self, llm, prompt_template, output_parser=None):
self.llm = llm
self.prompt_template = prompt_template
self.output_parser = output_parser or StrOutputParser()
def invoke(self, inputs: dict) -> dict:
"""执行链"""
# 1. 格式化提示词
prompt = self.prompt_template.format(**inputs)
# 2. 调用LLM
response = self.llm.invoke(prompt)
# 3. 解析输出
parsed_output = self.output_parser.parse(response)
return {"output": parsed_output}
async def ainvoke(self, inputs: dict) -> dict:
"""异步执行链"""
prompt = self.prompt_template.format(**inputs)
response = await self.llm.ainvoke(prompt)
parsed_output = self.output_parser.parse(response)
return {"output": parsed_output}
class CustomSequentialChain:
"""自定义顺序链实现"""
def __init__(self, chains: list, input_variables: list, output_variables: list):
self.chains = chains
self.input_variables = input_variables
self.output_variables = output_variables
def invoke(self, inputs: dict) -> dict:
"""执行顺序链"""
current_data = inputs.copy()
for chain in self.chains:
# 执行当前链
result = chain.invoke(current_data)
# 合并结果到当前数据
current_data.update(result)
# 返回指定的输出变量
return {k: current_data[k] for k in self.output_variables if k in current_data}
return CustomLLMChain, CustomSequentialChain
5.2 性能优化方向
优化1:并行执行优化
import asyncio
from concurrent.futures import ThreadPoolExecutor
def parallel_execution_optimization():
"""并行执行优化"""
class ParallelSequentialChain:
"""支持并行执行的顺序链"""
def __init__(self, chain_groups: list, max_workers: int = 4):
self.chain_groups = chain_groups # 每组内的链可以并行执行
self.max_workers = max_workers
async def ainvoke(self, inputs: dict) -> dict:
"""异步并行执行"""
current_data = inputs.copy()
for group in self.chain_groups:
if len(group) == 1:
# 单个链,直接执行
result = await group[0].ainvoke(current_data)
current_data.update(result)
else:
# 多个链,并行执行
tasks = [chain.ainvoke(current_data) for chain in group]
results = await asyncio.gather(*tasks)
# 合并所有结果
for result in results:
current_data.update(result)
return current_data
def invoke(self, inputs: dict) -> dict:
"""同步并行执行"""
current_data = inputs.copy()
for group in self.chain_groups:
if len(group) == 1:
result = group[0].invoke(current_data)
current_data.update(result)
else:
# 使用线程池并行执行
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [
executor.submit(chain.invoke, current_data)
for chain in group
]
results = [future.result() for future in futures]
for result in results:
current_data.update(result)
return current_data
return ParallelSequentialChain
# 优化2:缓存机制
def caching_optimization():
"""缓存机制优化"""
import hashlib
import json
from functools import wraps
class CachedLLMChain:
"""带缓存的LLM链"""
def __init__(self, llm, prompt, cache_ttl=3600):
self.llm = llm
self.prompt = prompt
self.cache = {}
self.cache_ttl = cache_ttl
def _generate_cache_key(self, inputs: dict) -> str:
"""生成缓存键"""
# 将输入序列化为JSON字符串,然后计算哈希
input_str = json.dumps(inputs, sort_keys=True)
return hashlib.md5(input_str.encode()).hexdigest()
def invoke(self, inputs: dict) -> dict:
"""带缓存的执行"""
cache_key = self._generate_cache_key(inputs)
# 检查缓存
if cache_key in self.cache:
cached_result, timestamp = self.cache[cache_key]
if time.time() - timestamp < self.cache_ttl:
print(f"🎯 缓存命中: {cache_key[:8]}...")
return cached_result
# 执行链
prompt_text = self.prompt.format(**inputs)
response = self.llm.invoke(prompt_text)
result = {"text": response}
# 存储到缓存
self.cache[cache_key] = (result, time.time())
print(f"💾 结果已缓存: {cache_key[:8]}...")
return result
return CachedLLMChain
# 优化3:流式输出
def streaming_optimization():
"""流式输出优化"""
class StreamingLLMChain:
"""支持流式输出的LLM链"""
def __init__(self, llm, prompt):
self.llm = llm
self.prompt = prompt
def stream(self, inputs: dict):
"""流式执行"""
prompt_text = self.prompt.format(**inputs)
# 使用流式LLM
for chunk in self.llm.stream(prompt_text):
yield chunk
async def astream(self, inputs: dict):
"""异步流式执行"""
prompt_text = self.prompt.format(**inputs)
async for chunk in self.llm.astream(prompt_text):
yield chunk
return StreamingLLMChain
5.3 功能扩展优化
扩展1:条件分支链
def conditional_chain_extension():
"""条件分支链扩展"""
class ConditionalChain:
"""支持条件分支的链"""
def __init__(self, condition_func, true_chain, false_chain):
self.condition_func = condition_func
self.true_chain = true_chain
self.false_chain = false_chain
def invoke(self, inputs: dict) -> dict:
"""根据条件执行不同的链"""
if self.condition_func(inputs):
return self.true_chain.invoke(inputs)
else:
return self.false_chain.invoke(inputs)
# 使用示例
def create_sentiment_based_chain():
"""创建基于情感的条件链"""
# 情感分析链
sentiment_chain = LLMChain(
llm=OpenAI(temperature=0.1),
prompt=PromptTemplate.from_template("分析情感:{text}\n情感(positive/negative):")
)
# 积极回复链
positive_chain = LLMChain(
llm=OpenAI(temperature=0.7),
prompt=PromptTemplate.from_template("对积极内容的回复:{text}")
)
# 消极回复链
negative_chain = LLMChain(
llm=OpenAI(temperature=0.3),
prompt=PromptTemplate.from_template("对消极内容的安慰回复:{text}")
)
# 条件函数
def is_positive_sentiment(inputs):
sentiment_result = sentiment_chain.invoke(inputs)
return "positive" in sentiment_result["text"].lower()
# 创建条件链
conditional_chain = ConditionalChain(
condition_func=is_positive_sentiment,
true_chain=positive_chain,
false_chain=negative_chain
)
return conditional_chain
return create_sentiment_based_chain
# 扩展2:重试机制
def retry_mechanism_extension():
"""重试机制扩展"""
class RetryChain:
"""支持重试的链"""
def __init__(self, base_chain, max_retries=3, retry_delay=1.0):
self.base_chain = base_chain
self.max_retries = max_retries
self.retry_delay = retry_delay
def invoke(self, inputs: dict) -> dict:
"""带重试的执行"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
result = self.base_chain.invoke(inputs)
if attempt > 0:
print(f"✅ 重试成功,尝试次数: {attempt + 1}")
return result
except Exception as e:
last_exception = e
if attempt < self.max_retries:
print(f"⚠️ 执行失败,{self.retry_delay}秒后重试... (尝试 {attempt + 1}/{self.max_retries + 1})")
time.sleep(self.retry_delay)
else:
print(f"❌ 所有重试都失败了")
raise last_exception
return RetryChain
总结
LangChain基础链(LLMChain与SequentialChain)作为AI应用开发的核心基础设施,通过标准化的组件封装和流程编排,极大地简化了复杂AI工作流的构建。
核心价值总结
- 最小功能单元:LLMChain提供了Prompt+LLM+Parser的标准化封装
- 流程编排工具:SequentialChain实现了多步骤AI处理的自动化串联
- 组件解耦设计:支持灵活的组件替换和配置驱动开发
- 丰富的集成能力:与内存、回调、异步等系统无缝集成
发展趋势
- 现代化迁移:从传统Chain向Runnable接口的演进
- 性能优化:并行执行、缓存机制、流式输出等优化方向
- 功能扩展:条件分支、重试机制、智能路由等高级特性
基础链作为LangChain生态的基石组件,为构建复杂的AI应用提供了坚实的技术基础,是每个LangChain开发者必须掌握的核心技术。
更多推荐
所有评论(0)