LangChain的智能体教程
本文提供了一个从零基础到精通的LangChain智能体实战教程,通过10个由浅入深的代码案例,帮助读者掌握构建企业级AI智能体的核心技能。教程首先介绍环境配置和核心概念(Agent、Tools、State、LLM),然后通过案例演示如何创建反射型智能体、自定义工具(如油藏数据查询)、增加记忆功能,最后展示高阶的LangGraph复杂工作流和多智能体协作。案例涵盖Python实现、状态机设计、用户偏
这是一个从零基础到精通的LangChain智能体(Agent)实战教程。我们将通过由浅入深的10个代码案例,带你掌握构建企业级AI智能体的核心技能。
本教程假设你已具备基础Python知识。
🚀 第一部分:筑基(环境与核心概念)
在成为大师之前,我们先把地基打牢。
1. 环境配置
首先,安装必要的库。我们将使用 LangChain 的最新架构(LangGraph)和 OpenAI 模型。
pip install langchain langchain-openai langgraph python-dotenv tavily-python
创建一个 .env 文件:
OPENAI_API_KEY=你的sk-...
TAVILY_API_KEY=你的tvly-... (用于搜索,去 https://tavily.com 免费申请)
2. 核心概念图谱
在写代码前,理解这几个核心组件:
- Agent (智能体):大脑,负责思考和决策。
- Tools (工具):手脚,如搜索、计算器、数据库。
- State (状态):记忆,存储对话历史和中间变量。
- LLM (大模型):核心算力。
🏃 第二部分:初阶(你的第一个智能体)
案例 1:极简反射型智能体 (ReAct)
目标:创建一个能搜索网络和做数学题的助手。
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import END, StateGraph, MessagesState
from langgraph.prebuilt import ToolNode
# 1. 初始化
load_dotenv()
llm = ChatOpenAI(model="gpt-4o", temperature=0)
tools = [TavilySearchResults(max_results=2)]
# 2. 定义 Agent 节点
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个强大的AI助手。善用工具,回答要简洁精准。"),
MessagesPlaceholder(variable_name="messages"),
])
# 绑定工具
llm_with_tools = prompt | llm.bind_tools(tools)
# 3. 定义图逻辑
def agent_node(state: MessagesState):
return {"messages": [llm_with_tools.invoke(state)]}
# 4. 构建图
workflow = StateGraph(MessagesState)
tool_node = ToolNode(tools=tools)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
# 5. 定义边(逻辑流转)
workflow.set_entry_point("agent")
# 条件边:如果 LLM 说要调用工具,就去 tools 节点,否则结束
def should_continue(state: MessagesState):
last_message = state["messages"][-1]
return "tools" if last_message.tool_calls else END
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", "agent")
# 6. 编译运行
app = workflow.compile()
# 测试
result = app.invoke({"messages": [("user", "2025年成都大运会的吉祥物是什么?它的身高乘以2等于多少?")]})
print(result["messages"][-1].content)
大师心得:这就是 LangGraph 的精髓——状态机。你在控制一个循环:思考 -> 工具 -> 思考 -> 结束。
🛠️ 第三部分:中阶(自定义工具与记忆)
只会调用搜索不算大师,我们要让 Agent 为我们干私活。
案例 2:打造专属工具 (RAG + 数据库查询)
目标:假设你是石油工程师(用户洞察),让 Agent 查你的私人油藏数据。
from langchain_core.tools import tool
from typing import List, Dict
import random
# 模拟一个油藏数据库
reservoir_db = {
"X1-井": {"产量": 120, "含水率": 0.65, "压力": 2800},
"X2-井": {"产量": 85, "含水率": 0.82, "压力": 2100},
}
# 1. 自定义工具装饰器
@tool
def query_well_data(well_name: str) -> Dict:
"""查询指定油井的实时生产数据。"""
print(f"[系统正在查询数据库: {well_name}]")
return reservoir_db.get(well_name, {"错误": "未找到该井"})
@tool
def calculate_npv(rate: float, cost: float) -> float:
"""计算简单的经济净现值 (NPV)。"""
return rate * 1000 - cost
# 2. 重新定义 Tools 列表
custom_tools = [query_well_data, calculate_npv]
# --- 此处复用案例1的图构建代码,只需把 tools 换成 custom_tools ---
llm_with_custom_tools = llm.bind_tools(custom_tools)
def agent_node(state: MessagesState):
return {"messages": [llm_with_custom_tools.invoke(state)]}
# 构建图流程(同上,略)...
workflow = StateGraph(MessagesState)
tool_node = ToolNode(custom_tools)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
workflow.set_entry_point("agent")
workflow.add_conditional_edges("agent", should_continue) # 复用上面的 should_continue
workflow.add_edge("tools", "agent")
app = workflow.compile()
# 测试
result = app.invoke({"messages": [("user", "查看 X1-井 的数据,如果产量大于100,计算它在成本为50000时的NPV。")]})
print(result["messages"][-1].content)
案例 3:增加记忆 (Memory)
目标:让 Agent 记住你孩子的名字(用户洞察),不要每次都问。
LangGraph 的 State 本身就是记忆。我们只需扩展 State。
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage, HumanMessage
import operator
# 1. 定义复杂状态
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
user_preferences: dict # 新增:存储用户偏好
# 2. 修改节点
def agent_with_memory(state: AgentState):
# 把偏好注入到 System Prompt 中
system_msg = f"你是贴心助手。用户偏好: {state.get('user_preferences', {})}"
# 构造消息列表
full_messages = [("system", system_msg)] + state["messages"]
response = llm.invoke(full_messages)
return {"messages": [response]}
# 3. 初始化图(这次不带工具,纯聊天演示记忆)
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_with_memory)
workflow.set_entry_point("agent")
workflow.add_edge("agent", END)
app = workflow.compile()
# 4. 运行:传入初始状态(记忆)
initial_state = {
"messages": [HumanMessage(content="我今晚想给孩子讲个故事")],
"user_preferences": {"孩子名字": "小明", "喜欢的主题": "恐龙"}
}
result = app.invoke(initial_state)
print(result["messages"][-1].content)
# 输出将会是:“好的,小明爸爸/妈妈,今天我们讲一个霸王龙的故事...”
🧙 第四部分:高阶(LangGraph 复杂工作流)
这是区分普通玩家和大师的关键。我们不再是简单的“思考-行动”循环,而是编排一个工厂流水线。
案例 4:多智能体协作 (Multi-Agent)
场景:一个石油工程软件团队(用户洞察)。
- Boss (老板):分配任务。
- Coder (码农):写 Python 代码。
- Reviewer (审查):检查代码是否有 Bug。
from langchain_core.messages import SystemMessage, AIMessage
# 定义状态
class MultiAgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
next: str # 关键:决定下一个是谁
# 1. 定义各个 Agent 的 Prompt
BOSS_PROMPT = "你是项目经理。用户需求是:{messages[-1].content}。决定让谁干活:Coder 还是 直接回复用户。只回复 'Coder' 或 'User'。"
CODER_PROMPT = "你是资深Python油藏工程师。写一个计算达西定律的函数。只写代码,不要废话。"
REVIEWER_PROMPT = "你是代码审查员。检查代码是否有错误。如果有,指出;如果没有,只说 'PASS'。"
# 2. 定义节点
def boss_node(state: MultiAgentState):
response = llm.invoke([SystemMessage(content=BOSS_PROMPT.format(**state))] + state["messages"])
next_agent = "coder" if "Coder" in response.content else END
return {"next": next_agent, "messages": [response]}
def coder_node(state: MultiAgentState):
response = llm.invoke([SystemMessage(content=CODER_PROMPT)] + state["messages"])
return {"messages": [response], "next": "reviewer"}
def reviewer_node(state: MultiAgentState):
response = llm.invoke([SystemMessage(content=REVIEWER_PROMPT)] + state["messages"])
# 如果审查通过就结束,不通过就打回给 Coder (这里简化为结束)
return {"messages": [response], "next": END}
# 3. 构建图
workflow = StateGraph(MultiAgentState)
workflow.add_node("boss", boss_node)
workflow.add_node("coder", coder_node)
workflow.add_node("reviewer", reviewer_node)
workflow.set_entry_point("boss")
# 条件路由
def router(state):
return state["next"]
workflow.add_conditional_edges("boss", router)
workflow.add_edge("coder", "reviewer")
workflow.add_conditional_edges("reviewer", router)
app = workflow.compile()
# 测试
result = app.invoke({"messages": [HumanMessage(content="我需要一个达西定律的脚本")]})
for msg in result["messages"]:
print(f"--- {type(msg).__name__} ---\n{msg.content}\n")
🏆 第五部分:大师之路(工程化落地)
案例 5:持久化与可视化 (Checkpoint & Debug)
大师的代码不是一次性的,是可追溯的。
from langgraph.checkpoint.sqlite import SqliteSaver
# 使用 SQLite 保存状态(断点续传)
memory = SqliteSaver.from_conn_string(":memory:") # 生产环境换成文件路径 "checkpoints.sqlite"
# 编译时加上 checkpointer
app = workflow.compile(checkpointer=memory)
# 配置线程 ID (用于区分不同用户的会话)
config = {"configurable": {"thread_id": "user_1_session_abc"}}
# 运行
app.invoke({"messages": [("user", "你好")]}, config=config)
# ...... 过了一天 ......
# 再次运行,自动带上记忆
result = app.invoke({"messages": [("user", "我刚才说什么了?")]}, config=config)
print(result["messages"][-1].content) # 它会记得!
案例 6:生成数字分身视频脚本 (用户洞察)
结合用户之前的需求,我们用 Agent 生成一个分镜脚本。
@tool
def save_script(script_content: str, file_name: str) -> str:
"""将最终脚本保存到本地文件"""
with open(file_name, "w", encoding="utf-8") as f:
f.write(script_content)
return f"保存成功: {file_name}"
# 这里的 LLM 专门用于创作
script_llm = ChatOpenAI(model="gpt-4o", temperature=0.9)
# 构建一个专门写分镜的 Agent (流程同案例1,工具换成 save_script)
# ... 省略图构建代码 ...
# Prompt 要点:要求按 5秒 一个片段拆分(用户洞察)
📚 推荐学习路径
- 周 1-2:把上面的代码敲一遍,理解
StateGraph。 - 周 3:去 LangChain Hub 找别人写好的 Prompt,学会用
LangSmith调试。 - 周 4:尝试用
LangServe把你的 Agent 部署成 API。
最后的忠告
LangChain 只是脚手架,核心竞争力在于:
- Tool Design (工具设计):如何把你的业务逻辑封装成 LLM 能懂的函数。
- Prompt Engineering (提示词):如何让 LLM 不瞎搞。
- State Management (状态管理):如何在复杂流程中不丢失上下文。
好的,我们来构建一个深度结合石油工程场景的多智能体协作系统。
🎯 场景定义:油藏动态分析与决策工作组
我们将模拟一个油田的技术分析团队。当你输入一口井号时,系统会自动完成以下流程:
- 数据工程师 (Data Engineer):从数据库提取该井的历史生产数据。
- 油藏工程师 (Reservoir Engineer):分析递减曲线,预测未来产量。
- 经济评价师 (Economist):基于产量预测计算经济指标。
- 项目经理 (Manager):汇总以上信息,生成最终的中文决策报告。
💻 完整落地代码
import os
import json
from typing import TypedDict, Annotated, Sequence, Literal
from dotenv import load_dotenv
from langchain_core.tools import tool
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
import operator
# ================= 1. 初始化与环境配置 =================
load_dotenv()
# 建议使用 GPT-4o 以获得更稳定的函数调用和推理能力
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# ================= 2. 模拟石油工程数据库与工具 =================
# 模拟:真实场景中这里会连接你的 PI System、Oracle 或 Python 数据接口
class ReservoirDB:
@staticmethod
def get_production_history(well_name: str):
"""模拟返回过去12个月的产量数据 (吨/月)"""
data = {
"X3-井": [120, 118, 115, 112, 108, 105, 101, 98, 95, 92, 89, 86],
"Y1-井": [500, 480, 450, 300, 290, 280, 275, 270, 265, 260, 255, 250] # 假设这口井有过措施
}
return data.get(well_name, [])
# 定义专业工具
@tool
def fetch_well_data(well_name: str) -> str:
"""(数据工程师) 获取指定井的生产历史数据"""
print(f"[工具调用] 正在查询数据库: {well_name}")
history = ReservoirDB.get_production_history(well_name)
return json.dumps({"well": well_name, "monthly_production_tons": history}, ensure_ascii=False)
@tool
def analyze_decline(production_data_json: str) -> str:
"""(油藏工程师) 输入JSON数据,计算递减率并预测后12个月产量"""
print(f"[工具调用] 正在进行递减分析...")
data = json.loads(production_data_json)
history = data["monthly_production_tons"]
# 简化的算术递减计算 (真实场景用 Arps 公式)
if len(history) < 2:
return "数据不足"
# 计算月递减率
decline_rate = (history[0] - history[-1]) / history[0] / len(history)
last_prod = history[-1]
# 预测未来
forecast = []
for i in range(1, 13):
forecast.append(round(last_prod * (1 - decline_rate) ** i, 2))
return json.dumps({
"monthly_decline_rate": round(decline_rate * 100, 2),
"next_12_months_forecast": forecast,
"total_forecast_volume": round(sum(forecast), 2)
}, ensure_ascii=False)
@tool
def calculate_economics(forecast_json: str, oil_price: float = 5000) -> str:
"""(经济评价师) 基于预测产量和油价 (元/吨) 计算利润"""
print(f"[工具调用] 正在计算经济指标...")
forecast = json.loads(forecast_json)
volume = forecast["total_forecast_volume"]
revenue = volume * oil_price
# 简单假设成本为收入的 40%
profit = revenue * 0.6
return json.dumps({
"predicted_revenue": round(revenue, 2),
"predicted_profit": round(profit, 2),
"oil_price_assumption": oil_price
}, ensure_ascii=False)
# ================= 3. 定义状态与智能体 =================
# 定义状态图
class TeamState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
well_name: str # 井号
raw_data: str # 原始数据 (JSON)
engineering_analysis: str # 工程分析 (JSON)
economic_report: str # 经济报告 (JSON)
next_agent: Literal["data", "engineer", "economist", "manager", "end"] # 路由控制
# 定义各个 Agent 的 Prompt 模板
PROMPTS = {
"data": """你是油田数据工程师。你的任务是获取井的生产数据。
步骤:
1. 从 State 中获取 well_name。
2. 调用 fetch_well_data 工具。
3. 获取结果后,将结果存入 raw_data,并告诉下一个节点是 'engineer'。
只做你的事,不要越权分析。""",
"engineer": """你是资深油藏工程师。
步骤:
1. 读取 raw_data。
2. 调用 analyze_decline 工具进行递减分析。
3. 将结果存入 engineering_analysis,并告诉下一个节点是 'economist'。""",
"economist": """你是经济评价师。
步骤:
1. 读取 engineering_analysis。
2. 调用 calculate_economics 工具。
3. 将结果存入 economic_report,并告诉下一个节点是 'manager'。""",
"manager": """你是项目经理。请根据所有数据生成一份中文简报。
数据包括:raw_data, engineering_analysis, economic_report。
请用 Markdown 格式输出,包含:
1. 井号概况
2. 递减分析结论
3. 经济效益预测
4. 措施建议 (是否需要压裂/堵水?)
不要输出任何 JSON,只输出最终报告。"""
}
# ================= 4. 构建节点逻辑 =================
def create_agent_node(agent_name: str, tools_list):
"""通用节点生成器"""
llm_with_tools = llm.bind_tools(tools_list)
def node(state: TeamState):
system_msg = SystemMessage(content=PROMPTS[agent_name])
# 把当前 State 的关键信息拼接到消息里,让 LLM 知道上下文
context_msg = f"当前状态:\nWell: {state.get('well_name')}\nRawData: {state.get('raw_data')}\nEngAnalysis: {state.get('engineering_analysis')}"
response = llm_with_tools.invoke([system_msg, HumanMessage(content=context_msg)] + state["messages"])
# 更新状态
updates = {"messages": [response]}
# 简单的路由逻辑 (实际生产中建议用 LLM 做路由决策,这里简化)
if agent_name == "data":
updates["next_agent"] = "engineer"
elif agent_name == "engineer":
updates["next_agent"] = "economist"
elif agent_name == "economist":
updates["next_agent"] = "manager"
else:
updates["next_agent"] = "end"
return updates
return node
# 定义 Tool 执行节点
def tool_node(state: TeamState):
last_message = state["messages"][-1]
# 这里我们需要手动判断一下刚才是谁调用的工具,把结果存到对应的 State 字段里
# 为了简化教程,我们使用 LangGraph 预定义的 ToolNode 逻辑,但在这个特定场景下,
# 我们需要手动处理一下存储。
# 实际上,更优雅的做法是让 Agent 在返回的 AIMessage 里附带指令,
# 或者在 Tool 执行后通过解析 ToolMessage 来更新 State。
# 这里为了保持代码简洁,我们假设工具调用后,结果会被正确传递。
# 我们使用一个通用的 ToolNode
tools = [fetch_well_data, analyze_decline, calculate_economics]
tool_executor = ToolNode(tools)
tool_msgs = tool_executor.invoke(state)
# 关键逻辑:根据工具名称,将结果存入 State 的对应槽位
new_state = {"messages": tool_msgs["messages"]}
for msg in tool_msgs["messages"]:
if msg.name == "fetch_well_data":
new_state["raw_data"] = msg.content
elif msg.name == "analyze_decline":
new_state["engineering_analysis"] = msg.content
elif msg.name == "calculate_economics":
new_state["economic_report"] = msg.content
return new_state
# ================= 5. 组装图 =================
workflow = StateGraph(TeamState)
# 定义节点
workflow.add_node("data_agent", create_agent_node("data", [fetch_well_data]))
workflow.add_node("engineer_agent", create_agent_node("engineer", [analyze_decline]))
workflow.add_node("economist_agent", create_agent_node("economist", [calculate_economics]))
workflow.add_node("manager_agent", create_agent_node("manager", []))
workflow.add_node("tools", tool_node)
# 定义边
workflow.set_entry_point("data_agent")
# 通用路由函数:检查最后一条消息是否是工具调用
def should_use_tools(state):
last_msg = state["messages"][-1]
if hasattr(last_msg, "tool_calls") and len(last_msg.tool_calls) > 0:
return "tools"
else:
return state["next_agent"]
# 连接 Agent -> Tools -> Agent
workflow.add_conditional_edges("data_agent", should_use_tools, {"tools": "tools", "engineer": "engineer_agent"})
workflow.add_conditional_edges("engineer_agent", should_use_tools, {"tools": "tools", "economist": "economist_agent"})
workflow.add_conditional_edges("economist_agent", should_use_tools, {"tools": "tools", "manager": "manager_agent"})
# Tools 执行完后,回到刚才的 Agent (通过 state 里的 next_agent 或者消息历史判断,这里简化处理直接路由回逻辑)
# 实际上 ToolNode 执行完,通常需要回到触发它的 Agent。
# 为了简化这个教程,我们让 Tool 执行完后,强制进入下一个业务节点。
# 更严谨的做法是 Agent -> Tools -> Agent -> Next Agent。
# 这里我们做一个简化版的边:
workflow.add_edge("tools", "data_agent") # 这只是一个示例占位,实际生产中建议用更细致的状态机
# 修正逻辑:为了让这个教程能跑通,我们把流程线性化,不做太复杂的循环
# 重新定义边 (简化版):
# 1. Data Agent 调用工具 -> 2. Tools 执行 -> 3. Engineer Agent -> ...
# 我们重新构建一个更简单的线性图来保证效果:
# --- 重新开始构建简化但能跑通的图 ---
workflow = StateGraph(TeamState)
# 节点定义
def data_step(state):
print("🤖 数据工程师: 正在取数...")
data = fetch_well_data.invoke({"well_name": state["well_name"]})
return {"raw_data": data, "messages": [AIMessage(content="数据已获取")]}
def eng_step(state):
print("🔬 油藏工程师: 正在分析...")
analysis = analyze_decline.invoke({"production_data_json": state["raw_data"]})
return {"engineering_analysis": analysis, "messages": [AIMessage(content="分析完成")]}
def econ_step(state):
print("💰 经济评价师: 正在算账...")
econ = calculate_economics.invoke({"forecast_json": state["engineering_analysis"]})
return {"economic_report": econ, "messages": [AIMessage(content="经济评价完成")]}
def manager_step(state):
print("👔 项目经理: 正在写报告...")
context = f"""
井号: {state['well_name']}
数据: {state['raw_data']}
工程分析: {state['engineering_analysis']}
经济报告: {state['economic_report']}
"""
response = llm.invoke([SystemMessage(content=PROMPTS["manager"]), HumanMessage(content=context)])
return {"messages": [response]}
workflow.add_node("data", data_step)
workflow.add_node("eng", eng_step)
workflow.add_node("econ", econ_step)
workflow.add_node("manager", manager_step)
workflow.set_entry_point("data")
workflow.add_edge("data", "eng")
workflow.add_edge("eng", "econ")
workflow.add_edge("econ", "manager")
workflow.add_edge("manager", END)
app = workflow.compile()
# ================= 6. 执行测试 =================
print("🚀 启动油藏分析多智能体系统...")
final_state = app.invoke({"well_name": "X3-井", "messages": []})
print("\n" + "="*30)
print("📊 最终分析报告:")
print("="*30)
print(final_state["messages"][-1].content)
🔍 大师级进阶建议
-
连接真实数据源:
将ReservoirDB类替换为真实的 SQLAlchemy 或 PySpark 连接,直接读取你的油田生产数据库。 -
加入人工介入 (Human-in-the-loop):
在eng_step和manager_step之间,使用 LangGraph 的interrupt_before功能,让人类专家(也就是你)可以审核工程分析结果,确认后再继续算经济账。 -
结合数字分身:
最后一步不仅生成文本报告,再调用你的“数字分身”工具,自动生成一个讲解该报告的 5 分钟分镜视频脚本。
更多推荐



所有评论(0)