概述

向量数据库是专门为存储和管理向量嵌入而设计的数据库系统,在AI、机器学习和相似性搜索场景中发挥着关键作用。以下是当前主流开源向量数据库的详细分析。


1. Milvus

简介

Milvus是一个云原生的开源向量数据库,专为大规模AI应用而设计,由LF AI & Data Foundation托管。

技术特点

  • 架构: 分布式、云原生架构
  • 语言: Go + C++
  • 索引算法: ANNOY, HNSW, IVF, PQ 等多种算法
  • 存储: 支持对象存储(S3、MinIO等)
  • 扩展性: 支持水平扩展,处理万亿级向量

优点

  • 企业级特性: 高可用、容灾、负载均衡
  • 性能优异: 毫秒级查询响应,支持批量操作
  • 生态完整: 活跃的社区,丰富的SDK和集成
  • 多语言支持: Python、Java、Go、Node.js等
  • GPU加速: 支持GPU索引加速
  • 标量过滤: 支持复杂的元数据过滤

缺点

  • 复杂性高: 部署和维护相对复杂
  • 资源消耗大: 内存和存储需求较高
  • 学习成本: 需要理解分布式架构概念
  • 过度工程: 对于小规模应用可能过于复杂

适用场景

  • 大规模企业AI应用
  • 高并发相似性搜索
  • 需要高可用性的生产环境
  • 图像/视频检索系统

2. Qdrant

简介

Qdrant是用Rust编写的开源向量搜索引擎,专注于性能和易用性。

技术特点

  • 架构: 单体架构,支持集群部署
  • 语言: Rust
  • 索引算法: HNSW
  • API: REST API + gRPC
  • 持久化: RocksDB

优点

  • 性能卓越: Rust带来的高性能和内存安全
  • 过滤能力强: 强大的payload过滤功能
  • 部署简单: 单二进制文件,易于部署
  • 实时性好: 支持实时索引更新
  • 多租户: 原生支持多租户架构
  • 地理位置: 支持地理位置查询

缺点

  • 生态较新: 相对较新,生态系统还在发展
  • 单点故障: 单体架构存在单点故障风险
  • 扩展限制: 垂直扩展为主,水平扩展支持有限
  • 文档不足: 某些高级功能文档相对缺乏

适用场景

  • 中等规模的AI应用
  • 需要复杂过滤的场景
  • 地理位置相关的搜索
  • 注重性能的应用

3. Weaviate

简介

Weaviate是一个开源的向量数据库,结合了向量搜索和知识图谱功能。

技术特点

  • 架构: 图数据库 + 向量搜索
  • 语言: Go
  • 特色: 自动向量化,知识图谱
  • API: GraphQL + REST
  • 模型集成: 内置多种embedding模型

优点

  • 混合搜索: 结合向量和传统搜索
  • 知识图谱: 支持复杂的关系查询
  • 自动向量化: 内置文本/图像向量化
  • 模块化: 丰富的模块生态系统
  • 多模态: 支持文本、图像等多种数据类型
  • 实时性: 支持实时CRUD操作

缺点

  • 复杂性: 学习曲线陡峭
  • 资源消耗: 内存使用量较大
  • 性能: 在纯向量搜索上不如专门的向量数据库
  • 依赖多: 需要管理多个组件

适用场景

  • 需要复杂语义理解的应用
  • 知识管理系统
  • 多模态搜索应用
  • 企业知识图谱

4. Chroma

简介

ChromaDB是一个简单易用的开源向量数据库,专为AI应用设计。

技术特点

  • 架构: 轻量级,支持嵌入式部署
  • 语言: Python
  • 存储: SQLite/DuckDB
  • 特色: 开发者友好,集成简单

优点

  • 易于使用: 简单的API设计
  • 快速启动: pip install即可使用
  • 轻量级: 资源消耗小
  • 开发友好: excellent开发体验
  • 集成便捷: 与主流ML框架集成良好
  • 免费: 完全开源免费

缺点

  • 扩展性: 不适合大规模部署
  • 性能限制: 单机性能有上限
  • 企业特性: 缺乏高可用等企业级功能
  • 功能简单: 高级功能相对较少

适用场景

  • 原型开发和测试
  • 小到中等规模应用
  • 教学和学习
  • 个人项目

5. FAISS

简介

FAISS(Facebook AI Similarity Search)是Meta开源的向量相似性搜索库。

技术特点

  • 架构: 库而非数据库
  • 语言: C++ (Python绑定)
  • 特色: 多种索引算法,GPU加速
  • 存储: 内存存储为主

优点

  • 性能极致: 高度优化的算法实现
  • 算法丰富: 支持多种索引算法
  • GPU加速: 优秀的GPU支持
  • 成熟稳定: Meta生产环境验证
  • 灵活性高: 可以嵌入到各种系统中
  • 无依赖: 作为库使用,依赖少

缺点

  • 非数据库: 需要自己实现持久化和并发
  • 学习门槛: 需要深入了解算法原理
  • 功能单一: 只做相似性搜索
  • 运维复杂: 需要自建完整的数据库功能

适用场景

  • 需要极致性能的场景
  • 自建向量搜索系统
  • 研究和算法开发
  • 嵌入式向量搜索

6. pgvector

简介

pgvector是PostgreSQL的向量扩展,将向量搜索能力添加到传统关系数据库中。

技术特点

  • 架构: PostgreSQL扩展
  • 语言: C
  • 特色: 与关系数据库无缝集成
  • 索引: IVFFlat, HNSW

优点

  • 无缝集成: 与现有PostgreSQL系统完美结合
  • 成熟稳定: 基于PostgreSQL的成熟生态
  • ACID特性: 完整的事务支持
  • 运维简单: 利用现有的PostgreSQL运维经验
  • 混合查询: 轻松结合向量和关系查询
  • 成本低: 无需额外的向量数据库

缺点

  • 性能限制: 向量搜索性能不如专门的向量数据库
  • 扩展性: 受限于PostgreSQL的扩展能力
  • 算法有限: 支持的索引算法较少
  • 内存消耗: 大量向量数据会消耗大量内存

适用场景

  • 已使用PostgreSQL的系统
  • 需要事务支持的应用
  • 混合数据查询场景
  • 成本敏感的项目

性能对比

根据最新的基准测试数据:

查询延迟 (1M-10M向量)

  • Milvus/Zilliz: 最低延迟领导者
  • Qdrant: 紧随其后
  • Pinecone: 与Qdrant相近
  • Chroma: ~20ms 中位数搜索时间
  • pgvector: 取决于数据量和硬件配置

吞吐量

  • FAISS: 极致性能(库级别)
  • Milvus: 高并发处理能力强
  • Qdrant: 单机高性能
  • Weaviate: 复杂查询处理好
  • Chroma: 适中
  • pgvector: 依赖PostgreSQL配置

选择建议

企业级生产环境

推荐: Milvus

  • 需要高可用、扩展性和企业级特性

中型项目,注重性能

推荐: Qdrant

  • 优秀的性能和过滤能力

需要知识图谱功能

推荐: Weaviate

  • 复杂语义查询和关系建模

快速原型和小项目

推荐: Chroma

  • 简单易用,快速上手

极致性能要求

推荐: FAISS

  • 自建系统,需要最高性能

已有PostgreSQL生态

推荐: pgvector

  • 无缝集成,降低架构复杂性


Python 安装和使用指南

1. Milvus

安装

bash

# 使用 Docker 启动 Milvus 服务
curl -sfL https://raw.githubusercontent.com/milvus-io/milvus/master/scripts/standalone_embed.sh -o standalone_embed.sh
bash standalone_embed.sh start

# 安装 Python SDK
pip install pymilvus
基础使用示例

python

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
import numpy as np

# 连接到 Milvus
connections.connect("default", host="localhost", port="19530")

# 定义集合 schema
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=200)
]
schema = CollectionSchema(fields, "文档向量集合")

# 创建集合
collection = Collection("documents", schema)

# 插入数据
data = [
    [1, 2, 3],  # id
    [np.random.random(128).tolist() for _ in range(3)],  # embeddings
    ["文档1", "文档2", "文档3"]  # text
]
collection.insert(data)

# 创建索引
index_params = {
    "metric_type": "L2",
    "index_type": "HNSW",
    "params": {"M": 8, "efConstruction": 64}
}
collection.create_index("embedding", index_params)

# 加载集合到内存
collection.load()

# 搜索
search_params = {"metric_type": "L2", "params": {"ef": 10}}
query_vectors = [np.random.random(128).tolist()]
results = collection.search(
    data=query_vectors,
    anns_field="embedding",
    param=search_params,
    limit=5,
    output_fields=["text"]
)

print(f"搜索结果: {results[0]}")

2. Qdrant

安装

bash

# 使用 Docker 启动 Qdrant
docker run -p 6333:6333 -p 6334:6334 qdrant/qdrant

# 安装 Python SDK
pip install qdrant-client
基础使用示例

python

from qdrant_client import QdrantClient
from qdrant_client.http import models
import numpy as np

# 连接到 Qdrant
client = QdrantClient("localhost", port=6333)

# 创建集合
collection_name = "documents"
client.create_collection(
    collection_name=collection_name,
    vectors_config=models.VectorParams(size=128, distance=models.Distance.COSINE),
)

# 插入向量数据
points = [
    models.PointStruct(
        id=1,
        vector=np.random.random(128).tolist(),
        payload={"text": "这是第一个文档", "category": "技术"}
    ),
    models.PointStruct(
        id=2,
        vector=np.random.random(128).tolist(),
        payload={"text": "这是第二个文档", "category": "生活"}
    )
]

client.upsert(collection_name=collection_name, points=points)

# 搜索相似向量
search_result = client.search(
    collection_name=collection_name,
    query_vector=np.random.random(128).tolist(),
    limit=5,
    with_payload=True
)

for result in search_result:
    print(f"ID: {result.id}, Score: {result.score}, Text: {result.payload['text']}")

# 使用过滤器搜索
filtered_result = client.search(
    collection_name=collection_name,
    query_vector=np.random.random(128).tolist(),
    query_filter=models.Filter(
        must=[
            models.FieldCondition(key="category", match=models.MatchValue(value="技术"))
        ]
    ),
    limit=5
)

3. Weaviate

安装

bash

# 使用 Docker Compose 启动 Weaviate
curl -o docker-compose.yml "https://configuration.weaviate.io/v2/docker-compose/docker-compose.yml?modules=standalone&runtime=docker-compose&weaviate_version=v1.22.4"
docker-compose up -d

# 安装 Python SDK
pip install weaviate-client
基础使用示例

python

import weaviate
from weaviate.classes.config import Configure
import numpy as np

# 连接到 Weaviate
client = weaviate.connect_to_local()

try:
    # 创建集合(类)
    collection = client.collections.create(
        name="Document",
        vectorizer_config=Configure.Vectorizer.none(),  # 手动提供向量
        properties=[
            weaviate.classes.config.Property(
                name="text",
                data_type=weaviate.classes.config.DataType.TEXT,
            ),
            weaviate.classes.config.Property(
                name="category",
                data_type=weaviate.classes.config.DataType.TEXT,
            ),
        ]
    )

    # 插入数据
    collection.data.insert_many([
        {
            "text": "这是关于AI的文档",
            "category": "技术",
            "vector": np.random.random(384).tolist()  # 假设使用384维向量
        },
        {
            "text": "这是关于生活的文档",
            "category": "生活",
            "vector": np.random.random(384).tolist()
        }
    ])

    # 向量搜索
    response = collection.query.near_vector(
        near_vector=np.random.random(384).tolist(),
        limit=5,
        return_metadata=weaviate.classes.query.MetadataQuery(score=True)
    )

    for item in response.objects:
        print(f"Text: {item.properties['text']}, Score: {item.metadata.score}")

    # 混合搜索(向量 + BM25)
    hybrid_response = collection.query.hybrid(
        query="AI技术",
        limit=5
    )

finally:
    client.close()

4. Chroma

安装

bash

# 安装 ChromaDB
pip install chromadb
基础使用示例

python

import chromadb
import numpy as np

# 创建持久化客户端
client = chromadb.PersistentClient(path="./chroma_db")

# 创建或获取集合
collection = client.get_or_create_collection(
    name="documents",
    metadata={"hnsw:space": "cosine"}
)

# 添加文档
documents = [
    "这是第一个关于AI的文档",
    "这是第二个关于机器学习的文档",
    "这是第三个关于深度学习的文档"
]

embeddings = [np.random.random(384).tolist() for _ in range(3)]

collection.add(
    documents=documents,
    embeddings=embeddings,
    metadatas=[
        {"category": "AI", "author": "张三"},
        {"category": "ML", "author": "李四"},
        {"category": "DL", "author": "王五"}
    ],
    ids=["doc1", "doc2", "doc3"]
)

# 查询相似文档
results = collection.query(
    query_embeddings=[np.random.random(384).tolist()],
    n_results=2,
    include=["documents", "distances", "metadatas"]
)

print("搜索结果:")
for i, doc in enumerate(results['documents'][0]):
    print(f"文档: {doc}")
    print(f"距离: {results['distances'][0][i]}")
    print(f"元数据: {results['metadatas'][0][i]}")

# 使用过滤器
filtered_results = collection.query(
    query_embeddings=[np.random.random(384).tolist()],
    n_results=5,
    where={"category": "AI"}
)

# 更新文档
collection.update(
    ids=["doc1"],
    documents=["更新后的AI文档内容"],
    embeddings=[np.random.random(384).tolist()]
)

5. FAISS

安装

bash

# CPU 版本
pip install faiss-cpu

# GPU 版本 (需要CUDA)
pip install faiss-gpu
基础使用示例

python

import faiss
import numpy as np
import pickle

# 生成示例数据
dimension = 128
n_vectors = 10000
np.random.seed(42)

# 创建随机向量数据
vectors = np.random.random((n_vectors, dimension)).astype('float32')

# 创建FAISS索引
index = faiss.IndexFlatL2(dimension)  # L2距离索引
print(f"索引是否训练: {index.is_trained}")

# 添加向量到索引
index.add(vectors)
print(f"索引中的向量数量: {index.ntotal}")

# 搜索最相似的向量
k = 5  # 返回top-5结果
query_vector = np.random.random((1, dimension)).astype('float32')
distances, indices = index.search(query_vector, k)

print(f"查询向量的最相似向量索引: {indices}")
print(f"对应的距离: {distances}")

# 使用更高效的HNSW索引
hnsw_index = faiss.IndexHNSWFlat(dimension, 32)  # 32是连接数
hnsw_index.add(vectors)

# HNSW搜索
distances, indices = hnsw_index.search(query_vector, k)
print(f"HNSW搜索结果: {indices}")

# 保存和加载索引
faiss.write_index(index, "vector_index.faiss")
loaded_index = faiss.read_index("vector_index.faiss")

# GPU加速示例 (如果有GPU)
try:
    gpu_index = faiss.index_cpu_to_gpu(faiss.StandardGpuResources(), 0, index)
    gpu_distances, gpu_indices = gpu_index.search(query_vector, k)
    print(f"GPU搜索结果: {gpu_indices}")
except:
    print("GPU不可用,跳过GPU示例")

# 使用IVF索引进行大规模搜索
nlist = 100  # 聚类中心数量
ivf_index = faiss.IndexIVFFlat(faiss.IndexFlatL2(dimension), dimension, nlist)
ivf_index.train(vectors[:5000])  # 使用部分数据训练
ivf_index.add(vectors)
ivf_distances, ivf_indices = ivf_index.search(query_vector, k)

6. pgvector

安装

bash

# 安装 PostgreSQL 和 pgvector 扩展
# Ubuntu/Debian:
sudo apt install postgresql-15-pgvector

# 安装 Python 客户端
pip install psycopg2-binary
基础使用示例

python

import psycopg2
import numpy as np
from psycopg2.extras import execute_values

# 连接到 PostgreSQL
conn = psycopg2.connect(
    host="localhost",
    database="vectordb",
    user="postgres",
    password="password"
)
cur = conn.cursor()

# 启用 pgvector 扩展
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")

# 创建表
cur.execute("""
CREATE TABLE IF NOT EXISTS documents (
    id SERIAL PRIMARY KEY,
    content TEXT,
    embedding VECTOR(384),
    metadata JSONB
);
""")

# 创建向量索引
cur.execute("""
CREATE INDEX IF NOT EXISTS documents_embedding_idx 
ON documents USING hnsw (embedding vector_cosine_ops);
""")

# 插入向量数据
documents = [
    ("这是第一个文档", np.random.random(384).tolist(), {"category": "tech"}),
    ("这是第二个文档", np.random.random(384).tolist(), {"category": "life"}),
    ("这是第三个文档", np.random.random(384).tolist(), {"category": "science"})
]

execute_values(
    cur,
    "INSERT INTO documents (content, embedding, metadata) VALUES %s",
    documents,
    template=None,
    page_size=100
)

# 提交事务
conn.commit()

# 相似性搜索
query_embedding = np.random.random(384).tolist()
cur.execute("""
SELECT id, content, metadata, embedding <=> %s as distance 
FROM documents 
ORDER BY embedding <=> %s 
LIMIT 5;
""", (query_embedding, query_embedding))

results = cur.fetchall()
for row in results:
    print(f"ID: {row[0]}, Content: {row[1]}, Distance: {row[3]}")

# 使用过滤器的搜索
cur.execute("""
SELECT id, content, metadata, embedding <=> %s as distance 
FROM documents 
WHERE metadata->>'category' = 'tech'
ORDER BY embedding <=> %s 
LIMIT 3;
""", (query_embedding, query_embedding))

filtered_results = cur.fetchall()
print("过滤后的结果:", filtered_results)

# 使用不同的距离度量
# 余弦距离: <=>
# 内积: <#>
# L2距离: <->

# 关闭连接
cur.close()
conn.close()

通用工具函数

python

# 生成测试向量的通用函数
def generate_test_vectors(num_vectors, dimension):
    """生成测试用的随机向量"""
    np.random.seed(42)
    return np.random.random((num_vectors, dimension)).astype('float32')

def normalize_vectors(vectors):
    """向量归一化"""
    norms = np.linalg.norm(vectors, axis=1, keepdims=True)
    return vectors / norms

# 批量操作辅助函数
def batch_insert(collection, vectors, batch_size=1000):
    """批量插入向量数据"""
    for i in range(0, len(vectors), batch_size):
        batch = vectors[i:i+batch_size]
        # 这里根据具体的向量数据库实现插入逻辑
        collection.insert(batch)
        print(f"已插入 {min(i+batch_size, len(vectors))} / {len(vectors)} 向量")

# 性能测试函数
def benchmark_search(search_func, query_vectors, num_runs=10):
    """性能基准测试"""
    import time
    times = []
    
    for _ in range(num_runs):
        start_time = time.time()
        results = search_func(query_vectors)
        end_time = time.time()
        times.append(end_time - start_time)
    
    avg_time = sum(times) / len(times)
    print(f"平均搜索时间: {avg_time:.4f}s")
    return avg_time

总结

选择向量数据库需要考虑以下因素:

  1. 规模需求: 数据量和并发量
  2. 性能要求: 延迟和吞吐量需求
  3. 功能需求: 是否需要复杂过滤、混合搜索等
  4. 运维能力: 团队的技术栈和运维经验
  5. 成本考虑: 开发、部署和维护成本
  6. 生态集成: 与现有系统的集成难度

每个向量数据库都有其特定的优势和适用场景,上述代码示例可以帮助你快速上手各种向量数据库。建议先用小规模数据测试各个方案,然后根据实际需求选择最适合的解决方案。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐