一、总体思路分层

  • Pass-1:分块抽取(Chunk IE)
    对每个切片做提及级别抽取:实体提及(mention)、关系提及(rel-mention)、证据(原文片段/offset)、置信度。此阶段不要急着“下定论”,只把局部事实 + 证据抛出来。

  • Pass-2:全局对齐(Entity Linking / Coref)
    把每个提及映射到全局实体(canonical entity)。这一步解决同名/别名/代词问题,是把“碎片”粘到“一个人/一个公司”上。

  • Pass-3:关系汇总(Relation Assembly & Consolidation)
    以“全局实体ID”为端点,将所有关系提及合并成全局关系(去重、聚合证据、合并时间戳/属性、计算聚合置信度),并标注来源与证据。

  • Pass-4:入库(Neo4j)
    幂等 MERGE 写库:(:Entity {id})(s)-[:TYPE {…props}]->(t),保留 evidence[]/source_doc_id/chunk_id/offset

关键:Pass-1 是“粒度小+证据充分”Pass-2/3 才做“抽象与合并”。这样最稳、可审计、可回溯。


二、如何把切片“正确喂给”大模型

1. 切割策略(避免上下文断裂)

  • RecursiveCharacterTextSplitter:建议 chunk_size=800~1200 tokenschunk_overlap=150~250 tokens

  • 优先按章节/标题/段落/标点切;必要时用重叠跨界,保证关系动词和主客体尽量在同一块或覆盖于相邻块。

2. 每块输入的上下文打包(非常重要)

对第 i 块输入时,附加两类“轻记忆”:

  • 邻近窗口chunk_{i-1} 的末尾 23 句 + chunk_{i+1} 的开头 23 句(可选),防止句子跨段。

  • 全局实体“摘要索引”(Top-K):Pass-1 后面会有,但首轮没有的话,用逐块内缓存即可。第二轮跑时,把已发现的全局实体摘要(每个1行小卡片)作为上下文附加,帮助模型在 chunk 内就把别名/代词绑定到已有实体。

轻记忆示例(随 prompt 附加):

Known entities so far (id | name | type | key attrs):
E001 | Alice | Person | {nationality: "SG"}
E007 | Acme Corp | Org | {hq: "Singapore"}

3. 输出结构一定要提及级 + 证据

为防止幻觉与错配,要求模型定位原文证据

  • mention:text_span(摘录10~40字)、char_startchar_end

  • relation-mention:subj_mention_idobj_mention_idpredicateevidence_spanwhen/whereconfidence

这样可以在 Pass-3 聚合时做去重与冲突仲裁


三、跨切片构建“整体关系”的算法

1. 实体对齐(Entity Linking / Coreference)

候选生成(越快越好):

  1. 规则匹配:规范化字符串(去空白、大小写、全半角、公司后缀等),精确/近似匹配(可用 rapidfuzz)。

  2. 关键属性:注册号/ISBN/邮箱/URL/地名层级等。

  3. (可选)向量相似:name + type + attrs 生成嵌入,Top-K 候选。

候选裁决(LLM 只做“选择/NEW”而非自由发挥):

  • 把候选列表(最多5个)+ 当前 mention 的证据片段发给 LLM,强制返回 {link_to: <candidate_id | "NEW">, confidence}

  • "NEW",为其生成 canonical_id(哈希),并登记其aliases[]含当前 mention 文本。

裁决提示词见后文模板《链接判定提示词》。

2. 关系汇总(Consolidation)

拿到 subj_global_idobj_global_id 后,把同一对端点、同一关系类型的多条关系提及合并:

  • 关系键(src_id, predicate, tgt_id)

  • 证据聚合:保存去重后的 evidence[];时间/地点/数额等属性用最可信合并集合(如 dates[])。

  • 置信度聚合:例如 p_agg = 1 - ∏(1 - p_i);或按证据权重加权平均。

  • 时间范围:若多处出现时间,合成 valid_from/valid_todates[]

  • 来源与可追溯:保留 source_doc_idchunk_idmention_ids[]offsets[]

3. 生成反向/派生关系

  • 明确定义的可逆关系(如 ACQUIREDWAS_ACQUIRED_BY)自动生成逆向边。

  • 规则推理(如 “创始人 = 在 X 年 时创立 Y”)产出 FOUNDED;但把 derived=true 标记清楚,并把证据链(来源关系ID)挂在属性中。


四、代码关键片段

下面用 LangChain 0.2+ 写法,演示:

  1. 分块+批量送模;2) 提及级输出;3) 实体链接裁决;4) 关系汇总;5) 写入 Neo4j
    模型你可换 OpenAI / 本地 Ollama;此处保留两种写法(择一启用)。

# --- deps ---
# pip install langchain>=0.2 langchain-community langchain-core pydantic neo4j tiktoken rapidfuzz
# pip install openai  # 如用OpenAI
# pip install ollama  # 如用本地Ollama

from __future__ import annotations
import os, json, hashlib
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, field
from pydantic import BaseModel, Field, ValidationError
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableLambda
from rapidfuzz import fuzz, process
from neo4j import GraphDatabase

USE_OPENAI = False
if USE_OPENAI:
    from langchain_openai import ChatOpenAI
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
else:
    from langchain_community.chat_models import ChatOllama
    llm = ChatOllama(model="qwen2.5:14b", temperature=0)

# ---- Neo4j driver ----
driver = GraphDatabase.driver(
    os.getenv("NEO4J_URI", "bolt://localhost:7687"),
    auth=(os.getenv("NEO4J_USER", "neo4j"), os.getenv("NEO4J_PASSWORD", "password"))
)

# ---------- 数据结构 ----------
class Mention(BaseModel):
    id: str
    text: str
    type: str                   # 预测类型
    char_start: int
    char_end: int
    attrs: Dict[str, Any] = {}
    confidence: float = 0.8

class RelMention(BaseModel):
    id: str
    subj_mention_id: str
    obj_mention_id: str
    predicate: str
    evidence: str
    char_start: int
    char_end: int
    when: Optional[str] = None
    where: Optional[str] = None
    props: Dict[str, Any] = {}
    confidence: float = 0.7

class ChunkIE(BaseModel):
    chunk_id: int
    mentions: List[Mention] = []
    rel_mentions: List[RelMention] = []

# ---------- 切割 ----------
def split_text(text: str, chunk_size=1200, chunk_overlap=200):
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", "。", "!", "?", ";", ",", " ", ""]
    )
    return splitter.split_text(text)

# ---------- 稳定ID ----------
def stable_id(*parts) -> str:
    key = "|".join([json.dumps(p, ensure_ascii=False, sort_keys=True) for p in parts])
    return hashlib.sha256(key.encode("utf-8")).hexdigest()[:24]

# ---------- 提及级抽取 Prompt ----------
IE_SYSTEM = """You are a precise IE system. Extract entity mentions and relation mentions.
Return ONLY JSON matching the schema. Use evidence spans from the text (char_start/end)."""

IE_USER = """
DOC_ID: {doc_id}
CHUNK_ID: {chunk_id}/{total}
NEIGHBOR_CONTEXT:
<<<
{neighbor}
>>>
TEXT:
<<<
{chunk}
>>>

Schema:
{schema}
Guidelines:
- Mentions: short span text from this chunk; include char_start/char_end offsets within THIS chunk.
- Relations: use subj_mention_id/obj_mention_id; keep predicate concise (ACQUIRED, FOUNDED, WORKS_AT, PART_OF, etc.)
- Include evidence (a short quote) and optional when/where if present.
- No fabrication. Only facts supported by this text.
"""

ie_prompt = ChatPromptTemplate.from_messages([("system", IE_SYSTEM), ("user", IE_USER)])
ie_parser = JsonOutputParser(pydantic_object=ChunkIE)

def build_neighbor_context(prev_tail: str, next_head: str) -> str:
    return (prev_tail or "").strip() + ("\n---\n" if (prev_tail and next_head) else "") + (next_head or "").strip()

def extract_chunk_ie(doc_id: str, chunk: str, chunk_id: int, total: int, neighbor: str) -> ChunkIE:
    msg = ie_prompt.format_messages(
        doc_id=doc_id, chunk_id=chunk_id, total=total,
        neighbor=neighbor, chunk=chunk,
        schema=json.dumps(ChunkIE.schema(), ensure_ascii=False, indent=2)
    )
    raw = llm.invoke(msg)
    text = getattr(raw, "content", str(raw))
    try:
        data = ie_parser.parse(text)
    except ValidationError:
        start, end = text.find("{"), text.rfind("}")
        data = ie_parser.parse(text[start:end+1])
    # 填mention id(若模型未填)
    for m in data.mentions:
        if not m.id:
            m.id = stable_id(doc_id, chunk_id, m.text, m.type, m.char_start, m.char_end)
    for r in data.rel_mentions:
        if not r.id:
            r.id = stable_id(doc_id, chunk_id, r.subj_mention_id, r.predicate, r.obj_mention_id, r.char_start, r.char_end)
    return data

# ---------- 实体全局对齐 ----------
@dataclass
class CanonicalEntity:
    id: str
    name: str
    type: str
    attrs: Dict[str, Any] = field(default_factory=dict)
    aliases: set = field(default_factory=set)

def name_key(s: str) -> str:
    return s.lower().replace(" ", "").replace("(", "(").replace(")", ")")

def candidates_for(entities: Dict[str, CanonicalEntity], name: str, etype: str, top=5) -> List[CanonicalEntity]:
    # 先快速筛:同type优先
    same_type = [e for e in entities.values() if e.type == etype]
    # 近似匹配
    choices = [(e, max(fuzz.partial_ratio(name, e.name), *(fuzz.partial_ratio(name, a) for a in e.aliases) )) for e in same_type]
    choices.sort(key=lambda x: x[1], reverse=True)
    return [e for e, score in choices[:top] if score >= 80]

LINK_SYSTEM = "Choose the best candidate id for the mention or NEW. Return JSON: {\"link_to\": \"<id|NEW>\", \"confidence\": 0..1}"
LINK_USER = """
Mention:
name: {name}
type: {etype}
evidence: "{evidence}"

Candidates:
{cands}

Rules:
- If none is a clear match, answer NEW.
- Prefer exact/near-exact alias match; consider type and attrs if present.
"""

link_prompt = ChatPromptTemplate.from_messages([("system", LINK_SYSTEM), ("user", LINK_USER)])

def link_with_llm(name: str, etype: str, evidence: str, cands: List[CanonicalEntity]) -> Tuple[str, float]:
    if not cands:
        return "NEW", 1.0
    cands_txt = "\n".join([f"- id={c.id}, name={c.name}, type={c.type}, aliases={list(c.aliases)[:3]}" for c in cands])
    msg = link_prompt.format_messages(name=name, etype=etype, evidence=evidence[:120], cands=cands_txt)
    out = llm.invoke(msg)
    try:
        data = json.loads(out.content)
        return data.get("link_to", "NEW"), float(data.get("confidence", 0.7))
    except Exception:
        return "NEW", 0.6

# ---------- 汇总关系 ----------
@dataclass
class RelAgg:
    src: str
    predicate: str
    tgt: str
    evidences: list = field(default_factory=list)  # [{doc_id, chunk_id, text, offsets}]
    when: set = field(default_factory=set)
    where: set = field(default_factory=set)
    props: dict = field(default_factory=dict)
    confidence_list: list = field(default_factory=list)

def combine_confidences(ps: List[float]) -> float:
    from math import prod
    ps = [max(0.0, min(1.0, p)) for p in ps if p is not None]
    return 1 - prod([(1-p) for p in ps]) if ps else 0.0

# ---------- 入库 ----------
def write_to_neo4j(entities: Dict[str, CanonicalEntity], relations: Dict[Tuple[str,str,str], RelAgg], source_doc_id: str):
    node_cypher = """
    UNWIND $nodes AS n
    MERGE (e:Entity {id:n.id})
      ON CREATE SET e.name=n.name, e.type=n.type, e.aliases=n.aliases, e += n.attrs, e.createdAt=timestamp(), e.updatedAt=timestamp()
      ON MATCH  SET e.name=coalesce(n.name, e.name), e.type=coalesce(n.type, e.type), e.aliases=apoc.coll.toSet(coalesce(e.aliases, []) + n.aliases), e += n.attrs, e.updatedAt=timestamp()
    """
    rel_cypher = """
    UNWIND $rels AS r
    MATCH (s:Entity {id:r.src})
    MATCH (t:Entity {id:r.tgt})
    CALL apoc.merge.relationship(s, r.predicate, {}, {sourceDocId:$source_doc_id, evidences:r.evidences, when:r.when, where:r.where, props:r.props, confidence:r.confidence}, t)
      YIELD rel
    SET rel.updatedAt=timestamp()
    RETURN count(rel)
    """
    with driver.session() as sess:
        nodes = [{"id": e.id, "name": e.name, "type": e.type, "attrs": e.attrs, "aliases": list(e.aliases)} for e in entities.values()]
        sess.run(node_cypher, parameters={"nodes": nodes})
        rels = []
        for (src,pred,tgt), agg in relations.items():
            rels.append({
                "src": src, "predicate": pred, "tgt": tgt,
                "evidences": agg.evidences, "when": list(agg.when), "where": list(agg.where),
                "props": agg.props, "confidence": combine_confidences(agg.confidence_list)
            })
        if rels:
            sess.run(rel_cypher, parameters={"rels": rels, "source_doc_id": source_doc_id})

# ---------- 主流程 ----------
def build_graph_from_text(doc_id: str, raw_text: str, source_doc_id: str):
    chunks = split_text(raw_text)
    total = len(chunks)

    # Pass-1: 分块抽取
    chunk_outputs: List[ChunkIE] = []
    for i, chunk in enumerate(chunks, 1):
        prev_tail = chunks[i-2][-300:] if i-2 >= 0 else ""
        next_head = chunks[i][:300] if i < total else ""
        neighbor = build_neighbor_context(prev_tail, next_head)
        ie = extract_chunk_ie(doc_id, chunk, i, total, neighbor)
        chunk_outputs.append(ie)

    # Pass-2: 实体对齐
    entities: Dict[str, CanonicalEntity] = {}
    mention2entity: Dict[str, str] = {}  # mention_id -> canonical_id
    for ie in chunk_outputs:
        for m in ie.mentions:
            # 候选
            cands = candidates_for(entities, m.text, m.type, top=5)
            decision, conf = link_with_llm(m.text, m.type, m.text, cands)
            if decision == "NEW":
                cid = stable_id(m.text, m.type)
                entities[cid] = CanonicalEntity(id=cid, name=m.text, type=m.type, attrs=m.attrs, aliases={m.text})
            else:
                cid = decision
                # 记录别名
                entities[cid].aliases.add(m.text)
            mention2entity[m.id] = cid

    # Pass-3: 关系汇总
    relations: Dict[Tuple[str,str,str], RelAgg] = {}
    for idx, ie in enumerate(chunk_outputs, 1):
        for r in ie.rel_mentions:
            s = mention2entity.get(r.subj_mention_id)
            t = mention2entity.get(r.obj_mention_id)
            if not s or not t: 
                continue
            key = (s, r.predicate, t)
            if key not in relations:
                relations[key] = RelAgg(src=s, predicate=r.predicate, tgt=t)
            agg = relations[key]
            agg.evidences.append({
                "doc_id": doc_id, "chunk_id": idx,
                "text": r.evidence[:180], "offsets": [r.char_start, r.char_end]
            })
            if r.when: agg.when.add(r.when)
            if r.where: agg.where.add(r.where)
            agg.confidence_list.append(r.confidence)
            # 可选择把金额/比例等 props 合并为列表或选最大置信度那条
            for k, v in (r.props or {}).items():
                if k not in agg.props: agg.props[k] = set()
                agg.props[k].add(v)
    # 将 set 转 list
    for agg in relations.values():
        for k,v in list(agg.props.items()):
            if isinstance(v, set): agg.props[k] = list(v)

    # Pass-4: 入库
    write_to_neo4j(entities, relations, source_doc_id=source_doc_id)

五、三份关键提示词模板

1. “分块抽取”提示词(上文 IE_SYSTEM/IE_USER 已给)

  • 要求只输出 JSON必须给 offsets,关系要用提及ID指向主客体。

  • predicate 控制在规范动词集合(自定义枚举)内,减少花样词。

示例枚举(可放到 System):

Entity types: Person, Org, Product, Event, Location, Law, Concept
Predicates: FOUNDED, ACQUIRED, MERGED_WITH, PART_OF, WORKS_AT, LOCATED_IN, PUBLISHED, REGULATED_BY, INVESTED_IN

2. “实体链接判定”提示词(上文 LINK_SYSTEM/LINK_USER 已给)

  • 输出只能是 {"link_to":"<id|NEW>","confidence":0..1}

  • 传入候选列表,LLM只做“选择或NEW”,避免自由发挥。

3. “全局校对/补关系(可选)”

对全局汇总后的草图(top-N 实体与关系),请 LLM 做一致性检查与遗漏补全(例如从证据语句推导 FOUNDED)。
注意:把新增的标为 derived=true 并保留“来源关系键”。


六、Neo4j 图模式与溯源

建议最小可行模式:

  • (:Entity {id, name, type, aliases[], ...})

  • (:Entity)-[:REL_TYPE {sourceDocId, evidences[], when[], where[], props{}, confidence, derived?}]->(:Entity)

  • (可选):Evidence 节点太重,先用数组属性即可;等数据量/法务需求提升后再拆。

约束与索引:

CREATE CONSTRAINT entity_id IF NOT EXISTS FOR (n:Entity) REQUIRE n.id IS UNIQUE;
CREATE INDEX ent_type IF NOT EXISTS FOR (n:Entity) ON (n.type);

七、质量与鲁棒性(实践要点)

  1. 证据先行:强制输出 evidence 与 offsets,后续任何纠错都有依据。

  2. 两阶段思路:先“列举事实片段”,再“汇总定论”。

  3. 别名池aliases 越丰富,链接越稳;可在图中周期性把相同邮箱/域名/注册号的节点合并。

  4. 去重与幂等:写库时用 MERGE + APOC apoc.merge.relationship;关系以 (src, pred, tgt) 作为幂等键。

  5. 跨切片缺词:使用邻近窗口Top-K 全局实体摘要当“轻记忆”。

  6. 冲突仲裁:置信度聚合 + “最高权重证据优先”。

  7. 可观测性:记录每块耗时/失败/重试次数、每文档提及/关系密度分布。

  8. 成本控制chain.batch(chunks, config={"max_concurrency": 6}) 并发;重复文本用缓存;只对“模糊提及”调用链接判定 LLM。


八、你可以直接复用的最小运行清单

  • 输入:原始文本字符串 raw_text

  • 步骤

    1. chunks = split_text(raw_text)

    2. 遍历 chunksextract_chunk_ie(...) → 累积 mentions, rel_mentions

    3. link_with_llm(...) 把 mention 绑定到全局实体

    4. 合并成全局关系 relations[(src,pred,tgt)]

    5. write_to_neo4j(...) MERGE 入库

  • 验证

    MATCH (s:Entity)-[r]->(t:Entity) RETURN s.name, type(r), t.name, r.confidence, r.evidences[0..2] LIMIT 20;
    
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐