Naive RAG Pipeline:从数据到智能检索的完整技术实现
在当今大模型技术浪潮中,检索增强生成(Retrieval-Augmented Generation,RAG)已成为解决大模型幻觉问题、提升知识准确性的关键技术路径。Naive RAG,又称Baseline RAG或Native RAG,是RAG技术的最原始形态,它构成了后续所有高级RAG变体的基础框架。Naive RAG的核心价值在于其简洁的三段式架构:索引(Indexing)→ 检索(Retri
图片来源网络,侵权删

文章目录
引言:RAG技术演进与Naive RAG定位
在当今大模型技术浪潮中,检索增强生成(Retrieval-Augmented Generation,RAG)已成为解决大模型幻觉问题、提升知识准确性的关键技术路径。Naive RAG,又称Baseline RAG或Native RAG,是RAG技术的最原始形态,它构成了后续所有高级RAG变体的基础框架。
Naive RAG的核心价值在于其简洁的三段式架构:索引(Indexing)→ 检索(Retrieval)→ 生成(Generation)。这种"毛坯房"版的设计虽然简单,却为理解RAG系统的工作原理提供了最直观的切入点。本文将深入剖析Naive RAG Pipeline的每个技术环节,结合代码示例、架构图和实践案例,为开发者提供从理论到实践的完整指南。
一、文档数据选项与清洗:构建高质量知识源
1.1 数据源类型与特点
在构建RAG系统时,数据源的选择直接影响最终效果。常见的数据源包括:
- 结构化文档:PDF、Word、Excel等格式文档,通常包含格式化的文本和元数据
- 半结构化数据:HTML网页、Markdown文件,包含一定的结构标记
- 非结构化文本:纯文本文件、日志文件、社交媒体内容
- API数据源:通过API获取的实时或历史数据
1.2 数据清洗的核心流程
数据清洗是确保向量化质量的关键前置步骤。LangChain等框架提供了完整的清洗流程:
# 数据清洗示例代码
from langchain.document_loaders import TextLoader, PyPDFLoader
from langchain.document_transformers import RemoveRedundantSectionsTransformer
import re
# 1. 文档加载
pdf_loader = PyPDFLoader("技术文档.pdf")
documents = pdf_loader.load()
# 2. 自定义文本清洗函数
def clean_text(text):
# 移除HTML标签
text = re.sub(r'<[^>]+>', '', text)
# 规范化空白字符
text = re.sub(r'\s+', ' ', text).strip()
# 移除特殊字符
text = re.sub(r'[^\w\s.,;:!?-]', '', text)
return text
# 3. 应用清洗
for doc in documents:
doc.page_content = clean_text(doc.page_content)
# 4. 移除冗余部分
redundancy_remover = RemoveRedundantSectionsTransformer()
cleaned_docs = redundancy_remover.transform_documents(documents)
# 5. 元数据增强
from datetime import datetime
def enhance_metadata(doc_list):
enhanced = []
for i, doc in enumerate(doc_list):
metadata = doc.metadata.copy() if hasattr(doc, 'metadata') else {}
metadata['processed_time'] = datetime.now().isoformat()
metadata['doc_index'] = i
metadata['char_count'] = len(doc.page_content)
enhanced.append(Document(
page_content=doc.page_content,
metadata=metadata
))
return enhanced
清洗关键点:
- 去除HTML标签、页眉页脚等无关内容
- 统一文本格式和编码
- 处理乱码和特殊字符
- 识别并移除重复内容
- 增强元数据信息(来源、时间、长度等)

二、文档加载与摄取:多格式支持与高效解析
2.1 主流文档加载器对比
现代RAG系统需要支持多种文档格式,以下是常用加载器的选择指南:
| 加载器类型 | 支持格式 | 适用场景 | 特点 |
|---|---|---|---|
| PyPDFLoader | PDF文件 | 技术文档、报告 | 提取文本和元数据,支持多页 |
| UnstructuredLoader | PDF、Word、HTML等 | 复杂格式文档 | 基于Unstructured.io,处理能力强 |
| WebBaseLoader | 网页内容 | 在线知识源 | 支持递归抓取,可配置深度 |
| TextLoader | 纯文本文件 | 日志、配置文件 | 简单高效,内存占用小 |
| CSVLoader | CSV文件 | 结构化数据 | 支持列选择和数据转换 |
2.2 Unstructured.io深度集成实践
对于复杂文档格式,Unstructured.io提供了强大的解析能力:
# Unstructured.io集成示例
from langchain_community.document_loaders import UnstructuredLoader
# 安装依赖
# pip install unstructured-client langchain-unstructured
# pip install "unstructured[all-docs]" # 安装所有格式支持
# 使用API模式(需要API密钥)
loader = UnstructuredLoader(
"技术报告.docx",
api_key="your_api_key",
strategy="hi_res", # 高精度解析策略
include_page_breaks=True # 保留分页信息
)
# 或使用本地模式
loader = UnstructuredLoader(
"技术报告.docx",
mode="local",
strategy="fast" # 快速解析模式
)
documents = loader.load()
# 解析结果处理
for doc in documents:
print(f"内容长度: {len(doc.page_content)}")
print(f"元数据: {doc.metadata}")
print(f"来源: {doc.metadata.get('source', '未知')}")
Unstructured.io优势:
- 支持30+文档格式
- 智能表格和列表识别
- 保留文档结构信息
- 可配置的解析策略(fast/hi_res)

三、文档分块策略:平衡语义完整性与检索效率
3.1 传统分块方法分析
文档分块是RAG系统的核心环节,分块质量直接决定检索效果。传统方法主要包括:
1. 固定大小分块(Fixed-size Chunking)
from langchain.text_splitter import CharacterTextSplitter
splitter = CharacterTextSplitter(
chunk_size=1000, # 每块1000字符
chunk_overlap=200, # 重叠200字符
separator="\n" # 按换行符分割
)
chunks = splitter.split_documents(documents)
优点:实现简单,内存可控
缺点:可能切断完整语义单元
2. 递归字符分块(Recursive Character Text Splitter)
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
separators=["\n\n", "\n", ".", " ", ""] # 优先级分割符
)
chunks = splitter.split_documents(documents)
优点:保持语义连贯性
缺点:对无结构文档效果有限
3.2 语义分块新范式:先Embedding再Chunking
2025年,业界开始从"先分块再向量化"向"先向量化再分块"转变。Max-Min Semantic Chunking是这一趋势的代表:
# 语义分块概念实现
import numpy as np
from sentence_transformers import SentenceTransformer
class SemanticChunker:
def __init__(self, model_name='BAAI/bge-small-zh-v1.5'):
self.model = SentenceTransformer(model_name)
def chunk_by_semantics(self, text, max_chunk_size=512, similarity_threshold=0.7):
# 1. 句子分割
sentences = self._split_into_sentences(text)
# 2. 句子向量化
sentence_embeddings = self.model.encode(sentences)
# 3. 动态语义聚类
chunks = []
current_chunk = []
current_embeddings = []
for i, (sentence, embedding) in enumerate(zip(sentences, sentence_embeddings)):
if not current_chunk:
current_chunk.append(sentence)
current_embeddings.append(embedding)
else:
# 计算当前块平均向量与新句子的相似度
avg_embedding = np.mean(current_embeddings, axis=0)
similarity = self._cosine_similarity(avg_embedding, embedding)
if similarity >= similarity_threshold and len(current_chunk) < max_chunk_size:
current_chunk.append(sentence)
current_embeddings.append(embedding)
else:
# 开始新块
chunks.append(" ".join(current_chunk))
current_chunk = [sentence]
current_embeddings = [embedding]
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def _cosine_similarity(self, a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
语义分块优势:
- 基于句子向量相似度进行动态聚类
- 保持语义连贯性,避免硬性切割
- 适应不同文档类型的语义结构
- 2025年Bhat等人研究发现:事实类问题适合小块(64-128 token),叙事类问题适合大块(512-1024 token)
3.3 分块策略选择矩阵
| 场景类型 | 推荐分块策略 | 分块大小 | 重叠比例 | 说明 |
|---|---|---|---|---|
| 技术文档 | 递归分块 | 500-800字符 | 10-15% | 保持代码段和公式完整 |
| 法律合同 | 语义分块 | 300-500字符 | 20-25% | 确保条款完整性 |
| 新闻文章 | 段落分块 | 800-1200字符 | 5-10% | 按自然段落分割 |
| 学术论文 | 混合分块 | 按章节分割 | 章节间重叠 | 保留引用和图表上下文 |
| 对话记录 | 按对话轮次 | 每轮对话为一块 | 无重叠 | 保持对话连贯性 |

四、Embedding向量化与实操:模型选择与性能优化
4.1 Embedding模型全景对比
选择合适的Embedding模型是RAG系统成功的关键。以下是2025年主流模型对比:
| 模型名称 | 维度 | 支持语言 | 特点 | 适用场景 |
|---|---|---|---|---|
| BGE系列 | 384-1024 | 中英双语 | 针对中文优化,MTEB排名靠前 | 中文知识库、混合语言场景 |
| OpenAI text-embedding-3 | 256-3072 | 多语言 | 上下文长度8K,性能稳定 | 企业级应用、多语言支持 |
| Jina Embeddings v2 | 768-4096 | 多语言 | 支持8K上下文,后期分块友好 | 长文档处理、语义分块 |
| M3E系列 | 768 | 中文 | 针对中文搜索优化 | 中文问答、文档检索 |
| E5系列 | 384-1024 | 多语言 | 指令微调,支持查询-文档不对称 | 问答系统、对话检索 |
4.2 向量化实践代码
# 多种Embedding模型使用示例
from langchain.embeddings import OpenAIEmbeddings, HuggingFaceEmbeddings
from sentence_transformers import SentenceTransformer
import numpy as np
# 方案1:使用OpenAI Embeddings(需要API密钥)
openai_embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
dimensions=512, # 可指定输出维度
api_key="your_openai_key"
)
# 方案2:使用本地HuggingFace模型
hf_embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-small-zh-v1.5",
model_kwargs={'device': 'cuda'}, # 使用GPU加速
encode_kwargs={'normalize_embeddings': True} # L2归一化
)
# 方案3:直接使用Sentence Transformers
model = SentenceTransformer('BAAI/bge-large-zh-v1.5')
# 批量向量化函数
def batch_embed_texts(texts, batch_size=32):
"""批量向量化文本,优化内存使用"""
embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
batch_embeddings = model.encode(batch)
embeddings.extend(batch_embeddings)
return np.array(embeddings)
# 向量归一化处理
def normalize_embeddings(embeddings):
"""L2归一化,使余弦相似度计算更准确"""
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
return embeddings / norms
# 实际应用
documents = ["文档内容1", "文档内容2", ...]
chunks = splitter.split_documents(documents)
texts = [chunk.page_content for chunk in chunks]
# 生成向量
embeddings = batch_embed_texts(texts)
normalized_embeddings = normalize_embeddings(embeddings)
4.3 向量化性能优化策略
- 批量处理:利用GPU并行计算能力,设置合适的batch_size
- 缓存机制:对已向量化的文档建立缓存,避免重复计算
- 增量更新:只对新文档或修改文档进行向量化
- 维度选择:根据数据量选择合适维度,平衡精度和效率
- 量化压缩:使用FP16或INT8量化减少存储和计算开销

五、向量相似度计算:数学原理与实现细节
5.1 相似度度量方法对比
向量检索的本质是在高维空间中寻找相似邻居,选择合适的相似度度量至关重要:
1. 余弦相似度(Cosine Similarity)
def cosine_similarity(a, b):
"""计算两个向量的余弦相似度"""
dot_product = np.dot(a, b)
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
return dot_product / (norm_a * norm_b)
公式:cos(θ) = A·B / (||A|| × ||B||)
范围:[-1, 1],值越大越相似
特点:关注向量方向,忽略长度,适合文本语义匹配
2. 欧氏距离(Euclidean Distance)
def euclidean_distance(a, b):
"""计算欧氏距离"""
return np.linalg.norm(a - b)
公式:d = √Σ(ai - bi)²
范围:[0, ∞),值越小越相似
特点:考虑向量绝对位置,适合图像特征匹配
3. 内积相似度(Dot Product)
def dot_product_similarity(a, b):
"""计算内积相似度"""
return np.dot(a, b)
特点:未归一化时同时受方向和模长影响
关系:当向量L2归一化后,内积 = 余弦相似度
5.2 相似度计算优化
# 高效批量相似度计算
import numpy as np
from numba import jit
@jit(nopython=True, parallel=True)
def batch_cosine_similarity(query_vector, document_vectors):
"""使用NumPy广播和Numba加速的批量余弦相似度计算"""
# 归一化查询向量
query_norm = np.linalg.norm(query_vector)
query_normalized = query_vector / query_norm
# 批量归一化文档向量
doc_norms = np.linalg.norm(document_vectors, axis=1, keepdims=True)
docs_normalized = document_vectors / doc_norms
# 矩阵乘法计算相似度
similarities = np.dot(docs_normalized, query_normalized)
return similarities
# 使用示例
query_embedding = model.encode(["查询问题"])[0]
doc_embeddings = np.array([...]) # 所有文档向量
similarities = batch_cosine_similarity(query_embedding, doc_embeddings)
top_k_indices = np.argsort(similarities)[-5:][::-1] # 获取前5个最相似
5.3 相似度阈值策略
在实际应用中,需要设置相似度阈值来过滤低质量结果:
def filter_by_similarity_threshold(similarities, indices, threshold=0.6):
"""根据阈值过滤检索结果"""
filtered_indices = []
filtered_similarities = []
for sim, idx in zip(similarities, indices):
if sim >= threshold:
filtered_indices.append(idx)
filtered_similarities.append(sim)
return filtered_similarities, filtered_indices
# 动态阈值调整
def dynamic_threshold_adjustment(similarities, strategy='mean_std'):
"""动态计算相似度阈值"""
if strategy == 'mean_std':
mean_sim = np.mean(similarities)
std_sim = np.std(similarities)
return mean_sim - 0.5 * std_sim # 低于均值0.5个标准差
elif strategy == 'percentile':
return np.percentile(similarities, 30) # 前30%分位数
else:
return 0.5 # 默认阈值

六、向量检索基本策略:从暴力搜索到智能优化
6.1 检索策略演进路径
暴力搜索(Brute-force Search) → 近似最近邻(ANN) → 混合检索(Hybrid Search)
# 暴力搜索实现(仅适用于小规模数据)
def brute_force_search(query_vector, document_vectors, top_k=5):
"""暴力搜索:计算所有相似度并排序"""
similarities = []
for i, doc_vec in enumerate(document_vectors):
sim = cosine_similarity(query_vector, doc_vec)
similarities.append((sim, i))
# 按相似度排序
similarities.sort(reverse=True, key=lambda x: x[0])
return similarities[:top_k]
6.2 近似最近邻(ANN)算法原理
当数据量达到百万级时,暴力搜索不可行,必须使用ANN算法:
ANN核心思想:用可控的精度损失换取数量级的性能提升
# ANN检索框架
class ANNRetriever:
def __init__(self, algorithm='hnsw', dimensions=384):
self.algorithm = algorithm
self.dimensions = dimensions
self.index = self._build_index_structure()
def _build_index_structure(self):
"""构建索引结构"""
if self.algorithm == 'hnsw':
# HNSW索引:层次化可导航小世界图
import hnswlib
index = hnswlib.Index(space='cosine', dim=self.dimensions)
index.init_index(max_elements=1000000, ef_construction=200, M=16)
return index
elif self.algorithm == 'ivf':
# IVF索引:倒排文件索引
import faiss
quantizer = faiss.IndexFlatL2(self.dimensions)
index = faiss.IndexIVFFlat(quantizer, self.dimensions, 100)
return index
else:
raise ValueError(f"不支持的算法: {self.algorithm}")
def add_vectors(self, vectors):
"""添加向量到索引"""
if self.algorithm == 'hnsw':
self.index.add_items(vectors)
elif self.algorithm == 'ivf':
self.index.train(vectors)
self.index.add(vectors)
def search(self, query_vector, top_k=10):
"""近似最近邻搜索"""
if self.algorithm == 'hnsw':
labels, distances = self.index.knn_query(query_vector, k=top_k)
return labels[0], distances[0]
elif self.algorithm == 'ivf':
distances, labels = self.index.search(query_vector.reshape(1, -1), top_k)
return labels[0], distances[0]
6.3 主流ANN算法对比
| 算法类型 | 代表算法 | 时间复杂度 | 空间复杂度 | 适用场景 |
|---|---|---|---|---|
| 基于图 | HNSW | O(log N) | O(N) | 高精度、低延迟检索 |
| 基于量化 | PQ/IVF-PQ | O(√N) | O(N) | 超大规模、内存受限 |
| 基于树 | KD-Tree | O(N) | O(N) | 低维数据(d<20) |
| 基于哈希 | LSH | O(1) | O(N) | 快速近似、可并行 |
HNSW算法详解:
- 构建多层稀疏图结构
- 高层用于快速跳转,底层用于精细搜索
- 支持动态插入和删除
- 当前性能最强的通用ANN算法之一

七、主流向量数据库功能分析:选型指南与技术对比
7.1 七大向量数据库全景对比
根据2025年最新技术评估,主流向量数据库的功能对比如下:
| 特性维度 | Milvus | Pinecone | Weaviate | Qdrant | Chroma | PgVector | Redis Stack |
|---|---|---|---|---|---|---|---|
| 核心定位 | 超大规模分布式 | 企业级SaaS | 知识图谱集成 | 高性能生产级 | 轻量级原型 | Postgres扩展 | 内存优先 |
| 标量过滤 | ✅ 强大 | ✅ 支持 | ✅ 原生支持 | ✅ 高效支持 | ✅ 基础 | ✅ SQL原生 | ✅ 有限 |
| 混合搜索 | ✅(需集成) | ❌ 有限 | ✅ BM25+向量 | ✅ 全文+向量 | ❌ | ✅ 有限 | ✅ 有限 |
| 分布式架构 | ✅ 原生支持 | ✅ 云原生 | ✅(v1.20+) | ✅ 集群模式 | ❌ | ❌ | ✅ 集群 |
| GPU加速 | ✅(通过FAISS) | ❌ | ❌ | ✅ 实验性 | ❌ | ❌ | ❌ |
| 部署复杂度 | 高 | 低(托管) | 中 | 中 | 低 | 低 | 中 |
| 数据规模 | 10亿+ | 10亿+ | 1亿级 | 1亿级 | 百万级 | 千万级 | 千万级 |
| 典型延迟 | 45ms | 50ms | 70ms | 38ms | 20ms | 100ms | 30ms |
7.2 各数据库适用场景分析
Milvus:超大规模(千万级以上)、高并发、需GPU加速的场景,如推荐系统、智能客服知识库
Pinecone:预算充足、想快速上线AI产品的团队,SaaS开箱即用,适合初创团队或POC验证
Weaviate:企业知识库、智能问答平台、知识图谱+RAG的复合型系统,Schema驱动设计
Qdrant:多数企业的生产级RAG系统,适合中等规模索引、在线检索、混合查询,开源阵营中最均衡
Chroma:快速验证RAG项目、原型实验、个人或中小应用,最强原型开发伙伴
PgVector:已有Postgres生态、需要向量+关系数据一致性的场景,最"简单"的解决方案
7.3 性能实测数据对比
基于2025年基准测试数据(100万条768维向量,相同硬件环境):
| 系统 | QPS(TopK=10) | P99延迟 | 内存占用 | 索引构建时间 |
|---|---|---|---|---|
| Milvus | ~1200 | 45ms | 18GB | 8分钟 |
| Qdrant | ~950 | 38ms | 12GB | 6分钟 |
| Weaviate | ~600 | 70ms | 15GB | 10分钟 |
| Pinecone | ~800* | 50ms* | N/A | N/A |
| Chroma | ~300 | 20ms | 2GB | 2分钟 |
*注:Pinecone数据基于Serverless实例,实际受网络影响较大

八、主流向量数据库选项实践:部署与集成指南
8.1 部署模式选择策略
云托管 vs 自建部署决策矩阵:
| 考虑因素 | 选择云托管 | 选择自建部署 |
|---|---|---|
| 团队规模 | 小团队(<10人) | 大团队(有专职SRE) |
| 上线时间 | 快速上线(<1周) | 可接受较长部署周期 |
| 运维能力 | 无专职运维人员 | 有K8s和运维经验 |
| 数据合规 | 数据可出境 | 数据需本地化 |
| 成本控制 | 接受按量付费 | 需要固定成本 |
| 定制需求 | 标准功能满足 | 需要深度定制 |
8.2 各数据库部署示例
Chroma本地部署(最简单):
# 安装
pip install chromadb
# 启动服务(可选)
chroma run --path /db_path --host 0.0.0.0 --port 8000
Qdrant Docker部署:
# docker-compose.yml
version: '3.8'
services:
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333"
- "6334:6334"
volumes:
- ./qdrant_storage:/qdrant/storage
environment:
- QDRANT__SERVICE__HTTP_PORT=6333
- QDRANT__SERVICE__GRPC_PORT=6334
Milvus Kubernetes部署:
# 使用Helm部署
helm repo add milvus https://milvus-io.github.io/milvus-helm/
helm upgrade --install milvus milvus/milvus \
--set cluster.enabled=true \
--set etcd.replicaCount=3 \
--set minio.mode=standalone \
--set pulsar.enabled=false
8.3 LangChain集成示例
# 多数据库LangChain集成
from langchain.vectorstores import Chroma, Milvus, Qdrant, Weaviate
from langchain.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
# Chroma集成
chroma_store = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./chroma_db"
)
# Milvus集成
milvus_store = Milvus.from_documents(
documents=chunks,
embedding=embeddings,
connection_args={
"host": "localhost",
"port": "19530"
},
collection_name="my_collection"
)
# Qdrant集成
qdrant_store = Qdrant.from_documents(
documents=chunks,
embedding=embeddings,
url="http://localhost:6333",
collection_name="my_collection"
)
# Weaviate集成
weaviate_store = Weaviate.from_documents(
documents=chunks,
embedding=embeddings,
weaviate_url="http://localhost:8080",
index_name="MyIndex"
)

九、向量数据库Chroma专题应用实践
9.1 Chroma核心特性与架构
Chroma定位为轻量级开源向量数据库,专为AI应用设计:
核心优势:
- 安装简单,API简洁
- 原生Python生态,与LangChain深度集成
- 本地开发友好,无需复杂部署
- 支持持久化和内存两种模式
- Metadata过滤体验优秀
架构特点:
Chroma架构图:
┌─────────────────────────────────┐
│ Client Layer │
│ (Python/JavaScript/HTTP) │
└──────────────┬──────────────────┘
│
┌──────────────▼──────────────────┐
│ API Server │
│ (Collection/Query/Update) │
└──────────────┬──────────────────┘
│
┌──────────────▼──────────────────┐
│ Storage Engine │
│ ├─ DuckDB (默认) │
│ ├─ ClickHouse (可选) │
│ └─ PostgreSQL (可选) │
└──────────────┬──────────────────┘
│
┌──────────────▼──────────────────┐
│ Embedding Function │
│ ├─ 默认: all-MiniLM-L6-v2 │
│ ├─ OpenAI Embeddings │
│ └─ 自定义模型 │
└─────────────────────────────────┘
9.2 Chroma完整使用示例
# Chroma完整实践示例
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
import uuid
# 1. 初始化客户端
# 内存模式(开发测试)
client = chromadb.Client()
# 持久化模式(生产环境)
persistent_client = chromadb.PersistentClient(
path="./chroma_data",
settings=Settings(
chroma_db_impl="duckdb+parquet",
persist_directory="./chroma_data"
)
)
# HTTP客户端(连接远程服务)
http_client = chromadb.HttpClient(
host="localhost",
port=8000
)
# 2. 创建集合(Collection)
# 使用默认嵌入模型
collection = client.create_collection(
name="knowledge_base",
metadata={"description": "企业知识库", "hnsw:space": "cosine"}
)
# 使用自定义嵌入函数
model = SentenceTransformer('BAAI/bge-small-zh-v1.5')
def custom_embedding_function(texts):
"""自定义嵌入函数"""
embeddings = model.encode(texts)
return embeddings.tolist()
custom_collection = client.create_collection(
name="custom_knowledge",
embedding_function=custom_embedding_function,
metadata={
"hnsw:space": "cosine",
"hnsw:construction_ef": 200,
"hnsw:M": 16
}
)
# 3. 添加数据
documents = [
"Chroma是一个轻量级向量数据库,适合AI应用开发",
"向量数据库能够高效存储和检索高维向量数据",
"RAG系统通过检索增强生成提升大模型准确性",
"近似最近邻算法是向量检索的核心技术",
"HNSW算法在精度和速度之间取得了良好平衡"
]
metadatas = [
{"source": "官方文档", "category": "介绍", "length": len(documents[0])},
{"source": "技术博客", "category": "原理", "length": len(documents[1])},
{"source": "论文", "category": "应用", "length": len(documents[2])},
{"source": "技术博客", "category": "算法", "length": len(documents[3])},
{"source": "论文", "category": "算法", "length": len(documents[4])}
]
ids = [str(uuid.uuid4()) for _ in range(len(documents))]
collection.add(
documents=documents,
metadatas=metadatas,
ids=ids
)
# 4. 查询数据
# 基于文本查询
results = collection.query(
query_texts=["什么是向量数据库?"],
n_results=3,
where={"category": {"$in": ["介绍", "原理"]}}, # 元数据过滤
where_document={"$contains": "数据库"} # 文档内容过滤
)
print("查询结果:")
for i, (doc, meta, score) in enumerate(zip(
results['documents'][0],
results['metadatas'][0],
results['distances'][0]
)):
print(f"{i+1}. 相似度: {1-score:.3f}")
print(f" 内容: {doc[:50]}...")
print(f" 元数据: {meta}")
print()
# 5. 更新和删除
# 更新文档
collection.update(
ids=[ids[0]],
documents=["更新后的文档内容"],
metadatas=[{"source": "更新", "category": "介绍", "length": 20}]
)
# 删除文档
collection.delete(ids=[ids[1]])
# 6. 高级查询:混合过滤
complex_results = collection.query(
query_texts=["检索算法有哪些?"],
n_results=5,
where={
"$and": [
{"category": {"$eq": "算法"}},
{"length": {"$gt": 30}}
]
}
)
# 7. 获取集合信息
collection_info = collection.get()
print(f"集合名称: {collection_info.name}")
print(f"文档数量: {len(collection_info.ids)}")
print(f"向量维度: {collection_info.embedding_dimension}")
9.3 Chroma生产环境最佳实践
1. 数据预处理管道
class ChromaPipeline:
def __init__(self, collection_name, embedding_model=None):
self.client = chromadb.PersistentClient(path="./chroma_db")
self.collection = self._get_or_create_collection(
collection_name, embedding_model
)
def _get_or_create_collection(self, name, embedding_model):
"""获取或创建集合"""
try:
return self.client.get_collection(name=name)
except:
if embedding_model:
return self.client.create_collection(
name=name,
embedding_function=embedding_model
)
else:
return self.client.create_collection(name=name)
def ingest_documents(self, documents, batch_size=100):
"""批量摄入文档"""
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
self.collection.add(
documents=[doc.page_content for doc in batch],
metadatas=[doc.metadata for doc in batch],
ids=[str(uuid.uuid4()) for _ in range(len(batch))]
)
def semantic_search(self, query, filters=None, top_k=5):
"""语义搜索"""
return self.collection.query(
query_texts=[query],
n_results=top_k,
where=filters
)
2. 性能监控与优化
# Chroma性能监控
import time
from prometheus_client import Counter, Histogram
# 定义指标
query_counter = Counter('chroma_queries_total', 'Total queries')
query_duration = Histogram('chroma_query_duration_seconds', 'Query duration')
def monitored_query(collection, query_text, **kwargs):
"""带监控的查询函数"""
query_counter.inc()
start_time = time.time()
try:
results = collection.query(query_texts=[query_text], **kwargs)
duration = time.time() - start_time
query_duration.observe(duration)
# 记录慢查询
if duration > 0.1: # 超过100ms
print(f"慢查询警告: {duration:.3f}s - {query_text[:50]}...")
return results
except Exception as e:
print(f"查询失败: {e}")
raise
3. 备份与恢复策略
# Chroma数据备份脚本
#!/bin/bash
# backup_chroma.sh
BACKUP_DIR="./chroma_backups"
DATE=$(date +%Y%m%d_%H%M%S)
SOURCE_DIR="./chroma_db"
# 创建备份目录
mkdir -p $BACKUP_DIR
# 停止Chroma服务(如果运行中)
# systemctl stop chroma # 或相应的停止命令
# 备份数据
tar -czf $BACKUP_DIR/chroma_backup_$DATE.tar.gz $SOURCE_DIR
# 恢复示例
# tar -xzf chroma_backup_20251214_143022.tar.gz -C ./

总结
向量数据库作为Naive RAG Pipeline的核心组件,其技术深度和复杂度远超表面所见。从底层的存储引擎优化,到中层的索引算法实现,再到上层的分布式架构设计,每一个环节都影响着最终系统的性能、准确性和可扩展性。
本文通过深入剖析向量数据库的各个技术层面,结合代码示例和真实场景案例,展示了如何构建高性能、高可用的向量检索系统。随着AI技术的不断发展,向量数据库将继续演进,为更智能、更高效的检索增强生成系统提供坚实基础。
在实际应用中,开发者需要根据具体场景需求,在召回率、延迟、成本之间找到最佳平衡点。通过持续的性能监控、参数调优和架构优化,向量数据库能够支撑起从简单问答到复杂决策支持的各类AI应用,真正实现"数据智能"的愿景。
更多推荐



所有评论(0)