项目实战:LangExtract知识图谱构建和混合RAG在工业故障诊断中的应用(二)
本文介绍了如何利用生成式AI构建工业故障诊断知识图谱及混合检索问答系统。首先通过LangExtract工具从非结构化维修手册和故障报告中抽取结构化知识,设计知识图谱本体并存储到Neo4j数据库。然后结合向量检索和图检索优势,在LangChain中实现混合RAG架构,最终构建端到端的工业故障诊断问答机器人。系统能通过语义理解和知识推理,提供专业诊断建议,如分析轴承温度异常原因、查询备件更换流程等。文
目录
- 第一部分:工业知识的“非结构化”困境与生成式AI的破局
- 第二部分:构建工业故障诊断知识图谱
- 第三部分:混合检索(Hybrid RAG)的实现与超越
- 第四部分:项目实践 —— 端到端工业故障诊断问答机器人
- 第五部分:企业级部署的挑战与远景
- 结语:开启工业智能的新篇章
(续前文:项目实战:LangExtract知识图谱构建和混合RAG在工业故障诊断中的应用(一))
第四部分:项目实践 —— 端到端工业故障诊断问答机器人
现在,万事俱备。我们已经有了知识图谱(大脑的记忆)和混合检索器(大脑的思考方式),是时候将它们组装起来,打造一个可以交互的问答机器人了。
4.1 整合所有组件:构建完整的Q&A链
我们将使用LangChain的LCEL(LangChain Expression Language)来优雅地将各个组件串联起来。这个链条的流程是:问题 -> 混合检索器 -> 格式化上下文 -> 提示词模板 -> LLM -> 答案。
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
# 假设 hybrid_retriever, llm 已经按照上一节的方式初始化好了
# 提示词模板
template = """
你是一个世界级的工业设备故障诊断专家。请根据下面提供的上下文信息,简洁、清晰、专业地回答用户的问题。
如果上下文中没有足够信息,就直接说“根据现有知识库,我无法回答这个问题”。
不要编造任何信息。
上下文:
{context}
问题: {question}
专家回答:
"""
prompt = ChatPromptTemplate.from_template(template)
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# 构建完整的RAG链
rag_chain = (
{"context": hybrid_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
# --- 开始提问 ---
# query = "C-101压缩机振动超标,可能是什么原因?"
# answer = rag_chain.invoke(query)
# print(answer)
这个rag_chain就是一个功能完备的问答系统。它接收一个问题字符串,内部自动完成混合检索、上下文构建、提示词生成和最终答案的生成,整个过程对用户完全透明。
4.2 实战演练:与我们的“工业大脑”对话
让我们来模拟几个真实的工业场景,看看这个系统的表现如何。
场景一:诊断复杂故障
用户提问: “主蒸汽涡轮机的2号轴承温度持续升高,超过了95摄氏度,该怎么办?”
系统内部流程:
- 向量检索: 找到与“涡轮机 轴承 温度升高”相关的故障报告和维修手册片段。
- 实体抽取: 从问题中识别出“主蒸汽涡轮机”、“2号轴承”。
- 图检索: 在KG中查询与“主蒸汽涡轮机”和“轴承”相关的故障,发现“润滑油污染”和“冷却水流量不足”是常见原因,对应的解决方案分别是“更换润滑油”和“检查冷却水泵”。
- 生成答案: LLM综合所有信息,生成如下回答: “根据知识库,主蒸汽涡轮机轴承温度过高(超过95℃)的可能原因及处理建议如下: 1. 根本原因: * 润滑油污染或劣化,导致润滑性能下降。 * 冷却水系统流量不足或中断,导致冷却效果不佳。 2. 排查步骤: * 立即检查润滑油站的油质分析报告,确认油品是否合格。 * 检查冷却水泵出口压力和流量,确认冷却水供应正常。 3. 解决方案: * 如果油质不合格,请安排停机更换润滑油并清洗油路。 * 如果冷却水异常,请立即排查并修复冷却水系统。 请优先排查冷却水问题,因为其风险更高。”
场景二:查询备件信息
用户提问: “更换P-205泵的机械密封需要哪些工具和备件型号?”
系统内部流程:
- 向量检索: 找到关于“P-205泵 机械密封 更换”的维修规程。
- 实体抽取: 识别“P-205泵”、“机械密封”。
- 图检索: 在KG中找到 (Solution: 更换机械密封) -[:REQUIRES]-> (Component: XX型号密封) 的关系,以及相关的工具信息。
- 生成答案: “更换P-205泵的机械密封,您需要: * 备件: 机械密封,型号为 ‘Garlock-2100’。 * 工具: 内六角扳手一套、扭矩扳手、专用拆卸工具。 请参考《P-205泵维修标准作业程序》第3.2节获取详细步骤。”
这些例子清晰地展示了混合RAG系统的强大能力。它不仅能“复述”知识,更能通过连接和推理,提供真正具有“洞察力”的答案。
4.3 完整项目代码
为了方便读者实践,这里提供一个整合了上述所有步骤的、简化的端到端Python脚本。请注意,你需要准备自己的文本数据(如.txt文件放在data目录下),并相应调整Schema和Prompt。
# main_project.py
import os
import textwrap
import pandas as pd
from dataclasses import dataclass, field
from typing import List, Optional
# --- LangChain and Neo4j Imports ---
from langchain_core.retrievers import BaseRetriever
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_neo4j import Neo4jVector
from neo4j import GraphDatabase
# --- Google LangExtract (Simulated) ---
# 在实际项目中,你需要安装并使用 langextract 库
# import langextract as lx
# 这里我们用一个模拟函数代替
def simulated_langextract(text, schema_class):
# This is a mock function. In a real scenario, you would use LangExtract
# and a powerful LLM to extract this information.
if "V-201反应釜" in text and "螺栓松动" in text:
return [
schema_class(
fault_id='F002',
equipment_name='V-201反应釜',
component='机械密封',
symptom='泄漏',
cause='法兰连接螺栓松动',
solution='紧固所有螺栓',
document='report_002.txt'
)
]
if "C-101压缩机" in text and "振动" in text:
return [
schema_class(
fault_id='F003',
equipment_name='C-101压缩机',
component='转子',
symptom='振动超标',
cause='转子不平衡',
solution='进行动平衡校正',
document='report_003.txt'
)
]
return []
# --- 1. 知识抽取与图谱构建 ---
@dataclass
class FaultKnowledge:
fault_id: str
equipment_name: str
component: str
symptom: str
cause: str
solution: str
document: str
class KnowledgeGraphBuilder:
def __init__(self, driver):
self.driver = driver
def build_from_text(self, text_content: str, doc_name: str):
# 1.1 Extract knowledge using LangExtract (simulated)
extractions = simulated_langextract(text_content, FaultKnowledge)
# 1.2 Load into Neo4j
with self.driver.session() as session:
for record in extractions:
session.execute_write(self._create_graph_nodes, record)
return extractions
@staticmethod
def _create_graph_nodes(tx, record: FaultKnowledge):
query = """
MERGE (doc:Document {title: $document})
MERGE (equip:Equipment {name: $equipment_name})
MERGE (comp:Component {name: $component})
MERGE (symp:Symptom {description: $symptom})
MERGE (ca:Cause {description: $cause})
MERGE (sol:Solution {description: $solution})
MERGE (f:Fault {id: $fault_id})
MERGE (f)-[:OCCURS_ON]->(equip)
MERGE (f)-[:HAS_SYMPTOM]->(symp)
MERGE (f)-[:HAS_CAUSE]->(ca)
MERGE (f)-[:HAS_SOLUTION]->(sol)
MERGE (f)-[:MENTIONED_IN]->(doc)
MERGE (equip)-[:HAS_COMPONENT]->(comp)
MERGE (sol)-[:AFFECTS]->(comp)
"""
tx.run(query, record.__dict__)
# --- 2. 混合检索器 ---
class HybridRetriever(BaseRetriever):
# (代码同上一节,此处为简洁省略,请直接复制上一节的HybridRetriever类定义)
def __init__(self, neo4j_vector_store: Neo4jVector, neo4j_driver, llm, k: int = 2):
super().__init__()
self.vector_store = neo4j_vector_store
self.driver = neo4j_driver
self.llm = llm
self.k = k
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
vector_results = self.vector_store.similarity_search(query, k=self.k)
entity_extraction_prompt = f"From the following question, extract equipment or component names. Question: \"{query}\". Return only comma-separated names."
response = self.llm.invoke(entity_extraction_prompt)
entities = [e.strip() for e in response.content.split(',') if e.strip()]
graph_docs = []
if entities:
with self.driver.session() as session:
for entity in entities:
cypher_query = """
MATCH (e) WHERE e.name CONTAINS $entity
OPTIONAL MATCH (f:Fault)-[:OCCURS_ON|AFFECTS]->(e)
OPTIONAL MATCH (f)-[:HAS_CAUSE]->(c:Cause)
OPTIONAL MATCH (f)-[:HAS_SOLUTION]->(s:Solution)
WITH e, c, s
WHERE c IS NOT NULL AND s IS NOT NULL
RETURN "For equipment '" + e.name + "', a potential cause is '" + c.description + "' with solution '" + s.description + "'." AS text
LIMIT 2
"""
result = session.run(cypher_query, entity=entity)
for record in result:
graph_docs.append(Document(page_content=record["text"]))
combined_docs = vector_results + graph_docs
unique_contents = set()
final_docs = [doc for doc in combined_docs if doc.page_content not in unique_contents and not unique_contents.add(doc.page_content)]
return final_docs
# --- 3. 主程序 ---
def main():
# --- 配置 ---
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "your_openai_api_key")
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "your_password" # !!!请替换为你的Neo4j密码
if OPENAI_API_KEY == "your_openai_api_key" or NEO4J_PASSWORD == "your_password":
print("请设置 OPENAI_API_KEY 环境变量和 NEO4J_PASSWORD。")
return
# --- 初始化连接 ---
llm = ChatOpenAI(model="gpt-4o", temperature=0, api_key=OPENAI_API_KEY)
embeddings = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
neo4j_driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
# --- 步骤一: 构建知识图谱 ---
print("--- 1. 开始构建知识图谱 ---")
# 准备示例文本数据
documents_data = {
"report_002.txt": "2025年9月11日,当班操作员报告,V-201反应釜的机械密封出现泄漏,现场有明显物料滴落。经检查,确认为法兰连接螺栓松动导致。已派维修工紧固所有螺栓,泄漏停止。",
"report_003.txt": "故障警报:C-101压缩机发生高频振动,振动值超限。分析认为是转子不平衡引起的。解决方案是停机后对转子进行动平衡校正。"
}
kg_builder = KnowledgeGraphBuilder(neo4j_driver)
all_docs_for_vector_index = []
for name, content in documents_data.items():
extractions = kg_builder.build_from_text(content, name)
print(f"从 {name} 抽取并存入KG: {len(extractions)} 条知识")
all_docs_for_vector_index.append(Document(page_content=content, metadata={"source": name}))
# --- 步骤二: 创建向量索引 ---
print("\n--- 2. 创建或加载向量索引 ---")
vector_index_name = "fault_reports"
# 清理旧索引(仅为演示)
try:
neo4j_driver.execute_query(f"DROP INDEX {vector_index_name} IF EXISTS")
except Exception as e:
print(f"清理索引时出错 (可能不存在): {e}")
vector_store = Neo4jVector.from_documents(
all_docs_for_vector_index,
embedding=embeddings,
url=NEO4J_URI,
username=NEO4J_USER,
password=NEO4J_PASSWORD,
index_name=vector_index_name,
database="neo4j"
)
print(f"向量索引 '{vector_index_name}' 创建成功。")
# --- 步骤三: 初始化混合检索器和RAG链 ---
print("\n--- 3. 初始化混合RAG系统 ---")
hybrid_retriever = HybridRetriever(
neo4j_vector_store=vector_store,
neo4j_driver=neo4j_driver,
llm=llm
)
template = """You are an expert industrial fault diagnosis assistant. Answer the user's question based on the provided context. If the context is insufficient, say so. Do not invent information.
Context:
{context}
Question: {question}
Answer:"""
prompt = ChatPromptTemplate.from_template(template)
rag_chain = (
{"context": hybrid_retriever | (lambda docs: "\n\n".join(doc.page_content for doc in docs)), "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
print("RAG系统准备就绪。")
# --- 步骤四: 交互式问答 ---
print("\n--- 4. 进入交互式问答环节 (输入 'exit' 退出) ---")
while True:
query = input("你问: ")
if query.lower() == 'exit':
break
answer = rag_chain.invoke(query)
print(f"工业大脑: {answer}\n")
# --- 清理 ---
neo4j_driver.close()
print("程序结束。")
if __name__ == "__main__":
main()
第五部分:企业级部署的挑战与远景
一个成功的原型系统只是万里长征的第一步。在真实的企业环境中部署和运维这样一套复杂的AI系统,我们还会面临诸多挑战。
5.1 从“能用”到“好用”:性能评估与优化
如何科学地评估我们的“工业大脑”?这本身就是一个复杂的问题。单一的准确率指标远远不够,我们需要一个多维度的评估框架。
- 检索质量:
- Context Precision (上下文精确率): 检索出的上下文中,有多少是真正与答案相关的?
- Context Recall (上下文召回率): 所有相关的上下文信息,被检索出来了多少?
- 生成质量:
- Faithfulness (忠实度): 生成的答案是否完全基于给定的上下文,没有“幻觉”?
- Answer Relevancy (答案相关性): 答案是否直接回应了用户的问题?
- 端到端性能:
- Latency (延迟): 从提问到获得答案需要多长时间?
- Cost (成本): 每次查询调用LLM API的费用是多少?
像RAGAs、DeepEval等框架提供了自动化评估上述指标的能力。通过持续的基准测试,我们可以针对性地优化系统的各个环节,例如调整LangExtract的Prompt、优化知识图谱的Schema、或是调整混合检索中向量和图的权重。
5.2 治理与维护:让知识图谱“活”起来
知识图谱不是一次性构建完成的,它需要随着企业知识的增长而不断演进。这涉及到一套完整的**数据治理(Data Governance)**和维护流程。
- 增量更新: 如何高效地将新的故障报告、维修手册增补到知识图谱中?需要建立自动化的ETL(抽取、转换、加载)流水线。
- 知识校验: 新知识入库前,如何验证其准确性?可以设计“人机协同”的审核流程,让领域专家对LLM抽取的结果进行最终确认。
- 版本控制: 对知识图谱的Schema变更和数据更新进行版本管理,确保系统的稳定和可追溯。
- 访问控制: 在企业内部,不同角色的员工(如操作员、维修工、技术专家)对知识的访问权限应有所不同。图数据库天然支持精细化的权限控制。
一个“活”的、不断进化的知识图谱,才是企业最有价值的数字资产。
5.3 未来展望:从辅助诊断到“数字孪生”与“自主智能体”
我们构建的问答系统,仅仅是冰山一角。基于这个强大的知识图谱和RAG架构,未来充满了想象空间。
- 与数字孪生(Digital Twin)结合: 将知识图谱与设备的实时运行数据(来自IoT传感器)相结合,可以构建一个“认知数字孪生”。系统不仅能回答“发生了什么”,还能基于实时数据进行预测性维护,告诉你“将要发生什么”。
- 多模态RAG: 未来的故障诊断不仅依赖文本。设备异常的声波、热成像图、振动波形图等都可以作为RAG的检索源。LLM将能够理解“这张红外图中温度异常的区域”与“手册中描述的过热故障”之间的关联。
- 智能体(Agent)化: 最终,系统将从一个被动的问答工具,进化为一个主动的诊断智能体。它能主动监控设备状态,发现异常后自动执行一系列动作:查询知识图谱、分析历史数据、生成诊断报告、创建维修工单,甚至直接调整设备控制参数,实现真正的自主运维。
结语:开启工业智能的新篇章
我们通过结合LangExtract的精准抽取、知识图谱的深度连接以及混合RAG的智能检索,一步步构建了一个功能强大的工业故障诊断系统。这个过程清晰地表明,大语言模型不再是只能聊天、写诗的“玩具”,它们正在成为驱动产业变革的核心引擎。
将非结构化的企业知识转化为可计算、可推理的智能,是所有行业数字化转型的关键一步。本文所展示的架构和方法,不仅适用于工业维修,同样可以推广到法律、金融、医疗等任何知识密集型领域。
请访问我的微信公众号“大模型RAG和Agent技术实践”,有更丰富的内容
更多推荐
所有评论(0)