LangChain基础链:LLMChain与SequentialChain的流程编排逻辑深度解析

目录

  1. 核心定义与价值
  2. 底层实现逻辑
  3. 代码实践
  4. 设计考量
  5. 替代方案与优化空间

1. 核心定义与价值

1.1 本质定位

LLMChain作为LangChain框架中的最小功能单元,本质上是"PromptTemplate + LLM + OutputParser"的标准化封装。它解决了AI应用开发中最基础的问题:如何将结构化的提示词模板与大语言模型进行标准化连接

SequentialChain则是多链串行执行的编排工具,实现了复杂AI工作流的分步骤处理和数据流转。它将多个独立的处理单元按照依赖关系串联,形成完整的处理管道。

1.2 核心痛点解决

传统开发困境
# 手动串联的复杂性和易错性
def manual_process(user_input):
    # 步骤1:手动格式化提示词
    prompt = f"请分析以下文本的情感:{user_input}"
    
    # 步骤2:调用LLM
    response1 = llm.invoke(prompt)
    
    # 步骤3:手动解析输出
    sentiment = extract_sentiment(response1)
    
    # 步骤4:构建下一个提示词
    prompt2 = f"基于情感分析结果'{sentiment}',生成回复:{user_input}"
    
    # 步骤5:再次调用LLM
    response2 = llm.invoke(prompt2)
    
    # 问题:错误处理、状态管理、输入验证都需要手动实现
    return response2
基础链解决方案
# LLMChain简化单步处理
sentiment_chain = LLMChain(
    llm=llm,
    prompt=PromptTemplate(
        input_variables=["text"],
        template="请分析以下文本的情感:{text}"
    )
)

# SequentialChain简化多步流程
overall_chain = SequentialChain(
    chains=[sentiment_chain, response_chain],
    input_variables=["text"],
    output_variables=["final_response"]
)

1.3 链路可视化

SequentialChain内部
LLMChain内部
输出映射
Chain1执行
输入准备
Chain2执行
输入验证
PromptTemplate格式化
变量替换
Prompt生成
用户输入
LLMChain核心流程
LLM推理
OutputParser解析
输出结果
多链输入
SequentialChain编排
数据流转
Chain3执行
最终输出

1.4 核心能力

  • 组件解耦:Prompt、LLM、Parser独立配置,支持灵活组合
  • 输入输出自动映射:自动处理链间数据传递和类型转换
  • 同步/异步支持:完整的async/await支持,适应不同性能需求
  • 错误处理机制:内置异常捕获和回调机制
  • 内存集成:原生支持对话历史和上下文管理
  • 可观测性:完整的回调和日志系统

2. 底层实现逻辑

2.1 LLMChain核心构成

架构组件协同
class LLMChain(Chain):
    """LLMChain的核心组件协同逻辑"""
    
    # 核心组件
    prompt: BasePromptTemplate      # 提示词模板
    llm: Union[Runnable, BaseLanguageModel]  # 语言模型
    output_parser: BaseLLMOutputParser       # 输出解析器
    
    def _call(self, inputs: dict[str, Any], run_manager=None) -> dict[str, str]:
        """完整链路执行逻辑"""
        # 1. 提示词准备阶段
        prompts, stop = self.prep_prompts([inputs], run_manager)
        
        # 2. LLM推理阶段
        response = self.generate([inputs], run_manager)
        
        # 3. 输出处理阶段
        return self.create_outputs(response)[0]
    
    def prep_prompts(self, input_list, run_manager=None):
        """提示词准备的详细逻辑"""
        prompts = []
        for inputs in input_list:
            # 提取模板所需变量
            selected_inputs = {k: inputs[k] for k in self.prompt.input_variables}
            
            # 格式化提示词
            prompt = self.prompt.format_prompt(**selected_inputs)
            
            # 可视化输出(调试用)
            if run_manager:
                colored_text = get_colored_text(prompt.to_string(), "green")
                run_manager.on_text(f"Prompt after formatting:\n{colored_text}")
            
            prompts.append(prompt)
        return prompts, stop
数据流转机制
def create_outputs(self, llm_result: LLMResult) -> list[dict[str, Any]]:
    """输出创建的核心逻辑"""
    result = []
    for generation in llm_result.generations:
        # 应用输出解析器
        parsed_output = self.output_parser.parse_result(generation)
        
        output_dict = {
            self.output_key: parsed_output,
            "full_generation": generation  # 保留完整生成信息
        }
        
        # 根据配置决定返回内容
        if self.return_final_only:
            output_dict = {self.output_key: parsed_output}
            
        result.append(output_dict)
    return result

2.2 SequentialChain串联原理

多链串行核心机制
class SequentialChain(Chain):
    """多链串行执行的核心实现"""
    
    def _call(self, inputs: dict[str, str], run_manager=None) -> dict[str, str]:
        """串行执行的核心逻辑"""
        known_values = inputs.copy()  # 累积所有已知变量
        
        for i, chain in enumerate(self.chains):
            # 为每个链创建子回调管理器
            callbacks = run_manager.get_child() if run_manager else None
            
            # 执行当前链,只返回新产生的输出
            outputs = chain(known_values, return_only_outputs=True, callbacks=callbacks)
            
            # 将新输出合并到已知变量中
            known_values.update(outputs)
        
        # 根据配置返回指定的输出变量
        return {k: known_values[k] for k in self.output_variables}
输入输出映射验证
@model_validator(mode="before")
@classmethod
def validate_chains(cls, values: dict) -> Any:
    """链间依赖关系验证"""
    chains = values["chains"]
    input_variables = values["input_variables"]
    
    # 初始化已知变量集合
    known_variables = set(input_variables)
    
    # 如果有内存组件,添加内存变量
    if values.get("memory"):
        memory_keys = values["memory"].memory_variables
        known_variables.update(memory_keys)
    
    # 逐个验证每个链的输入输出
    for chain in chains:
        # 检查当前链的输入是否都能被满足
        missing_vars = set(chain.input_keys).difference(known_variables)
        if missing_vars:
            raise ValueError(f"Missing required input keys: {missing_vars}")
        
        # 检查输出是否与已知变量冲突
        overlapping_keys = known_variables.intersection(chain.output_keys)
        if overlapping_keys:
            raise ValueError(f"Chain returned keys that already exist: {overlapping_keys}")
        
        # 将当前链的输出添加到已知变量中
        known_variables.update(chain.output_keys)
    
    return values

2.3 数据流转逻辑示例

# 示例:文本分析 -> 情感判断 -> 回复生成
def demonstrate_data_flow():
    """演示SequentialChain中的数据流转"""
    
    # 输入: {"user_text": "我今天很开心"}
    initial_input = {"user_text": "我今天很开心"}
    
    # Step 1: 文本分析链
    # 输入: {"user_text": "我今天很开心"}
    # 输出: {"analysis": "积极情绪表达"}
    
    # Step 2: 情感判断链  
    # 输入: {"user_text": "我今天很开心", "analysis": "积极情绪表达"}
    # 输出: {"sentiment": "positive", "confidence": 0.95}
    
    # Step 3: 回复生成链
    # 输入: {"user_text": "我今天很开心", "analysis": "积极情绪表达", 
    #       "sentiment": "positive", "confidence": 0.95}
    # 输出: {"response": "很高兴听到你今天心情不错!"}
    
    # 最终输出: {"response": "很高兴听到你今天心情不错!"}
---

## 3. 代码实践

### 3.1 基础实践1:LLMChain核心用法

#### 依赖安装
```bash
# 安装核心依赖
pip install langchain langchain-openai langchain-core

# 或使用conda
conda install -c conda-forge langchain
完整可运行代码
import os
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
from langchain_openai import OpenAI
from langchain_core.output_parsers import StrOutputParser

# 设置API密钥
os.environ["OPENAI_API_KEY"] = "your-api-key-here"

def basic_llm_chain_example():
    """LLMChain基础用法示例"""
    
    # 1. 创建提示词模板
    prompt_template = PromptTemplate(
        input_variables=["product", "audience"],
        template="""
        为{product}写一个面向{audience}的营销文案。
        
        要求:
        - 突出产品特点
        - 符合目标受众特征
        - 语言简洁有力
        
        营销文案:
        """
    )
    
    # 2. 初始化LLM
    llm = OpenAI(
        temperature=0.7,  # 控制创造性
        max_tokens=200    # 限制输出长度
    )
    
    # 3. 创建LLMChain
    marketing_chain = LLMChain(
        llm=llm,
        prompt=prompt_template,
        output_parser=StrOutputParser(),  # 字符串输出解析器
        verbose=True  # 显示详细执行过程
    )
    
    # 4. 执行链
    result = marketing_chain.invoke({
        "product": "智能手表",
        "audience": "健身爱好者"
    })
    
    print("生成的营销文案:")
    print(result["text"])
    
    return marketing_chain

def advanced_llm_chain_features():
    """LLMChain高级特性演示"""
    
    # 自定义输出解析器
    class MarketingOutputParser:
        def parse(self, text: str) -> dict:
            """解析营销文案为结构化数据"""
            lines = text.strip().split('\n')
            return {
                "headline": lines[0] if lines else "",
                "body": '\n'.join(lines[1:]) if len(lines) > 1 else "",
                "word_count": len(text.split())
            }
    
    prompt = PromptTemplate(
        input_variables=["topic"],
        template="写一个关于{topic}的简短介绍,包含标题和正文:"
    )
    
    chain = LLMChain(
        llm=OpenAI(temperature=0.5),
        prompt=prompt,
        output_parser=MarketingOutputParser()
    )
    
    # 批量处理
    topics = ["人工智能", "区块链技术", "量子计算"]
    results = chain.apply([{"topic": topic} for topic in topics])
    
    for i, result in enumerate(results):
        print(f"\n主题 {i+1}: {topics[i]}")
        print(f"标题: {result['headline']}")
        print(f"字数: {result['word_count']}")

# 异步执行示例
async def async_llm_chain_example():
    """异步LLMChain使用示例"""
    
    prompt = PromptTemplate(
        input_variables=["question"],
        template="请简洁回答:{question}"
    )
    
    chain = LLMChain(
        llm=OpenAI(),
        prompt=prompt
    )
    
    # 异步执行
    result = await chain.ainvoke({"question": "什么是机器学习?"})
    print("异步执行结果:", result["text"])

if __name__ == "__main__":
    # 运行基础示例
    basic_llm_chain_example()
    
    # 运行高级特性示例
    advanced_llm_chain_features()
    
    # 运行异步示例(需要在异步环境中)
    import asyncio
    asyncio.run(async_llm_chain_example())

3.2 基础实践2:SequentialChain多链串联

SimpleSequentialChain vs SequentialChain对比
from langchain.chains import SequentialChain, SimpleSequentialChain
from langchain_core.prompts import PromptTemplate
from langchain_openai import OpenAI

def simple_vs_full_sequential_comparison():
    """对比SimpleSequentialChain与SequentialChain"""
    
    llm = OpenAI(temperature=0.7)
    
    # === SimpleSequentialChain示例 ===
    # 限制:每个链只能有一个输入和一个输出
    
    # 第一个链:生成故事大纲
    outline_prompt = PromptTemplate(
        input_variables=["topic"],
        template="为'{topic}'写一个简短的故事大纲:"
    )
    outline_chain = LLMChain(llm=llm, prompt=outline_prompt)
    
    # 第二个链:扩展故事
    story_prompt = PromptTemplate(
        input_variables=["outline"],
        template="基于以下大纲,写一个完整的短故事:\n{outline}"
    )
    story_chain = LLMChain(llm=llm, prompt=story_prompt)
    
    # 创建SimpleSequentialChain
    simple_chain = SimpleSequentialChain(
        chains=[outline_chain, story_chain],
        verbose=True
    )
    
    # 执行SimpleSequentialChain
    simple_result = simple_chain.invoke({"input": "太空探险"})
    print("SimpleSequentialChain结果:")
    print(simple_result["output"])
    
    # === SequentialChain示例 ===
    # 优势:支持多输入多输出,更灵活的数据流转
    
    # 第一个链:分析主题
    analysis_prompt = PromptTemplate(
        input_variables=["topic", "genre"],
        template="分析'{topic}'作为{genre}类型故事的要素:\n主要冲突:\n角色设定:\n背景设置:"
    )
    analysis_chain = LLMChain(
        llm=llm, 
        prompt=analysis_prompt,
        output_key="analysis"  # 指定输出键名
    )
    
    # 第二个链:创建角色
    character_prompt = PromptTemplate(
        input_variables=["topic", "analysis"],
        template="基于主题'{topic}'和分析'{analysis}',创建主要角色:\n姓名:\n性格:\n背景:"
    )
    character_chain = LLMChain(
        llm=llm,
        prompt=character_prompt,
        output_key="characters"
    )
    
    # 第三个链:生成完整故事
    full_story_prompt = PromptTemplate(
        input_variables=["topic", "analysis", "characters"],
        template="""
        基于以下信息创作完整故事:
        主题:{topic}
        分析:{analysis}
        角色:{characters}
        
        故事:
        """
    )
    full_story_chain = LLMChain(
        llm=llm,
        prompt=full_story_prompt,
        output_key="story"
    )
    
    # 创建SequentialChain
    full_chain = SequentialChain(
        chains=[analysis_chain, character_chain, full_story_chain],
        input_variables=["topic", "genre"],
        output_variables=["story"],  # 只返回最终故事
        verbose=True
    )
    
    # 执行SequentialChain
    full_result = full_chain.invoke({
        "topic": "太空探险",
        "genre": "科幻"
    })
    
    print("\nSequentialChain结果:")
    print(full_result["story"])

def complex_sequential_chain_example():
    """复杂SequentialChain实际应用示例"""
    
    llm = OpenAI(temperature=0.3)
    
    # 链1:文本预处理
    preprocess_prompt = PromptTemplate(
        input_variables=["raw_text"],
        template="""
        对以下文本进行预处理,提取关键信息:
        原文:{raw_text}
        
        请提取:
        1. 主要主题
        2. 关键实体
        3. 情感倾向
        
        提取结果:
        """
    )
    preprocess_chain = LLMChain(
        llm=llm,
        prompt=preprocess_prompt,
        output_key="processed_info"
    )
    
    # 链2:内容分析
    analysis_prompt = PromptTemplate(
        input_variables=["raw_text", "processed_info"],
        template="""
        基于原文和预处理信息进行深度分析:
        原文:{raw_text}
        预处理信息:{processed_info}
        
        分析维度:
        1. 内容质量评分(1-10)
        2. 目标受众
        3. 改进建议
        
        分析结果:
        """
    )
    analysis_chain = LLMChain(
        llm=llm,
        prompt=analysis_prompt,
        output_key="analysis_result"
    )
    
    # 链3:优化建议
    optimization_prompt = PromptTemplate(
        input_variables=["raw_text", "processed_info", "analysis_result"],
        template="""
        基于分析结果提供具体的优化建议:
        
        原文:{raw_text}
        预处理信息:{processed_info}
        分析结果:{analysis_result}
        
        请提供:
        1. 3个具体的改进点
        2. 重写建议
        3. SEO优化建议
        
        优化建议:
        """
    )
    optimization_chain = LLMChain(
        llm=llm,
        prompt=optimization_prompt,
        output_key="optimization_suggestions"
    )
    
    # 创建完整的内容分析链
    content_analysis_chain = SequentialChain(
        chains=[preprocess_chain, analysis_chain, optimization_chain],
        input_variables=["raw_text"],
        output_variables=["processed_info", "analysis_result", "optimization_suggestions"],
        return_all=True,  # 返回所有中间结果
        verbose=True
    )
    
    # 测试文本
    test_text = """
    人工智能技术正在快速发展,它改变了我们的生活方式。
    从智能手机到自动驾驶汽车,AI无处不在。
    但是我们也需要考虑AI带来的挑战和风险。
    """
    
    # 执行分析
    result = content_analysis_chain.invoke({"raw_text": test_text})
    
    print("=== 内容分析结果 ===")
    print(f"预处理信息:\n{result['processed_info']}\n")
    print(f"分析结果:\n{result['analysis_result']}\n")
    print(f"优化建议:\n{result['optimization_suggestions']}")

if __name__ == "__main__":
    # 运行对比示例
    simple_vs_full_sequential_comparison()
    
    print("\n" + "="*50 + "\n")
    
    # 运行复杂示例
    complex_sequential_chain_example()

3.3 进阶实践:基础链与Runnable的结合

from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.output_parsers import JsonOutputParser
from langchain.chains import LLMChain

def chain_runnable_integration():
    """基础链与Runnable接口的集成使用"""
    
    llm = OpenAI(temperature=0.5)
    
    # 1. 传统LLMChain方式
    traditional_prompt = PromptTemplate(
        input_variables=["question"],
        template="回答问题:{question}"
    )
    traditional_chain = LLMChain(llm=llm, prompt=traditional_prompt)
    
    # 2. 现代Runnable方式(推荐)
    modern_prompt = PromptTemplate(
        input_variables=["question"],
        template="回答问题:{question}"
    )
    modern_chain = modern_prompt | llm | StrOutputParser()
    
    # 3. 混合使用:LLMChain + Runnable组合
    
    # 创建并行处理分支
    parallel_analysis = RunnableParallel({
        "sentiment": LLMChain(
            llm=llm,
            prompt=PromptTemplate(
                input_variables=["text"],
                template="分析情感:{text}\n情感:"
            )
        ),
        "keywords": LLMChain(
            llm=llm,
            prompt=PromptTemplate(
                input_variables=["text"],
                template="提取关键词:{text}\n关键词:"
            )
        ),
        "summary": LLMChain(
            llm=llm,
            prompt=PromptTemplate(
                input_variables=["text"],
                template="总结内容:{text}\n总结:"
            )
        )
    })
    
    # 创建综合分析链
    synthesis_chain = LLMChain(
        llm=llm,
        prompt=PromptTemplate(
            input_variables=["sentiment", "keywords", "summary", "original_text"],
            template="""
            基于以下分析结果,生成综合报告:
            
            原文:{original_text}
            情感分析:{sentiment}
            关键词:{keywords}
            内容总结:{summary}
            
            综合报告:
            """
        )
    )
    
    # 组合完整流程
    complete_pipeline = (
        RunnablePassthrough.assign(
            analysis=parallel_analysis
        )
        | RunnablePassthrough.assign(
            report=lambda x: synthesis_chain.invoke({
                "sentiment": x["analysis"]["sentiment"]["text"],
                "keywords": x["analysis"]["keywords"]["text"],
                "summary": x["analysis"]["summary"]["text"],
                "original_text": x["text"]
            })["text"]
        )
    )
    
    # 测试流程
    test_input = {
        "text": "今天的会议非常成功,我们讨论了新产品的发布计划,团队士气很高。"
    }
    
    result = complete_pipeline.invoke(test_input)
    
    print("=== 综合分析结果 ===")
    print(f"原文:{result['text']}")
    print(f"情感分析:{result['analysis']['sentiment']['text']}")
    print(f"关键词:{result['analysis']['keywords']['text']}")
    print(f"内容总结:{result['analysis']['summary']['text']}")
    print(f"综合报告:{result['report']}")

def memory_enabled_sequential_chain():
    """带内存的SequentialChain示例"""
    
    from langchain.memory import ConversationBufferMemory
    
    llm = OpenAI(temperature=0.7)
    
    # 创建对话内存
    memory = ConversationBufferMemory(
        memory_key="chat_history",
        input_key="user_input",
        output_key="response"
    )
    
    # 上下文理解链
    context_chain = LLMChain(
        llm=llm,
        prompt=PromptTemplate(
            input_variables=["user_input", "chat_history"],
            template="""
            基于对话历史理解用户意图:
            
            对话历史:{chat_history}
            用户输入:{user_input}
            
            用户意图分析:
            """
        ),
        output_key="intent_analysis"
    )
    
    # 回复生成链
    response_chain = LLMChain(
        llm=llm,
        prompt=PromptTemplate(
            input_variables=["user_input", "chat_history", "intent_analysis"],
            template="""
            基于对话历史和意图分析生成回复:
            
            对话历史:{chat_history}
            用户输入:{user_input}
            意图分析:{intent_analysis}
            
            回复:
            """
        ),
        output_key="response"
    )
    
    # 创建带内存的顺序链
    conversation_chain = SequentialChain(
        chains=[context_chain, response_chain],
        input_variables=["user_input"],
        output_variables=["response"],
        memory=memory,
        verbose=True
    )
    
    # 模拟多轮对话
    conversations = [
        "你好,我想了解人工智能",
        "它有哪些应用领域?",
        "在医疗方面具体有什么用途?",
        "谢谢你的解答"
    ]
    
    print("=== 多轮对话示例 ===")
    for i, user_input in enumerate(conversations):
        print(f"\n轮次 {i+1}:")
        print(f"用户:{user_input}")
        
        result = conversation_chain.invoke({"user_input": user_input})
        print(f"AI:{result['response']}")

if __name__ == "__main__":
    # 运行集成示例
    chain_runnable_integration()
    
    print("\n" + "="*50 + "\n")
    
    # 运行内存示例
    memory_enabled_sequential_chain()

4. 设计考量

4.1 组件解耦价值

职责分离原则

LangChain基础链的设计遵循单一职责原则,每个组件都有明确的边界:

# 组件职责清晰分离
class ComponentSeparationDemo:
    """演示组件解耦的价值"""
    
    def __init__(self):
        # 提示词模板:负责输入格式化
        self.prompt_template = PromptTemplate(
            input_variables=["context", "question"],
            template="""
            上下文:{context}
            问题:{question}
            
            请基于上下文回答问题:
            """
        )
        
        # 语言模型:负责推理生成
        self.llm = OpenAI(temperature=0.3)
        
        # 输出解析器:负责结果处理
        self.output_parser = JsonOutputParser()
    
    def demonstrate_flexibility(self):
        """演示组件独立替换的灵活性"""
        
        # 场景1:更换不同的LLM
        gpt4_chain = LLMChain(
            prompt=self.prompt_template,
            llm=ChatOpenAI(model="gpt-4"),  # 替换为GPT-4
            output_parser=self.output_parser
        )
        
        # 场景2:更换不同的提示词策略
        cot_prompt = PromptTemplate(
            input_variables=["context", "question"],
            template="""
            上下文:{context}
            问题:{question}
            
            让我们一步步思考:
            1. 首先分析问题的关键点
            2. 然后在上下文中寻找相关信息
            3. 最后得出结论
            
            回答:
            """
        )
        
        cot_chain = LLMChain(
            prompt=cot_prompt,  # 替换为思维链提示词
            llm=self.llm,
            output_parser=StrOutputParser()  # 替换为字符串解析器
        )
        
        return gpt4_chain, cot_chain
配置驱动的灵活性
def configuration_driven_chains():
    """配置驱动的链构建"""
    
    # 配置文件定义
    chain_configs = {
        "qa_chain": {
            "prompt_template": "基于{context}回答{question}",
            "llm_config": {"temperature": 0.1, "max_tokens": 200},
            "output_format": "string"
        },
        "creative_chain": {
            "prompt_template": "创造性地扩展这个想法:{idea}",
            "llm_config": {"temperature": 0.9, "max_tokens": 500},
            "output_format": "json"
        },
        "analysis_chain": {
            "prompt_template": "分析{data}的趋势和模式",
            "llm_config": {"temperature": 0.3, "max_tokens": 300},
            "output_format": "structured"
        }
    }
    
    def build_chain_from_config(config_name: str) -> LLMChain:
        """根据配置构建链"""
        config = chain_configs[config_name]
        
        # 动态创建提示词模板
        prompt = PromptTemplate.from_template(config["prompt_template"])
        
        # 动态配置LLM
        llm = OpenAI(**config["llm_config"])
        
        # 动态选择输出解析器
        parser_map = {
            "string": StrOutputParser(),
            "json": JsonOutputParser(),
            "structured": PydanticOutputParser(pydantic_object=AnalysisResult)
        }
        output_parser = parser_map[config["output_format"]]
        
        return LLMChain(
            prompt=prompt,
            llm=llm,
            output_parser=output_parser
        )
    
    # 根据需求动态构建不同的链
    qa_chain = build_chain_from_config("qa_chain")
    creative_chain = build_chain_from_config("creative_chain")
    analysis_chain = build_chain_from_config("analysis_chain")
    
    return qa_chain, creative_chain, analysis_chain

4.2 流程复用性

模板化流程设计
class ReusableChainTemplates:
    """可复用的链模板设计"""
    
    @staticmethod
    def create_analysis_template(analysis_type: str) -> SequentialChain:
        """创建通用分析流程模板"""
        
        # 数据预处理链
        preprocess_chain = LLMChain(
            llm=OpenAI(temperature=0.1),
            prompt=PromptTemplate(
                input_variables=["raw_data"],
                template=f"对以下数据进行{analysis_type}分析的预处理:{{raw_data}}"
            ),
            output_key="preprocessed_data"
        )
        
        # 核心分析链
        analysis_chain = LLMChain(
            llm=OpenAI(temperature=0.3),
            prompt=PromptTemplate(
                input_variables=["raw_data", "preprocessed_data"],
                template=f"""
                基于预处理结果进行{analysis_type}分析:
                原始数据:{{raw_data}}
                预处理结果:{{preprocessed_data}}
                
                分析结果:
                """
            ),
            output_key="analysis_result"
        )
        
        # 结论生成链
        conclusion_chain = LLMChain(
            llm=OpenAI(temperature=0.2),
            prompt=PromptTemplate(
                input_variables=["raw_data", "analysis_result"],
                template=f"""
                基于{analysis_type}分析结果生成结论和建议:
                原始数据:{{raw_data}}
                分析结果:{{analysis_result}}
                
                结论和建议:
                """
            ),
            output_key="conclusion"
        )
        
        return SequentialChain(
            chains=[preprocess_chain, analysis_chain, conclusion_chain],
            input_variables=["raw_data"],
            output_variables=["conclusion"],
            verbose=True
        )

4.3 与其他组件协同

内存系统集成
def memory_integration_patterns():
    """内存系统集成模式"""
    
    from langchain.memory import (
        ConversationBufferMemory,
        ConversationSummaryMemory,
        ConversationBufferWindowMemory
    )
    
    # 模式1:缓冲内存 - 保存完整对话历史
    buffer_memory = ConversationBufferMemory(
        memory_key="chat_history",
        input_key="user_input",
        output_key="ai_response"
    )
    
    # 模式2:摘要内存 - 压缩长对话
    summary_memory = ConversationSummaryMemory(
        llm=OpenAI(temperature=0),
        memory_key="chat_summary",
        input_key="user_input",
        output_key="ai_response"
    )
    
    # 模式3:滑动窗口内存 - 保持最近N轮对话
    window_memory = ConversationBufferWindowMemory(
        k=5,  # 保持最近5轮对话
        memory_key="recent_chat",
        input_key="user_input",
        output_key="ai_response"
    )
    
    # 创建带不同内存的链
    def create_memory_aware_chain(memory_type: str):
        memory_map = {
            "buffer": buffer_memory,
            "summary": summary_memory,
            "window": window_memory
        }
        
        memory = memory_map[memory_type]
        memory_key = memory.memory_key
        
        chain = LLMChain(
            llm=OpenAI(temperature=0.7),
            prompt=PromptTemplate(
                input_variables=["user_input", memory_key],
                template=f"""
                基于对话历史回复用户:
                
                对话历史:{{{memory_key}}}
                用户输入:{{user_input}}
                
                回复:
                """
            ),
            memory=memory,
            verbose=True
        )
        
        return chain
    
    return create_memory_aware_chain

4.4 开发者体验

调试和可观测性
def debugging_and_observability():
    """调试和可观测性功能"""
    
    from langchain.callbacks import StdOutCallbackHandler
    from langchain_core.callbacks import BaseCallbackHandler
    
    class CustomCallbackHandler(BaseCallbackHandler):
        """自定义回调处理器"""
        
        def on_chain_start(self, serialized, inputs, **kwargs):
            print(f"🚀 链开始执行: {inputs}")
        
        def on_chain_end(self, outputs, **kwargs):
            print(f"✅ 链执行完成: {outputs}")
        
        def on_llm_start(self, serialized, prompts, **kwargs):
            print(f"🤖 LLM开始推理,提示词数量: {len(prompts)}")
        
        def on_llm_end(self, response, **kwargs):
            print(f"🎯 LLM推理完成,生成数量: {len(response.generations)}")
    
    # 创建带回调的链
    chain_with_callbacks = LLMChain(
        llm=OpenAI(temperature=0.5),
        prompt=PromptTemplate(
            input_variables=["question"],
            template="回答问题:{question}"
        ),
        callbacks=[CustomCallbackHandler(), StdOutCallbackHandler()],
        verbose=True
    )
    
    # 测试回调效果
    result = chain_with_callbacks.invoke({"question": "什么是LangChain?"})
    return result

5. 替代方案与优化空间

5.1 替代实现方案

方案1:纯Runnable实现
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.output_parsers import StrOutputParser

def pure_runnable_alternative():
    """使用纯Runnable接口替代基础链"""
    
    # 替代LLMChain的现代实现
    def create_modern_llm_chain():
        prompt = PromptTemplate(
            input_variables=["topic"],
            template="写一篇关于{topic}的文章"
        )
        
        # 传统LLMChain方式
        traditional = LLMChain(
            llm=OpenAI(),
            prompt=prompt
        )
        
        # 现代Runnable方式
        modern = prompt | OpenAI() | StrOutputParser()
        
        return traditional, modern
    
    # 替代SequentialChain的现代实现
    def create_modern_sequential_chain():
        # 步骤1:主题分析
        topic_analysis = (
            PromptTemplate.from_template("分析主题:{topic}")
            | OpenAI()
            | StrOutputParser()
        )
        
        # 步骤2:内容生成
        content_generation = (
            PromptTemplate.from_template(
                "基于分析'{analysis}',为主题'{topic}'生成内容"
            )
            | OpenAI()
            | StrOutputParser()
        )
        
        # 组合成完整流程
        complete_pipeline = (
            RunnablePassthrough.assign(
                analysis=topic_analysis
            )
            | RunnablePassthrough.assign(
                content=content_generation
            )
        )
        
        return complete_pipeline
    
    return create_modern_llm_chain, create_modern_sequential_chain

# 方案2:自定义链实现
def custom_chain_implementation():
    """自定义链实现方案"""
    
    class CustomLLMChain:
        """自定义LLM链实现"""
        
        def __init__(self, llm, prompt_template, output_parser=None):
            self.llm = llm
            self.prompt_template = prompt_template
            self.output_parser = output_parser or StrOutputParser()
        
        def invoke(self, inputs: dict) -> dict:
            """执行链"""
            # 1. 格式化提示词
            prompt = self.prompt_template.format(**inputs)
            
            # 2. 调用LLM
            response = self.llm.invoke(prompt)
            
            # 3. 解析输出
            parsed_output = self.output_parser.parse(response)
            
            return {"output": parsed_output}
        
        async def ainvoke(self, inputs: dict) -> dict:
            """异步执行链"""
            prompt = self.prompt_template.format(**inputs)
            response = await self.llm.ainvoke(prompt)
            parsed_output = self.output_parser.parse(response)
            return {"output": parsed_output}
    
    class CustomSequentialChain:
        """自定义顺序链实现"""
        
        def __init__(self, chains: list, input_variables: list, output_variables: list):
            self.chains = chains
            self.input_variables = input_variables
            self.output_variables = output_variables
        
        def invoke(self, inputs: dict) -> dict:
            """执行顺序链"""
            current_data = inputs.copy()
            
            for chain in self.chains:
                # 执行当前链
                result = chain.invoke(current_data)
                
                # 合并结果到当前数据
                current_data.update(result)
            
            # 返回指定的输出变量
            return {k: current_data[k] for k in self.output_variables if k in current_data}
    
    return CustomLLMChain, CustomSequentialChain

5.2 性能优化方向

优化1:并行执行优化
import asyncio
from concurrent.futures import ThreadPoolExecutor

def parallel_execution_optimization():
    """并行执行优化"""
    
    class ParallelSequentialChain:
        """支持并行执行的顺序链"""
        
        def __init__(self, chain_groups: list, max_workers: int = 4):
            self.chain_groups = chain_groups  # 每组内的链可以并行执行
            self.max_workers = max_workers
        
        async def ainvoke(self, inputs: dict) -> dict:
            """异步并行执行"""
            current_data = inputs.copy()
            
            for group in self.chain_groups:
                if len(group) == 1:
                    # 单个链,直接执行
                    result = await group[0].ainvoke(current_data)
                    current_data.update(result)
                else:
                    # 多个链,并行执行
                    tasks = [chain.ainvoke(current_data) for chain in group]
                    results = await asyncio.gather(*tasks)
                    
                    # 合并所有结果
                    for result in results:
                        current_data.update(result)
            
            return current_data
        
        def invoke(self, inputs: dict) -> dict:
            """同步并行执行"""
            current_data = inputs.copy()
            
            for group in self.chain_groups:
                if len(group) == 1:
                    result = group[0].invoke(current_data)
                    current_data.update(result)
                else:
                    # 使用线程池并行执行
                    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                        futures = [
                            executor.submit(chain.invoke, current_data)
                            for chain in group
                        ]
                        results = [future.result() for future in futures]
                    
                    for result in results:
                        current_data.update(result)
            
            return current_data
    
    return ParallelSequentialChain

# 优化2:缓存机制
def caching_optimization():
    """缓存机制优化"""
    
    import hashlib
    import json
    from functools import wraps
    
    class CachedLLMChain:
        """带缓存的LLM链"""
        
        def __init__(self, llm, prompt, cache_ttl=3600):
            self.llm = llm
            self.prompt = prompt
            self.cache = {}
            self.cache_ttl = cache_ttl
        
        def _generate_cache_key(self, inputs: dict) -> str:
            """生成缓存键"""
            # 将输入序列化为JSON字符串,然后计算哈希
            input_str = json.dumps(inputs, sort_keys=True)
            return hashlib.md5(input_str.encode()).hexdigest()
        
        def invoke(self, inputs: dict) -> dict:
            """带缓存的执行"""
            cache_key = self._generate_cache_key(inputs)
            
            # 检查缓存
            if cache_key in self.cache:
                cached_result, timestamp = self.cache[cache_key]
                if time.time() - timestamp < self.cache_ttl:
                    print(f"🎯 缓存命中: {cache_key[:8]}...")
                    return cached_result
            
            # 执行链
            prompt_text = self.prompt.format(**inputs)
            response = self.llm.invoke(prompt_text)
            result = {"text": response}
            
            # 存储到缓存
            self.cache[cache_key] = (result, time.time())
            print(f"💾 结果已缓存: {cache_key[:8]}...")
            
            return result
    
    return CachedLLMChain

# 优化3:流式输出
def streaming_optimization():
    """流式输出优化"""
    
    class StreamingLLMChain:
        """支持流式输出的LLM链"""
        
        def __init__(self, llm, prompt):
            self.llm = llm
            self.prompt = prompt
        
        def stream(self, inputs: dict):
            """流式执行"""
            prompt_text = self.prompt.format(**inputs)
            
            # 使用流式LLM
            for chunk in self.llm.stream(prompt_text):
                yield chunk
        
        async def astream(self, inputs: dict):
            """异步流式执行"""
            prompt_text = self.prompt.format(**inputs)
            
            async for chunk in self.llm.astream(prompt_text):
                yield chunk
    
    return StreamingLLMChain

5.3 功能扩展优化

扩展1:条件分支链
def conditional_chain_extension():
    """条件分支链扩展"""
    
    class ConditionalChain:
        """支持条件分支的链"""
        
        def __init__(self, condition_func, true_chain, false_chain):
            self.condition_func = condition_func
            self.true_chain = true_chain
            self.false_chain = false_chain
        
        def invoke(self, inputs: dict) -> dict:
            """根据条件执行不同的链"""
            if self.condition_func(inputs):
                return self.true_chain.invoke(inputs)
            else:
                return self.false_chain.invoke(inputs)
    
    # 使用示例
    def create_sentiment_based_chain():
        """创建基于情感的条件链"""
        
        # 情感分析链
        sentiment_chain = LLMChain(
            llm=OpenAI(temperature=0.1),
            prompt=PromptTemplate.from_template("分析情感:{text}\n情感(positive/negative):")
        )
        
        # 积极回复链
        positive_chain = LLMChain(
            llm=OpenAI(temperature=0.7),
            prompt=PromptTemplate.from_template("对积极内容的回复:{text}")
        )
        
        # 消极回复链
        negative_chain = LLMChain(
            llm=OpenAI(temperature=0.3),
            prompt=PromptTemplate.from_template("对消极内容的安慰回复:{text}")
        )
        
        # 条件函数
        def is_positive_sentiment(inputs):
            sentiment_result = sentiment_chain.invoke(inputs)
            return "positive" in sentiment_result["text"].lower()
        
        # 创建条件链
        conditional_chain = ConditionalChain(
            condition_func=is_positive_sentiment,
            true_chain=positive_chain,
            false_chain=negative_chain
        )
        
        return conditional_chain
    
    return create_sentiment_based_chain

# 扩展2:重试机制
def retry_mechanism_extension():
    """重试机制扩展"""
    
    class RetryChain:
        """支持重试的链"""
        
        def __init__(self, base_chain, max_retries=3, retry_delay=1.0):
            self.base_chain = base_chain
            self.max_retries = max_retries
            self.retry_delay = retry_delay
        
        def invoke(self, inputs: dict) -> dict:
            """带重试的执行"""
            last_exception = None
            
            for attempt in range(self.max_retries + 1):
                try:
                    result = self.base_chain.invoke(inputs)
                    if attempt > 0:
                        print(f"✅ 重试成功,尝试次数: {attempt + 1}")
                    return result
                except Exception as e:
                    last_exception = e
                    if attempt < self.max_retries:
                        print(f"⚠️ 执行失败,{self.retry_delay}秒后重试... (尝试 {attempt + 1}/{self.max_retries + 1})")
                        time.sleep(self.retry_delay)
                    else:
                        print(f"❌ 所有重试都失败了")
            
            raise last_exception
    
    return RetryChain

总结

LangChain基础链(LLMChain与SequentialChain)作为AI应用开发的核心基础设施,通过标准化的组件封装和流程编排,极大地简化了复杂AI工作流的构建。

核心价值总结

  1. 最小功能单元:LLMChain提供了Prompt+LLM+Parser的标准化封装
  2. 流程编排工具:SequentialChain实现了多步骤AI处理的自动化串联
  3. 组件解耦设计:支持灵活的组件替换和配置驱动开发
  4. 丰富的集成能力:与内存、回调、异步等系统无缝集成

发展趋势

  • 现代化迁移:从传统Chain向Runnable接口的演进
  • 性能优化:并行执行、缓存机制、流式输出等优化方向
  • 功能扩展:条件分支、重试机制、智能路由等高级特性

基础链作为LangChain生态的基石组件,为构建复杂的AI应用提供了坚实的技术基础,是每个LangChain开发者必须掌握的核心技术。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐