剖析graph-rag中最核心的一步:切片文本如何输入到大模型并且构建整体的关系
本文提出了一种基于大模型的信息抽取与知识图谱构建方法,采用四阶段处理流程:1)分块抽取实体和关系提及;2)全局实体对齐;3)关系汇总;4)Neo4j入库。通过分块策略保证上下文连贯,要求模型输出包含证据位置的结构化结果。关键创新点包括:实体链接时的候选裁决机制、关系证据聚合算法,以及支持溯源的存储设计。文章详细说明了文本切割策略、提示词模板设计、核心算法实现和Neo4j图模式,并提供了可直接复用的
一、总体思路分层
-
Pass-1:分块抽取(Chunk IE)
对每个切片做提及级别抽取:实体提及(mention)、关系提及(rel-mention)、证据(原文片段/offset)、置信度。此阶段不要急着“下定论”,只把局部事实 + 证据抛出来。 -
Pass-2:全局对齐(Entity Linking / Coref)
把每个提及映射到全局实体(canonical entity)。这一步解决同名/别名/代词问题,是把“碎片”粘到“一个人/一个公司”上。 -
Pass-3:关系汇总(Relation Assembly & Consolidation)
以“全局实体ID”为端点,将所有关系提及合并成全局关系(去重、聚合证据、合并时间戳/属性、计算聚合置信度),并标注来源与证据。 -
Pass-4:入库(Neo4j)
幂等 MERGE 写库:(:Entity {id})
,(s)-[:TYPE {…props}]->(t)
,保留evidence[]/source_doc_id/chunk_id/offset
。
关键:Pass-1 是“粒度小+证据充分”;Pass-2/3 才做“抽象与合并”。这样最稳、可审计、可回溯。
二、如何把切片“正确喂给”大模型
1. 切割策略(避免上下文断裂)
-
RecursiveCharacterTextSplitter
:建议chunk_size=800~1200 tokens
,chunk_overlap=150~250 tokens
。 -
优先按章节/标题/段落/标点切;必要时用重叠跨界,保证关系动词和主客体尽量在同一块或覆盖于相邻块。
2. 每块输入的上下文打包(非常重要)
对第 i 块输入时,附加两类“轻记忆”:
-
邻近窗口:
chunk_{i-1}
的末尾 23 句 +chunk_{i+1}
的开头 23 句(可选),防止句子跨段。 -
全局实体“摘要索引”(Top-K):Pass-1 后面会有,但首轮没有的话,用逐块内缓存即可。第二轮跑时,把已发现的全局实体摘要(每个1行小卡片)作为上下文附加,帮助模型在 chunk 内就把别名/代词绑定到已有实体。
轻记忆示例(随 prompt 附加):
Known entities so far (id | name | type | key attrs):
E001 | Alice | Person | {nationality: "SG"}
E007 | Acme Corp | Org | {hq: "Singapore"}
3. 输出结构一定要提及级 + 证据
为防止幻觉与错配,要求模型定位原文证据:
-
mention:
text_span
(摘录10~40字)、char_start
、char_end
-
relation-mention:
subj_mention_id
、obj_mention_id
、predicate
、evidence_span
、when/where
、confidence
这样可以在 Pass-3 聚合时做去重与冲突仲裁。
三、跨切片构建“整体关系”的算法
1. 实体对齐(Entity Linking / Coreference)
候选生成(越快越好):
-
规则匹配:规范化字符串(去空白、大小写、全半角、公司后缀等),精确/近似匹配(可用
rapidfuzz
)。 -
关键属性:注册号/ISBN/邮箱/URL/地名层级等。
-
(可选)向量相似:
name + type + attrs
生成嵌入,Top-K 候选。
候选裁决(LLM 只做“选择/NEW”而非自由发挥):
-
把候选列表(最多5个)+ 当前 mention 的证据片段发给 LLM,强制返回
{link_to: <candidate_id | "NEW">, confidence}
。 -
若
"NEW"
,为其生成canonical_id
(哈希),并登记其aliases[]
含当前 mention 文本。
裁决提示词见后文模板《链接判定提示词》。
2. 关系汇总(Consolidation)
拿到 subj_global_id
与 obj_global_id
后,把同一对端点、同一关系类型的多条关系提及合并:
-
关系键:
(src_id, predicate, tgt_id)
-
证据聚合:保存去重后的
evidence[]
;时间/地点/数额等属性用最可信或合并集合(如dates[]
)。 -
置信度聚合:例如
p_agg = 1 - ∏(1 - p_i)
;或按证据权重加权平均。 -
时间范围:若多处出现时间,合成
valid_from/valid_to
或dates[]
。 -
来源与可追溯:保留
source_doc_id
、chunk_id
、mention_ids[]
、offsets[]
。
3. 生成反向/派生关系
-
明确定义的可逆关系(如
ACQUIRED
↔WAS_ACQUIRED_BY
)自动生成逆向边。 -
规则推理(如 “创始人 = 在 X 年 时创立 Y”)产出
FOUNDED
;但把derived=true
标记清楚,并把证据链(来源关系ID)挂在属性中。
四、代码关键片段
下面用 LangChain 0.2+ 写法,演示:
分块+批量送模;2) 提及级输出;3) 实体链接裁决;4) 关系汇总;5) 写入 Neo4j。
模型你可换 OpenAI / 本地 Ollama;此处保留两种写法(择一启用)。
# --- deps ---
# pip install langchain>=0.2 langchain-community langchain-core pydantic neo4j tiktoken rapidfuzz
# pip install openai # 如用OpenAI
# pip install ollama # 如用本地Ollama
from __future__ import annotations
import os, json, hashlib
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field
from pydantic import BaseModel, Field, ValidationError
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableLambda
from rapidfuzz import fuzz, process
from neo4j import GraphDatabase
USE_OPENAI = False
if USE_OPENAI:
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
else:
from langchain_community.chat_models import ChatOllama
llm = ChatOllama(model="qwen2.5:14b", temperature=0)
# ---- Neo4j driver ----
driver = GraphDatabase.driver(
os.getenv("NEO4J_URI", "bolt://localhost:7687"),
auth=(os.getenv("NEO4J_USER", "neo4j"), os.getenv("NEO4J_PASSWORD", "password"))
)
# ---------- 数据结构 ----------
class Mention(BaseModel):
id: str
text: str
type: str # 预测类型
char_start: int
char_end: int
attrs: Dict[str, Any] = {}
confidence: float = 0.8
class RelMention(BaseModel):
id: str
subj_mention_id: str
obj_mention_id: str
predicate: str
evidence: str
char_start: int
char_end: int
when: Optional[str] = None
where: Optional[str] = None
props: Dict[str, Any] = {}
confidence: float = 0.7
class ChunkIE(BaseModel):
chunk_id: int
mentions: List[Mention] = []
rel_mentions: List[RelMention] = []
# ---------- 切割 ----------
def split_text(text: str, chunk_size=1200, chunk_overlap=200):
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", "。", "!", "?", ";", ",", " ", ""]
)
return splitter.split_text(text)
# ---------- 稳定ID ----------
def stable_id(*parts) -> str:
key = "|".join([json.dumps(p, ensure_ascii=False, sort_keys=True) for p in parts])
return hashlib.sha256(key.encode("utf-8")).hexdigest()[:24]
# ---------- 提及级抽取 Prompt ----------
IE_SYSTEM = """You are a precise IE system. Extract entity mentions and relation mentions.
Return ONLY JSON matching the schema. Use evidence spans from the text (char_start/end)."""
IE_USER = """
DOC_ID: {doc_id}
CHUNK_ID: {chunk_id}/{total}
NEIGHBOR_CONTEXT:
<<<
{neighbor}
>>>
TEXT:
<<<
{chunk}
>>>
Schema:
{schema}
Guidelines:
- Mentions: short span text from this chunk; include char_start/char_end offsets within THIS chunk.
- Relations: use subj_mention_id/obj_mention_id; keep predicate concise (ACQUIRED, FOUNDED, WORKS_AT, PART_OF, etc.)
- Include evidence (a short quote) and optional when/where if present.
- No fabrication. Only facts supported by this text.
"""
ie_prompt = ChatPromptTemplate.from_messages([("system", IE_SYSTEM), ("user", IE_USER)])
ie_parser = JsonOutputParser(pydantic_object=ChunkIE)
def build_neighbor_context(prev_tail: str, next_head: str) -> str:
return (prev_tail or "").strip() + ("\n---\n" if (prev_tail and next_head) else "") + (next_head or "").strip()
def extract_chunk_ie(doc_id: str, chunk: str, chunk_id: int, total: int, neighbor: str) -> ChunkIE:
msg = ie_prompt.format_messages(
doc_id=doc_id, chunk_id=chunk_id, total=total,
neighbor=neighbor, chunk=chunk,
schema=json.dumps(ChunkIE.schema(), ensure_ascii=False, indent=2)
)
raw = llm.invoke(msg)
text = getattr(raw, "content", str(raw))
try:
data = ie_parser.parse(text)
except ValidationError:
start, end = text.find("{"), text.rfind("}")
data = ie_parser.parse(text[start:end+1])
# 填mention id(若模型未填)
for m in data.mentions:
if not m.id:
m.id = stable_id(doc_id, chunk_id, m.text, m.type, m.char_start, m.char_end)
for r in data.rel_mentions:
if not r.id:
r.id = stable_id(doc_id, chunk_id, r.subj_mention_id, r.predicate, r.obj_mention_id, r.char_start, r.char_end)
return data
# ---------- 实体全局对齐 ----------
@dataclass
class CanonicalEntity:
id: str
name: str
type: str
attrs: Dict[str, Any] = field(default_factory=dict)
aliases: set = field(default_factory=set)
def name_key(s: str) -> str:
return s.lower().replace(" ", "").replace("(", "(").replace(")", ")")
def candidates_for(entities: Dict[str, CanonicalEntity], name: str, etype: str, top=5) -> List[CanonicalEntity]:
# 先快速筛:同type优先
same_type = [e for e in entities.values() if e.type == etype]
# 近似匹配
choices = [(e, max(fuzz.partial_ratio(name, e.name), *(fuzz.partial_ratio(name, a) for a in e.aliases) )) for e in same_type]
choices.sort(key=lambda x: x[1], reverse=True)
return [e for e, score in choices[:top] if score >= 80]
LINK_SYSTEM = "Choose the best candidate id for the mention or NEW. Return JSON: {\"link_to\": \"<id|NEW>\", \"confidence\": 0..1}"
LINK_USER = """
Mention:
name: {name}
type: {etype}
evidence: "{evidence}"
Candidates:
{cands}
Rules:
- If none is a clear match, answer NEW.
- Prefer exact/near-exact alias match; consider type and attrs if present.
"""
link_prompt = ChatPromptTemplate.from_messages([("system", LINK_SYSTEM), ("user", LINK_USER)])
def link_with_llm(name: str, etype: str, evidence: str, cands: List[CanonicalEntity]) -> Tuple[str, float]:
if not cands:
return "NEW", 1.0
cands_txt = "\n".join([f"- id={c.id}, name={c.name}, type={c.type}, aliases={list(c.aliases)[:3]}" for c in cands])
msg = link_prompt.format_messages(name=name, etype=etype, evidence=evidence[:120], cands=cands_txt)
out = llm.invoke(msg)
try:
data = json.loads(out.content)
return data.get("link_to", "NEW"), float(data.get("confidence", 0.7))
except Exception:
return "NEW", 0.6
# ---------- 汇总关系 ----------
@dataclass
class RelAgg:
src: str
predicate: str
tgt: str
evidences: list = field(default_factory=list) # [{doc_id, chunk_id, text, offsets}]
when: set = field(default_factory=set)
where: set = field(default_factory=set)
props: dict = field(default_factory=dict)
confidence_list: list = field(default_factory=list)
def combine_confidences(ps: List[float]) -> float:
from math import prod
ps = [max(0.0, min(1.0, p)) for p in ps if p is not None]
return 1 - prod([(1-p) for p in ps]) if ps else 0.0
# ---------- 入库 ----------
def write_to_neo4j(entities: Dict[str, CanonicalEntity], relations: Dict[Tuple[str,str,str], RelAgg], source_doc_id: str):
node_cypher = """
UNWIND $nodes AS n
MERGE (e:Entity {id:n.id})
ON CREATE SET e.name=n.name, e.type=n.type, e.aliases=n.aliases, e += n.attrs, e.createdAt=timestamp(), e.updatedAt=timestamp()
ON MATCH SET e.name=coalesce(n.name, e.name), e.type=coalesce(n.type, e.type), e.aliases=apoc.coll.toSet(coalesce(e.aliases, []) + n.aliases), e += n.attrs, e.updatedAt=timestamp()
"""
rel_cypher = """
UNWIND $rels AS r
MATCH (s:Entity {id:r.src})
MATCH (t:Entity {id:r.tgt})
CALL apoc.merge.relationship(s, r.predicate, {}, {sourceDocId:$source_doc_id, evidences:r.evidences, when:r.when, where:r.where, props:r.props, confidence:r.confidence}, t)
YIELD rel
SET rel.updatedAt=timestamp()
RETURN count(rel)
"""
with driver.session() as sess:
nodes = [{"id": e.id, "name": e.name, "type": e.type, "attrs": e.attrs, "aliases": list(e.aliases)} for e in entities.values()]
sess.run(node_cypher, parameters={"nodes": nodes})
rels = []
for (src,pred,tgt), agg in relations.items():
rels.append({
"src": src, "predicate": pred, "tgt": tgt,
"evidences": agg.evidences, "when": list(agg.when), "where": list(agg.where),
"props": agg.props, "confidence": combine_confidences(agg.confidence_list)
})
if rels:
sess.run(rel_cypher, parameters={"rels": rels, "source_doc_id": source_doc_id})
# ---------- 主流程 ----------
def build_graph_from_text(doc_id: str, raw_text: str, source_doc_id: str):
chunks = split_text(raw_text)
total = len(chunks)
# Pass-1: 分块抽取
chunk_outputs: List[ChunkIE] = []
for i, chunk in enumerate(chunks, 1):
prev_tail = chunks[i-2][-300:] if i-2 >= 0 else ""
next_head = chunks[i][:300] if i < total else ""
neighbor = build_neighbor_context(prev_tail, next_head)
ie = extract_chunk_ie(doc_id, chunk, i, total, neighbor)
chunk_outputs.append(ie)
# Pass-2: 实体对齐
entities: Dict[str, CanonicalEntity] = {}
mention2entity: Dict[str, str] = {} # mention_id -> canonical_id
for ie in chunk_outputs:
for m in ie.mentions:
# 候选
cands = candidates_for(entities, m.text, m.type, top=5)
decision, conf = link_with_llm(m.text, m.type, m.text, cands)
if decision == "NEW":
cid = stable_id(m.text, m.type)
entities[cid] = CanonicalEntity(id=cid, name=m.text, type=m.type, attrs=m.attrs, aliases={m.text})
else:
cid = decision
# 记录别名
entities[cid].aliases.add(m.text)
mention2entity[m.id] = cid
# Pass-3: 关系汇总
relations: Dict[Tuple[str,str,str], RelAgg] = {}
for idx, ie in enumerate(chunk_outputs, 1):
for r in ie.rel_mentions:
s = mention2entity.get(r.subj_mention_id)
t = mention2entity.get(r.obj_mention_id)
if not s or not t:
continue
key = (s, r.predicate, t)
if key not in relations:
relations[key] = RelAgg(src=s, predicate=r.predicate, tgt=t)
agg = relations[key]
agg.evidences.append({
"doc_id": doc_id, "chunk_id": idx,
"text": r.evidence[:180], "offsets": [r.char_start, r.char_end]
})
if r.when: agg.when.add(r.when)
if r.where: agg.where.add(r.where)
agg.confidence_list.append(r.confidence)
# 可选择把金额/比例等 props 合并为列表或选最大置信度那条
for k, v in (r.props or {}).items():
if k not in agg.props: agg.props[k] = set()
agg.props[k].add(v)
# 将 set 转 list
for agg in relations.values():
for k,v in list(agg.props.items()):
if isinstance(v, set): agg.props[k] = list(v)
# Pass-4: 入库
write_to_neo4j(entities, relations, source_doc_id=source_doc_id)
五、三份关键提示词模板
1. “分块抽取”提示词(上文 IE_SYSTEM
/IE_USER
已给)
-
要求只输出 JSON,必须给 offsets,关系要用提及ID指向主客体。
-
predicate
控制在规范动词集合(自定义枚举)内,减少花样词。
示例枚举(可放到 System):
Entity types: Person, Org, Product, Event, Location, Law, Concept
Predicates: FOUNDED, ACQUIRED, MERGED_WITH, PART_OF, WORKS_AT, LOCATED_IN, PUBLISHED, REGULATED_BY, INVESTED_IN
2. “实体链接判定”提示词(上文 LINK_SYSTEM
/LINK_USER
已给)
-
输出只能是
{"link_to":"<id|NEW>","confidence":0..1}
。 -
传入候选列表,LLM只做“选择或NEW”,避免自由发挥。
3. “全局校对/补关系(可选)”
对全局汇总后的草图(top-N 实体与关系),请 LLM 做一致性检查与遗漏补全(例如从证据语句推导 FOUNDED
)。
注意:把新增的标为 derived=true
并保留“来源关系键”。
六、Neo4j 图模式与溯源
建议最小可行模式:
-
(:Entity {id, name, type, aliases[], ...})
-
(:Entity)-[:REL_TYPE {sourceDocId, evidences[], when[], where[], props{}, confidence, derived?}]->(:Entity)
-
(可选)
:Evidence
节点太重,先用数组属性即可;等数据量/法务需求提升后再拆。
约束与索引:
CREATE CONSTRAINT entity_id IF NOT EXISTS FOR (n:Entity) REQUIRE n.id IS UNIQUE;
CREATE INDEX ent_type IF NOT EXISTS FOR (n:Entity) ON (n.type);
七、质量与鲁棒性(实践要点)
-
证据先行:强制输出 evidence 与 offsets,后续任何纠错都有依据。
-
两阶段思路:先“列举事实片段”,再“汇总定论”。
-
别名池:
aliases
越丰富,链接越稳;可在图中周期性把相同邮箱/域名/注册号的节点合并。 -
去重与幂等:写库时用
MERGE
+ APOCapoc.merge.relationship
;关系以(src, pred, tgt)
作为幂等键。 -
跨切片缺词:使用邻近窗口与Top-K 全局实体摘要当“轻记忆”。
-
冲突仲裁:置信度聚合 + “最高权重证据优先”。
-
可观测性:记录每块耗时/失败/重试次数、每文档提及/关系密度分布。
-
成本控制:
chain.batch(chunks, config={"max_concurrency": 6})
并发;重复文本用缓存;只对“模糊提及”调用链接判定 LLM。
八、你可以直接复用的最小运行清单
-
输入:原始文本字符串
raw_text
-
步骤:
-
chunks = split_text(raw_text)
-
遍历
chunks
→extract_chunk_ie(...)
→ 累积mentions, rel_mentions
-
link_with_llm(...)
把 mention 绑定到全局实体 -
合并成全局关系
relations[(src,pred,tgt)]
-
write_to_neo4j(...)
MERGE 入库
-
-
验证:
MATCH (s:Entity)-[r]->(t:Entity) RETURN s.name, type(r), t.name, r.confidence, r.evidences[0..2] LIMIT 20;
更多推荐
所有评论(0)