第四章:大模型(LLM)

第十一部分:LLM + RAG 构建私有领域专家

第五节:Rag实战: Rag完整项目代码详解


一、项目总览(目标与边界)

目标:搭建一个可生产化的 RAG 服务,支持:

  • 文档入库(文件解析 → chunk → embedding → index)

  • 向量检索(FAISS / Milvus)

  • 可选精排(cross-encoder)

  • Prompt 组装与生成(可选:云端生成/本地生成)

  • REST API(/ingest, /query, /status)

  • 日志、审计与基本监控

边界:示例以 Python + FastAPI 为主,embedding 用通义千问 API(或本地 sentence-transformers),检索用 FAISS(可扩展到 Milvus),生成可配置为通义生成或本地模型。


二、项目目录与说明(推荐结构)

rag_project/
├─ README.md
├─ requirements.txt
├─ app/
│  ├─ main.py                # FastAPI 服务入口
│  ├─ config.py              # 全局配置(API keys, paths)
│  ├─ logger.py              # 日志与审计配置
│  ├─ api/
│  │  ├─ ingest.py           # 文档入库接口
│  │  ├─ query.py            # 查询接口
│  ├─ core/
│  │  ├─ text_loader.py      # 文件解析(pdf/docx/html)
│  │  ├─ chunker.py          # chunk策略
│  │  ├─ embedder.py         # 调用 embedding api / 本地 embed
│  │  ├─ indexer.py          # FAISS / Milvus 操作
│  │  ├─ retriever.py        # Retriever logic (dense+sparse hybrid)
│  │  ├─ reranker.py         # Cross-encoder rerank(可选)
│  │  ├─ prompt.py           # Prompt 模板及拼装
│  │  ├─ generator.py        # 调用生成模型(通义 / 本地)
│  ├─ models/                # 保存索引、metadata、模型文件
│  ├─ tests/                 # 单元与集成测试
├─ deployments/
│  ├─ Dockerfile
│  ├─ k8s-deployment.yaml
└─ scripts/
   ├─ build_index.py
   ├─ ingest_file.py
   └─ local_query.py

三、核心模块详解与关键代码

我将每个模块先讲清职责,然后给关键实现(简化版但可直接运行),最后给工程化注意点。


1) config.py — 配置管理

职责:集中配置(环境变量优先),便于部署环境差异化替换。

# app/config.py
import os
from pydantic import BaseSettings

class Settings(BaseSettings):
    DASHSCOPE_API_KEY: str = os.getenv("DASHSCOPE_API_KEY", "")
    DASHSCOPE_BASE_URL: str = os.getenv("DASHSCOPE_BASE_URL", "https://dashscope-intl.aliyuncs.com/compatible-mode/v1")
    EMBED_MODEL: str = os.getenv("EMBED_MODEL", "text-embedding-v4")
    GENERATE_MODEL: str = os.getenv("GENERATE_MODEL", "qwen-vl-plus")
    INDEX_DIR: str = os.getenv("INDEX_DIR", "./app/models")
    FAISS_INDEX_PATH: str = os.path.join(INDEX_DIR, "faiss.index")
    METADATA_PATH: str = os.path.join(INDEX_DIR, "metadatas.jsonl")
    CHUNK_SIZE: int = int(os.getenv("CHUNK_SIZE", 1500))
    CHUNK_OVERLAP: int = int(os.getenv("CHUNK_OVERLAP", 300))
    CROSS_ENCODER_MODEL: str = os.getenv("CROSS_ENCODER_MODEL", "cross-encoder/ms-marco-MiniLM-L-6-v2")
    # ... 更多配置
    class Config:
        env_file = ".env"

settings = Settings()

注意:使用 pydantic 的 BaseSettings 能从 .env 加载并自动类型转换,便于配置管理。


2) text_loader.py — 文档抽取

职责:把多种格式(PDF/DOCX/HTML/TXT)解析为纯文本,保留元数据。

关键点:处理编码、OCR(可选)、表格提取(如果需要,可用 camelot/tabula)。

# app/core/text_loader.py
import pdfplumber, os
from docx import Document

def extract_pdf(path):
    texts=[]
    with pdfplumber.open(path) as pdf:
        for p in pdf.pages:
            txt = p.extract_text() or ""
            texts.append(txt)
    return "\n".join(texts)

def extract_docx(path):
    doc = Document(path)
    return "\n".join([p.text for p in doc.paragraphs])

def load_file(path):
    ext = os.path.splitext(path)[1].lower()
    if ext==".pdf":
        return extract_pdf(path)
    if ext in (".docx", ".doc"):
        return extract_docx(path)
    if ext in (".txt",):
        with open(path,'r',encoding='utf-8') as f:
            return f.read()
    # html handling, other types...
    raise ValueError("Unsupported format")

3) chunker.py — 文本分片

职责:把长文本切成带 overlap 的 chunk,输出 {"text":..., "meta": {...}}

工程要点:为每个 chunk 生成唯一 chunk_id 和保留 source_doc_idchar_span

# app/core/chunker.py
import uuid

def sliding_window_chunk(text, chunk_chars=1500, overlap_chars=300, source_id=None):
    start=0; N=len(text)
    chunks=[]
    idx=0
    while start < N:
        end = min(N, start+chunk_chars)
        chunk_text = text[start:end].strip()
        if chunk_text:
            chunk_id = f"{source_id or 'doc'}_{idx}_{uuid.uuid4().hex[:8]}"
            chunks.append({
                "chunk_id": chunk_id,
                "text": chunk_text,
                "meta": {"source_id": source_id,"char_span": (start,end)}
            })
            idx+=1
        if end==N: break
        start = end - overlap_chars
    return chunks

4) embedder.py — 嵌入器(通义 / 本地)

职责:对 batch 文本调用 embedding(云/本地),返回向量。需支持批量、重试与速率限制。

关键实现(通义):

# app/core/embedder.py
import requests, time, numpy as np
from app.config import settings

HEADERS = {"Authorization": f"Bearer {settings.DASHSCOPE_API_KEY}", "Content-Type": "application/json"}

def qwen_embeddings(texts, model=None, batch_size=16, max_retries=3):
    url = f"{settings.DASHSCOPE_BASE_URL}/embeddings"
    model = model or settings.EMBED_MODEL
    embs=[]
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i+batch_size]
        payload = {"model": model, "input": batch}
        for attempt in range(max_retries):
            r = requests.post(url, headers=HEADERS, json=payload, timeout=60)
            if r.status_code==200:
                data = r.json()
                embs.extend([np.array(item["embedding"], dtype=np.float32) for item in data["data"]])
                break
            else:
                time.sleep(2**attempt)
                if attempt==max_retries-1:
                    raise RuntimeError("Embedding failed: "+r.text)
    return embs

本地 embedding(可选):

from sentence_transformers import SentenceTransformer
_local_model = None
def local_embeddings(texts, model_name="BAAI/bge-large-zh"):
    global _local_model
    if _local_model is None:
        _local_model = SentenceTransformer(model_name)
    return _local_model.encode(texts, show_progress_bar=False, normalize_embeddings=True)

5) indexer.py — FAISS / Milvus 索引管理

职责:建立索引、增量 upsert、保存/加载 metadata(chunks 与 meta)。支持选择不同类型索引(Flat / IVF / HNSW)。

关键实现(FAISS FlatIP 例):

# app/core/indexer.py
import faiss, numpy as np, os, json
from app.config import settings

def build_faiss_index(vectors, dim, index_path=settings.FAISS_INDEX_PATH):
    index = faiss.IndexFlatIP(dim)
    faiss.normalize_L2(vectors)
    index.add(vectors)
    os.makedirs(os.path.dirname(index_path), exist_ok=True)
    faiss.write_index(index, index_path)
    return index

def load_faiss(index_path=settings.FAISS_INDEX_PATH):
    if not os.path.exists(index_path):
        return None
    return faiss.read_index(index_path)

Metadata 存储:把 chunk metadatas 存为 JSONL(按行),索引顺序与 metadata 对齐,便于检索后回查。


6) retriever.py — 检索策略(Dense / Hybrid)

职责:给定 query,执行:

  • embedding -> q_vec

  • dense search -> top_k_dense

  • optionally BM25 -> top_k_sparse

  • 合并两路 candidate -> (optional) cross-encoder rerank -> top_k_final

简化版:

# app/core/retriever.py
from app.core.embedder import qwen_embeddings
from app.core.indexer import load_faiss

index = load_faiss()
metadatas = load_metadata()  # implement load_metadata

def retrieve(query, top_k=10):
    q_emb = qwen_embeddings([query])[0].astype('float32').reshape(1,-1)
    faiss.normalize_L2(q_emb)
    D, I = index.search(q_emb, top_k)
    results=[]
    for score, idx in zip(D[0], I[0]):
        md = metadatas[idx]
        results.append({"score": float(score), "chunk_id": md["chunk_id"], "text": md["text"], "meta": md["meta"]})
    return results

7) reranker.py — cross-encoder 精排(可选,但强烈推荐)

职责:对候选做更精细语义评分(query + candidate)。通常用 CrossEncoder(sentence-transformers)。

要点:

  • 只在 top-N(例如 100)上运行,避免慢请求。

  • cross-encoder 得分对 prompt 的质量影响极大。

# app/core/reranker.py
from sentence_transformers import CrossEncoder
_ce = None
def rerank(query, candidates, model_name=None):
    global _ce
    model_name = model_name or settings.CROSS_ENCODER_MODEL
    if _ce is None:
        _ce = CrossEncoder(model_name)
    pairs = [(query, c['text']) for c in candidates]
    scores = _ce.predict(pairs, show_progress_bar=False)
    for c,s in zip(candidates, scores):
        c['rerank_score'] = float(s)
    candidates = sorted(candidates, key=lambda x: x['rerank_score'], reverse=True)
    return candidates

8) prompt.py — Prompt 模板管理与拼装

职责:把 top-k 文档拼装进 Prompt,控制 token 长度并明确约束(如要求列出来源、不可编造)。

模板示例(中文):

PROMPT_TEMPLATE = """
你是一个专业领域助手。请仅基于下面的检索到资料回答问题,不要编造事实。
检索到的资料:
{retrieved}

用户问题:{query}

请给出清晰、分步的回答,并在答案末尾列出引用来源编号(例如:[1])。
"""
def assemble_prompt(query, hits, top_k=3):
    retrieved=""
    for i, h in enumerate(hits[:top_k], start=1):
        src = h['meta'].get('source_id', 'unknown')
        retrieved += f"[{i}] 来源: {src}\n{h['text']}\n\n"
    return PROMPT_TEMPLATE.format(retrieved=retrieved, query=query)

注意:当拼接 retrieved 内容时,应优先保留 rerank 得分最高的片段,并限制每段字符数(如截断到 1024 字符)。


9) generator.py — 生成器(通义或本地)

职责:把 prompt 传给生成模型,返回文本。支持 cloud call(通义)或本地 transformer 调用(量化优化建议)。

通义示例:

# cloud generate
def call_qwen_chat(prompt, model=None, max_tokens=512, temperature=0.0):
    url = f"{settings.DASHSCOPE_BASE_URL}/chat/completions"
    payload = {
        "model": model or settings.GENERATE_MODEL,
        "messages": [{"role":"system","content":"你是领域专家。"}, {"role":"user","content":[{"type":"text","text":prompt}]}],
        "temperature": temperature,
        "max_tokens": max_tokens
    }
    r = requests.post(url, headers=HEADERS, json=payload, timeout=120)
    r.raise_for_status()
    data = r.json()
    # parse content robustly
    content = data['choices'][0]['message']['content']
    if isinstance(content, list):
        text = "".join([c.get("text","") for c in content if c.get("type")=="text"])
    else:
        text = content
    return text

本地示例(transformers):

from transformers import AutoTokenizer, AutoModelForCausalLM

tokenizer = AutoTokenizer.from_pretrained(LOCAL_MODEL, use_fast=False)
model = AutoModelForCausalLM.from_pretrained(LOCAL_MODEL, device_map="auto", torch_dtype="auto")

def generate_local(prompt, max_new_tokens=256, temperature=0.0):
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    gen = model.generate(**inputs, max_new_tokens=max_new_tokens, do_sample=(temperature>0.0), temperature=temperature)
    out = tokenizer.decode(gen[0], skip_special_tokens=True)
    # extract answer portion if necessary
    if out.startswith(prompt):
        return out[len(prompt):].strip()
    return out

工程建议

  • 对本地模型进行量化(bitsandbytes、QLoRA)以降低显存需求。

  • 本地生成建议禁用 sampling(temperature=0)用于稳定性,或开启小温度用于更开放的对话。


10) api/ingest.pyapi/query.py — FastAPI 接口

/ingest:接受文件上传或URL,执行:load → chunk → embed → index add → persist metadata

/query:接收 query, top_k 参数,执行:embed(query) → retrieve → (optional rerank) → assemble prompt → generate → 返回 answer + sources + retrieval_debug

示例 query 接口:

# app/api/query.py
from fastapi import APIRouter
from pydantic import BaseModel
from app.core.retriever import retrieve
from app.core.reranker import rerank
from app.core.prompt import assemble_prompt
from app.core.generator import generate_local, call_qwen_chat

router = APIRouter()

class QueryReq(BaseModel):
    query: str
    top_k: int = 5
    use_local: bool = True

@router.post("/query")
def query_endpoint(req: QueryReq):
    hits = retrieve(req.query, top_k=50)  # get candidates
    hits = rerank(req.query, hits)        # rerank
    prompt = assemble_prompt(req.query, hits, top_k=req.top_k)
    if req.use_local:
        answer = generate_local(prompt)
    else:
        answer = call_qwen_chat(prompt)
    return {"answer": answer, "sources": hits[:req.top_k]}

四、测试、评估与调优

单元测试

  • tests/test_chunker.py:验证 chunk 长度、overlap

  • tests/test_embedder.py:mock embedding 接口,确保输出维度

  • tests/test_indexer.py:小数据集建立索引并检索验证

检索评估(离线)

  • Recall@k(gold doc in top-k)

  • MRR / nDCG

生成评估

  • 人工评审(accuracy / hallucination rate)

  • 自动指标(ROUGE/BLEU)对“有参考答案”的数据集

调优技巧(经验)

  • chunk_size:短文档用 300-800,长文档可 1500;中文多试几组

  • top_k 检索:初期 50 → rerank → 最终取 3~5 片段进 prompt

  • cross-encoder:用于精排,选择支持中文的 cross-encoder

  • nlist / nprobe(FAISS IVF):nlist ~ sqrt(N) 或 N/100,nprobe 设 10~50 试 trade-off


五、部署(Docker + k8s)与运维

Dockerfile(示例)

FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
ENV PYTHONUNBUFFERED=1
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]

k8s(简要)

  • Deployment:分配 CPU/GPU、env secrets(API keys)

  • Horizontal Pod Autoscaler:基于 CPU 或 custom metrics(latency)

  • PersistentVolume:存放 FAISS index 与 metadata

  • Service + Ingress / API Gateway:对外暴露


六、监控与审计(生产必备)

  • Metrics:Latency分段(embed、retrieval、rerank、generate)、QPS、错误率

  • Logging:Request id、user id(如有)、retrieval ids、source list、生成文本摘要

  • Feedback loop:用户反馈保存到 DB,用作后续训练/微调数据

  • 安全审计:保存每次检索到的源与最终答案以便责任追踪


七、常见坑与注意事项(实战总结)

  1. Embedding 速率限制:云端 embedding 接口有吞吐和速率限制,必须批量化与重试策略。

  2. 索引/metadata 不一致:确保向量顺序与 metadata 一一对应,持久化时保存映射文件。

  3. prompt 超长:谨慎控制 prompt 长度;使用 rerank 限制 top segments。

  4. 泄露隐私:对上传数据做 PII 检测与脱敏,加入访问控制。

  5. 幻觉风险:提示模型“仅基于检索资料回答”,并返回 sources;高风险场景加入人工审查。

  6. 备份与恢复:对 index 和 metadata 做周期性备份;若使用 IVF 必须保留训练样本以便重构。


八、示例快速启动脚本(scripts/build_index.py

# scripts/build_index.py (简化)
from app.core.text_loader import load_file
from app.core.chunker import sliding_window_chunk
from app.core.embedder import qwen_embeddings
from app.core.indexer import build_faiss_index
import json, os

file_path = "data/legal_docs.pdf"
text = load_file(file_path)
chunks = sliding_window_chunk(text, chunk_chars=1500, overlap_chars=300, source_id="legal_docs")
texts = [c['text'] for c in chunks]
vectors = qwen_embeddings(texts, batch_size=16)
import numpy as np
arr = np.stack(vectors).astype('float32')
build_faiss_index(arr, dim=arr.shape[1])
# persist metadata
os.makedirs("app/models", exist_ok=True)
with open("app/models/metadatas.jsonl","w",encoding="utf-8") as f:
    for c in chunks:
        f.write(json.dumps(c, ensure_ascii=False)+"\n")

九、扩展功能建议(产品级)

  • 多语言支持:按语言建立分库或在 embedding 时传语言参数

  • 向量压缩与召回分层:使用 PQ/OPQ 为冷数据降维

  • 知识图谱结合:把 structured knowledge (triples) 与向量检索混合

  • 差分隐私 / 加密索引:在高合规场景下使用加密检索方案

  • 在线学习:把用户反馈作为微调数据(LoRA/QLoRA)定期落地


十、交付清单(你将获得)

  • 完整项目目录与代码骨架(如上)

  • 可运行的脚本:scripts/build_index.pyapp/main.py(FastAPI)与示例 curl 用法

  • 部署材料:Dockerfile、k8s 模板、.env 示例

  • 调优建议表(如何选 chunk_size、index type、rerank 参数)

  • 测试用例模板(检索 recall、生成校验)

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐