企业级 Agent 知识图谱构建实战指南:从零到一
schema.py是《员工手册》:规定了公司里只能用什么术语,防止沟通障碍。data/是“档案员”:负责每天阅读大量文件,按手册规定整理成卡片存入档案室(Neo4j)。是“档案检索员”:他懂复杂的档案索引语言(Cypher),能快速找到卡片。是“接待专员”:他坐在前台面对客户(用户),听懂客户需求,指挥检索员干活,最后把检索员给的生硬数据(JSON)翻译成优美的客套话(自然语言)反馈给客户。原则说
企业级 Agent 知识图谱构建实战指南:从零到一
0. 项目概览与环境准备
0.1 项目核心逻辑
企业级智能体(Agent)与普通聊天机器人的核心差异在于结构化的知识沉淀与可追溯的逻辑推理。本项目通过「非结构化文本→结构化三元组→知识图谱存储→Agent 智能查询」的闭环,解决大模型 “幻觉” 问题,实现基于真实业务数据的精准推理。
核心技术栈:
- LangChain:大模型编排与 Agent 框架,负责文本解析、工具调用、智能决策
- Neo4j:业界主流的图数据库,存储实体 - 关系 - 属性结构化数据
- GPT-4 Turbo:高精度实体 / 关系提取,保证图谱构建的准确性
- Python-dotenv/Pydantic:环境配置与数据校验,保障生产级稳定性
0.2 完整文件结构(含详细作用说明)
knowledge-agent-project/
├── data/ # 数据目录
│ ├── company_policy.txt # 原始非结构化数据源(企业员工/项目规则)
│ └── raw_docs/ # 批量处理时的多文件目录(支持txt/pdf)
├── src/ # 核心代码目录
│ ├── __init__.py # 包初始化,标记Python包
│ ├── config.py # 全局配置(数据库、模型、路径)
│ ├── schema.py # 【核心】图谱本体(节点/关系定义+数据校验)
│ ├── graph_builder.py # 【核心】文本→三元组→Neo4j入库全流程
│ ├── retrieval.py # 图谱查询工具(自然语言→Cypher→结果返回)
│ ├── main_agent.py # 面向用户的交互Agent(集成图谱查询工具)
│ └── utils.py # 通用工具(日志、文本分块、数据去重)
├── logs/ # 运行日志目录(自动生成)
├── .env # 环境变量(敏感信息,不提交代码库)
├── requirements.txt # 依赖清单(指定版本,保证环境一致性)
└── README.md # 项目说明(部署/运行/调试指南)
0.2.2 核心模块深度解析 (The Components)
我们按照数据流动的方向,将核心代码分为三个逻辑层次:
1. 立法与规范层 (The Lawmaker)
src/schema.py
- 角色:图谱的“宪法”与“校验器”。
- 深度解析:这是项目中最关键的文件之一。它不包含业务逻辑,但定义了 Agent 的世界观。
- 本体定义:明确规定了系统里只能有
Employee、Department等节点,防止 LLM 自由发挥产生幻觉。 - Pydantic 校验:利用 Python 的类型系统,在数据入库前进行严格清洗。例如,强制
Employee必须有name属性,否则拒绝入库,保证了图谱数据的“工业级”质量。
- 本体定义:明确规定了系统里只能有
- 协作:被
graph_builder.py调用以约束提取结果,被retrieval.py读取以生成准确的 Cypher 语句。
2. 构建与存储层 (The Builder)
src/graph_builder.py
- 角色:数据 ETL 引擎。
- 深度解析:实现了从“非结构化文本”到“结构化图谱”的完整流水线。
- Chunking:调用
utils.py将长文本切分为适合 LLM 处理的片段。 - Extraction:利用
schema.py定义的 Prompt,指挥 LLM 提取实体和关系。 - Deduping:并在入库前执行去重逻辑(例如:合并多个文档中提到的同一个 “Alice”)。
- Chunking:调用
- 关键产出:将清洗后的三元组写入 Neo4j 数据库。
data/ 目录
- 角色:知识的原材料。
- 深度解析:这里存放的是原始的非结构化数据(如企业手册、SOP)。Agent 最终回答问题的依据全部来源于此,但需经过图谱化处理。
3. 推理与交互层 (The Brain)
src/retrieval.py
- 角色:语义翻译官 (Semantic Translator)。
- 深度解析:连接 LLM 与 Graph Database 的桥梁。
- 它不直接存储答案,而是存储“查询能力”。它接收用户的自然语言(“谁是 Alice 的经理?”),动态编写 Cypher 代码(
MATCH (e:Employee {name:'Alice'})<-[:MANAGES]-(m) RETURN m),并执行查询。 - 内置了缓存机制 (TTL Cache),对相同问题直接返回结果,大幅降低 Token 消耗和延迟。
- 它不直接存储答案,而是存储“查询能力”。它接收用户的自然语言(“谁是 Alice 的经理?”),动态编写 Cypher 代码(
src/main_agent.py
- 角色:总控大脑 (Orchestrator)。
- 深度解析:这是面向用户的最终交付物。
- 工具调度:它决定何时查图谱(调用
retrieval.py),何时进行通用闲聊,何时调用计算器。 - 记忆管理:利用
ConversationBufferMemory维护上下文,让 Agent 能理解多轮对话中的指代(如“她属于哪个部门”)。 - System Prompt:在此处注入“人设”,约束 Agent 不回答图谱之外的无关信息。
- 工具调度:它决定何时查图谱(调用
4. 基础设施层 (Infrastructure)
.env & config.py
- 角色:神经中枢与密钥管理。
- 深度解析:
.env负责隔离敏感信息(OpenAI API Key, Database Password),确保代码开源时的安全性。config.py将环境变量转化为全局单例配置,供各模块调用,便于在开发/测试/生产环境间无缝切换。
src/utils.py & logs/
- 角色:调试与监控系统。
- 深度解析:在 Agent 开发中,能够看到 LLM 的“思考过程”至关重要。
utils.py封装了loguru,将 Agent 的每一次工具调用、每一次 Cypher 生成错误都记录在logs/中,是排查“Agent 为什么答错”的唯一依据。
0.2.3 这些文件是如何协作的?(System Workflow)
本系统的核心协作逻辑遵循:“Schema 定义世界,Builder 构建世界,Agent 探索世界”。
1. 系统协作全景图 (The Pipeline)
我们将系统分为两个平行的时空:离线构建时(左)与在线交互时(右),它们通过数据库和 Schema 紧密连接。
[时空一:离线构建阶段 (Graph Construction)] [时空二:在线交互阶段 (Agent Interaction)]
(一次性或定期运行) (实时响应用户请求)
│ │
▼ ▼
┌─────────────────────────────────────┐ ┌─────────────────────────────────────┐
│ 1. 数据摄入 (Ingestion) │ │ 1. 意图识别 (Intent Recognition) │
│ ├── 输入: data/company_policy.txt │ │ ├── 用户: "Alice的经理是谁?" │
│ ├── 动作: 读取 & 清洗 │ │ ├── 大脑: src/main_agent.py │
│ └── 工具: src/utils.py (分块) │ │ └── 决策: 需要查库,调用检索工具 │
└──────────────────┬──────────────────┘ └──────────────────┬──────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────┐ ┌─────────────────────────────────────┐
│ 2. 知识提取 (Extraction) │ │ 2. 语义翻译 (Translation) │
│ ├── 核心: src/graph_builder.py │ │ ├── 工具: src/retrieval.py │
│ ├── 依据: src/schema.py (本体约束) │◄─────────┤ 依据: src/schema.py (读取定义) │
│ │ (LLM仅提取定义好的实体类型) │ │ │ (确保生成的Cypher符合库表结构) │
│ └── 输出: 实体节点 + 关系边 │ │ └── 输出: MATCH (n)-[:MANAGES]... │
└──────────────────┬──────────────────┘ └──────────────────┬──────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────┐ ┌─────────────────────────────────────┐
│ 3. 图谱存储 (Storage) │ │ 3. 数据检索 (Execution) │
│ ├── 仓库: Neo4j Database │ │ ├── 动作: 执行Cypher查询 │
│ ├── 凭证: .env (账号密码) │─────────►│ 结果: "Bob" (结构化数据) │
│ └── 状态: 形成完整的企业知识网络 │ │ └── 优化: 存入 Cache (避免重复查) │
└─────────────────────────────────────┘ └──────────────────┬──────────────────┘
│
▼
┌─────────────────────────────────────┐
│ 4. 答案合成 (Synthesis) │
│ ├── 输入: 原始数据 "Bob" │
│ ├── 大脑: src/main_agent.py │
│ └── 输出: "Alice的经理是Bob。" │
└─────────────────────────────────────┘
2. 协作细节深度解析 (Deep Dive)
这套系统之所以能运转,是因为各文件之间存在严格的“契约”关系。
A. 宪法与执行者的协作 (Schema & Builder/Retrieval)
这是系统中最关键的协作。
- 协作逻辑:
- 写入时 (
builder调用schema):graph_builder.py在提取信息时,会把schema.py中的 Pydantic 模型作为 Prompt 发送给 GPT-4。这就像是给了 LLM 一张“填空题试卷”,强迫它只能填Employee或Project,而不能填“好员工”这种非标准词。 - 读取时 (
retrieval调用schema):retrieval.py在生成查询语句时,必须先读取schema.py。因为它需要知道数据库里到底是叫name还是user_name,是BELONGS_TO还是IN_DEPT。schema.py保证了“存进去的”和“查出来的”是一套语言。
- 写入时 (
B. 大脑与手臂的协作 (Main Agent & Tools)
- 协作逻辑:
- 任务分发:
main_agent.py是唯一的决策者。它不知道怎么写 Cypher 语句,但它知道“一旦用户问了人事关系,就找EnterpriseKnowledgeBase工具”。 - 黑盒调用:
retrieval.py封装了所有的复杂性(Cypher 生成、报错重试、格式化)。对 Agent 来说,它只是传入一个字符串问题,得到一个字符串答案。这种解耦使得 Agent 可以轻松扩展其他工具(如计算器、联网搜索),而无需修改核心逻辑。
- 任务分发:
C. 神经系统与基础设施 (Env/Config/Logs)
- 协作逻辑:
- 全链路追踪:从
builder开始处理文档,到agent回答用户问题,所有模块都会调用utils.py中的日志记录器。这意味着你可以在logs/文件夹中看到一条完整的证据链:10:00:01Builder 提取了 “Alice-WORKS_ON->Apollo”10:05:00User 问 “Alice做什么项目?”10:05:02Agent 生成 Cypher 查询 Apollo10:05:05Agent 回答 “Apollo”
- 这种协作机制是企业级项目排查“幻觉”问题的根本保障。
- 全链路追踪:从
3. 总结 (The Analogy)
如果把这个 Agent 比作一家**“情报公司”**:
schema.py是《员工手册》:规定了公司里只能用什么术语,防止沟通障碍。data/&graph_builder.py是“档案员”:负责每天阅读大量文件,按手册规定整理成卡片存入档案室(Neo4j)。retrieval.py是“档案检索员”:他懂复杂的档案索引语言(Cypher),能快速找到卡片。main_agent.py是“接待专员”:他坐在前台面对客户(用户),听懂客户需求,指挥检索员干活,最后把检索员给的生硬数据(JSON)翻译成优美的客套话(自然语言)反馈给客户。
0.3 环境配置
0.3.1 依赖清单(requirements.txt,指定稳定版本)
langchain==0.2.14
langchain-openai==0.1.19
langchain-community==0.2.14
langchain-experimental==0.0.64
neo4j==5.20.0
python-dotenv==1.0.1
pydantic==2.7.4
python-multipart==0.0.9
pdfplumber==0.11.4 # 扩展:支持PDF文档解析
loguru==0.7.2 # 日志工具
cachetools==5.3.3 # 扩展:查询缓存
0.3.2 环境变量(.env,需手动创建)
# OpenAI 配置
OPENAI_API_KEY=your_openai_api_key # 替换为实际API Key
OPENAI_BASE_URL=https://api.openai.com/v1 # 国内可配置代理地址
# Neo4j 配置
NEO4J_URI=bolt://localhost:7687 # Neo4j默认端口
NEO4J_USERNAME=neo4j # 默认用户名
NEO4J_PASSWORD=your_neo4j_password # 安装Neo4j时设置的密码
# 项目配置
LOG_LEVEL=INFO # 日志级别:DEBUG/INFO/WARNING/ERROR
CACHE_TTL=300 # 缓存有效期(秒)
MAX_TEXT_CHUNK_SIZE=1000 # 文本分块大小(避免LLM上下文超限)
0.3.3 基础环境准备
-
Neo4j 安装
:
- 下载 Neo4j Desktop(免费版):https://neo4j.com/download/
- 安装后创建新数据库(版本推荐 5.x),设置密码,启动数据库(默认端口 7687)。
-
Python 环境
:
运行
# 创建虚拟环境(可选但推荐) conda create -n kg-agent python=3.10 conda activate kg-agent # 安装依赖 pip install -r requirements.txt
第一步:定义 “世界观”—— 本体设计 (Schema Design)
1.1 本体设计的核心原则
企业级知识图谱的本体(Ontology)是 “规则手册”,需满足:
| 原则 | 说明 |
|---|---|
| 业务对齐 | 节点 / 关系必须匹配实际业务场景(如 “员工 - 属于 - 部门” 对应企业组织架构) |
| 最小够用 | 避免过度设计(如无需定义 “员工 - 使用 - 电脑” 这类非核心关系) |
| 可扩展 | 预留扩展字段(如节点增加create_time属性,便于数据溯源) |
| 标准化 | 实体 / 关系名称统一(如 “部门” 统一用 Department,避免混用 Dept) |
1.2 本体定义与数据校验(src/schema.py)
不仅定义节点 / 关系类型,还通过 Pydantic 实现数据校验,避免脏数据入库:
运行
# src/schema.py
from pydantic import BaseModel, Field, validator
from typing import Dict, List, Optional
import re
# ===================== 基础定义 =====================
# 节点标签(英文为Neo4j存储值,中文为展示值)
NODE_LABELS: Dict[str, str] = {
"Employee": "员工",
"Department": "部门",
"Project": "项目",
"Skill": "技能"
}
# 关系类型(英文为Neo4j存储值,中文为展示值)
REL_TYPES: Dict[str, str] = {
"BELONGS_TO": "属于", # Employee -> Department
"WORKS_ON": "参与", # Employee -> Project
"HAS_SKILL": "拥有技能", # Employee -> Skill
"MANAGES": "管理" # Employee -> Employee
}
# 节点属性约束(保证数据规范性)
NODE_PROPERTIES = {
"Employee": ["name", "position", "create_time"], # 必选+可选属性
"Department": ["name", "location"],
"Project": ["name", "start_date", "leader"],
"Skill": ["name", "level"] # level: 初级/中级/高级
}
# ===================== 数据校验模型 =====================
class KGNode(BaseModel):
"""知识图谱节点校验模型"""
label: str = Field(..., description="节点类型(如Employee)")
properties: Dict[str, str] = Field(..., description="节点属性")
@validator("label")
def validate_label(cls, v):
"""校验节点类型是否在预定义列表中"""
if v not in NODE_LABELS.keys():
raise ValueError(f"节点类型非法,仅支持:{list(NODE_LABELS.keys())}")
return v
@validator("properties")
def validate_properties(cls, v, values):
"""校验节点属性是否符合约束"""
label = values.get("label")
if not label:
return v
required_props = NODE_PROPERTIES[label]
# 检查必选属性(第一个属性为必选)
if required_props[0] not in v:
raise ValueError(f"{label}节点缺少必选属性:{required_props[0]}")
# 检查属性值格式(如名称不能包含特殊字符)
for prop_name, prop_value in v.items():
if not re.match(r'^[\w\s\u4e00-\u9fa5]+$', prop_value):
raise ValueError(f"属性{prop_name}值非法,仅支持中英文、数字、空格")
return v
class KGRelationship(BaseModel):
"""知识图谱关系校验模型"""
source: KGNode = Field(..., description="起始节点")
target: KGNode = Field(..., description="目标节点")
type: str = Field(..., description="关系类型(如BELONGS_TO)")
properties: Optional[Dict[str, str]] = Field({}, description="关系属性(如创建时间)")
@validator("type")
def validate_rel_type(cls, v):
"""校验关系类型是否在预定义列表中"""
if v not in REL_TYPES.keys():
raise ValueError(f"关系类型非法,仅支持:{list(REL_TYPES.keys())}")
return v
# ===================== Prompt约束模板 =====================
def get_kg_extraction_prompt() -> str:
"""生成实体/关系提取的Prompt模板(约束LLM输出符合本体)"""
prompt = f"""
你是一个专业的知识图谱构建助手,需要从文本中提取实体和关系,严格遵循以下规则:
1. 实体类型仅允许:{[f"{k}({v})" for k, v in NODE_LABELS.items()]}
2. 关系类型仅允许:{[f"{k}({v})" for k, v in REL_TYPES.items()]}
3. 每个实体必须包含必选属性:
- Employee: name(必选)、position(可选)
- Department: name(必选)、location(可选)
- Project: name(必选)、start_date(可选)、leader(可选)
- Skill: name(必选)、level(可选)
4. 输出格式要求(JSON):
{{
"nodes": [{{"label": "Employee", "properties": {{"name": "Alice", "position": "Senior Engineer"}}}}],
"relationships": [{{
"source": {{"label": "Employee", "properties": {{"name": "Alice"}}}},
"target": {{"label": "Skill", "properties": {{"name": "Python"}}}},
"type": "HAS_SKILL",
"properties": {{"create_time": "2024-08-01"}}
}}]
}}
请严格按照上述格式输出,不要添加额外解释,仅返回JSON字符串。
"""
return prompt
1.3 本体设计的可视化与验证
- 可视化工具:Neo4j Browser(内置)、Linkurious(企业级)、PyVis(Python 可视化)
- 验证方法:编写单元测试校验本体约束(如测试非法节点类型是否被拦截):
运行
# src/schema.py 末尾添加单元测试
if __name__ == "__main__":
# 测试合法节点
valid_node = KGNode(label="Employee", properties={"name": "Alice", "position": "Senior Engineer"})
print("合法节点校验通过:", valid_node)
# 测试非法节点(缺少必选属性)
try:
invalid_node = KGNode(label="Employee", properties={"position": "Senior Engineer"})
except ValueError as e:
print("非法节点校验触发错误(预期):", e)
# 测试非法关系类型
try:
invalid_rel = KGRelationship(
source=valid_node,
target=KGNode(label="Skill", properties={"name": "Python"}),
type="INVALID_REL"
)
except ValueError as e:
print("非法关系校验触发错误(预期):", e)
第二步:构建 “图谱构建者”—— 提取与入库 (The Builder)
2.1 核心工具说明
| 工具 | 作用 | 关键参数 |
|---|---|---|
| LLMGraphTransformer | 文本→图谱数据转换核心,基于 LLM 提取实体 / 关系 | allowed_nodes/allowed_relationships(约束提取范围) |
| Neo4jGraph | LangChain 与 Neo4j 的连接器 | url/username/password(数据库连接信息) |
| Document | LangChain 文本封装对象 | page_content(文本内容)、metadata(元数据) |
2.1.1 核心工具详情
1. LLMGraphTransformer (提取引擎)
-
定义:这是图谱构建的“大脑”。它利用大语言模型(LLM)的推理能力,将非结构化文本转化为符合图谱 Schema 的结构化数据(
GraphDocument对象)。 -
来源库:
langchain_experimental.graph_transformers -
关键参数详解:
llm(BaseLanguageModel): 必须是一个支持结构化输出或指令遵循能力强的模型(推荐 GPT-4 或 Qwen2.5-72B)。allowed_nodes(List[str]): Schema 强约束。告诉 LLM 只能提取这些类型的节点(如["Employee", "Department"]),防止提取出无关实体。allowed_relationships(List[str]): Schema 强约束。告诉 LLM 只能提取这些类型的关系(如["WORKS_ON", "MANAGES"])。strict_mode(bool): 是否严格过滤掉不在 allowed 列表中的内容(建议True)。
-
代码实战:
from langchain_experimental.graph_transformers import LLMGraphTransformer # 初始化转换器 llm_transformer = LLMGraphTransformer( llm=chat_model, # 传入初始化好的 ChatOpenAI 对象 allowed_nodes=["Employee", "Department"], allowed_relationships=["BELONGS_TO"], strict_mode=True # 【生产建议】开启严格模式,减少幻觉 ) # 使用方法:转换 Document 列表 # docs 是 Document 对象列表,graph_docs 是包含节点/关系的结构化对象 graph_docs = llm_transformer.convert_to_graph_documents(docs)
2. Neo4jGraph (存储连接器)
-
定义:LangChain 与 Neo4j 数据库之间的“通信桥梁”。它既负责将结构化数据写入数据库,也负责将自然语言查询转换为 Cypher 并执行。
-
来源库:
langchain_community.graphs -
关键参数详解:
url(str): 数据库连接地址,必须使用 Bolt 协议(如bolt://localhost:7687)。username/password(str): 数据库认证信息。refresh_schema(bool): 初始化时是否自动拉取数据库 Schema(生产环境建议设为False以手动控制,避免启动慢)。
-
代码实战:
from langchain_community.graphs import Neo4jGraph # 建立连接 graph = Neo4jGraph( url="bolt://localhost:7687", username="neo4j", password="password", refresh_schema=False ) # 方法 A:刷新 Schema(当数据库结构变更后必须调用,否则 LLM 不知道库里有什么) graph.refresh_schema() print(graph.schema) # 方法 B:写入数据 graph.add_graph_documents(graph_docs)
3. Document (数据载体)
-
定义:LangChain 中最基础的文本单元。所有的文本处理(分块、向量化、图谱提取)都以
Document对象为单位流转。 -
来源库:
langchain_core.documents -
关键参数详解:
page_content(str): 核心文本。这是 LLM 读取并进行分析的实际内容。metadata(dict): 数据血缘。用于存储原文来源、页码、分块 ID、创建时间等。这在图谱构建中至关重要,因为它可以作为属性写入 Neo4j,实现“答案溯源”。
-
代码实战:
from langchain_core.documents import Document from datetime import datetime # 手动创建一个文档对象 doc = Document( page_content="Alice works at AI_Lab.", metadata={ "source": "company_policy.txt", "chunk_id": 101, "created_at": datetime.now().isoformat() } ) # 在图谱中,metadata 会被转换为节点/关系的属性(需配置)
2.1.2 工具协作数据流 (Data Flow Pipeline)
理解这三个工具如何串联,是掌握代码逻辑的关键:
代码段
graph LR
A[原始文本 String] -->|封装 & 分块| B(Document 对象)
B -->|作为输入| C{LLMGraphTransformer}
C -->|调用 LLM 提取| D[GraphDocument 对象]
D -->|包含 Nodes/Relationships| E[Neo4jGraph 连接器]
E -->|执行 Cypher 写入| F[(Neo4j 数据库)]
- 封装:原始文本被切分并封装进
Document,携带 metadata。 - 提取:
LLMGraphTransformer读取Document.page_content,根据 Schema 提取实体。 - 转换:生成
GraphDocument,这是一种中间格式,既包含图数据,也保留了源Document的 metadata。 - 入库:
Neo4jGraph接收GraphDocument,自动生成CREATE或MERGE语句写入数据库。
2.2 完整图谱构建代码(src/graph_builder.py)
包含文本分块、数据去重、日志配置、错误处理、批量入库等生产级特性:
运行
# src/graph_builder.py
import os
import json
import time
from datetime import datetime
from typing import List, Optional
from loguru import logger
from langchain_core.documents import Document
from langchain_openai import ChatOpenAI
from langchain_community.graphs import Neo4jGraph
from langchain_experimental.graph_transformers import LLMGraphTransformer
from langchain.text_splitter import RecursiveCharacterTextSplitter
from dotenv import load_dotenv
from schema import KGNode, KGRelationship, get_kg_extraction_prompt, NODE_LABELS, REL_TYPES
from utils import init_logger # 后续补充utils.py
# ===================== 初始化配置 =====================
load_dotenv()
# 初始化日志
init_logger(log_level=os.getenv("LOG_LEVEL", "INFO"))
# 初始化Neo4j连接
graph = Neo4jGraph(
url=os.getenv("NEO4J_URI"),
username=os.getenv("NEO4J_USERNAME"),
password=os.getenv("NEO4J_PASSWORD")
)
# 初始化LLM(指定版本,关闭随机性保证结果稳定)
llm = ChatOpenAI(
temperature=0,
model="gpt-4-turbo",
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL")
)
# ===================== 文本预处理 =====================
def split_text_into_chunks(text: str, chunk_size: int = None, chunk_overlap: int = 50) -> List[Document]:
"""
文本分块处理(避免LLM上下文超限)
:param text: 原始文本
:param chunk_size: 分块大小(默认从环境变量读取)
:param chunk_overlap: 分块重叠长度(保证上下文连贯)
:return: 分块后的Document列表
"""
chunk_size = chunk_size or int(os.getenv("MAX_TEXT_CHUNK_SIZE", 1000))
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ".", ",", "。"] # 中文文本分隔符
)
chunks = text_splitter.split_text(text)
documents = [Document(page_content=chunk, metadata={"chunk_id": i, "create_time": datetime.now().isoformat()})
for i, chunk in enumerate(chunks)]
logger.info(f"文本分块完成,共生成{len(documents)}个块")
return documents
# ===================== 图谱转换器配置 =====================
def init_llm_graph_transformer() -> LLMGraphTransformer:
"""初始化图谱转换器(自定义Prompt,约束提取规则)"""
# 自定义Prompt(覆盖默认Prompt,确保符合本体约束)
extraction_prompt = get_kg_extraction_prompt()
llm_transformer = LLMGraphTransformer(
llm=llm,
allowed_nodes=list(NODE_LABELS.keys()),
allowed_relationships=list(REL_TYPES.keys()),
node_properties=NODE_PROPERTIES,
# 自定义Prompt模板
prompt=extraction_prompt
)
return llm_transformer
# ===================== 数据去重 =====================
def deduplicate_graph_data(graph_documents) -> List:
"""
去重图谱数据(避免重复节点/关系)
:param graph_documents: LLMGraphTransformer输出的图谱文档
:return: 去重后的图谱文档
"""
seen_nodes = set()
seen_rels = set()
deduped_nodes = []
deduped_rels = []
for doc in graph_documents:
# 节点去重(按label+name唯一标识)
for node in doc.nodes:
node_key = (node.label, node.properties.get("name"))
if node_key not in seen_nodes and node.properties.get("name"):
seen_nodes.add(node_key)
deduped_nodes.append(node)
# 关系去重(按源节点+关系类型+目标节点)
for rel in doc.relationships:
rel_key = (
rel.source.properties.get("name"),
rel.type,
rel.target.properties.get("name")
)
if rel_key not in seen_rels and all(rel_key):
seen_rels.add(rel_key)
deduped_rels.append(rel)
# 重新构建图谱文档
doc.nodes = deduped_nodes
doc.relationships = deduped_rels
logger.info(f"数据去重完成:节点保留{len(deduped_nodes)}个,关系保留{len(deduped_rels)}条")
return [doc]
# ===================== 核心入库函数 =====================
def build_graph_from_text(text_content: str, batch_size: int = 5) -> None:
"""
从文本构建知识图谱(主函数)
:param text_content: 原始文本
:param batch_size: 批量入库大小(避免单次入库过多)
"""
try:
logger.info(">>> 开始处理文本,提取知识三元组 <<<")
# 1. 文本分块
documents = split_text_into_chunks(text_content)
# 2. 初始化图谱转换器
llm_transformer = init_llm_graph_transformer()
# 3. 批量转换文本为图谱数据
all_graph_docs = []
for i in range(0, len(documents), batch_size):
batch_docs = documents[i:i+batch_size]
logger.info(f"处理第{i//batch_size + 1}批文本块(共{len(batch_docs)}个)")
graph_docs = llm_transformer.convert_to_graph_documents(batch_docs)
all_graph_docs.extend(graph_docs)
time.sleep(1) # 避免LLM API请求超限
# 4. 数据去重
deduped_graph_docs = deduplicate_graph_data(all_graph_docs)
# 5. 数据校验(基于Pydantic模型)
validated_nodes = []
validated_rels = []
for doc in deduped_graph_docs:
# 校验节点
for node in doc.nodes:
try:
validated_node = KGNode(label=node.label, properties=node.properties)
validated_nodes.append(validated_node)
except ValueError as e:
logger.warning(f"节点校验失败,跳过:{node},错误:{e}")
# 校验关系
for rel in doc.relationships:
try:
validated_rel = KGRelationship(
source=KGNode(label=rel.source.label, properties=rel.source.properties),
target=KGNode(label=rel.target.label, properties=rel.target.properties),
type=rel.type,
properties={"create_time": datetime.now().isoformat()}
)
validated_rels.append(validated_rel)
except ValueError as e:
logger.warning(f"关系校验失败,跳过:{rel},错误:{e}")
# 6. 批量入库Neo4j
if validated_nodes and validated_rels:
# 重新封装为LangChain GraphDocument格式
from langchain_experimental.graph_transformers.base import GraphDocument
final_graph_doc = GraphDocument(
nodes=[n.dict() for n in validated_nodes],
relationships=[r.dict() for r in validated_rels],
source=Document(page_content=text_content[:100] + "...") # 源文本摘要
)
# 入库
graph.add_graph_documents([final_graph_doc])
logger.info(f">>> 图谱入库完成:共写入{len(validated_nodes)}个节点,{len(validated_rels)}条关系 <<<")
else:
logger.warning(">>> 无有效图谱数据,跳过入库 <<<")
except Exception as e:
logger.error(f"图谱构建失败:{str(e)}", exc_info=True)
raise
# ===================== 批量处理文件 =====================
def build_graph_from_file(file_path: str) -> None:
"""
从文件构建知识图谱(支持txt/pdf)
:param file_path: 文件路径
"""
# 读取文本(扩展:支持PDF解析)
if file_path.endswith(".txt"):
with open(file_path, "r", encoding="utf-8") as f:
text_content = f.read()
elif file_path.endswith(".pdf"):
import pdfplumber
with pdfplumber.open(file_path) as pdf:
text_content = "\n".join([page.extract_text() for page in pdf.pages if page.extract_text()])
else:
raise ValueError("仅支持txt/pdf格式文件")
logger.info(f"读取文件完成:{file_path},文本长度:{len(text_content)}字符")
build_graph_from_text(text_content)
# ===================== 测试运行 =====================
if __name__ == "__main__":
# 模拟企业数据
sample_text = """
Alice works as a Senior Engineer at the AI_Lab department, located in Beijing.
She is skilled in Python (高级) and DeepLearning (中级).
Bob manages Alice. Bob is the leader of the Apollo Project, which started on 2024-01-01.
The Apollo Project requires DeepLearning skills and is part of the AI_Lab department.
Charlie is a Junior Engineer in AI_Lab, skilled in Java (初级) and works on the Apollo Project.
"""
# 运行图谱构建
build_graph_from_text(sample_text)
# 扩展:从文件构建
# build_graph_from_file("data/company_policy.txt")
2.3 通用工具模块(src/utils.py)
补充日志初始化、数据格式转换等通用功能:
运行
# src/utils.py
import os
import logging
from loguru import logger
from datetime import datetime
def init_logger(log_level: str = "INFO", log_dir: str = "logs") -> None:
"""
初始化日志配置(输出到文件+控制台)
:param log_level: 日志级别
:param log_dir: 日志目录
"""
# 创建日志目录
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 日志文件名
log_file = os.path.join(log_dir, f"kg_builder_{datetime.now().strftime('%Y%m%d')}.log")
# 配置Loguru
logger.remove() # 清除默认配置
# 添加控制台输出
logger.add(
sink=lambda msg: print(msg, end=""),
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}"
)
# 添加文件输出
logger.add(
sink=log_file,
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {file} | {line} | {message}",
rotation="1 day", # 按天分割日志
retention="7 days", # 保留7天日志
encoding="utf-8"
)
logger.info("日志配置初始化完成")
def neo4j_cleanup() -> None:
"""清空Neo4j图谱数据(测试用)"""
from langchain_community.graphs import Neo4jGraph
from dotenv import load_dotenv
load_dotenv()
graph = Neo4jGraph(
url=os.getenv("NEO4J_URI"),
username=os.getenv("NEO4J_USERNAME"),
password=os.getenv("NEO4J_PASSWORD")
)
graph.query("MATCH (n) DETACH DELETE n")
logger.info("Neo4j数据已清空")
2.4 运行与验证
2.4.1 运行命令
运行
# 激活虚拟环境
conda activate kg-agent
# 运行图谱构建脚本
python src/graph_builder.py
2.4.2 Neo4j 验证
打开 Neo4j Browser(http://localhost:7474/),执行 Cypher 查询查看数据:
# 查询所有员工节点
MATCH (e:Employee) RETURN e.name, e.position LIMIT 10;
# 查询Alice的所有关系
MATCH (a:Employee {name: "Alice"})-[r]->(n) RETURN a.name, r.type, n.name;
2.4.3 常见问题
| 问题 | 解决方案 |
|---|---|
| LLM API 调用失败 | 检查 OPENAI_API_KEY 是否正确,国内需配置代理地址 |
| Neo4j 连接超时 | 确认 Neo4j 数据库已启动,URI / 用户名 / 密码正确 |
| 实体提取不全 | 减小文本分块大小,优化 Prompt 约束,更换更高精度 LLM |
第三步:赋予 Agent “查询大脑”—— 检索工具 (Retrieval Tool)
3.1 核心能力设计
企业级检索工具需满足:
- 自然语言→Cypher 自动转换(无需用户懂图查询语言)
- 查询缓存(避免重复计算,提升响应速度)
- 异常处理(Cypher 语法错误、查询超时)
- 结果格式化(返回自然语言答案,而非原始数据)
3.2 完整检索工具代码(src/retrieval.py)
运行
# src/retrieval.py
import os
import time
from typing import Optional
from loguru import logger
from cachetools import TTLCache
from langchain.chains import GraphCypherQAChain
from langchain_openai import ChatOpenAI
from langchain_community.graphs import Neo4jGraph
from langchain.prompts import PromptTemplate
from dotenv import load_dotenv
from utils import init_logger
# ===================== 初始化配置 =====================
load_dotenv()
init_logger(log_level=os.getenv("LOG_LEVEL", "INFO"))
# 初始化Neo4j连接
graph = Neo4jGraph(
url=os.getenv("NEO4J_URI"),
username=os.getenv("NEO4J_USERNAME"),
password=os.getenv("NEO4J_PASSWORD")
)
# 初始化LLM
llm = ChatOpenAI(
temperature=0,
model="gpt-4-turbo",
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL")
)
# ===================== 查询缓存配置 =====================
# 缓存配置:key=用户问题,value=查询结果,TTL=300秒(5分钟)
QUERY_CACHE = TTLCache(
maxsize=100, # 最大缓存数量
ttl=int(os.getenv("CACHE_TTL", 300)) # 缓存有效期
)
# ===================== Cypher生成Prompt优化 =====================
# 自定义Cypher生成Prompt(约束查询规则,避免危险操作)
CYPHER_GENERATION_PROMPT = PromptTemplate(
input_variables=["schema", "question"],
template="""
你是一个Neo4j Cypher查询专家,需要根据知识图谱schema和用户问题生成合法的Cypher语句,严格遵循以下规则:
1. Schema信息:
{schema}
2. 生成规则:
- 仅允许查询(MATCH/RETURN),禁止删除/修改(DELETE/SET)等写操作
- 限制返回结果数量(最多10条),使用LIMIT子句
- 节点属性仅使用name/position/location/start_date/leader/level
- 关系类型仅使用BELONGS_TO/WORKS_ON/HAS_SKILL/MANAGES
3. 示例:
用户问题:Who manages Alice?
Cypher语句:MATCH (m:Employee)-[:MANAGES]->(e:Employee {name: "Alice"}) RETURN m.name LIMIT 10;
用户问题:{question}
请仅返回Cypher语句,不要添加任何解释。
"""
)
# ===================== 核心查询函数 =====================
def query_knowledge_graph(question: str, use_cache: bool = True) -> str:
"""
知识图谱查询核心函数(Agent调用的工具)
:param question: 用户自然语言问题
:param use_cache: 是否使用缓存
:return: 自然语言答案
"""
# 1. 缓存检查
if use_cache and question in QUERY_CACHE:
logger.info(f"命中缓存,直接返回结果:{question[:20]}...")
return QUERY_CACHE[question]
try:
logger.info(f"开始处理查询:{question}")
# 2. 初始化Cypher QA链
graph_chain = GraphCypherQAChain.from_llm(
llm=llm,
graph=graph,
verbose=True, # 打印Cypher生成过程(调试用)
allow_dangerous_requests=True, # 企业内网允许执行
cypher_prompt=CYPHER_GENERATION_PROMPT,
# 配置查询超时(避免长时间阻塞)
cypher_kwargs={"timeout": 10},
# 结果格式化(返回自然语言)
return_intermediate_steps=True # 保留中间步骤(调试用)
)
# 3. 执行查询
start_time = time.time()
response = graph_chain.invoke({"query": question})
end_time = time.time()
logger.info(f"查询耗时:{end_time - start_time:.2f}秒")
# 4. 结果处理(提取答案,格式化)
raw_result = response.get("result", "未找到相关信息")
# 补充中间步骤日志(调试)
intermediate_steps = response.get("intermediate_steps", [])
if intermediate_steps:
cypher_query = intermediate_steps[0].get("query", "")
logger.info(f"生成的Cypher语句:{cypher_query}")
# 5. 缓存结果
if use_cache:
QUERY_CACHE[question] = raw_result
return raw_result
except Exception as e:
error_msg = f"查询失败:{str(e)}"
logger.error(error_msg, exc_info=True)
return error_msg
# ===================== 批量查询测试 =====================
def batch_test_queries(queries: list) -> None:
"""批量测试查询"""
for q in queries:
logger.info(f"\n=== 测试查询:{q} ===")
result = query_knowledge_graph(q)
logger.info(f"查询结果:{result}")
# ===================== 测试运行 =====================
if __name__ == "__main__":
# 测试查询列表
test_queries = [
"Who manages the person that knows Python?",
"What skills does Alice have?",
"Which project is Charlie working on?",
"Where is the AI_Lab department located?"
]
batch_test_queries(test_queries)
3.3 检索工具增强技巧
3.3.1 Cypher 优化
-
限制返回条数(LIMIT),避免大数据量查询超时
-
为节点 name 属性创建索引,提升查询速度:
CREATE INDEX idx_employee_name FOR (e:Employee) ON (e.name); CREATE INDEX idx_department_name FOR (d:Department) ON (d.name); CREATE INDEX idx_project_name FOR (p:Project) ON (p.name); CREATE INDEX idx_skill_name FOR (s:Skill) ON (s.name);
3.3.2 多模态检索(RAG+KG 融合)
将知识图谱检索结果与文档检索结果融合,提升答案丰富度:
运行
# 扩展:RAG+KG融合示例
from langchain_community.document_loaders import TextLoader
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
def rag_kg_fusion_query(question: str) -> str:
"""RAG+KG融合查询"""
# 1. KG检索
kg_result = query_knowledge_graph(question)
# 2. 文档检索(RAG)
loader = TextLoader("data/company_policy.txt")
docs = loader.load()
embeddings = OpenAIEmbeddings()
vector_db = Chroma.from_documents(docs, embeddings)
rag_result = vector_db.similarity_search(question, k=1)[0].page_content
# 3. 结果融合
fusion_result = f"基于企业知识图谱:{kg_result}\n基于原始文档:{rag_result}"
return fusion_result
第四步:组装最终 Agent (The User-Facing Agent)
4.1 Agent 设计要点
| 设计维度 | 实现方案 |
|---|---|
| 工具管理 | 明确工具描述(让 Agent 正确选择),设置工具优先级 |
| 记忆管理 | 集成 ConversationBufferMemory,支持多轮对话 |
| Prompt 优化 | 定义 System Prompt,约束 Agent 行为逻辑 |
| 调试能力 | 开启 verbose 模式,打印 Agent 思考过程 |
4.2 完整 Agent 代码(src/main_agent.py)
运行
# src/main_agent.py
import os
from loguru import logger
from langchain.agents import initialize_agent, Tool, AgentType
from langchain.agents.memory import ConversationBufferMemory
from langchain_openai import ChatOpenAI
from langchain.prompts import SystemMessagePromptTemplate, PromptTemplate
from dotenv import load_dotenv
from retrieval import query_knowledge_graph
from utils import init_logger
# ===================== 初始化配置 =====================
load_dotenv()
init_logger(log_level=os.getenv("LOG_LEVEL", "INFO"))
# 初始化LLM
llm = ChatOpenAI(
temperature=0.1, # 少量随机性,提升回答自然度
model="gpt-4-turbo",
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL")
)
# ===================== 工具定义 =====================
def define_agent_tools() -> list:
"""定义Agent可用的工具集"""
# 1. 知识图谱查询工具(核心)
kg_tool = Tool(
name="EnterpriseKnowledgeBase",
func=query_knowledge_graph,
description="""
用于回答关于企业员工、部门、项目、技能之间关系的问题,例如:
- 谁管理懂Python的员工?
- Alice拥有哪些技能?
- Charlie参与了哪个项目?
- AI_Lab部门位于哪里?
只有遇到上述类型的问题时才调用此工具,其他问题无需调用。
"""
)
# 2. 扩展工具:计算器(示例)
def math_calculator(expression: str) -> str:
"""简单计算器工具"""
try:
result = eval(expression)
return f"计算结果:{result}"
except Exception as e:
return f"计算失败:{str(e)}"
math_tool = Tool(
name="MathCalculator",
func=math_calculator,
description="用于解决数学计算问题,例如:1+2*3,仅处理纯数学表达式"
)
return [kg_tool, math_tool]
# ===================== 记忆配置 =====================
def init_agent_memory() -> ConversationBufferMemory:
"""初始化Agent对话记忆(支持多轮交互)"""
memory = ConversationBufferMemory(
memory_key="chat_history", # 记忆变量名(与Prompt对应)
return_messages=True, # 返回消息对象(而非字符串)
output_key="output" # 输出变量名
)
return memory
# ===================== System Prompt 配置 =====================
def get_agent_system_prompt() -> SystemMessagePromptTemplate:
"""定义Agent的系统提示(约束行为逻辑)"""
system_prompt = PromptTemplate(
input_variables=[],
template="""
你是企业级智能助手,负责回答关于企业组织架构、员工技能、项目信息的问题,严格遵循以下规则:
1. 优先使用EnterpriseKnowledgeBase工具获取准确数据,不编造信息;
2. 只有数学计算问题才使用MathCalculator工具,其他问题禁止使用;
3. 回答简洁、准确,使用自然语言,避免技术术语(如Cypher);
4. 记住用户之前的问题和你的回答,保持对话连贯;
5. 如果工具返回错误或无结果,明确告知用户“未查询到相关信息”,不要猜测。
"""
)
return SystemMessagePromptTemplate(prompt=system_prompt)
# ===================== Agent 初始化 =====================
def init_enterprise_agent() -> object:
"""初始化企业级Agent"""
# 1. 定义工具
tools = define_agent_tools()
# 2. 初始化记忆
memory = init_agent_memory()
# 3. 初始化System Prompt
system_message = get_agent_system_prompt()
# 4. 初始化Agent
agent = initialize_agent(
tools=tools,
llm=llm,
agent=AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION,
memory=memory,
verbose=True, # 打印Agent思考过程(调试用)
max_iterations=5, # 最大迭代次数(避免无限循环)
handle_parsing_errors="请重试,无法解析工具调用结果",
system_message=system_message
)
logger.info("企业级Agent初始化完成")
return agent
# ===================== 交互函数 =====================
def agent_chat_interface(agent) -> None:
"""Agent交互界面(命令行)"""
print("=== 企业级智能助手(输入 'quit' 退出)===")
while True:
try:
user_input = input("\n用户:").strip()
if user_input.lower() == "quit":
print("助手:再见!")
logger.info("用户退出交互")
break
if not user_input:
print("助手:请输入有效问题!")
continue
logger.info(f"用户输入:{user_input}")
# 执行Agent推理
response = agent.run(user_input)
print(f"助手:{response}")
logger.info(f"Agent回答:{response}")
except KeyboardInterrupt:
print("\n助手:交互被中断,再见!")
break
except Exception as e:
error_msg = f"交互出错:{str(e)}"
print(f"助手:{error_msg}")
logger.error(error_msg, exc_info=True)
# ===================== 测试运行 =====================
if __name__ == "__main__":
# 初始化Agent
agent = init_enterprise_agent()
# 启动交互界面
agent_chat_interface(agent)
4.3 运行与交互示例
4.3.1 运行命令
运行
conda activate kg-agent
python src/main_agent.py
4.3.2 交互流程示例
=== 企业级智能助手(输入 'quit' 退出)===
用户:Who manages the person that knows Python?
> Entering new AgentExecutor chain...
思考:我需要调用EnterpriseKnowledgeBase工具来回答这个问题。
调用工具:EnterpriseKnowledgeBase
工具输入:Who manages the person that knows Python?
工具输出:Bob manages Alice, who knows Python.
思考:我现在可以回答用户的问题了。
> Finished chain.
助手:Bob manages Alice, who knows Python.
用户:What skills does she have?
> Entering new AgentExecutor chain...
思考:用户问的是“她有什么技能”,结合上下文,“她”指Alice,需要调用EnterpriseKnowledgeBase工具。
调用工具:EnterpriseKnowledgeBase
工具输入:What skills does Alice have?
工具输出:Alice has Python (高级) and DeepLearning (中级) skills.
思考:我现在可以回答用户的问题了。
> Finished chain.
助手:Alice has Python (高级) and DeepLearning (中级) skills.
用户:quit
助手:再见!
定制化指南:不同场景的图谱与 Agent 适配
5.1 法律合规 Agent (Legal Agent)
5.1.1 本体定制(src/schema.py 修改)
运行
# 法律场景节点/关系定义
NODE_LABELS = {
"Law": "法律",
"Clause": "条款",
"Case": "案例",
"Entity": "涉及主体"
}
REL_TYPES = {
"CONTRADICTS": "抵触",
"CITES": "引用",
"APPLIES_TO": "适用于"
}
NODE_PROPERTIES = {
"Law": ["name", "issuer", "issue_date"],
"Clause": ["number", "content", "law_name"],
"Case": ["case_id", "parties", "verdict"],
"Entity": ["name", "type"] # type: 企业/个人/政府
}
5.1.2 数据源与特殊处理
-
数据源:法律法规文本、裁判文书网案例、企业合规文档
-
特殊逻辑
:
- 条款引用关系校验(确保 CITES 关系准确)
- 抵触关系检测(需 LLM 对比条款内容)
- 人工审核环节(关键关系需人工确认后入库)
5.2 供应链管理 Agent (Supply Chain Agent)
5.2.1 本体定制(src/schema.py 修改)
运行
# 供应链场景节点/关系定义
NODE_LABELS = {
"Supplier": "供应商",
"Part": "零件",
"Product": "产品",
"Location": "地点"
}
REL_TYPES = {
"SUPPLIES": "供应",
"LOCATED_IN": "位于",
"DEPENDS_ON": "依赖"
}
# 关系属性扩展(添加交货周期、价格等)
REL_PROPERTIES = {
"SUPPLIES": ["lead_time", "price", "quality_level"]
}
5.2.2 特殊逻辑(src/graph_builder.py 修改)
运行
# 扩展关系属性提取
def extract_rel_properties(text: str, rel_type: str) -> dict:
"""提取供应链关系属性(如交货周期)"""
if rel_type == "SUPPLIES":
# 从文本中提取lead_time(交货周期)
import re
lead_time_match = re.search(r"lead_time=(\d+)days", text)
price_match = re.search(r"price=(\d+\.\d+)", text)
return {
"lead_time": lead_time_match.group(1) if lead_time_match else "",
"price": price_match.group(1) if price_match else "",
"create_time": datetime.now().isoformat()
}
return {"create_time": datetime.now().isoformat()}
5.3 医疗诊断 Agent (Medical Agent)
5.3.1 本体定制(src/schema.py 修改)
运行
# 医疗场景节点/关系定义
NODE_LABELS = {
"Symptom": "症状",
"Disease": "疾病",
"Drug": "药物",
"Protein": "蛋白质"
}
REL_TYPES = {
"CAUSES": "导致",
"TREATS": "治疗",
"INTERACTS_WITH": "相互作用"
}
5.3.2 特殊约束
- 数据源:仅使用 PubMed、药监局等权威数据源
- 准确性校验:基于医疗知识库验证提取的关系
- 权限控制:不同角色(医生 / 患者)看到不同的查询结果
生产级优化与部署
6.1 性能优化
| 优化方向 | 实现方案 |
|---|---|
| 速度 | 开启查询缓存、Neo4j 索引、LLM 请求批处理 |
| 准确性 | 优化 Prompt 约束、增加数据校验、人工审核关键数据 |
| 稳定性 | 增加重试机制(LLM/Neo4j 调用)、超时控制、异常捕获 |
1. 速度优化 (Speed): “天下武功,唯快不破”
优化目标:减少 LLM 等待时间,降低数据库查询延迟。
A. Neo4j 索引优化 (Database Indexing)
原理:默认情况下,Neo4j 查找节点是全表扫描。建立索引可将查询复杂度从 O(N) 降为 O(logN)。
🛠️ 代码实现 (Cypher 脚本) 请在 Neo4j Browser 中执行以下语句(建议在项目初始化脚本中自动执行):
// 1. 唯一性约束 (最快,同时防止重复数据)
// 作用:加速根据 name 查找 Employee 的速度,并确保 name 唯一
CREATE CONSTRAINT FOR (e:Employee) REQUIRE e.name IS UNIQUE;
CREATE CONSTRAINT FOR (p:Project) REQUIRE p.name IS UNIQUE;
// 2. 复合索引 (针对多条件查询)
// 作用:加速 "查找属于某部门的某职位员工"
CREATE INDEX emp_dept_pos FOR (e:Employee) ON (e.department, e.position);
// 3. 全文索引 (模糊搜索神器)
// 作用:支持 "包含 'AI' 的项目" 这种模糊查询,比 CONTAINS 快得多
CREATE FULLTEXT INDEX project_name_index FOR (p:Project) ON EACH [p.name, p.description];
B. LLM 请求批处理 (Batch Processing)
原理:LLM API 通常支持并发或批量处理。一个个处理 Chunk 极其缓慢,批量发送可以显著提升吞吐量。
🛠️ 代码实现 (修改 src/graph_builder.py)
import asyncio
from langchain_core.documents import Document
# 异步批处理函数
async def process_chunks_in_parallel(transformer, documents, batch_size=5):
"""
并发处理文档块,提升图谱构建速度
:param transformer: LLMGraphTransformer 实例
:param documents: 切分后的文档列表
:param batch_size: 并发数量(根据 API Rate Limit 设定)
"""
results = []
# 将文档切分为小批次
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
# 使用 asyncio.gather 并发发送请求
# 注意: convert_to_graph_documents 本身若不支持 async,
# 需要改用 ThreadPoolExecutor 或 transformer 的异步方法
print(f"🚀 正在并发处理第 {i} - {i+batch_size} 个块...")
# 模拟并发调用(LangChain 新版支持 aconvert_to_graph_documents)
# 如果版本不支持,可使用 run_in_executor
batch_results = await transformer.aconvert_to_graph_documents(batch)
results.extend(batch_results)
return results
# 调用示例
# asyncio.run(process_chunks_in_parallel(llm_transformer, all_chunks))
C. 多级查询缓存 (Caching)
原理:用户问过的问题,第二次毫秒级返回。
🛠️ 代码实现 (修改 src/retrieval.py)
from langchain.globals import set_llm_cache
from langchain_community.cache import RedisCache
from redis import Redis
def init_production_cache():
"""配置 Redis 生产级缓存"""
# 相比内存缓存,Redis 支持持久化且多实例共享
redis_client = Redis(host="localhost", port=6379, db=0)
set_llm_cache(RedisCache(redis_client=redis_client))
print("✅ Redis 缓存已开启,重复查询将不再消耗 Token")
# 在 main.py 启动时调用
# init_production_cache()
2. 准确性优化 (Accuracy): “拒绝幻觉,精准可控”
优化目标:确保提取的数据符合业务逻辑,防止脏数据入库。
A. 增强型 Prompt 约束 (Prompt Engineering)
原理:不要只告诉 LLM 做什么,还要告诉它不能做什么,并提供反例。
🛠️ 代码实现 (修改 src/schema.py)
def get_strict_extraction_prompt() -> str:
return """
你是一个严谨的数据录入员。请从文本提取实体和关系。
【严格禁止】
1. 禁止创造 Schema 中不存在的节点类型(如 'Person', 'Company' 都是非法的,必须映射到 'Employee', 'Department')。
2. 禁止提取模糊的时间(如 "去年", "下个月"),如果无法确定具体日期,请忽略 start_date 属性。
3. 禁止推断不存在的关系(如文本只说 "Alice 和 Bob 聊天",不要提取 MANAGES 关系)。
【数据清洗规则】
- 人名必须去除称谓(如 "Mr. Wang" -> "Wang")。
- 部门名称统一为全称(如 "研发部" -> "Research & Development")。
【Schema 定义】
{schema}
输入文本: {input}
输出 JSON:
"""
B. Pydantic 逻辑校验 (Data Validation)
原理:利用代码逻辑进行二次清洗,拦截 LLM 的格式错误。
🛠️ 代码实现 (修改 src/schema.py)
from pydantic import BaseModel, validator, Field
from datetime import datetime
class ProjectNode(BaseModel):
label: str = "Project"
name: str
start_date: str = Field(None, description="ISO 8601 格式日期")
@validator("start_date")
def validate_date_format(cls, v):
"""校验日期格式,清洗脏数据"""
if not v:
return None
try:
# 尝试解析日期,如果 LLM 返回 "2023/01/01",自动转为 "2023-01-01"
dt = datetime.strptime(v.replace("/", "-"), "%Y-%m-%d")
return dt.strftime("%Y-%m-%d")
except ValueError:
print(f"⚠️ 警告: 丢弃非法日期格式 '{v}'")
return None # 格式错误则丢弃该属性,而不是让程序崩溃
@validator("name")
def clean_name(cls, v):
"""清洗名称"""
return v.strip().replace("项目", "") # 去除冗余后缀
3. 稳定性优化 (Stability): “坚如磐石,永不宕机”
优化目标:应对网络抖动、API 限流和数据库连接失败。
A. 自动重试机制 (Retry Logic)
原理:使用 tenacity 库,当 API 报错时自动指数退避重试,而不是直接抛出异常导致进程退出。
🛠️ 代码实现 (修改 src/utils.py 或 API 调用处)
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from openai import APITimeoutError, RateLimitError
from neo4j.exceptions import ServiceUnavailable
# 定义重试策略装饰器
def robust_api_call():
return retry(
# 遇到以下异常时重试
retry=retry_if_exception_type((APITimeoutError, RateLimitError, ConnectionError)),
# 最多重试 5 次
stop=stop_after_attempt(5),
# 指数退避:等待 1s, 2s, 4s... 最大 10s
wait=wait_exponential(multiplier=1, min=4, max=10),
# 重试前打印日志
before_sleep=lambda retry_state: print(f"🔄 API 调用失败,正在进行第 {retry_state.attempt_number} 次重试...")
)
def robust_db_query():
return retry(
retry=retry_if_exception_type(ServiceUnavailable),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=5)
)
# --- 使用示例 ---
@robust_api_call()
def call_llm_with_retry(prompt):
return llm.invoke(prompt)
@robust_db_query()
def execute_cypher_with_retry(query):
return graph.query(query)
B. 超时控制 (Timeout Control)
原理:防止某个复杂的 Cypher 查询或卡死的 LLM 请求无限期占用资源。
🛠️ 代码实现 (修改 src/retrieval.py)
# 1. LLM 超时设置
llm = ChatOpenAI(
model="gpt-4-turbo",
request_timeout=30, # 【关键】单次请求超过30秒强制中断
max_retries=2 # 内置重试次数
)
# 2. Neo4j 查询超时设置 (在 Cypher 语句中限制)
def safe_query_knowledge_graph(question: str):
chain = GraphCypherQAChain.from_llm(
llm=llm,
graph=graph,
cypher_llm_kwargs={"request_timeout": 10}, # 生成 Cypher 的超时
# 【关键】给数据库查询设置事务超时时间 (毫秒)
# 避免全图扫描导致数据库卡死
return_direct=True
)
# 也可以在 Cypher 语句前加超时指令
# CALL db.ms.transactions.timeout(5000)
# 或者配置 Neo4j 驱动级别的超时
C. 兜底异常捕获 (Fallback Mechanism)
原理:当所有重试都失败时,Agent 应该优雅地降级,而不是报错堆栈。
🛠️ 代码实现 (修改 src/main_agent.py)
def safe_tool_execution(func):
"""工具执行的装饰器:异常兜底"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
# 记录详细错误日志
logger.error(f"工具执行严重错误: {str(e)}", exc_info=True)
# 返回友好的用户提示,而不是 Crash
return "⚠️ 系统暂时繁忙,无法获取知识图谱数据。请稍后再试,或联系管理员。"
return wrapper
# 应用到工具函数
kg_tool = Tool(
name="EnterpriseKnowledgeBase",
func=safe_tool_execution(query_knowledge_graph), # 包裹安全装饰器
description="..."
)
总结
通过以上代码优化,您的 Agent 将实现质的飞跃:
- 快:Redis 拦截了 50% 的重复问题,Neo4j 索引让查询从秒级变毫秒级。
- 准:Pydantic 像守门员一样挡住了日期格式错误和非法实体。
- 稳:Tenacity 重试机制让系统无惧网络波动,通过了压力测试的考验。
6.2 部署方案
-
本地部署:直接运行 Python 脚本(测试 / 开发环境)
-
容器化部署
:
-
编写 Dockerfile:
FROM python:3.10-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . ENV PYTHONPATH=/app CMD ["python", "src/main_agent.py"] -
启动命令:
运行
docker build -t kg-agent . docker run -it --env-file .env kg-agent
-
-
服务化部署:基于 FastAPI 封装 Agent 为 HTTP 接口(参考 LangChain 示例)
6.3 调试与监控
-
LangSmith 调试
:集成 LangSmith 追踪 Agent 思考过程、工具调用、LLM 请求
运行
# src/main_agent.py 添加LangSmith配置 os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "your_langsmith_api_key" os.environ["LANGCHAIN_PROJECT"] = "enterprise-kg-agent" -
Neo4j 监控:使用 Neo4j Browser 查看查询性能、数据量
-
日志分析:通过 logs 目录下的日志文件定位错误
常见问题解答
| 问题 | 解决方案 |
|---|---|
| Agent 未调用知识图谱工具 | 优化工具 description,更清晰描述适用场景;调整 System Prompt |
| Cypher 生成错误 | 优化 CYPHER_GENERATION_PROMPT,增加示例;限制查询复杂度 |
| 图谱数据重复 | 完善 deduplicate_graph_data 函数,增加更多去重维度 |
| LLM API 费用过高 | 开启缓存、使用本地开源模型(如 Qwen2.5)替代 GPT-4 |
| Neo4j 数据量过大 | 分库分表、定期归档历史数据、使用 Neo4j 企业版集群 |
✅ 解决方案 A:Prompt Engineering - 场景显性化 (修改 src/main_agent.py)
LLM 是通过语义相似度来决定是否调用工具的。你需要把触发关键词直接写进描述里,并使用负面约束。
# src/main_agent.py
def define_agent_tools() -> list:
kg_tool = Tool(
name="EnterpriseKnowledgeBase",
func=query_knowledge_graph,
# 【修改前】描述太简单
# description="用于回答关于企业员工、部门的问题。"
# 【修改后】场景化 + 关键词堆砌 + 负向约束
description="""
【必须调用此工具的场景】:
1. 查询员工信息:职位、技能、上下级关系(关键词:谁是、经理、汇报给、擅长、技能);
2. 查询部门/项目:部门位置、项目负责人、项目周期(关键词:哪里、何时开始、谁负责);
3. 任何涉及实体关系的多跳推理。
【禁止场景】:
1. 简单的数学计算(请用计算器);
2. 翻译任务;
3. 通用常识(如“天空为什么是蓝的”)。
注意:如果你不确定答案,必须先调用此工具查询,禁止直接根据训练数据回答。
"""
)
return [kg_tool, ...]
✅ 解决方案 B:System Prompt 强制思维链 (CoT)
在 System Prompt 中强制要求 Agent 在回答前“大声思考”是否需要查库。
# src/main_agent.py
def get_agent_system_prompt() -> SystemMessagePromptTemplate:
template = """
...
【思考协议】
在回答用户问题前,你必须进行以下自我提问:
Step 1: 用户的问题是否涉及具体的企业实体(人名、部门名、项目名)?
Step 2: 如果是,我必须使用 'EnterpriseKnowledgeBase' 工具。
Step 3: 只有当工具返回“未找到信息”时,才告知用户无法回答。
...
"""
return SystemMessagePromptTemplate(prompt=PromptTemplate(template=template, input_variables=[]))
5. Cypher 生成错误 (Cypher Syntax Error)
现象:报错 ClientError: ... Invalid input,或者生成的 Cypher 查不到数据(如方向搞反了)。 核心原因:模型不知道 Schema 细节,或者生成了 Neo4j 不支持的语法。
✅ 解决方案:Few-Shot Prompt + Schema 注入 (修改 src/retrieval.py)
不要只给规则,要给**“正确答案的样本”**,并动态注入当前的 Schema。
# src/retrieval.py
# 1. 动态获取最新的 Schema(而非写死)
def get_current_schema(graph):
return graph.get_schema
# 2. 增强版 Prompt
CYPHER_GENERATION_TEMPLATE = """
你是一个 Neo4j Cypher 专家。根据下面的 Schema 和用户问题编写 Cypher 查询。
【Schema 信息】
{schema}
【编写规则】
1. 不要使用 Schema 中不存在的关系类型或属性。
2. 始终在查询末尾添加 `LIMIT 10`。
3. 使用不区分大小写的匹配:`WHERE toLower(n.name) CONTAINS toLower('Alice')`。
4. 仅返回关系的一跳或两跳,避免全图扫描。
【参考示例】(Few-Shot Learning)
问题: "Alice 的经理是谁?"
Cypher: MATCH (e:Employee {{name: "Alice"}})<-[:MANAGES]-(m:Employee) RETURN m.name LIMIT 5;
问题: "AI_Lab 部门有哪些项目?"
Cypher: MATCH (d:Department {{name: "AI_Lab"}})<-[:BELONGS_TO]-(p:Project) RETURN p.name LIMIT 10;
【当前任务】
问题: {question}
Cypher:"""
CYPHER_PROMPT = PromptTemplate(
input_variables=["schema", "question"],
template=CYPHER_GENERATION_TEMPLATE
)
# 在初始化链时使用
chain = GraphCypherQAChain.from_llm(
cypher_prompt=CYPHER_PROMPT,
# ...
)
6. 图谱数据重复 (Duplicate Data)
现象:图谱里出现了 3 个 “Alice” 节点,或者 5 条相同的 “BELONGS_TO” 边。 核心原因:deduplicate_graph_data 逻辑简单,或者没有利用 Neo4j 的 MERGE 机制。
✅ 解决方案:基于内容哈希的去重 (修改 src/graph_builder.py)
在 Python 端做一次严格的指纹去重,确保同一批次内不重复;入库时依靠 Neo4j 的 MERGE 保证库内不重复。
# src/graph_builder.py
import hashlib
def generate_node_id(node) -> str:
"""生成节点唯一指纹 (Label + Name)"""
# 假设 name 是唯一标识属性
content = f"{node.type}:{node.properties.get('name', '')}"
return hashlib.md5(content.encode()).hexdigest()
def deduplicate_graph_data(graph_documents: List[GraphDocument]) -> List[GraphDocument]:
"""增强版去重函数"""
unique_nodes = {}
unique_rels = set()
deduped_docs = []
for doc in graph_documents:
# 1. 节点去重
clean_nodes = []
for node in doc.nodes:
node_id = generate_node_id(node)
if node_id not in unique_nodes:
unique_nodes[node_id] = node
clean_nodes.append(node)
# 2. 关系去重
clean_rels = []
for rel in doc.relationships:
# 生成关系指纹: SourceID - Type - TargetID
source_id = generate_node_id(rel.source)
target_id = generate_node_id(rel.target)
rel_id = f"{source_id}-{rel.type}-{target_id}"
if rel_id not in unique_rels:
unique_rels.add(rel_id)
clean_rels.append(rel)
if clean_nodes or clean_rels:
deduped_docs.append(GraphDocument(
nodes=clean_nodes,
relationships=clean_rels,
source=doc.source
))
return deduped_docs
# 补充说明:LangChain 的 add_graph_documents 默认使用 MERGE 操作
# 但前提是节点必须有明确的 id 属性或 unique constraint。
# 建议在 Neo4j 中先执行:
# CREATE CONSTRAINT FOR (e:Employee) REQUIRE e.name IS UNIQUE;
7. LLM API 费用过高 (High Cost)
现象:开发阶段每天跑测试消耗几十美元,或者生产环境 Token 量巨大。 核心原因:使用了昂贵的 GPT-4,且没有缓存机制。
✅ 解决方案 A:本地模型替换 (集成 Ollama/Qwen2.5)
在 src/config.py 或初始化处,增加本地模型支持。
# src/config.py 或 main_agent.py
# 引入 Ollama 集成
from langchain_community.chat_models import ChatOllama
def init_llm(use_local=False):
if use_local:
# 使用 Qwen2.5-14b (需先安装 Ollama 并 pull qwen2.5)
logger.info("📡 正在使用本地模型: Qwen2.5")
return ChatOllama(
model="qwen2.5:14b", # 推荐 14b 或 72b 以获得较好逻辑能力
temperature=0,
base_url="http://localhost:11434"
)
else:
# 使用 OpenAI
return ChatOpenAI(model="gpt-4-turbo", ...)
✅ 解决方案 B:持久化缓存 (Redis/SQLite)
之前的 TTLCache 是内存级的,重启就没了。生产环境建议用 SQLite 或 Redis。
# src/retrieval.py
from langchain.cache import SQLiteCache
import langchain
# 开启全局 LLM 缓存(所有相同的 Prompt 请求直接读本地文件)
langchain.llm_cache = SQLiteCache(database_path=".langchain.db")
# 或者在 Retrieval 函数层做业务缓存
def query_knowledge_graph(question):
# 先查 Redis
cache_key = f"kg_query:{hash(question)}"
if redis_client.exists(cache_key):
return redis_client.get(cache_key)
# ... 执行查询 ...
# 存入 Redis
redis_client.setex(cache_key, 3600, result)
8. Neo4j 数据量过大 (Data Volume / Performance)
现象:查询变慢,磁盘空间不足,或者图谱包含了 5 年前的过时项目数据。 核心原因:未清理历史数据,单节点瓶颈。
✅ 解决方案 A:定期归档脚本 (Data Archiving Script)
创建一个维护脚本 maintenance.py,定期清理旧数据。
# src/maintenance.py
from neo4j import GraphDatabase
import os
from dotenv import load_dotenv
load_dotenv()
def archive_old_data(days_to_keep=365):
"""
清理超过指定天数的数据
策略:删除旧的 Project 及其关联关系,但保留 Employee(因为员工可能还在)
"""
uri = os.getenv("NEO4J_URI")
auth = (os.getenv("NEO4J_USERNAME"), os.getenv("NEO4J_PASSWORD"))
driver = GraphDatabase.driver(uri, auth=auth)
query = """
MATCH (p:Project)
WHERE date(p.end_date) < date() - duration($duration)
DETACH DELETE p
RETURN count(p) as deleted_count
"""
with driver.session() as session:
result = session.run(query, duration=f"P{days_to_keep}D")
count = result.single()["deleted_count"]
print(f"🧹 已清理 {days_to_keep} 天前的项目数据,共删除 {count} 个节点。")
if __name__ == "__main__":
archive_old_data(365) # 清理1年前的数据
✅ 解决方案 B:Neo4j 索引优化 (Index Tuning)
除了删数据,建索引是解决慢查询最直接的方法。不要只给 ID 建索引,要给查询高频字段建索引。
// 在 Neo4j Browser 中执行
// 1. 基础约束(防止重复 + 加速 ID 查找)
CREATE CONSTRAINT FOR (e:Employee) REQUIRE e.name IS UNIQUE;
// 2. 全文索引(加速模糊搜索,如 WHERE n.name CONTAINS 'Ali')
CREATE FULLTEXT INDEX employeeNameIdx FOR (n:Employee) ON EACH [n.name];
// 3. 复合索引(加速组合查询,如“找 AI_Lab 的 Senior Engineer”)
CREATE INDEX employeeDeptPos FOR (e:Employee) ON (e.department, e.position);
总结
本指南完整实现了 “文本→知识图谱→Agent 查询” 的企业级闭环,核心价值在于:
- 结构化知识沉淀:解决大模型 “幻觉” 问题,答案可追溯至原始数据
- 可扩展架构:通过本体设计适配不同业务场景(法律 / 供应链 / 医疗)
- 生产级特性:包含缓存、日志、错误处理、数据校验等企业级必备能力
读者可基于本指南:
- 替换数据源为实际业务文本(如企业手册、行业报告)
- 扩展本体设计适配特定场景
- 集成更多工具(如文档检索、API 调用)增强 Agent 能力
- 部署为服务化接口,对接前端应用
企业级 Agent 的核心竞争力不是 “聊天”,而是 “基于结构化知识的精准推理”—— 这正是知识图谱赋予 Agent 的核心能力。
更多推荐


所有评论(0)