【第四章:大模型(LLM)】11.LLM + RAG:构建私有领域专家-(5)Rag实战: Rag完整项目代码详解
本文详细介绍了如何构建一个完整的RAG(检索增强生成)系统,从项目架构到核心模块实现。主要内容包括:1)项目目标与边界,支持文档入库、向量检索、精排和RESTAPI;2)推荐的Python项目目录结构;3)核心模块详解(文本加载、分块、嵌入、索引、检索、精排、提示组装和生成);4)FastAPI接口实现;5)测试评估与调优建议;6)部署方案(Docker+k8s);7)监控与审计方案;8)常见问题
第四章:大模型(LLM)
第十一部分:LLM + RAG 构建私有领域专家
第五节:Rag实战: Rag完整项目代码详解
一、项目总览(目标与边界)
目标:搭建一个可生产化的 RAG 服务,支持:
-
文档入库(文件解析 → chunk → embedding → index)
-
向量检索(FAISS / Milvus)
-
可选精排(cross-encoder)
-
Prompt 组装与生成(可选:云端生成/本地生成)
-
REST API(/ingest, /query, /status)
-
日志、审计与基本监控
边界:示例以 Python + FastAPI 为主,embedding 用通义千问 API(或本地 sentence-transformers),检索用 FAISS(可扩展到 Milvus),生成可配置为通义生成或本地模型。
二、项目目录与说明(推荐结构)
rag_project/
├─ README.md
├─ requirements.txt
├─ app/
│ ├─ main.py # FastAPI 服务入口
│ ├─ config.py # 全局配置(API keys, paths)
│ ├─ logger.py # 日志与审计配置
│ ├─ api/
│ │ ├─ ingest.py # 文档入库接口
│ │ ├─ query.py # 查询接口
│ ├─ core/
│ │ ├─ text_loader.py # 文件解析(pdf/docx/html)
│ │ ├─ chunker.py # chunk策略
│ │ ├─ embedder.py # 调用 embedding api / 本地 embed
│ │ ├─ indexer.py # FAISS / Milvus 操作
│ │ ├─ retriever.py # Retriever logic (dense+sparse hybrid)
│ │ ├─ reranker.py # Cross-encoder rerank(可选)
│ │ ├─ prompt.py # Prompt 模板及拼装
│ │ ├─ generator.py # 调用生成模型(通义 / 本地)
│ ├─ models/ # 保存索引、metadata、模型文件
│ ├─ tests/ # 单元与集成测试
├─ deployments/
│ ├─ Dockerfile
│ ├─ k8s-deployment.yaml
└─ scripts/
├─ build_index.py
├─ ingest_file.py
└─ local_query.py
三、核心模块详解与关键代码
我将每个模块先讲清职责,然后给关键实现(简化版但可直接运行),最后给工程化注意点。
1) config.py
— 配置管理
职责:集中配置(环境变量优先),便于部署环境差异化替换。
# app/config.py
import os
from pydantic import BaseSettings
class Settings(BaseSettings):
DASHSCOPE_API_KEY: str = os.getenv("DASHSCOPE_API_KEY", "")
DASHSCOPE_BASE_URL: str = os.getenv("DASHSCOPE_BASE_URL", "https://dashscope-intl.aliyuncs.com/compatible-mode/v1")
EMBED_MODEL: str = os.getenv("EMBED_MODEL", "text-embedding-v4")
GENERATE_MODEL: str = os.getenv("GENERATE_MODEL", "qwen-vl-plus")
INDEX_DIR: str = os.getenv("INDEX_DIR", "./app/models")
FAISS_INDEX_PATH: str = os.path.join(INDEX_DIR, "faiss.index")
METADATA_PATH: str = os.path.join(INDEX_DIR, "metadatas.jsonl")
CHUNK_SIZE: int = int(os.getenv("CHUNK_SIZE", 1500))
CHUNK_OVERLAP: int = int(os.getenv("CHUNK_OVERLAP", 300))
CROSS_ENCODER_MODEL: str = os.getenv("CROSS_ENCODER_MODEL", "cross-encoder/ms-marco-MiniLM-L-6-v2")
# ... 更多配置
class Config:
env_file = ".env"
settings = Settings()
注意:使用 pydantic
的 BaseSettings 能从 .env
加载并自动类型转换,便于配置管理。
2) text_loader.py
— 文档抽取
职责:把多种格式(PDF/DOCX/HTML/TXT)解析为纯文本,保留元数据。
关键点:处理编码、OCR(可选)、表格提取(如果需要,可用 camelot
/tabula
)。
# app/core/text_loader.py
import pdfplumber, os
from docx import Document
def extract_pdf(path):
texts=[]
with pdfplumber.open(path) as pdf:
for p in pdf.pages:
txt = p.extract_text() or ""
texts.append(txt)
return "\n".join(texts)
def extract_docx(path):
doc = Document(path)
return "\n".join([p.text for p in doc.paragraphs])
def load_file(path):
ext = os.path.splitext(path)[1].lower()
if ext==".pdf":
return extract_pdf(path)
if ext in (".docx", ".doc"):
return extract_docx(path)
if ext in (".txt",):
with open(path,'r',encoding='utf-8') as f:
return f.read()
# html handling, other types...
raise ValueError("Unsupported format")
3) chunker.py
— 文本分片
职责:把长文本切成带 overlap 的 chunk,输出 {"text":..., "meta": {...}}
。
工程要点:为每个 chunk 生成唯一 chunk_id
和保留 source_doc_id
、char_span
。
# app/core/chunker.py
import uuid
def sliding_window_chunk(text, chunk_chars=1500, overlap_chars=300, source_id=None):
start=0; N=len(text)
chunks=[]
idx=0
while start < N:
end = min(N, start+chunk_chars)
chunk_text = text[start:end].strip()
if chunk_text:
chunk_id = f"{source_id or 'doc'}_{idx}_{uuid.uuid4().hex[:8]}"
chunks.append({
"chunk_id": chunk_id,
"text": chunk_text,
"meta": {"source_id": source_id,"char_span": (start,end)}
})
idx+=1
if end==N: break
start = end - overlap_chars
return chunks
4) embedder.py
— 嵌入器(通义 / 本地)
职责:对 batch 文本调用 embedding(云/本地),返回向量。需支持批量、重试与速率限制。
关键实现(通义):
# app/core/embedder.py
import requests, time, numpy as np
from app.config import settings
HEADERS = {"Authorization": f"Bearer {settings.DASHSCOPE_API_KEY}", "Content-Type": "application/json"}
def qwen_embeddings(texts, model=None, batch_size=16, max_retries=3):
url = f"{settings.DASHSCOPE_BASE_URL}/embeddings"
model = model or settings.EMBED_MODEL
embs=[]
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
payload = {"model": model, "input": batch}
for attempt in range(max_retries):
r = requests.post(url, headers=HEADERS, json=payload, timeout=60)
if r.status_code==200:
data = r.json()
embs.extend([np.array(item["embedding"], dtype=np.float32) for item in data["data"]])
break
else:
time.sleep(2**attempt)
if attempt==max_retries-1:
raise RuntimeError("Embedding failed: "+r.text)
return embs
本地 embedding(可选):
from sentence_transformers import SentenceTransformer
_local_model = None
def local_embeddings(texts, model_name="BAAI/bge-large-zh"):
global _local_model
if _local_model is None:
_local_model = SentenceTransformer(model_name)
return _local_model.encode(texts, show_progress_bar=False, normalize_embeddings=True)
5) indexer.py
— FAISS / Milvus 索引管理
职责:建立索引、增量 upsert、保存/加载 metadata(chunks 与 meta)。支持选择不同类型索引(Flat / IVF / HNSW)。
关键实现(FAISS FlatIP 例):
# app/core/indexer.py
import faiss, numpy as np, os, json
from app.config import settings
def build_faiss_index(vectors, dim, index_path=settings.FAISS_INDEX_PATH):
index = faiss.IndexFlatIP(dim)
faiss.normalize_L2(vectors)
index.add(vectors)
os.makedirs(os.path.dirname(index_path), exist_ok=True)
faiss.write_index(index, index_path)
return index
def load_faiss(index_path=settings.FAISS_INDEX_PATH):
if not os.path.exists(index_path):
return None
return faiss.read_index(index_path)
Metadata 存储:把 chunk metadatas 存为 JSONL(按行),索引顺序与 metadata 对齐,便于检索后回查。
6) retriever.py
— 检索策略(Dense / Hybrid)
职责:给定 query,执行:
-
embedding -> q_vec
-
dense search -> top_k_dense
-
optionally BM25 -> top_k_sparse
-
合并两路 candidate -> (optional) cross-encoder rerank -> top_k_final
简化版:
# app/core/retriever.py
from app.core.embedder import qwen_embeddings
from app.core.indexer import load_faiss
index = load_faiss()
metadatas = load_metadata() # implement load_metadata
def retrieve(query, top_k=10):
q_emb = qwen_embeddings([query])[0].astype('float32').reshape(1,-1)
faiss.normalize_L2(q_emb)
D, I = index.search(q_emb, top_k)
results=[]
for score, idx in zip(D[0], I[0]):
md = metadatas[idx]
results.append({"score": float(score), "chunk_id": md["chunk_id"], "text": md["text"], "meta": md["meta"]})
return results
7) reranker.py
— cross-encoder 精排(可选,但强烈推荐)
职责:对候选做更精细语义评分(query + candidate)。通常用 CrossEncoder(sentence-transformers)。
要点:
-
只在 top-N(例如 100)上运行,避免慢请求。
-
cross-encoder 得分对 prompt 的质量影响极大。
# app/core/reranker.py
from sentence_transformers import CrossEncoder
_ce = None
def rerank(query, candidates, model_name=None):
global _ce
model_name = model_name or settings.CROSS_ENCODER_MODEL
if _ce is None:
_ce = CrossEncoder(model_name)
pairs = [(query, c['text']) for c in candidates]
scores = _ce.predict(pairs, show_progress_bar=False)
for c,s in zip(candidates, scores):
c['rerank_score'] = float(s)
candidates = sorted(candidates, key=lambda x: x['rerank_score'], reverse=True)
return candidates
8) prompt.py
— Prompt 模板管理与拼装
职责:把 top-k 文档拼装进 Prompt,控制 token 长度并明确约束(如要求列出来源、不可编造)。
模板示例(中文):
PROMPT_TEMPLATE = """
你是一个专业领域助手。请仅基于下面的检索到资料回答问题,不要编造事实。
检索到的资料:
{retrieved}
用户问题:{query}
请给出清晰、分步的回答,并在答案末尾列出引用来源编号(例如:[1])。
"""
def assemble_prompt(query, hits, top_k=3):
retrieved=""
for i, h in enumerate(hits[:top_k], start=1):
src = h['meta'].get('source_id', 'unknown')
retrieved += f"[{i}] 来源: {src}\n{h['text']}\n\n"
return PROMPT_TEMPLATE.format(retrieved=retrieved, query=query)
注意:当拼接 retrieved 内容时,应优先保留 rerank 得分最高的片段,并限制每段字符数(如截断到 1024 字符)。
9) generator.py
— 生成器(通义或本地)
职责:把 prompt 传给生成模型,返回文本。支持 cloud call(通义)或本地 transformer 调用(量化优化建议)。
通义示例:
# cloud generate
def call_qwen_chat(prompt, model=None, max_tokens=512, temperature=0.0):
url = f"{settings.DASHSCOPE_BASE_URL}/chat/completions"
payload = {
"model": model or settings.GENERATE_MODEL,
"messages": [{"role":"system","content":"你是领域专家。"}, {"role":"user","content":[{"type":"text","text":prompt}]}],
"temperature": temperature,
"max_tokens": max_tokens
}
r = requests.post(url, headers=HEADERS, json=payload, timeout=120)
r.raise_for_status()
data = r.json()
# parse content robustly
content = data['choices'][0]['message']['content']
if isinstance(content, list):
text = "".join([c.get("text","") for c in content if c.get("type")=="text"])
else:
text = content
return text
本地示例(transformers):
from transformers import AutoTokenizer, AutoModelForCausalLM
tokenizer = AutoTokenizer.from_pretrained(LOCAL_MODEL, use_fast=False)
model = AutoModelForCausalLM.from_pretrained(LOCAL_MODEL, device_map="auto", torch_dtype="auto")
def generate_local(prompt, max_new_tokens=256, temperature=0.0):
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
gen = model.generate(**inputs, max_new_tokens=max_new_tokens, do_sample=(temperature>0.0), temperature=temperature)
out = tokenizer.decode(gen[0], skip_special_tokens=True)
# extract answer portion if necessary
if out.startswith(prompt):
return out[len(prompt):].strip()
return out
工程建议:
-
对本地模型进行量化(bitsandbytes、QLoRA)以降低显存需求。
-
本地生成建议禁用 sampling(temperature=0)用于稳定性,或开启小温度用于更开放的对话。
10) api/ingest.py
与 api/query.py
— FastAPI 接口
/ingest
:接受文件上传或URL,执行:load → chunk → embed → index add → persist metadata
/query
:接收 query, top_k 参数,执行:embed(query) → retrieve → (optional rerank) → assemble prompt → generate → 返回 answer + sources + retrieval_debug
示例 query
接口:
# app/api/query.py
from fastapi import APIRouter
from pydantic import BaseModel
from app.core.retriever import retrieve
from app.core.reranker import rerank
from app.core.prompt import assemble_prompt
from app.core.generator import generate_local, call_qwen_chat
router = APIRouter()
class QueryReq(BaseModel):
query: str
top_k: int = 5
use_local: bool = True
@router.post("/query")
def query_endpoint(req: QueryReq):
hits = retrieve(req.query, top_k=50) # get candidates
hits = rerank(req.query, hits) # rerank
prompt = assemble_prompt(req.query, hits, top_k=req.top_k)
if req.use_local:
answer = generate_local(prompt)
else:
answer = call_qwen_chat(prompt)
return {"answer": answer, "sources": hits[:req.top_k]}
四、测试、评估与调优
单元测试
-
tests/test_chunker.py
:验证 chunk 长度、overlap -
tests/test_embedder.py
:mock embedding 接口,确保输出维度 -
tests/test_indexer.py
:小数据集建立索引并检索验证
检索评估(离线)
-
Recall@k(gold doc in top-k)
-
MRR / nDCG
生成评估
-
人工评审(accuracy / hallucination rate)
-
自动指标(ROUGE/BLEU)对“有参考答案”的数据集
调优技巧(经验)
-
chunk_size:短文档用 300-800,长文档可 1500;中文多试几组
-
top_k 检索:初期 50 → rerank → 最终取 3~5 片段进 prompt
-
cross-encoder:用于精排,选择支持中文的 cross-encoder
-
nlist / nprobe(FAISS IVF):nlist ~ sqrt(N) 或 N/100,nprobe 设 10~50 试 trade-off
五、部署(Docker + k8s)与运维
Dockerfile(示例)
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
ENV PYTHONUNBUFFERED=1
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
k8s(简要)
-
Deployment:分配 CPU/GPU、env secrets(API keys)
-
Horizontal Pod Autoscaler:基于 CPU 或 custom metrics(latency)
-
PersistentVolume:存放 FAISS index 与 metadata
-
Service + Ingress / API Gateway:对外暴露
六、监控与审计(生产必备)
-
Metrics:Latency分段(embed、retrieval、rerank、generate)、QPS、错误率
-
Logging:Request id、user id(如有)、retrieval ids、source list、生成文本摘要
-
Feedback loop:用户反馈保存到 DB,用作后续训练/微调数据
-
安全审计:保存每次检索到的源与最终答案以便责任追踪
七、常见坑与注意事项(实战总结)
-
Embedding 速率限制:云端 embedding 接口有吞吐和速率限制,必须批量化与重试策略。
-
索引/metadata 不一致:确保向量顺序与 metadata 一一对应,持久化时保存映射文件。
-
prompt 超长:谨慎控制 prompt 长度;使用 rerank 限制 top segments。
-
泄露隐私:对上传数据做 PII 检测与脱敏,加入访问控制。
-
幻觉风险:提示模型“仅基于检索资料回答”,并返回 sources;高风险场景加入人工审查。
-
备份与恢复:对 index 和 metadata 做周期性备份;若使用 IVF 必须保留训练样本以便重构。
八、示例快速启动脚本(scripts/build_index.py
)
# scripts/build_index.py (简化)
from app.core.text_loader import load_file
from app.core.chunker import sliding_window_chunk
from app.core.embedder import qwen_embeddings
from app.core.indexer import build_faiss_index
import json, os
file_path = "data/legal_docs.pdf"
text = load_file(file_path)
chunks = sliding_window_chunk(text, chunk_chars=1500, overlap_chars=300, source_id="legal_docs")
texts = [c['text'] for c in chunks]
vectors = qwen_embeddings(texts, batch_size=16)
import numpy as np
arr = np.stack(vectors).astype('float32')
build_faiss_index(arr, dim=arr.shape[1])
# persist metadata
os.makedirs("app/models", exist_ok=True)
with open("app/models/metadatas.jsonl","w",encoding="utf-8") as f:
for c in chunks:
f.write(json.dumps(c, ensure_ascii=False)+"\n")
九、扩展功能建议(产品级)
-
多语言支持:按语言建立分库或在 embedding 时传语言参数
-
向量压缩与召回分层:使用 PQ/OPQ 为冷数据降维
-
知识图谱结合:把 structured knowledge (triples) 与向量检索混合
-
差分隐私 / 加密索引:在高合规场景下使用加密检索方案
-
在线学习:把用户反馈作为微调数据(LoRA/QLoRA)定期落地
十、交付清单(你将获得)
-
完整项目目录与代码骨架(如上)
-
可运行的脚本:
scripts/build_index.py
、app/main.py
(FastAPI)与示例curl
用法 -
部署材料:
Dockerfile
、k8s 模板、.env 示例 -
调优建议表(如何选 chunk_size、index type、rerank 参数)
-
测试用例模板(检索 recall、生成校验)
更多推荐
所有评论(0)