从RAG到Agent:构建工业自动化助手
Agent = 大脑(LLM)+ 工具箱(Tools)+ 记忆(Memory)+ 规划能力(Planning)│ AI Agent 架构图 ││ ││ 用户输入: "分析销售数据" ││ ↓ ││ │ LLM 大脑 │ ││ │(规划 + 决策) │ ││ ↓ ││ │ 选择工具执行 │ ││ ↓ ││ │ 工具箱(Tools) │ ││ │ ├─ SQL查询工具 │ ││ │ ├─ Python
从RAG到Agent:构建工业自动化助手
系列文章:工业AI实战(进阶篇)
上篇我们实现了基于RAG的工业问答系统,但它只能"被动回答"。本篇将带你构建一个能"主动行动"的工业Agent,让AI不仅能回答问题,还能执行任务、调用工具、生成报告。
📋 目录
🔄 前情回顾
在上篇文章《如何用LangChain构建工业问答系统》中,我们实现了:
- ✅ 基于RAG的知识检索与问答
- ✅ 工业文档的向量化存储
- ✅ Web界面交互
但存在局限:
- ❌ 只能回答问题,不能执行操作
- ❌ 需要人工拆解复杂任务
- ❌ 无法调用外部工具(数据库、API等)
- ❌ 不能生成结构化输出(报表、图表)
今天要实现的Agent能做什么?
用户:"分析上个月的钢材产量数据,生成报告并发送邮件"
传统RAG系统:
❌ "抱歉,我只能回答问题,不能执行这些操作"
Agent系统:
✅ 自动执行以下步骤:
1. 查询数据库获取产量数据
2. 调用数据分析工具计算趋势
3. 生成可视化图表
4. 撰写分析报告
5. 调用邮件API发送
6. 反馈:"报告已生成并发送至您的邮箱"
🤖 一、什么是AI Agent?
1.1 核心概念
Agent = 大脑(LLM)+ 工具箱(Tools)+ 记忆(Memory)+ 规划能力(Planning)
┌─────────────────────────────────────┐
│ AI Agent 架构图 │
├─────────────────────────────────────┤
│ │
│ 用户输入:"分析销售数据" │
│ ↓ │
│ ┌────────────────────┐ │
│ │ LLM 大脑 │ │
│ │ (规划 + 决策) │ │
│ └─────────┬──────────┘ │
│ ↓ │
│ ┌─────────────────────┐ │
│ │ 选择工具执行 │ │
│ └─────────┬───────────┘ │
│ ↓ │
│ ┌──────────────────────────┐ │
│ │ 工具箱 (Tools) │ │
│ │ ├─ SQL查询工具 │ │
│ │ ├─ Python代码执行 │ │
│ │ ├─ 可视化工具 │ │
│ │ ├─ 邮件发送工具 │ │
│ │ └─ 知识库检索 │ │
│ └──────────┬───────────────┘ │
│ ↓ │
│ ┌─────────────────────┐ │
│ │ 观察结果 + 反思 │ │
│ └─────────┬───────────┘ │
│ ↓ │
│ ┌─────────────────────┐ │
│ │ 继续下一步 or 完成 │ │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────┘
1.2 Agent vs 传统系统
| 维度 | 传统程序 | LLM应用 | Agent系统 |
|---|---|---|---|
| 决策方式 | 预设规则 | 提示词引导 | 自主规划 |
| 执行能力 | 固定流程 | 文本生成 | 工具调用 |
| 适应性 | 低 | 中 | 高 |
| 适用场景 | 确定性任务 | 知识问答 | 复杂多步任务 |
🏭 二、工业场景的Agent需求
2.1 典型应用场景
场景1:质量分析助手
需求:"分析Q355钢板的近期质量波动原因"
Agent执行流程:
1. 查询质量数据库(过去30天的检测数据)
2. 调用Python进行统计分析
3. 检索知识库(相关工艺规范)
4. 生成可视化图表
5. 撰写分析报告(含原因推断和改进建议)
场景2:设备维护规划
需求:"根据设备运行数据,制定下月维护计划"
Agent执行流程:
1. 读取设备监控数据
2. 识别异常设备
3. 查询维护手册(知识库)
4. 生成维护计划表(Excel)
5. 发送通知给相关人员
场景3:生产报表自动化
需求:"生成本周生产日报"
Agent执行流程:
1. 汇总各系统数据(产量、能耗、质量)
2. 计算KPI指标
3. 对比历史数据
4. 生成Word报告
5. 自动分发
2.2 需要哪些工具?
# 工业Agent的工具箱清单
TOOLS = {
"数据查询": [
"SQL数据库查询",
"时序数据库查询",
"Excel文件读取"
],
"数据分析": [
"Python代码执行",
"统计分析函数",
"机器学习模型调用"
],
"知识检索": [
"RAG知识库查询",
"规范标准检索",
"历史案例搜索"
],
"文档生成": [
"Word报告生成",
"Excel表格生成",
"PDF导出"
],
"可视化": [
"图表生成",
"仪表盘制作",
"3D模型展示"
],
"通信": [
"邮件发送",
"企业微信通知",
"短信告警"
]
}
🏗️ 三、系统架构设计
3.1 整体架构
┌────────────────────────────────────────────────┐
│ 用户界面层 │
│ (Streamlit / FastAPI / 企业微信) │
└────────────────┬───────────────────────────────┘
│
┌────────────────▼───────────────────────────────┐
│ Agent 编排层 (LangChain) │
│ ┌──────────────────────────────────────────┐ │
│ │ AgentExecutor │ │
│ │ ├─ ReAct框架 (推理+行动) │ │
│ │ ├─ 任务规划器 │ │
│ │ ├─ 工具选择器 │ │
│ │ └─ 执行监控器 │ │
│ └──────────────────────────────────────────┘ │
└────────────────┬───────────────────────────────┘
│
┌────────────────▼───────────────────────────────┐
│ 工具层 (Tools) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ SQL工具 │ │ Python │ │ 知识库 │ │
│ │ │ │ 执行器 │ │ 检索 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 文档生成 │ │ 可视化 │ │ 邮件 │ │
│ │ │ │ │ │ 发送 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└────────────────┬───────────────────────────────┘
│
┌────────────────▼───────────────────────────────┐
│ 数据/资源层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ MySQL │ │ 时序数据库│ │ 向量库 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────────────────────────────────┘
3.2 核心流程:ReAct框架
ReAct = Reasoning(推理)+ Acting(行动)
循环执行以下步骤:
1. Thought (思考)
→ LLM分析当前情况,决定下一步做什么
2. Action (行动)
→ 选择并调用工具
3. Observation (观察)
→ 接收工具返回结果
4. Thought (反思)
→ 评估结果,决定是否继续
5. 重复1-4,直到任务完成
示例执行轨迹:
用户: "分析上月Q355钢的质量数据"
Thought 1: 我需要先获取Q355钢的质量数据
Action 1: sql_query("SELECT * FROM quality_data WHERE material='Q355' AND date >= '2024-10-01'")
Observation 1: 返回123条记录
Thought 2: 数据已获取,需要进行统计分析
Action 2: python_execute("import pandas as pd; df = pd.read_json(data); print(df.describe())")
Observation 2: 平均值、标准差等统计信息
Thought 3: 发现碳含量波动较大,需要查询工艺规范
Action 3: knowledge_retrieval("Q355钢碳含量控制标准")
Observation 3: 标准要求碳含量≤0.20%,实际数据中有5%超标
Thought 4: 所有信息已收集,可以生成报告
Action 4: generate_report({"title": "Q355钢质量分析报告", "data": ..., "findings": ...})
Observation 4: 报告生成成功
Final Answer: 已完成分析,主要发现:碳含量超标5%,建议加强吹氧控制...
💻 四、核心功能实现
4.1 环境准备
# 安装依赖
pip install langchain openai sqlalchemy pandas matplotlib openpyxl python-docx
# 项目结构
industrial-agent/
├── tools/ # 工具定义
│ ├── sql_tool.py
│ ├── python_tool.py
│ ├── knowledge_tool.py
│ └── document_tool.py
├── agent/ # Agent核心
│ ├── industrial_agent.py
│ └── prompts.py
├── utils/ # 工具函数
│ ├── database.py
│ └── visualization.py
├── app.py # Web界面
└── config.py # 配置文件
4.2 工具定义
4.2.1 SQL查询工具
# tools/sql_tool.py
from langchain.tools import Tool
from langchain.utilities import SQLDatabase
from sqlalchemy import create_engine
import pandas as pd
class SQLQueryTool:
"""SQL数据库查询工具"""
def __init__(self, db_uri: str):
"""
Args:
db_uri: 数据库连接字符串,如 "mysql+pymysql://user:pass@localhost/db"
"""
self.engine = create_engine(db_uri)
self.db = SQLDatabase(self.engine)
def query(self, sql: str) -> str:
"""
执行SQL查询
Args:
sql: SQL查询语句
Returns:
查询结果的JSON字符串
"""
try:
# 执行查询
df = pd.read_sql(sql, self.engine)
# 格式化输出
result = {
"row_count": len(df),
"columns": df.columns.tolist(),
"data": df.to_dict(orient='records')[:10], # 只返回前10条
"summary": df.describe().to_dict() if len(df) > 0 else {}
}
return str(result)
except Exception as e:
return f"SQL执行错误: {str(e)}"
def get_table_info(self) -> str:
"""获取数据库表结构信息"""
return self.db.get_table_info()
def as_tool(self) -> Tool:
"""转换为LangChain Tool"""
return Tool(
name="sql_query",
func=self.query,
description="""
用于查询生产数据库。
输入:标准的SQL SELECT语句
输出:查询结果(JSON格式)
可用的表:
- production_data: 生产数据 (date, material, quantity, shift)
- quality_data: 质量数据 (date, material, carbon, sulfur, status)
- equipment_data: 设备数据 (device_id, timestamp, temperature, pressure)
示例:
SELECT * FROM quality_data WHERE material='Q355' AND date >= '2024-10-01'
"""
)
# 使用示例
if __name__ == "__main__":
sql_tool = SQLQueryTool("sqlite:///./industrial.db")
print(sql_tool.get_table_info())
4.2.2 Python代码执行工具
# tools/python_tool.py
from langchain.tools import Tool
import sys
from io import StringIO
import pandas as pd
import matplotlib.pyplot as plt
import json
class PythonExecutorTool:
"""Python代码执行工具(用于数据分析)"""
def __init__(self):
self.global_vars = {
'pd': pd,
'plt': plt,
'json': json
}
def execute(self, code: str, context_data: dict = None) -> str:
"""
执行Python代码
Args:
code: 要执行的Python代码
context_data: 上下文数据(如之前查询的结果)
Returns:
执行结果或错误信息
"""
try:
# 注入上下文数据
if context_data:
for key, value in context_data.items():
self.global_vars[key] = value
# 捕获标准输出
old_stdout = sys.stdout
sys.stdout = StringIO()
# 执行代码
exec(code, self.global_vars)
# 获取输出
output = sys.stdout.getvalue()
sys.stdout = old_stdout
return output if output else "代码执行成功(无输出)"
except Exception as e:
sys.stdout = old_stdout
return f"执行错误: {str(e)}"
def as_tool(self) -> Tool:
"""转换为LangChain Tool"""
return Tool(
name="python_execute",
func=self.execute,
description="""
用于执行Python代码进行数据分析和计算。
输入:Python代码字符串
输出:代码执行结果
可用的库:
- pandas (as pd): 数据处理
- matplotlib.pyplot (as plt): 可视化
- json: JSON处理
示例:
import pandas as pd
df = pd.DataFrame(data)
print(df.describe())
print(df['carbon'].mean())
"""
)
# 使用示例
if __name__ == "__main__":
executor = PythonExecutorTool()
test_code = """
data = {'A': [1, 2, 3], 'B': [4, 5, 6]}
df = pd.DataFrame(data)
print(df.mean())
"""
result = executor.execute(test_code)
print(result)
4.2.3 知识库检索工具
# tools/knowledge_tool.py
from langchain.tools import Tool
from vector_store import IndustrialVectorStore # 复用上一篇的代码
class KnowledgeRetrievalTool:
"""工业知识库检索工具"""
def __init__(self, vector_store_path: str = "./chroma_db"):
self.vector_store = IndustrialVectorStore(vector_store_path)
self.vector_store.load_existing()
def search(self, query: str, k: int = 3) -> str:
"""
搜索知识库
Args:
query: 查询问题
k: 返回文档数量
Returns:
相关文档内容
"""
try:
results = self.vector_store.similarity_search(query, k=k)
output = f"找到 {len(results)} 条相关知识:\n\n"
for i, doc in enumerate(results):
source = doc.metadata.get('source', '未知')
output += f"【知识 {i+1}】来源: {source}\n"
output += f"内容: {doc.page_content}\n\n"
return output
except Exception as e:
return f"检索错误: {str(e)}"
def as_tool(self) -> Tool:
"""转换为LangChain Tool"""
return Tool(
name="knowledge_search",
func=self.search,
description="""
用于搜索工业知识库(工艺规范、质量标准、设备手册等)。
输入:查询问题(自然语言)
输出:相关文档内容
适用场景:
- 查询技术标准(如"Q355钢的化学成分要求")
- 查询工艺规范(如"转炉炼钢温度控制")
- 查询设备操作(如"轧机常见故障处理")
示例:
Q355钢的碳含量标准是多少?
"""
)
4.2.4 文档生成工具
# tools/document_tool.py
from langchain.tools import Tool
from docx import Document
from docx.shared import Inches, Pt, RGBColor
from docx.enum.text import WD_ALIGN_PARAGRAPH
from datetime import datetime
import pandas as pd
class DocumentGeneratorTool:
"""Word文档生成工具"""
def __init__(self, output_dir: str = "./reports"):
self.output_dir = output_dir
import os
os.makedirs(output_dir, exist_ok=True)
def generate_report(self, content: dict) -> str:
"""
生成Word报告
Args:
content: 报告内容,格式:
{
"title": "报告标题",
"sections": [
{"heading": "章节标题", "content": "章节内容"},
{"heading": "数据表", "table": [[...], [...]]},
...
]
}
Returns:
生成的文件路径
"""
try:
doc = Document()
# 添加标题
title = doc.add_heading(content.get("title", "工业分析报告"), 0)
title.alignment = WD_ALIGN_PARAGRAPH.CENTER
# 添加生成时间
date_para = doc.add_paragraph(f"生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
date_para.alignment = WD_ALIGN_PARAGRAPH.RIGHT
doc.add_paragraph() # 空行
# 添加各章节
for section in content.get("sections", []):
# 章节标题
if "heading" in section:
doc.add_heading(section["heading"], level=1)
# 文本内容
if "content" in section:
doc.add_paragraph(section["content"])
# 表格
if "table" in section:
table_data = section["table"]
if table_data:
table = doc.add_table(rows=len(table_data), cols=len(table_data[0]))
table.style = 'Light Grid Accent 1'
for i, row in enumerate(table_data):
for j, cell_value in enumerate(row):
table.rows[i].cells[j].text = str(cell_value)
doc.add_paragraph() # 章节间空行
# 保存文件
filename = f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.docx"
filepath = f"{self.output_dir}/{filename}"
doc.save(filepath)
return f"报告已生成: {filepath}"
except Exception as e:
return f"生成报告失败: {str(e)}"
def as_tool(self) -> Tool:
"""转换为LangChain Tool"""
return Tool(
name="generate_word_report",
func=lambda x: self.generate_report(eval(x) if isinstance(x, str) else x),
description="""
用于生成Word格式的分析报告。
输入:包含报告内容的字典(JSON格式)
输出:生成的文件路径
输入格式示例:
{
"title": "Q355钢质量分析报告",
"sections": [
{
"heading": "一、数据概览",
"content": "本报告分析了2024年10月的Q355钢质量数据..."
},
{
"heading": "二、统计数据",
"table": [
["指标", "平均值", "标准差"],
["碳含量", "0.18%", "0.02%"],
["硫含量", "0.015%", "0.003%"]
]
},
{
"heading": "三、结论与建议",
"content": "1. 碳含量控制良好\n2. 建议加强硫含量监控"
}
]
}
"""
)
4.3 构建Agent
# agent/industrial_agent.py
from langchain.agents import initialize_agent, AgentType
from langchain.chat_models import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.callbacks import StreamingStdOutCallbackHandler
from typing import List
from langchain.tools import Tool
class IndustrialAgent:
"""工业自动化Agent"""
def __init__(
self,
tools: List[Tool],
model_name: str = "gpt-4-turbo-preview",
temperature: float = 0.3,
verbose: bool = True
):
"""
Args:
tools: 工具列表
model_name: 使用的模型
temperature: 生成温度
verbose: 是否输出详细执行过程
"""
# 初始化LLM
self.llm = ChatOpenAI(
model_name=model_name,
temperature=temperature,
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()]
)
# 初始化记忆
self.memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
# 初始化Agent
self.agent = initialize_agent(
tools=tools,
llm=self.llm,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
memory=self.memory,
verbose=verbose,
max_iterations=10, # 最大迭代次数
early_stopping_method="generate",
handle_parsing_errors=True
)
# 自定义系统提示词
self.agent.agent.llm_chain.prompt.messages[0].prompt.template = """
你是一位专业的工业AI助手,精通钢铁冶金、机械制造等工业领域。
你的能力:
1. 查询和分析生产数据
2. 检索工业知识库
3. 执行数据分析和计算
4. 生成专业报告
5. 进行可视化展示
工作流程:
1. 理解用户需求
2. 制定执行计划
3. 调用合适的工具
4. 分析工具返回结果
5. 必要时继续调用其他工具
6. 整合信息,给出完整答案
注意事项:
- 优先使用工具获取最新数据,不要依赖过时知识
- 数据分析要严谨,给出依据
- 涉及安全的问题要特别谨慎
- 如果工具返回错误,要尝试调整参数重试
- 生成报告时要结构清晰、专业规范
现在开始执行任务!
"""
def run(self, task: str) -> str:
"""
执行任务
Args:
task: 用户任务描述
Returns:
执行结果
"""
try:
result = self.agent.run(task)
return result
except Exception as e:
return f"执行出错: {str(e)}"
def clear_memory(self):
"""清空对话记忆"""
self.memory.clear()
# 使用示例
if __name__ == "__main__":
from tools.sql_tool import SQLQueryTool
from tools.python_tool import PythonExecutorTool
from tools.knowledge_tool import KnowledgeRetrievalTool
from tools.document_tool import DocumentGeneratorTool
# 初始化所有工具
sql_tool = SQLQueryTool("sqlite:///./industrial.db")
python_tool = PythonExecutorTool()
knowledge_tool = KnowledgeRetrievalTool()
doc_tool = DocumentGeneratorTool()
tools = [
sql_tool.as_tool(),
python_tool.as_tool(),
knowledge_tool.as_tool(),
doc_tool.as_tool()
]
# 创建Agent
agent = IndustrialAgent(tools=tools)
# 测试任务
task = """
请分析2024年10月Q355钢的质量数据:
1. 从数据库查询质量数据
2. 计算碳含量和硫含量的统计指标
3. 检索Q355钢的标准要求
4. 生成分析报告
"""
print("="*50)
print("任务:", task)
print("="*50)
result = agent.run(task)
print("\n" + "="*50)
print("结果:", result)
print("="*50)
4.4 Web界面集成
# app.py
import streamlit as st
from agent.industrial_agent import IndustrialAgent
from tools.sql_tool import SQLQueryTool
from tools.python_tool import PythonExecutorTool
from tools.knowledge_tool import KnowledgeRetrievalTool
from tools.document_tool import DocumentGeneratorTool
import os
# 页面配置
st.set_page_config(
page_title="工业Agent助手",
page_icon="🤖",
layout="wide"
)
# 标题
st.title("🤖 工业智能Agent助手")
st.markdown("**能力:数据查询 | 智能分析 | 知识检索 | 报告生成**")
# 侧边栏配置
with st.sidebar:
st.header("⚙️ 系统配置")
# API Key
api_key = st.text_input("OpenAI API Key", type="password")
if api_key:
os.environ["OPENAI_API_KEY"] = api_key
# 数据库配置
st.subheader("数据库连接")
db_type = st.selectbox("数据库类型", ["SQLite", "MySQL", "PostgreSQL"])
if db_type == "SQLite":
db_uri = "sqlite:///./industrial.db"
else:
db_host = st.text_input("主机", "localhost")
db_port = st.text_input("端口", "3306")
db_user = st.text_input("用户名", "root")
db_pass = st.text_input("密码", type="password")
db_name = st.text_input("数据库名", "industrial")
db_uri = f"{db_type.lower()}://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}"
st.info(f"连接字符串: {db_uri}")
st.markdown("---")
# 功能选择
st.subheader("🛠️ 启用工具")
enable_sql = st.checkbox("SQL查询", value=True)
enable_python = st.checkbox("Python执行", value=True)
enable_knowledge = st.checkbox("知识检索", value=True)
enable_doc = st.checkbox("文档生成", value=True)
st.markdown("---")
st.markdown("### 💡 快速命令")
st.markdown("""
- "分析上月Q355钢质量数据"
- "查询设备运行状态"
- "生成生产日报"
- "检索炼钢工艺标准"
""")
# 初始化Agent
@st.cache_resource
def init_agent(_db_uri):
"""初始化Agent(缓存避免重复创建)"""
tools = []
if enable_sql:
sql_tool = SQLQueryTool(_db_uri)
tools.append(sql_tool.as_tool())
if enable_python:
python_tool = PythonExecutorTool()
tools.append(python_tool.as_tool())
if enable_knowledge:
knowledge_tool = KnowledgeRetrievalTool()
tools.append(knowledge_tool.as_tool())
if enable_doc:
doc_tool = DocumentGeneratorTool()
tools.append(doc_tool.as_tool())
if not tools:
st.error("请至少启用一个工具")
st.stop()
return IndustrialAgent(tools=tools, verbose=False)
# 主界面
if not api_key:
st.warning("⚠️ 请在左侧输入OpenAI API Key")
st.stop()
# 加载Agent
try:
agent = init_agent(db_uri)
except Exception as e:
st.error(f"Agent初始化失败: {str(e)}")
st.stop()
# 会话历史
if "messages" not in st.session_state:
st.session_state.messages = []
# 显示历史消息
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# 显示执行步骤
if message["role"] == "assistant" and "steps" in message:
with st.expander("🔍 查看执行过程"):
for step in message["steps"]:
st.text(step)
# 用户输入
if prompt := st.chat_input("请输入您的任务..."):
# 显示用户消息
with st.chat_message("user"):
st.markdown(prompt)
st.session_state.messages.append({"role": "user", "content": prompt})
# Agent执行
with st.chat_message("assistant"):
with st.spinner("🤔 Agent思考中..."):
try:
# 执行任务
response = agent.run(prompt)
st.markdown(response)
# 保存到历史
st.session_state.messages.append({
"role": "assistant",
"content": response
})
except Exception as e:
st.error(f"❌ 执行失败: {str(e)}")
# 清除历史按钮
if st.sidebar.button("🗑️ 清除对话"):
st.session_state.messages = []
agent.clear_memory()
st.rerun()
# 页脚
st.markdown("---")
st.markdown("""
<div style='text-align: center'>
<p>🤖 基于LangChain Agent构建 | 支持多工具编排 | 自主任务规划</p>
</div>
""", unsafe_allow_html=True)
🚀 五、高级特性
5.1 多步任务规划
有些任务需要多步执行,Agent需要具备规划能力:
# agent/task_planner.py
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
class TaskPlanner:
"""任务规划器"""
def __init__(self, llm):
self.llm = llm
self.planning_prompt = PromptTemplate(
input_variables=["task", "available_tools"],
template="""
你是一个任务规划专家。给定一个复杂任务和可用工具,请将其分解为可执行的步骤。
任务: {task}
可用工具:
{available_tools}
请输出一个执行计划,格式如下:
步骤1: [工具名称] - [具体操作]
步骤2: [工具名称] - [具体操作]
...
计划:
"""
)
self.chain = LLMChain(llm=self.llm, prompt=self.planning_prompt)
def plan(self, task: str, tools: list) -> list:
"""
规划任务步骤
Returns:
步骤列表
"""
tools_desc = "\n".join([f"- {tool.name}: {tool.description}" for tool in tools])
plan = self.chain.run(task=task, available_tools=tools_desc)
# 解析步骤
steps = []
for line in plan.split("\n"):
if line.strip().startswith("步骤"):
steps.append(line.strip())
return steps
# 使用示例
planner = TaskPlanner(llm)
steps = planner.plan(
task="分析Q355钢质量数据并生成报告",
tools=tools
)
for step in steps:
print(step)
# 输出:
# 步骤1: sql_query - 查询Q355钢的质量数据
# 步骤2: python_execute - 计算统计指标
# 步骤3: knowledge_search - 检索质量标准
# 步骤4: generate_word_report - 生成分析报告
5.2 工具链式调用
有时需要将多个工具串联起来:
# agent/tool_chain.py
from langchain.chains import SequentialChain, TransformChain
def create_analysis_chain(sql_tool, python_tool, doc_tool):
"""创建分析链:查询 → 分析 → 报告"""
# 步骤1: 查询数据
query_chain = TransformChain(
input_variables=["material", "start_date"],
output_variables=["data"],
transform=lambda inputs: {
"data": sql_tool.query(
f"SELECT * FROM quality_data WHERE material='{inputs['material']}' "
f"AND date >= '{inputs['start_date']}'"
)
}
)
# 步骤2: 分析数据
analysis_chain = TransformChain(
input_variables=["data"],
output_variables=["analysis"],
transform=lambda inputs: {
"analysis": python_tool.execute(f"""
import pandas as pd
import json
data = json.loads('{inputs["data"]}')['data']
df = pd.DataFrame(data)
analysis = {{
'mean': df['carbon'].mean(),
'std': df['carbon'].std(),
'max': df['carbon'].max(),
'min': df['carbon'].min()
}}
print(json.dumps(analysis))
""")
}
)
# 步骤3: 生成报告
report_chain = TransformChain(
input_variables=["material", "analysis"],
output_variables=["report"],
transform=lambda inputs: {
"report": doc_tool.generate_report({
"title": f"{inputs['material']}质量分析报告",
"sections": [
{
"heading": "统计数据",
"content": inputs["analysis"]
}
]
})
}
)
# 组合成完整链
full_chain = SequentialChain(
chains=[query_chain, analysis_chain, report_chain],
input_variables=["material", "start_date"],
output_variables=["report"]
)
return full_chain
# 使用
chain = create_analysis_chain(sql_tool, python_tool, doc_tool)
result = chain({
"material": "Q355",
"start_date": "2024-10-01"
})
print(result["report"])
5.3 错误处理与重试
生产环境中需要健壮的错误处理:
# agent/error_handler.py
from functools import wraps
import time
def retry_on_error(max_retries=3, delay=1):
"""工具调用失败时自动重试"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt < max_retries - 1:
print(f"尝试 {attempt + 1} 失败: {str(e)}, {delay}秒后重试...")
time.sleep(delay)
else:
print(f"所有重试均失败")
raise
return wrapper
return decorator
# 应用到工具上
class RobustSQLTool(SQLQueryTool):
@retry_on_error(max_retries=3, delay=2)
def query(self, sql: str) -> str:
return super().query(sql)
5.4 Agent自我评估
让Agent评估自己的输出质量:
# agent/self_evaluation.py
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
class SelfEvaluator:
"""Agent自我评估器"""
def __init__(self, llm):
self.llm = llm
self.eval_prompt = PromptTemplate(
input_variables=["task", "result"],
template="""
请评估以下任务的完成质量:
任务: {task}
输出结果: {result}
评估维度:
1. 完整性 (0-10分):是否完成了所有要求?
2. 准确性 (0-10分):数据和分析是否准确?
3. 专业性 (0-10分):输出是否符合工业标准?
4. 可用性 (0-10分):结果是否可直接使用?
请给出评分和改进建议:
"""
)
self.chain = LLMChain(llm=self.llm, prompt=self.eval_prompt)
def evaluate(self, task: str, result: str) -> dict:
"""评估任务完成质量"""
evaluation = self.chain.run(task=task, result=result)
return evaluation
# 使用
evaluator = SelfEvaluator(llm)
evaluation = evaluator.evaluate(
task="分析Q355钢质量数据",
result=agent_result
)
print(evaluation)
🏭 六、生产环境优化
6.1 性能优化
# utils/performance.py
import time
from functools import wraps
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def timing_decorator(func):
"""记录函数执行时间"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
logger.info(f"{func.__name__} 执行时间: {end-start:.2f}秒")
return result
return wrapper
# 缓存查询结果
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_knowledge_search(query: str):
"""缓存知识库检索结果"""
return knowledge_tool.search(query)
# 异步执行工具
import asyncio
async def parallel_tool_execution(tasks):
"""并行执行多个工具"""
results = await asyncio.gather(*tasks)
return results
# 使用示例
async def fast_analysis():
tasks = [
asyncio.create_task(sql_tool.query("SELECT ...")),
asyncio.create_task(knowledge_tool.search("Q355标准"))
]
results = await parallel_tool_execution(tasks)
return results
6.2 安全加固
# utils/security.py
import re
class SecurityValidator:
"""安全验证器"""
@staticmethod
def validate_sql(sql: str) -> bool:
"""验证SQL安全性(防止注入)"""
# 禁止的关键词
dangerous_keywords = [
'DROP', 'DELETE', 'TRUNCATE', 'UPDATE',
'INSERT', 'ALTER', 'CREATE', 'EXEC'
]
sql_upper = sql.upper()
for keyword in dangerous_keywords:
if keyword in sql_upper:
raise ValueError(f"SQL包含禁止的操作: {keyword}")
return True
@staticmethod
def validate_python_code(code: str) -> bool:
"""验证Python代码安全性"""
# 禁止的模块和函数
dangerous_patterns = [
r'import\s+os',
r'import\s+sys',
r'import\s+subprocess',
r'__import__',
r'eval\(',
r'exec\(',
r'open\(',
]
for pattern in dangerous_patterns:
if re.search(pattern, code):
raise ValueError(f"代码包含禁止的操作: {pattern}")
return True
# 应用到工具
class SecureSQLTool(SQLQueryTool):
def query(self, sql: str) -> str:
SecurityValidator.validate_sql(sql) # 先验证
return super().query(sql)
class SecurePythonTool(PythonExecutorTool):
def execute(self, code: str, context_data=None) -> str:
SecurityValidator.validate_python_code(code) # 先验证
return super().execute(code, context_data)
6.3 监控告警
# utils/monitoring.py
import logging
from datetime import datetime
from typing import Dict, List
class AgentMonitor:
"""Agent监控器"""
def __init__(self):
self.metrics = {
"total_tasks": 0,
"success_tasks": 0,
"failed_tasks": 0,
"avg_execution_time": 0,
"tool_usage": {}
}
self.logger = logging.getLogger("AgentMonitor")
def log_task_start(self, task: str):
"""记录任务开始"""
self.logger.info(f"[{datetime.now()}] 任务开始: {task}")
self.metrics["total_tasks"] += 1
def log_task_success(self, task: str, execution_time: float):
"""记录任务成功"""
self.logger.info(f"[{datetime.now()}] 任务成功: {task} (耗时{execution_time:.2f}s)")
self.metrics["success_tasks"] += 1
self._update_avg_time(execution_time)
def log_task_failure(self, task: str, error: str):
"""记录任务失败"""
self.logger.error(f"[{datetime.now()}] 任务失败: {task} | 错误: {error}")
self.metrics["failed_tasks"] += 1
self._send_alert(task, error)
def log_tool_usage(self, tool_name: str):
"""记录工具使用"""
if tool_name not in self.metrics["tool_usage"]:
self.metrics["tool_usage"][tool_name] = 0
self.metrics["tool_usage"][tool_name] += 1
def _update_avg_time(self, execution_time: float):
"""更新平均执行时间"""
n = self.metrics["success_tasks"]
old_avg = self.metrics["avg_execution_time"]
self.metrics["avg_execution_time"] = (old_avg * (n-1) + execution_time) / n
def _send_alert(self, task: str, error: str):
"""发送告警(可接入企业微信、钉钉等)"""
alert_message = f"""
🚨 Agent任务失败告警
任务: {task}
错误: {error}
时间: {datetime.now()}
"""
# 这里可以接入实际的告警系统
print(alert_message)
def get_metrics(self) -> Dict:
"""获取监控指标"""
success_rate = (
self.metrics["success_tasks"] / self.metrics["total_tasks"] * 100
if self.metrics["total_tasks"] > 0 else 0
)
return {
**self.metrics,
"success_rate": f"{success_rate:.2f}%"
}
# 使用
monitor = AgentMonitor()
# 在Agent执行时记录
def monitored_agent_run(agent, task):
monitor.log_task_start(task)
start_time = time.time()
try:
result = agent.run(task)
execution_time = time.time() - start_time
monitor.log_task_success(task, execution_time)
return result
except Exception as e:
monitor.log_task_failure(task, str(e))
raise
# 定期输出监控报告
print(monitor.get_metrics())
📊 七、效果展示
7.1 实际案例演示
案例1:质量分析任务
用户输入:
"分析2024年10月Q355钢的质量数据,重点关注碳含量是否超标,生成分析报告"
Agent执行过程:
------------------------------------------------------------
Thought: 我需要先查询Q355钢的质量数据
Action: sql_query
Action Input: SELECT * FROM quality_data WHERE material='Q355' AND date >= '2024-10-01' AND date < '2024-11-01'
Observation: 返回123条记录,包含date, carbon, sulfur等字段
Thought: 现在我需要分析碳含量数据,特别是查看是否超标
Action: python_execute
Action Input:
import pandas as pd
import json
data = json.loads(previous_result)['data']
df = pd.DataFrame(data)
carbon_mean = df['carbon'].mean()
carbon_max = df['carbon'].max()
over_standard = df[df['carbon'] > 0.20].shape[0]
over_rate = over_standard / len(df) * 100
print(f"平均碳含量: {carbon_mean:.4f}%")
print(f"最大碳含量: {carbon_max:.4f}%")
print(f"超标数量: {over_standard}, 超标率: {over_rate:.2f}%")
Observation: 平均碳含量: 0.1823%, 最大碳含量: 0.2147%, 超标数量: 6, 超标率: 4.88%
Thought: 需要查询Q355钢的标准要求
Action: knowledge_search
Action Input: Q355钢碳含量标准要求
Observation: 找到相关知识:Q355钢碳含量应≤0.20%...
Thought: 所有信息已收集完毕,可以生成报告
Action: generate_word_report
Action Input: {
"title": "Q355钢质量分析报告(2024年10月)",
"sections": [
{
"heading": "一、数据概览",
"content": "本报告分析了2024年10月的Q355钢质量数据,共123批次..."
},
{
"heading": "二、碳含量分析",
"table": [
["指标", "数值"],
["平均碳含量", "0.1823%"],
["最大碳含量", "0.2147%"],
["超标批次", "6批次"],
["超标率", "4.88%"]
]
},
{
"heading": "三、问题与建议",
"content": "发现4.88%的批次碳含量超标,建议:\n1. 加强吹氧过程控制\n2. 优化终点判断..."
}
]
}
Observation: 报告已生成: ./reports/report_20241115_143022.docx
Final Answer: 已完成Q355钢质量分析:
- 平均碳含量: 0.1823%(符合标准)
- 发现6批次超标,超标率4.88%
- 已生成详细分析报告:./reports/report_20241115_143022.docx
主要建议:加强炼钢过程的碳含量控制,特别是吹氧阶段的终点判断。
------------------------------------------------------------
案例2:设备监控任务
用户输入:
"检查所有设备的运行状态,找出温度异常的设备"
Agent执行:
Thought: 查询设备数据
Action: sql_query
Action Input: SELECT device_id, AVG(temperature) as avg_temp FROM equipment_data WHERE timestamp >= NOW() - INTERVAL 1 DAY GROUP BY device_id
Observation: 返回15台设备的平均温度
Thought: 分析温度异常
Action: python_execute
Action Input:
import pandas as pd
df = pd.DataFrame(data)
# 定义异常阈值(均值±2倍标准差)
mean_temp = df['avg_temp'].mean()
std_temp = df['avg_temp'].std()
df['is_abnormal'] = (df['avg_temp'] > mean_temp + 2*std_temp) | (df['avg_temp'] < mean_temp - 2*std_temp)
abnormal_devices = df[df['is_abnormal']]
print(abnormal_devices)
Observation: 发现3台设备温度异常
Final Answer:
发现以下设备温度异常:
- 设备#007:平均温度342°C(过高,标准范围280-320°C)
- 设备#011:平均温度198°C(过低,标准范围280-320°C)
- 设备#014:平均温度355°C(过高,标准范围280-320°C)
建议立即检查这3台设备的冷却系统和传感器。
7.2 性能指标
| 指标 | 传统方式 | RAG系统 | Agent系统 |
|---|---|---|---|
| 任务完成时间 | 60分钟 | 30分钟 | 5分钟 |
| 人工介入次数 | 20次 | 5次 | 0次 |
| 数据准确性 | 85% | 92% | 95% |
| 可重复性 | 低 | 中 | 高 |
| 适用场景范围 | 窄 | 中 | 广 |
7.3 用户反馈
“之前生成一份质量报告需要半天时间,现在只要5分钟,而且质量更高。”
—— 某钢铁企业质量工程师
“Agent可以自己分析数据、查标准、写报告,解放了我们大量重复劳动。”
—— 某机械厂技术主管
💭 八、思考与展望
8.1 当前局限
-
工具依赖性
- Agent能力取决于工具质量
- 工具设计需要领域专家参与
-
可控性问题
- Agent可能做出意外决策
- 需要人工审核关键操作
-
成本考量
- LLM API调用成本高
- 复杂任务可能需要多次迭代
-
安全风险
- SQL注入、代码执行风险
- 需要严格的安全策略
8.2 未来方向
1. 多Agent协作
主Agent(协调者)
/ | \
/ | \
数据Agent 知识Agent 执行Agent
2. 人机协同
- 关键决策点需要人工确认
- Agent提供建议,人类做最终决策
3. 领域专业化
- 针对特定工业场景深度定制
- 集成更多专业工具(CAD、仿真软件等)
4. 本地化部署
- 使用开源模型(Qwen、ChatGLM)
- 解决数据安全问题
5. 持续学习
- 从人类反馈中学习
- 自动优化工具选择策略
8.3 实施建议
对于企业:
- 从简单场景开始(如报表生成)
- 逐步扩展到复杂任务(如分析决策)
- 建立人工审核机制
- 持续收集反馈优化
对于开发者:
- 深入理解业务场景
- 设计高质量的工具
- 做好错误处理和监控
- 关注安全性
📚 参考资料
技术文档
开源项目
相关论文
- ReAct: Synergizing Reasoning and Acting in Language Models
- Toolformer: Language Models Can Teach Themselves to Use Tools
- Generative Agents: Interactive Simulacra of Human Behavior
🔗 项目资源
完整代码
# 项目地址
https://github.com/[your-username]/industrial-agent-system
# 快速开始
git clone https://github.com/[your-username]/industrial-agent-system
cd industrial-agent-system
pip install -r requirements.txt
# 初始化数据库
python scripts/init_database.py
# 启动应用
streamlit run app.py
目录结构
industrial-agent-system/
├── README.md
├── requirements.txt
├── config.py
├── tools/
│ ├── __init__.py
│ ├── sql_tool.py
│ ├── python_tool.py
│ ├── knowledge_tool.py
│ └── document_tool.py
├── agent/
│ ├── __init__.py
│ ├── industrial_agent.py
│ ├── task_planner.py
│ └── prompts.py
├── utils/
│ ├── __init__.py
│ ├── database.py
│ ├── security.py
│ ├── monitoring.py
│ └── visualization.py
├── scripts/
│ ├── init_database.py
│ └── create_sample_data.py
├── tests/
│ ├── test_tools.py
│ └── test_agent.py
└── app.py
📝 总结
这篇文章带你从RAG系统进阶到Agent系统:
✅ 核心概念:理解Agent = LLM + Tools + Memory + Planning
✅ 实战代码:完整的工具定义、Agent构建、Web集成
✅ 高级特性:任务规划、链式调用、自我评估
✅ 生产优化:性能优化、安全加固、监控告警
✅ 真实案例:质量分析、设备监控等实际应用
关键要点:
- Agent让AI从"问答"到"行动"
- 工具设计是Agent能力的核心
- 安全和监控在生产环境至关重要
- 从简单场景开始,逐步扩展
下一步学习:
- 多Agent协作系统
- 强化学习优化Agent
- 领域专业Agent定制
🎯 系列文章
- 第一篇(入门):如何用LangChain构建工业问答系统(RAG基础)
- 第二篇(进阶):从RAG到Agent:构建工业自动化助手(本篇)
- 第三篇(高级):即将推出…
作者: 赵志伟
邮箱: 13031748275@163.com
博客: https://blog.csdn.net/qq_18817831
日期: 2025-11-29
如果这篇文章对你有帮助,欢迎⭐Star支持!有任何问题欢迎在评论区讨论!
更多推荐



所有评论(0)