0x0 设计理念

维度 LangChain GraphRAG LlamaIndex GraphRAG
架构定位 Agent/Workflow 导向 数据索引层导向
设计哲学 图是推理流程的控制结构 图是知识表示结构
核心目标 复杂任务编排 提高知识检索质量
面向人群 构建复杂多步骤 Agent 的工程师 构建知识库 / QA 系统的人

一句话总结:

  • LangChain:Graph = 计算流程图
  • LlamaIndex:Graph = 知识图谱

换句话理解

LangChain GraphRAG  = orchestration layer
LlamaIndex GraphRAG = data layer


0x1 图数据存储 Graph Databases

LangChain 的 GraphRAG 更偏向workflow graph,而不是 knowledge graph。本身并不绑定 Neo4j,但支持Neo4j,ArangoDB,TigerGraph,Neptune,自定义 GraphStore。

LlamaIndex与Neo4j是最成熟、最常用的一个。本身也支持NebulaGraph,Memgraph,FalkorDB,Amazon Neptune

三大云厂商都提供

  • 🟢 Amazon Neptune
  • 🟢 Google Cloud Spanner Graph
  • 🟢 Azure Cosmos DB(Gremlin API)
产品 Graph 类型 成熟度
AWS Neptune Property + RDF ⭐⭐⭐⭐
GCP Spanner Graph Property ⭐⭐(较新)
Azure Cosmos DB Property ⭐⭐⭐


0x2 LangChain的GraphRAG例子→写入查询

以下的完整代码参考此Github Gist : https://gist.github.com/VictorZhang2014/219e97082bd0308ed453ec6b007bb501

PDF 文件
   │
   ▼
MarkItDown 解析 → Markdown 文本
   │
   ▼
LangChain 分块 → 800字/块,重叠150字
   │
   ├──→ OpenAI Embedding → 向量 → Neo4j 向量索引
   │
   └──→ GPT-4o-mini 实体抽取 → 实体/关系节点 → Neo4j 图谱
   
查询时:
问题 → Embedding → 向量检索 Top-5 Chunk
                 + 图谱实体关系子图
                 → GPT-4o 生成回答

a. 依赖安装

pip install markitdown langchain langchain-openai neo4j openai python-dotenv

b. 配置文件

NEO4J_URI      = os.getenv("NEO4J_URI",      "bolt://localhost:7687")
NEO4J_USER     = os.getenv("NEO4J_USER",     "neo4j")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "")          # 必填
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
EMBED_MODEL    = os.getenv("EMBED_MODEL",    "text-embedding-3-small")
LLM_MODEL      = os.getenv("LLM_MODEL",      "gpt-4o")
CHUNK_SIZE     = int(os.getenv("CHUNK_SIZE", "800"))
CHUNK_OVERLAP  = int(os.getenv("CHUNK_OVERLAP", "150"))
TOP_K          = int(os.getenv("TOP_K", "5"))
  • CHUNK_SIZE:PDF 文本会被切成一段一段的小块再存入数据库,CHUNK_SIZE 控制每块最多多少个字符。
    • 为什么要分块? LLM 有 Token 限制,整篇 PDF 无法一次性放入,分块后只取最相关的几块传给 LLM。
    • 调大(如 1500)→ 每块信息更完整,但检索精度下降,费用略高
    • 调小(如 400)→ 检索更精准,但可能切断完整语义
    • 默认 800 适合大多数中文文档
  • CHUNK_OVERLAP: 相邻两块之间的共享字符数,防止重要内容被切断丢失。如下所示
		CHUNK_SIZE = 800
        CHUNK_OVERLAP = 150

第1块  │←───────── 800字 ─────────→│
第2块           │←─150字重叠─→│←──── 800字 ────→│
第3块                        │←─150字重叠─→│←── 800字 ──→│

举个例子: 如果一句关键结论恰好跨在两块的边界,没有重叠的话这句话会被切成两半,两块都不完整。有了重叠,这句话会同时出现在第1块末尾和第2块开头。

设值参考:一般设为 CHUNK_SIZE15%~25% 比较合理。重叠太大会产生大量冗余数据,增加存储和费用

  • TOP_K:检索反馈最相关的K个chunk后交给LLM生成答案,如下所示:
用户提问:"什么是深度学习?"
         │
         ▼
向量检索 → 全库匹配
         │
         ▼ TOP_K = 5
    ┌────────────────────────┐
    │ Chunk #12  相关度 0.94 │ ✅
    │ Chunk #8   相关度 0.91 │ ✅
    │ Chunk #23  相关度 0.87 │ ✅  → 这5块拼成上下文 → GPT-4o 回答
    │ Chunk #5   相关度 0.82 │ ✅
    │ Chunk #31  相关度 0.79 │ ✅
    │ Chunk #2   相关度 0.61 │ ❌ 被丢弃
    └────────────────────────┘

设值参考:调大(如 10)→ 给 LLM 更多参考内容,回答更全面,但 Token 费用更高,也可能引入不相关内容。调小(如 3)→ 更快更省钱,但可能遗漏关键信息。

c. 数据库基本代码

from neo4j import GraphDatabase

def get_driver():
    return GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))

def run(driver, query, **params):
    with driver.session() as s:
        return list(s.run(query, **params))

d. 数据库初始化 for Neo4j

def init_schema(driver):
    """创建约束和向量索引"""
    stmts = [
        "CREATE CONSTRAINT doc_id IF NOT EXISTS FOR (d:Document) REQUIRE d.id IS UNIQUE",
        "CREATE CONSTRAINT chunk_id IF NOT EXISTS FOR (c:Chunk) REQUIRE c.id IS UNIQUE",
        "CREATE CONSTRAINT entity_uniq IF NOT EXISTS FOR (e:Entity) REQUIRE (e.name, e.type) IS UNIQUE",
        # Neo4j AuraDB 5.x 向量索引
        """CREATE VECTOR INDEX chunk_embedding_index IF NOT EXISTS
           FOR (c:Chunk) ON c.embedding
           OPTIONS {indexConfig: {`vector.dimensions`:1536, `vector.similarity_function`:'cosine'}}""",
    ]
    with driver.session() as s:
        for q in stmts:
            try:
                s.run(q)
            except Exception as e:
                log.debug("Schema: %s", e)

    # 等待索引上线
    import time
    for _ in range(20):
        with driver.session() as s:
            result = s.run(
                "SHOW VECTOR INDEXES WHERE name = 'chunk_embedding_index'"
            ).data()
            if result and result[0].get("state") == "ONLINE":
                log.info("✅ 向量索引 chunk_embedding_index 已就绪")
                return
        log.info("⏳ 等待向量索引上线...")
        time.sleep(2)
    log.warning("⚠️  向量索引可能尚未就绪,继续执行")
def cmd_init(args):
    driver = get_driver()
    print("🔧 正在初始化 Schema 和向量索引...")
    init_schema(driver)
    driver.close()

e. 上传数据到neo4j的Graph Database

上传指令 python3 main.py upload guide_msca.pdf

比如pdf文件,先将其转换成MarkDown

def parse_pdf(path: str) -> str:
    from markitdown import MarkItDown
    result = MarkItDown().convert(path)
    text = result.text_content
    if not text or not text.strip():
        raise ValueError("PDF 解析结果为空")
    log.info("📄 解析完成,%d 字符", len(text))
    return text

然后分块chunk,单个chunk最大800,重叠部分最大150

def split_text(text: str) -> list[str]:
    from langchain_text_splitters import RecursiveCharacterTextSplitter
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP,
        separators=["\n\n", "\n", "。", ".", " ", ""],
    )
    chunks = [c.strip() for c in splitter.split_text(text) if len(c.strip()) >= 30]
    log.info("🧩 分块完成:%d 块", len(chunks))
    return chunks

然后将text转换成embedding的向量化

def embed_texts(texts: list[str]) -> list[list[float]]:
    from langchain_openai import OpenAIEmbeddings
    embedder = OpenAIEmbeddings(model=EMBED_MODEL, openai_api_key=OPENAI_API_KEY)
    return embedder.embed_documents(texts)

最后插入到数据库

def ingest(file_path: str, driver):
    path = Path(file_path)
    doc_id = hashlib.md5(path.read_bytes()).hexdigest()
    filename = path.name

    # 1. 解析 PDF
    text = parse_pdf(str(path))

    # 2. 写入 Document 节点
    run(driver,
        "MERGE (d:Document {id:$id}) SET d.filename=$fn, d.createdAt=datetime()",
        id=doc_id, fn=filename)

    # 3. 分块 + 嵌入
    chunks = split_text(text)
    log.info("🔢 生成嵌入向量...")
    embeddings = embed_texts(chunks)

    # 4. 写入 Chunk 节点 + 实体抽取
    for i, (chunk, emb) in enumerate(zip(chunks, embeddings)):
        cid = f"{doc_id}_{i}"
        run(driver,
            """MERGE (c:Chunk {id:$cid})
               SET c.text=$text, c.embedding=$emb, c.idx=$i
               WITH c
               MATCH (d:Document {id:$did})
               MERGE (d)-[:HAS_CHUNK]->(c)""",
            cid=cid, text=chunk, emb=emb, i=i, did=doc_id)

        # 仅对前 40 块做实体抽取(省钱)
        if i < 40:
            kg = extract_kg(chunk)
            for e in kg.get("entities", []):
                if e.get("name"):
                    run(driver,
                        """MERGE (e:Entity {name:$name, type:$type})
                           SET e.desc=$desc
                           WITH e MATCH (c:Chunk {id:$cid})
                           MERGE (c)-[:MENTIONS]->(e)""",
                        name=e["name"], type=e.get("type","OTHER"),
                        desc=e.get("desc",""), cid=cid)
            for r in kg.get("relations", []):
                if r.get("src") and r.get("tgt"):
                    run(driver,
                        """MERGE (s:Entity {name:$src, type:$st})
                           MERGE (t:Entity {name:$tgt, type:$tt})
                           MERGE (s)-[:RELATES_TO {rel:$rel}]->(t)""",
                        src=r["src"], st=r.get("src_type","OTHER"),
                        tgt=r["tgt"], tt=r.get("tgt_type","OTHER"),
                        rel=r.get("rel","related"))

        if (i + 1) % 10 == 0:
            log.info("  %d/%d 块处理完毕", i+1, len(chunks))

    log.info("✅ 摄入完成: %s  doc_id=%s", filename, doc_id)
    return doc_id
def cmd_upload(args):
    path = Path(args.file)
    if not path.exists():
        print(f"❌ 文件不存在: {path}"); sys.exit(1)
    driver = get_driver()
    init_schema(driver)
    try:
        ingest(str(path), driver)
    finally:
        driver.close()

f. 查询已上传的文件数据

查询指令 python3 main.py list

def cmd_list(args):
    driver = get_driver()
    try:
        docs = run(driver,
            """MATCH (d:Document)
               OPTIONAL MATCH (d)-[:HAS_CHUNK]->(c)
               RETURN d.id AS id, d.filename AS name,
                      d.createdAt AS created, count(c) AS chunks
               ORDER BY d.createdAt DESC""")
        if not docs:
            print("📭 暂无文档"); return
        print(f"\n{'ID':<34} {'文件名':<28} {'块数':>5}")
        print("-"*70)
        for d in docs:
            print(f"{d['id']:<34} {d['name'][:26]:<28} {d['chunks']:>5}")
    finally:
        driver.close()

g. 更新已上传的数据到neo4j的Graph Database

更新指令 python3 main.py update New_Guided_MSCA.pdf

更新数据的逻辑是先删除→再插入新的数据

def cmd_update(args):
    """删除旧文档后重新上传"""
    path = Path(args.file)
    if not path.exists():
        print(f"❌ 文件不存在: {path}"); sys.exit(1)
    driver = get_driver()
    # 按文件名找旧文档
    docs = run(driver,
        "MATCH (d:Document) WHERE d.filename = $fn RETURN d.id AS id, d.filename AS name",
        fn=path.name)
    if docs:
        for d in docs:
            run(driver,
                """MATCH (d:Document {id:$id})
                   OPTIONAL MATCH (d)-[:HAS_CHUNK]->(c:Chunk)
                   DETACH DELETE d, c""",
                id=d["id"])
            print(f"🗑️  已删除旧版本: {d['name']}")
    else:
        print(f"ℹ️  未找到旧版本,直接上传")
    driver.close()
    # 重新上传
    driver = get_driver()
    try:
        ingest(str(path), driver)
    finally:
        driver.close()

h. 最后关键一步:用户查询→LLM和NEO4J交互

执行命令是:python3 main.py query,然后等待提示,就可以输出问题,比如问题:什么是MSCA项目
LLM通过neo4j graph db的回复是:

根据提供的上下文,MSCA项目指的是Marie Skłodowska-Curie Actions(玛丽·居里行动),这是欧盟的一项计划,旨在支持研究人员的职业发展和培训。具体到2024年的博士后奖学金(Postdoctoral Fellowships),该计划为研究人员提供在欧盟成员国或Horizon Europe关联国家进行研究的机会。申请者需要满足特定的资格条件,并且可以选择在非学术部门进行最多六个月的额外实习期。MSCA项目还包括其他元素,如第二次派遣和非学术实习,以增加项目的价值和影响。

LLM的代码

SYSTEM_PROMPT = """你是专业文档问答助手。严格基于提供的上下文回答问题,不要编造内容。
如上下文不足,请如实说明。回答要清晰、准确、有条理。"""

def ask(question: str, context: str, history: list) -> str:
    from langchain_openai import ChatOpenAI
    from langchain_core.messages import SystemMessage, HumanMessage, AIMessage

    llm = ChatOpenAI(model=LLM_MODEL, temperature=0.1, openai_api_key=OPENAI_API_KEY)

    messages = [SystemMessage(content=SYSTEM_PROMPT)]
    for h, a in history[-4:]:  # 最近4轮历史
        messages += [HumanMessage(content=h), AIMessage(content=a)]

    user_content = f"上下文:\n{context}\n\n问题: {question}" if context else question
    messages.append(HumanMessage(content=user_content))

    return llm.invoke(messages).content

从neo4j数据库查询的代码

def retrieve(question: str, driver) -> str:
    qvec = embed_query(question)

    # 向量检索
    chunks = run(driver,
        """CALL db.index.vector.queryNodes('chunk_embedding_index', $k, $vec)
           YIELD node, score
           MATCH (d:Document)-[:HAS_CHUNK]->(node)
           RETURN node.text AS text, d.filename AS filename, score
           ORDER BY score DESC""",
        k=TOP_K, vec=qvec)

    if not chunks:
        return ""

    chunk_context = "\n\n".join(
        f"[{r['filename']} | 相关度:{r['score']:.3f}]\n{r['text']}"
        for r in chunks
    )

    # 图谱实体关系检索
    chunk_texts = [r["text"][:100] for r in chunks]
    graph_facts = run(driver,
        """UNWIND $texts AS t
           MATCH (c:Chunk) WHERE c.text STARTS WITH t
           MATCH (c)-[:MENTIONS]->(e:Entity)
           OPTIONAL MATCH (e)-[r:RELATES_TO]->(e2:Entity)
           RETURN DISTINCT e.name AS src, e.type AS src_type,
                  r.rel AS rel, e2.name AS tgt, e2.type AS tgt_type
           LIMIT 30""",
        texts=chunk_texts)

    graph_context = ""
    if graph_facts:
        lines = []
        seen = set()
        for r in graph_facts:
            if r["rel"] and r["tgt"]:
                key = (r["src"], r["rel"], r["tgt"])
                if key not in seen:
                    seen.add(key)
                    lines.append(f"  [{r['src_type']}]{r['src']} --{r['rel']}--> [{r['tgt_type']}]{r['tgt']}")
            elif r["src"] and (r["src"],) not in seen:
                seen.add((r["src"],))
                lines.append(f"  [{r['src_type']}]{r['src']}")
        if lines:
            graph_context = "\n知识图谱关联事实:\n" + "\n".join(lines)

    return chunk_context + graph_context

查询串通逻辑的代码:

def cmd_query(args):
    driver = get_driver()
    init_schema(driver)
    print("\n🤖 GraphRAG PDF 问答(输入 exit 退出,clear 清空历史)\n")
    history = []
    try:
        while True:
            try:
                q = input("你: ").strip()
            except (EOFError, KeyboardInterrupt):
                print("\n👋"); break
            if not q: continue
            if q.lower() in ("exit", "quit"): print("👋"); break
            if q.lower() == "clear":
                history.clear(); print("🗑️ 历史已清空\n"); continue

            context = retrieve(q, driver)
            if not context:
                print("AI: 未找到相关文档内容,请先上传 PDF。\n")
                continue
            answer = ask(q, context, history)
            print(f"\nAI: {answer}\n")
            history.append((q, answer))
    finally:
        driver.close()

完成代码,参考github gist: https://gist.github.com/VictorZhang2014/219e97082bd0308ed453ec6b007bb501

还有CSDN的资源库:https://download.csdn.net/download/u013538542/92676979

最后效果演示

提问

🤖 GraphRAG PDF 问答(输入 exit 退出,clear 清空历史)

你: MSCA给研究者有什么好处

AI回复

AI: 根据提供的上下文,Marie Skłodowska-Curie Actions (MSCA) 为研究者提供以下好处:

1. **研究资助**:MSCA 提供资金支持研究者进行高水平的研究活动。

2. **知识转移和培训**:MSCA 项目包括知识转移和培训活动,帮助研究者提升技能和知识。

3. **国际合作机会**:研究者有机会在不同国家和机构之间进行合作,扩大国际视野。

4. **职业发展**:通过参与 MSCA 项目,研究者可以获得职业发展的机会,包括在非学术部门的实习。

5. **全球交流**:特别是通过 Global Fellowships,研究者可以在全球范围内进行研究和交流。

这些好处旨在支持研究者的职业发展和研究能力的提升。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐