目录

(续前文:项目实战:LangExtract知识图谱构建和混合RAG在工业故障诊断中的应用(一))

第四部分:项目实践 —— 端到端工业故障诊断问答机器人

现在,万事俱备。我们已经有了知识图谱(大脑的记忆)和混合检索器(大脑的思考方式),是时候将它们组装起来,打造一个可以交互的问答机器人了。

4.1 整合所有组件:构建完整的Q&A链

我们将使用LangChain的LCEL(LangChain Expression Language)来优雅地将各个组件串联起来。这个链条的流程是:问题 -> 混合检索器 -> 格式化上下文 -> 提示词模板 -> LLM -> 答案。

from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

# 假设 hybrid_retriever, llm 已经按照上一节的方式初始化好了

# 提示词模板
template = """
你是一个世界级的工业设备故障诊断专家。请根据下面提供的上下文信息,简洁、清晰、专业地回答用户的问题。
如果上下文中没有足够信息,就直接说“根据现有知识库,我无法回答这个问题”。
不要编造任何信息。

上下文:
{context}

问题: {question}

专家回答:
"""
prompt = ChatPromptTemplate.from_template(template)

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# 构建完整的RAG链
rag_chain = (
    {"context": hybrid_retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

# --- 开始提问 ---
# query = "C-101压缩机振动超标,可能是什么原因?"
# answer = rag_chain.invoke(query)
# print(answer)

这个rag_chain就是一个功能完备的问答系统。它接收一个问题字符串,内部自动完成混合检索、上下文构建、提示词生成和最终答案的生成,整个过程对用户完全透明。

4.2 实战演练:与我们的“工业大脑”对话

让我们来模拟几个真实的工业场景,看看这个系统的表现如何。

场景一:诊断复杂故障

用户提问: “主蒸汽涡轮机的2号轴承温度持续升高,超过了95摄氏度,该怎么办?”

系统内部流程:

  1. 向量检索: 找到与“涡轮机 轴承 温度升高”相关的故障报告和维修手册片段。
  2. 实体抽取: 从问题中识别出“主蒸汽涡轮机”、“2号轴承”。
  3. 图检索: 在KG中查询与“主蒸汽涡轮机”和“轴承”相关的故障,发现“润滑油污染”和“冷却水流量不足”是常见原因,对应的解决方案分别是“更换润滑油”和“检查冷却水泵”。
  4. 生成答案: LLM综合所有信息,生成如下回答: “根据知识库,主蒸汽涡轮机轴承温度过高(超过95℃)的可能原因及处理建议如下: 1. 根本原因: * 润滑油污染或劣化,导致润滑性能下降。 * 冷却水系统流量不足或中断,导致冷却效果不佳。 2. 排查步骤: * 立即检查润滑油站的油质分析报告,确认油品是否合格。 * 检查冷却水泵出口压力和流量,确认冷却水供应正常。 3. 解决方案: * 如果油质不合格,请安排停机更换润滑油并清洗油路。 * 如果冷却水异常,请立即排查并修复冷却水系统。 请优先排查冷却水问题,因为其风险更高。”

场景二:查询备件信息

用户提问: “更换P-205泵的机械密封需要哪些工具和备件型号?”

系统内部流程:

  1. 向量检索: 找到关于“P-205泵 机械密封 更换”的维修规程。
  2. 实体抽取: 识别“P-205泵”、“机械密封”。
  3. 图检索: 在KG中找到 (Solution: 更换机械密封) -[:REQUIRES]-> (Component: XX型号密封) 的关系,以及相关的工具信息。
  4. 生成答案: “更换P-205泵的机械密封,您需要: * 备件: 机械密封,型号为 ‘Garlock-2100’。 * 工具: 内六角扳手一套、扭矩扳手、专用拆卸工具。 请参考《P-205泵维修标准作业程序》第3.2节获取详细步骤。”

这些例子清晰地展示了混合RAG系统的强大能力。它不仅能“复述”知识,更能通过连接和推理,提供真正具有“洞察力”的答案。

4.3 完整项目代码

为了方便读者实践,这里提供一个整合了上述所有步骤的、简化的端到端Python脚本。请注意,你需要准备自己的文本数据(如.txt文件放在data目录下),并相应调整Schema和Prompt。

# main_project.py

import os
import textwrap
import pandas as pd
from dataclasses import dataclass, field
from typing import List, Optional

# --- LangChain and Neo4j Imports ---
from langchain_core.retrievers import BaseRetriever
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_neo4j import Neo4jVector
from neo4j import GraphDatabase

# --- Google LangExtract (Simulated) ---
# 在实际项目中,你需要安装并使用 langextract 库
# import langextract as lx
# 这里我们用一个模拟函数代替
def simulated_langextract(text, schema_class):
    # This is a mock function. In a real scenario, you would use LangExtract
    # and a powerful LLM to extract this information.
    if "V-201反应釜" in text and "螺栓松动" in text:
        return [
            schema_class(
                fault_id='F002',
                equipment_name='V-201反应釜',
                component='机械密封',
                symptom='泄漏',
                cause='法兰连接螺栓松动',
                solution='紧固所有螺栓',
                document='report_002.txt'
            )
        ]
    if "C-101压缩机" in text and "振动" in text:
         return [
            schema_class(
                fault_id='F003',
                equipment_name='C-101压缩机',
                component='转子',
                symptom='振动超标',
                cause='转子不平衡',
                solution='进行动平衡校正',
                document='report_003.txt'
            )
        ]
    return []

# --- 1. 知识抽取与图谱构建 ---

@dataclass
class FaultKnowledge:
    fault_id: str
    equipment_name: str
    component: str
    symptom: str
    cause: str
    solution: str
    document: str

class KnowledgeGraphBuilder:
    def __init__(self, driver):
        self.driver = driver

    def build_from_text(self, text_content: str, doc_name: str):
        # 1.1 Extract knowledge using LangExtract (simulated)
        extractions = simulated_langextract(text_content, FaultKnowledge)
        
        # 1.2 Load into Neo4j
        with self.driver.session() as session:
            for record in extractions:
                session.execute_write(self._create_graph_nodes, record)
        return extractions

    @staticmethod
    def _create_graph_nodes(tx, record: FaultKnowledge):
        query = """
        MERGE (doc:Document {title: $document})
        MERGE (equip:Equipment {name: $equipment_name})
        MERGE (comp:Component {name: $component})
        MERGE (symp:Symptom {description: $symptom})
        MERGE (ca:Cause {description: $cause})
        MERGE (sol:Solution {description: $solution})
        MERGE (f:Fault {id: $fault_id})
        
        MERGE (f)-[:OCCURS_ON]->(equip)
        MERGE (f)-[:HAS_SYMPTOM]->(symp)
        MERGE (f)-[:HAS_CAUSE]->(ca)
        MERGE (f)-[:HAS_SOLUTION]->(sol)
        MERGE (f)-[:MENTIONED_IN]->(doc)
        MERGE (equip)-[:HAS_COMPONENT]->(comp)
        MERGE (sol)-[:AFFECTS]->(comp)
        """
        tx.run(query, record.__dict__)

# --- 2. 混合检索器 ---

class HybridRetriever(BaseRetriever):
    # (代码同上一节,此处为简洁省略,请直接复制上一节的HybridRetriever类定义)
    def __init__(self, neo4j_vector_store: Neo4jVector, neo4j_driver, llm, k: int = 2):
        super().__init__()
        self.vector_store = neo4j_vector_store
        self.driver = neo4j_driver
        self.llm = llm
        self.k = k

    def _get_relevant_documents(
        self, query: str, *, run_manager: CallbackManagerForRetrieverRun
    ) -> List[Document]:
        
        vector_results = self.vector_store.similarity_search(query, k=self.k)
        
        entity_extraction_prompt = f"From the following question, extract equipment or component names. Question: \"{query}\". Return only comma-separated names."
        response = self.llm.invoke(entity_extraction_prompt)
        entities = [e.strip() for e in response.content.split(',') if e.strip()]
        
        graph_docs = []
        if entities:
            with self.driver.session() as session:
                for entity in entities:
                    cypher_query = """
                    MATCH (e) WHERE e.name CONTAINS $entity
                    OPTIONAL MATCH (f:Fault)-[:OCCURS_ON|AFFECTS]->(e)
                    OPTIONAL MATCH (f)-[:HAS_CAUSE]->(c:Cause)
                    OPTIONAL MATCH (f)-[:HAS_SOLUTION]->(s:Solution)
                    WITH e, c, s
                    WHERE c IS NOT NULL AND s IS NOT NULL
                    RETURN "For equipment '" + e.name + "', a potential cause is '" + c.description + "' with solution '" + s.description + "'." AS text
                    LIMIT 2
                    """
                    result = session.run(cypher_query, entity=entity)
                    for record in result:
                        graph_docs.append(Document(page_content=record["text"]))

        combined_docs = vector_results + graph_docs
        unique_contents = set()
        final_docs = [doc for doc in combined_docs if doc.page_content not in unique_contents and not unique_contents.add(doc.page_content)]
        
        return final_docs

# --- 3. 主程序 ---

def main():
    # --- 配置 ---
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "your_openai_api_key")
    NEO4J_URI = "bolt://localhost:7687"
    NEO4J_USER = "neo4j"
    NEO4J_PASSWORD = "your_password" # !!!请替换为你的Neo4j密码

    if OPENAI_API_KEY == "your_openai_api_key" or NEO4J_PASSWORD == "your_password":
        print("请设置 OPENAI_API_KEY 环境变量和 NEO4J_PASSWORD。")
        return

    # --- 初始化连接 ---
    llm = ChatOpenAI(model="gpt-4o", temperature=0, api_key=OPENAI_API_KEY)
    embeddings = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
    neo4j_driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))

    # --- 步骤一: 构建知识图谱 ---
    print("--- 1. 开始构建知识图谱 ---")
    # 准备示例文本数据
    documents_data = {
        "report_002.txt": "2025年9月11日,当班操作员报告,V-201反应釜的机械密封出现泄漏,现场有明显物料滴落。经检查,确认为法兰连接螺栓松动导致。已派维修工紧固所有螺栓,泄漏停止。",
        "report_003.txt": "故障警报:C-101压缩机发生高频振动,振动值超限。分析认为是转子不平衡引起的。解决方案是停机后对转子进行动平衡校正。"
    }
    
    kg_builder = KnowledgeGraphBuilder(neo4j_driver)
    all_docs_for_vector_index = []
    for name, content in documents_data.items():
        extractions = kg_builder.build_from_text(content, name)
        print(f"从 {name} 抽取并存入KG: {len(extractions)} 条知识")
        all_docs_for_vector_index.append(Document(page_content=content, metadata={"source": name}))

    # --- 步骤二: 创建向量索引 ---
    print("\n--- 2. 创建或加载向量索引 ---")
    vector_index_name = "fault_reports"
    # 清理旧索引(仅为演示)
    try:
        neo4j_driver.execute_query(f"DROP INDEX {vector_index_name} IF EXISTS")
    except Exception as e:
        print(f"清理索引时出错 (可能不存在): {e}")

    vector_store = Neo4jVector.from_documents(
        all_docs_for_vector_index,
        embedding=embeddings,
        url=NEO4J_URI,
        username=NEO4J_USER,
        password=NEO4J_PASSWORD,
        index_name=vector_index_name,
        database="neo4j"
    )
    print(f"向量索引 '{vector_index_name}' 创建成功。")

    # --- 步骤三: 初始化混合检索器和RAG链 ---
    print("\n--- 3. 初始化混合RAG系统 ---")
    hybrid_retriever = HybridRetriever(
        neo4j_vector_store=vector_store,
        neo4j_driver=neo4j_driver,
        llm=llm
    )

    template = """You are an expert industrial fault diagnosis assistant. Answer the user's question based on the provided context. If the context is insufficient, say so. Do not invent information.
Context:
{context}
Question: {question}
Answer:"""
    prompt = ChatPromptTemplate.from_template(template)

    rag_chain = (
        {"context": hybrid_retriever | (lambda docs: "\n\n".join(doc.page_content for doc in docs)), "question": RunnablePassthrough()}
        | prompt
        | llm
        | StrOutputParser()
    )
    print("RAG系统准备就绪。")

    # --- 步骤四: 交互式问答 ---
    print("\n--- 4. 进入交互式问答环节 (输入 'exit' 退出) ---")
    while True:
        query = input("你问: ")
        if query.lower() == 'exit':
            break
        answer = rag_chain.invoke(query)
        print(f"工业大脑: {answer}\n")

    # --- 清理 ---
    neo4j_driver.close()
    print("程序结束。")

if __name__ == "__main__":
    main()

第五部分:企业级部署的挑战与远景

一个成功的原型系统只是万里长征的第一步。在真实的企业环境中部署和运维这样一套复杂的AI系统,我们还会面临诸多挑战。

5.1 从“能用”到“好用”:性能评估与优化

如何科学地评估我们的“工业大脑”?这本身就是一个复杂的问题。单一的准确率指标远远不够,我们需要一个多维度的评估框架。

  • 检索质量:
    • Context Precision (上下文精确率): 检索出的上下文中,有多少是真正与答案相关的?
    • Context Recall (上下文召回率): 所有相关的上下文信息,被检索出来了多少?
  • 生成质量:
    • Faithfulness (忠实度): 生成的答案是否完全基于给定的上下文,没有“幻觉”?
    • Answer Relevancy (答案相关性): 答案是否直接回应了用户的问题?
  • 端到端性能:
    • Latency (延迟): 从提问到获得答案需要多长时间?
    • Cost (成本): 每次查询调用LLM API的费用是多少?

像RAGAs、DeepEval等框架提供了自动化评估上述指标的能力。通过持续的基准测试,我们可以针对性地优化系统的各个环节,例如调整LangExtract的Prompt、优化知识图谱的Schema、或是调整混合检索中向量和图的权重。

5.2 治理与维护:让知识图谱“活”起来

知识图谱不是一次性构建完成的,它需要随着企业知识的增长而不断演进。这涉及到一套完整的**数据治理(Data Governance)**和维护流程。

  • 增量更新: 如何高效地将新的故障报告、维修手册增补到知识图谱中?需要建立自动化的ETL(抽取、转换、加载)流水线。
  • 知识校验: 新知识入库前,如何验证其准确性?可以设计“人机协同”的审核流程,让领域专家对LLM抽取的结果进行最终确认。
  • 版本控制: 对知识图谱的Schema变更和数据更新进行版本管理,确保系统的稳定和可追溯。
  • 访问控制: 在企业内部,不同角色的员工(如操作员、维修工、技术专家)对知识的访问权限应有所不同。图数据库天然支持精细化的权限控制。

一个“活”的、不断进化的知识图谱,才是企业最有价值的数字资产。

5.3 未来展望:从辅助诊断到“数字孪生”与“自主智能体”

我们构建的问答系统,仅仅是冰山一角。基于这个强大的知识图谱和RAG架构,未来充满了想象空间。

  • 与数字孪生(Digital Twin)结合: 将知识图谱与设备的实时运行数据(来自IoT传感器)相结合,可以构建一个“认知数字孪生”。系统不仅能回答“发生了什么”,还能基于实时数据进行预测性维护,告诉你“将要发生什么”。
  • 多模态RAG: 未来的故障诊断不仅依赖文本。设备异常的声波、热成像图、振动波形图等都可以作为RAG的检索源。LLM将能够理解“这张红外图中温度异常的区域”与“手册中描述的过热故障”之间的关联。
  • 智能体(Agent)化: 最终,系统将从一个被动的问答工具,进化为一个主动的诊断智能体。它能主动监控设备状态,发现异常后自动执行一系列动作:查询知识图谱、分析历史数据、生成诊断报告、创建维修工单,甚至直接调整设备控制参数,实现真正的自主运维。

结语:开启工业智能的新篇章

我们通过结合LangExtract的精准抽取、知识图谱的深度连接以及混合RAG的智能检索,一步步构建了一个功能强大的工业故障诊断系统。这个过程清晰地表明,大语言模型不再是只能聊天、写诗的“玩具”,它们正在成为驱动产业变革的核心引擎。

将非结构化的企业知识转化为可计算、可推理的智能,是所有行业数字化转型的关键一步。本文所展示的架构和方法,不仅适用于工业维修,同样可以推广到法律、金融、医疗等任何知识密集型领域。

请访问我的微信公众号“大模型RAG和Agent技术实践”,有更丰富的内容

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐