玄同 765

大语言模型 (LLM) 开发工程师 | 中国传媒大学 · 数字媒体技术(智能交互与游戏设计)

CSDN · 个人主页 | GitHub · Follow


关于作者

  • 深耕领域:大语言模型开发 / RAG 知识库 / AI Agent 落地 / 模型微调
  • 技术栈:Python | RAG (LangChain / Dify + Milvus) | FastAPI + Docker
  • 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案

「让 AI 交互更智能,让技术落地更高效」
欢迎技术探讨与项目合作,解锁大模型与智能交互的无限可能!


摘要

LCEL(LangChain Expression Language)是 LangChain v1.0 引入的革命性特性,它提供了一种声明式、可组合的方式来构建 AI 应用链。本文将深入解析 LCEL 的核心概念、Runnable 接口、链式组合、并行执行、条件路由等高级特性,帮助开发者构建灵活、可维护的 AI 应用。


一、LCEL 概述

1.1 什么是 LCEL

LCEL(LangChain Expression Language)是一种声明式的链式编排语言,它让开发者可以用简洁的语法定义复杂的数据处理流程。

代码量大

简洁优雅

LCEL 方式

声明式定义

自动数据流转

内置错误处理

传统方式

手动调用组件

手动传递数据

手动处理错误

复杂度高

易于维护

1.2 LCEL 核心优势

优势 说明
声明式语法 用管道符串联组件,代码简洁
自动数据流转 组件间自动传递输入输出
统一接口 所有组件实现 Runnable 接口
流式支持 自动支持流式输出
异步原生 同步/异步 API 自动生成
可观测性 内置追踪和回调支持

1.3 Hello LCEL

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 定义组件
model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话")
parser = StrOutputParser()

# LCEL 链式组合
chain = prompt | model | parser

# 调用
result = chain.invoke({"topic": "程序员"})
print(result)

二、Runnable 接口

2.1 Runnable 协议

所有 LCEL 组件都实现 Runnable 接口,提供统一的调用方式:

实现组件

ChatPromptTemplate

ChatOpenAI

StrOutputParser

RunnableLambda

RunnableParallel

Runnable 接口

invoke
同步调用

ainvoke
异步调用

stream
流式输出

astream
异步流式

batch
批量调用

abatch
异步批量

2.2 核心方法

from langchain_core.runnables import Runnable
from typing import Protocol

class RunnableProtocol(Protocol):
    """Runnable 协议定义。"""
  
    # 同步调用
    def invoke(self, input: Any, config: RunnableConfig = None) -> Any: ...
  
    # 异步调用
    async def ainvoke(self, input: Any, config: RunnableConfig = None) -> Any: ...
  
    # 流式输出
    def stream(self, input: Any, config: RunnableConfig = None) -> Iterator[Any]: ...
  
    # 异步流式
    async def astream(self, input: Any, config: RunnableConfig = None) -> AsyncIterator[Any]: ...
  
    # 批量调用
    def batch(self, inputs: list[Any], config: RunnableConfig = None) -> list[Any]: ...
  
    # 异步批量
    async def abatch(self, inputs: list[Any], config: RunnableConfig = None) -> list[Any]: ...

2.3 使用示例

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("翻译成英文: {text}")
chain = prompt | model

# 1. 同步调用
result = chain.invoke({"text": "你好世界"})
print(result.content)

# 2. 异步调用
import asyncio
async def async_example():
    result = await chain.ainvoke({"text": "你好世界"})
    print(result.content)
asyncio.run(async_example())

# 3. 流式输出
for chunk in chain.stream({"text": "你好世界"}):
    print(chunk.content, end="", flush=True)

# 4. 批量调用
results = chain.batch([
    {"text": "你好"},
    {"text": "世界"},
    {"text": "Python"},
])
for r in results:
    print(r.content)

# 5. 异步流式
async def async_stream():
    async for chunk in chain.astream({"text": "你好世界"}):
        print(chunk.content, end="", flush=True)
asyncio.run(async_stream())

三、链式组合

3.1 管道操作符

LCEL 使用管道操作符串联组件,数据自动流转:

输入
topic: Python

Prompt
生成提示词

Model
调用 LLM

Parser
解析输出

输出
字符串结果

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 组件定义
prompt = ChatPromptTemplate.from_template(
    "用一句话解释 {concept},面向 {audience} 受众"
)
model = ChatOpenAI(model="gpt-4")
parser = StrOutputParser()

# 链式组合
chain = prompt | model | parser

# 调用
result = chain.invoke({
    "concept": "机器学习",
    "audience": "小学生"
})
print(result)

3.2 多步骤链

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4")

# 步骤1:生成大纲
outline_prompt = ChatPromptTemplate.from_template(
    "为关于 {topic} 的文章生成一个大纲"
)
outline_chain = outline_prompt | model | StrOutputParser()

# 步骤2:根据大纲写文章
article_prompt = ChatPromptTemplate.from_template(
    "根据以下大纲写一篇详细的文章:\n\n{outline}"
)
article_chain = article_prompt | model | StrOutputParser()

# 步骤3:总结文章
summary_prompt = ChatPromptTemplate.from_template(
    "用 3 句话总结以下文章:\n\n{article}"
)
summary_chain = summary_prompt | model | StrOutputParser()

# 组合成完整链
full_chain = (
    {"outline": outline_chain}
    | article_chain
    | {"summary": summary_chain}
)

# 调用
result = full_chain.invoke({"topic": "人工智能的未来"})
print(result["summary"])

3.3 RunnableLambda - 自定义函数

from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 自定义处理函数
def to_uppercase(text: str) -> str:
    return text.upper()

def add_prefix(text: str) -> str:
    return f"【AI 回答】{text}"

def count_words(text: str) -> dict:
    return {
        "text": text,
        "word_count": len(text.split()),
        "char_count": len(text)
    }

model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("用 50 字解释 {concept}")

# 使用 RunnableLambda 包装函数
chain = (
    prompt
    | model
    | StrOutputParser()
    | RunnableLambda(to_uppercase)
    | RunnableLambda(add_prefix)
    | RunnableLambda(count_words)
)

result = chain.invoke({"concept": "量子计算"})
print(result)
# {'text': '【AI 回答】量子计算是利用量子力学原理进行信息处理的技术...', 'word_count': 45, 'char_count': 150}

四、RunnableParallel - 并行执行

4.1 并行执行多个链

并行执行

输入
topic: Python

翻译链

总结链

关键词链

英文翻译

中文总结

关键词列表

合并结果

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel

model = ChatOpenAI(model="gpt-4")

# 定义多个并行链
translate_chain = (
    ChatPromptTemplate.from_template("将以下内容翻译成英文: {text}")
    | model
    | StrOutputParser()
)

summarize_chain = (
    ChatPromptTemplate.from_template("用一句话总结: {text}")
    | model
    | StrOutputParser()
)

keywords_chain = (
    ChatPromptTemplate.from_template("提取以下内容的关键词(逗号分隔): {text}")
    | model
    | StrOutputParser()
)

# 并行执行
parallel_chain = RunnableParallel(
    translation=translate_chain,
    summary=summarize_chain,
    keywords=keywords_chain,
)

# 调用
result = parallel_chain.invoke({
    "text": "人工智能是计算机科学的一个分支,致力于创建能够执行通常需要人类智能的任务的系统。"
})

print(result["translation"])  # 英文翻译
print(result["summary"])      # 中文总结
print(result["keywords"])     # 关键词

4.2 字典语法糖

# RunnableParallel 的简写形式
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4")

# 使用字典语法(等同于 RunnableParallel)
chain = {
    "joke": (
        ChatPromptTemplate.from_template("讲一个关于 {topic} 的笑话")
        | model
        | StrOutputParser()
    ),
    "poem": (
        ChatPromptTemplate.from_template("写一首关于 {topic} 的诗")
        | model
        | StrOutputParser()
    ),
    "story": (
        ChatPromptTemplate.from_template("写一个关于 {topic} 的短故事")
        | model
        | StrOutputParser()
    ),
}

result = chain.invoke({"topic": "程序员"})
print(result["joke"])
print(result["poem"])
print(result["story"])

4.3 嵌套并行

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4")

# 嵌套并行结构
chain = {
    "analysis": {
        "sentiment": (
            ChatPromptTemplate.from_template("分析情感: {text}")
            | model | StrOutputParser()
        ),
        "topics": (
            ChatPromptTemplate.from_template("提取主题: {text}")
            | model | StrOutputParser()
        ),
    },
    "generation": {
        "summary": (
            ChatPromptTemplate.from_template("总结: {text}")
            | model | StrOutputParser()
        ),
        "questions": (
            ChatPromptTemplate.from_template("生成 3 个相关问题: {text}")
            | model | StrOutputParser()
        ),
    },
}

result = chain.invoke({"text": "今天天气真好,心情特别愉快!"})
print(result["analysis"]["sentiment"])
print(result["generation"]["summary"])

五、RunnablePassthrough - 数据传递

5.1 基本用法

from langchain_core.runnables import RunnablePassthrough

# RunnablePassthrough 直接传递输入
chain = RunnablePassthrough() | (lambda x: f"收到: {x}")

result = chain.invoke("你好")
print(result)  # 收到: 你好

5.2 保留原始输入

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

model = ChatOpenAI(model="gpt-4")

# 保留原始输入,同时添加处理结果
chain = (
    {
        "original": RunnablePassthrough(),  # 保留原始输入
        "analysis": (
            ChatPromptTemplate.from_template("分析这段文字: {text}")
            | model
            | StrOutputParser()
        ),
    }
)

result = chain.invoke({"text": "人工智能正在改变世界"})
print(result["original"])  # {"text": "人工智能正在改变世界"}
print(result["analysis"])  # 分析结果

5.3 RunnablePassthrough.assign

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

model = ChatOpenAI(model="gpt-4")

# assign 方法添加新字段
chain = (
    RunnablePassthrough.assign(
        # 添加 word_count 字段
        word_count=lambda x: len(x["text"].split()),
    )
    .assign(
        # 添加 summary 字段(依赖 word_count)
        summary=(
            ChatPromptTemplate.from_template(
                "用 {word_count} 个词左右总结: {text}"
            )
            | model
            | StrOutputParser()
        )
    )
)

result = chain.invoke({"text": "人工智能是计算机科学的一个分支"})
print(result)
# {'text': '人工智能是计算机科学的一个分支', 'word_count': 5, 'summary': '...'}

六、条件路由

6.1 RunnableBranch

技术问题

生活问题

其他

输入

分类判断

技术专家链

生活顾问链

通用助手链

响应

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableBranch

model = ChatOpenAI(model="gpt-4")

# 定义不同类型的处理链
tech_chain = (
    ChatPromptTemplate.from_template(
        "作为技术专家,回答以下问题: {question}"
    )
    | model | StrOutputParser()
)

life_chain = (
    ChatPromptTemplate.from_template(
        "作为生活顾问,回答以下问题: {question}"
    )
    | model | StrOutputParser()
)

general_chain = (
    ChatPromptTemplate.from_template(
        "回答以下问题: {question}"
    )
    | model | StrOutputParser()
)

# 分类函数
def classify_question(input_dict: dict) -> str:
    question = input_dict["question"].lower()
    tech_keywords = ["代码", "编程", "python", "算法", "数据库", "api"]
    life_keywords = ["健康", "美食", "旅游", "运动", "生活"]
  
    for keyword in tech_keywords:
        if keyword in question:
            return "tech"
    for keyword in life_keywords:
        if keyword in question:
            return "life"
    return "general"

# 条件路由
branch = RunnableBranch(
    (lambda x: classify_question(x) == "tech", tech_chain),
    (lambda x: classify_question(x) == "life", life_chain),
    general_chain,  # 默认分支
)

# 调用
print(branch.invoke({"question": "Python 如何实现单例模式?"}))
print(branch.invoke({"question": "如何保持健康的生活方式?"}))
print(branch.invoke({"question": "今天天气怎么样?"}))

6.2 使用 LLM 分类

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableBranch, RunnableLambda

model = ChatOpenAI(model="gpt-4")

# LLM 分类链
classify_prompt = ChatPromptTemplate.from_template(
    """判断以下问题属于哪个类别,只返回类别名称(tech/life/other):
  
问题: {question}

类别:"""
)
classify_chain = classify_prompt | model | StrOutputParser()

# 处理链
tech_chain = (
    ChatPromptTemplate.from_template("技术回答: {question}")
    | model | StrOutputParser()
)
life_chain = (
    ChatPromptTemplate.from_template("生活建议: {question}")
    | model | StrOutputParser()
)
other_chain = (
    ChatPromptTemplate.from_template("通用回答: {question}")
    | model | StrOutputParser()
)

# 完整路由链
def route_by_llm(input_dict: dict):
    category = classify_chain.invoke(input_dict).strip().lower()
    input_dict["category"] = category
    return input_dict

full_chain = (
    RunnableLambda(route_by_llm)
    | RunnableBranch(
        (lambda x: x["category"] == "tech", tech_chain),
        (lambda x: x["category"] == "life", life_chain),
        other_chain,
    )
)

result = full_chain.invoke({"question": "Python 如何读取文件?"})

七、重试与回退

7.1 RunnableRetry

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableRetry

model = ChatOpenAI(model="gpt-4")

chain = (
    ChatPromptTemplate.from_template("解释: {concept}")
    | model
    | StrOutputParser()
)

# 添加重试机制
retry_chain = RunnableRetry(
    bound=chain,
    max_attempt_count=3,  # 最大尝试次数
    wait_exponential_jitter=True,  # 指数退避
    max_wait=10,  # 最大等待时间
    retry_on_exception=lambda e: True,  # 所有异常都重试
)

result = retry_chain.invoke({"concept": "量子纠缠"})

7.2 with_fallbacks - 降级策略

from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 多个模型
primary_model = ChatOpenAI(model="gpt-4")
fallback_model = ChatAnthropic(model="claude-3-5-sonnet")
last_resort_model = ChatOpenAI(model="gpt-3.5-turbo")

prompt = ChatPromptTemplate.from_template("回答: {question}")
parser = StrOutputParser()

# 主链
primary_chain = prompt | primary_model | parser
fallback_chain = prompt | fallback_model | parser
last_resort_chain = prompt | last_resort_model | parser

# 配置降级
chain_with_fallbacks = primary_chain.with_fallbacks(
    [fallback_chain, last_resort_chain],
    exceptions_to_handle=(Exception,),  # 捕获所有异常
)

# 调用(自动降级)
result = chain_with_fallbacks.invoke({"question": "你好"})

八、配置与参数绑定

8.1 bind 方法

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4")

# 绑定模型参数
creative_model = model.bind(temperature=1.2, max_tokens=500)
precise_model = model.bind(temperature=0.1, max_tokens=1000)

prompt = ChatPromptTemplate.from_template("写一段关于 {topic} 的文字")

creative_chain = prompt | creative_model | StrOutputParser()
precise_chain = prompt | precise_model | StrOutputParser()

print(creative_chain.invoke({"topic": "春天"}))  # 创意输出
print(precise_chain.invoke({"topic": "春天"}))   # 精确输出

8.2 with_config 方法

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("解释: {concept}")
chain = prompt | model | StrOutputParser()

# 配置标签和元数据
configured_chain = chain.with_config(
    tags=["production", "v1"],
    metadata={"version": "1.0", "environment": "prod"},
    callbacks=[...],  # 回调处理器
    max_concurrency=5,  # 最大并发
)

result = configured_chain.invoke({"concept": "机器学习"})

8.3 configurable_fields - 动态配置

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4", temperature=0.7)
prompt = ChatPromptTemplate.from_template("解释: {concept}")

# 声明可配置字段
chain = (
    prompt
    | model.configurable_fields(
        model=...,
        temperature=...,
    )
    | StrOutputParser()
)

# 运行时配置
result = chain.invoke(
    {"concept": "机器学习"},
    config={
        "configurable": {
            "model": "gpt-4o",
            "temperature": 0.3,
        }
    }
)

九、流式处理详解

9.1 流式输出

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("写一篇关于 {topic} 的短文")
chain = prompt | model | StrOutputParser()

# 同步流式
for chunk in chain.stream({"topic": "人工智能"}):
    print(chunk, end="", flush=True)

# 异步流式
import asyncio

async def async_stream():
    async for chunk in chain.astream({"topic": "人工智能"}):
        print(chunk, end="", flush=True)

asyncio.run(async_stream())

9.2 流式事件

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("解释: {concept}")
chain = prompt | model | StrOutputParser()

async def stream_with_events():
    async for event in chain.astream_events(
        {"concept": "量子计算"},
        version="v2",
    ):
        # 事件类型
        if event["event"] == "on_chat_model_stream":
            print(event["data"]["chunk"].content, end="", flush=True)
        elif event["event"] == "on_prompt_start":
            print("\n[Prompt 开始]")
        elif event["event"] == "on_llm_start":
            print("\n[LLM 开始]")
        elif event["event"] == "on_parser_start":
            print("\n[解析开始]")

asyncio.run(stream_with_events())

9.3 流式输出解析

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel

class Person(BaseModel):
    name: str
    age: int
    hobbies: list[str]

model = ChatOpenAI(model="gpt-4")
parser = JsonOutputParser(pydantic_object=Person)

prompt = ChatPromptTemplate.from_template(
    "生成一个人物信息:\n{format_instructions}"
).partial(format_instructions=parser.get_format_instructions())

chain = prompt | model | parser

# 流式解析 JSON
async def stream_json():
    async for partial in chain.astream({}):
        print(partial)  # 逐步构建的 JSON 对象

asyncio.run(stream_json())

十、最佳实践

10.1 链的设计原则

原则 说明
单一职责 每个组件只做一件事
可测试性 组件可独立测试
可复用性 组件可在多个链中复用
可观测性 添加标签和元数据便于追踪

10.2 错误处理最佳实践

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda

def safe_parse(output: str) -> dict:
    """安全的 JSON 解析。"""
    import json
    try:
        return json.loads(output)
    except json.JSONDecodeError:
        return {"error": "解析失败", "raw": output}

model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("生成 JSON: {request}")

# 带错误处理的链
chain = (
    prompt
    | model
    | StrOutputParser()
    | RunnableLambda(safe_parse)
    | RunnableLambda(lambda x: x if "error" not in x else {"result": None})
)

10.3 性能优化

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 1. 使用批量处理
model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("翻译: {text}")
chain = prompt | model | StrOutputParser()

# 批量处理多个请求
texts = ["hello", "world", "python"]
results = chain.batch([{"text": t} for t in texts])

# 2. 并行执行独立任务
from langchain_core.runnables import RunnableParallel

parallel_chain = RunnableParallel(
    translation=chain,
    sentiment=(
        ChatPromptTemplate.from_template("情感分析: {text}")
        | model
        | StrOutputParser()
    ),
)

# 3. 使用缓存
from langchain_core.caches import InMemoryCache
from langchain_core.globals import set_llm_cache

set_llm_cache(InMemoryCache())

十一、总结

11.1 LCEL 核心概念

特性支持

流式输出

异步原生

重试机制

降级策略

动态配置

核心概念

Runnable 接口
统一调用方式

管道操作符
链式组合

RunnableParallel
并行执行

RunnablePassthrough
数据传递

RunnableBranch
条件路由

11.2 LCEL vs 传统方式

方面 传统方式 LCEL
代码量
可读性 一般
流式支持 需手动实现 自动支持
异步支持 需单独编写 自动生成
可组合性
可观测性 需手动添加 内置支持

参考资料

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐