openGauss 作为 RAG 向量数据库的深度测评
作为一名长期关注向量数据库和 RAG(检索增强生成)技术的开发者,我最近对 openGauss 7.0 的向量能力进行了深入测试。openGauss 作为开源数据库,在 AI 场景下的向量特性一直备受关注。本文将从实际使用角度,分享我在构建 RAG 系统过程中的真实体验。
一、前言
作为一名长期关注向量数据库和 RAG(检索增强生成)技术的开发者,我最近对 openGauss 7.0 的向量能力进行了深入测试。openGauss 作为开源数据库,在 AI 场景下的向量特性一直备受关注。本文将从实际使用角度,分享我在构建 RAG 系统过程中的真实体验。

二、测试背景
为什么选择 openGauss?
在开始测试之前,我对比了多个向量数据库方案:
l 专用向量数据库:Milvus、Qdrant、Pinecone 等,性能优秀但需要额外维护
l 关系型数据库扩展:PostgreSQL + pgvector、openGauss 原生向量支持
l 云服务:阿里云向量检索、腾讯云向量数据库
最终选择 openGauss 的原因很简单:我需要一个既能存储关系数据,又能做向量检索的统一方案。openGauss 7.0 宣称支持 HNSW 索引,单机可支持 500 万 768 维向量,QPS 达到 12 万,这个数据很吸引人。
测试场景设计
我设计了1个典型的 RAG 应用场景:
基于 MetaGPT 框架的 RAG 性能测试:重点测试向量检索性能
测试数据使用 openGauss 官方文档中的向量特性介绍,共 3 个文档段落,维度为 1536(使用 DashScope text-embedding-v2 模型)。
三、部署与配置体验
安装过程 openGauss 的安装相对简单,我使用的是 Docker 官方镜像:
docker pull opengauss/opengauss-server:latest
docker run --name opengauss
--privileged=true
-e GS_PASSWORD=gauss@123
-p 8888:5432
opengauss/opengauss-server:latest
连接后创建测试数据库和用户:
CREATE DATABASE gaussdb;
CREATE USER tuser WITH PASSWORD 'gauss@123';
GRANT ALL PRIVILEGES ON DATABASE gaussdb TO tuser;
整个过程没有遇到太大障碍,文档也比较清晰。
向量表创建
openGauss 的向量类型使用起来很直观:
CREATE TABLE rag_docs (
id SERIAL PRIMARY KEY,
content TEXT,
embedding vector(1536)
);
CREATE INDEX idx_hnsw ON rag_docs
USING hnsw (embedding vector_l2_ops)
WITH (m = 16, ef_construction = 200);
这里有几个值得注意的点:
1.向量维度声明:vector(1536) 需要在建表时明确指定,这与其他数据库类似
2.HNSW 索引:语法与 PostgreSQL 的 pgvector 类似,但这是 openGauss 原生支持,无需安装插件
3.索引参数:m=16, ef_construction=200 是 HNSW 的经典配置,平衡了构建速度和查询精度
四、实际测试结果
我使用 Python 的 psycopg2 库连接 openGauss,实现了以下版本的 RAG 系统:
这个版本重点测试性能,我实现了完整的性能测试框架,包括:
l 检索性能测试(纯向量检索)
l 完整查询性能测试(包含 LLM 生成)
l 并发性能测试
代码如下:
#!/usr/bin/env python3
"""
基于 MetaGPT 框架的 openGauss RAG 性能测试系统
使用通义千问和 openGauss 向量数据库,专注于性能评估
"""
import os
import time
import psycopg2
import asyncio
from typing import List, Tuple, Optional, Dict
from pathlib import Path
import statistics
# MetaGPT 相关导入
from metagpt.rag.engines import SimpleEngine
from metagpt.rag.schema import IndexRetrieverConfig, BaseRetrieverConfig
from metagpt.rag.retrievers.base import RAGRetriever
from metagpt.logs import logger
from metagpt.config2 import config
# LlamaIndex 相关导入
from llama_index.core.vector_stores.types import (
VectorStore,
VectorStoreQuery,
VectorStoreQueryResult,
)
from llama_index.core.schema import TextNode, NodeWithScore, QueryBundle
from llama_index.core.embeddings import BaseEmbedding
from llama_index.core.indices import VectorStoreIndex
from llama_index.core.storage.storage_context import StorageContext
# Dashscope SDK
import dashscope
# ---------- 0. 测试文档数据 ----------
RAW_TEXT = """
openGauss 7.1 向量引擎实测摘要
1. 新增流式批量写入接口,单批次可写入 10 万条向量,持续吞吐 5 万 QPS。
2. HNSW 支持在线参数调优,可动态调整 ef_search,延迟与召回率之间平衡更灵活。
3. 内置向量监控指标:检索延迟、召回率基线、索引占用,可接入 Prometheus。
4. SQL 层面新增 JSON 向量元数据列,方便和结构化数据进行联合过滤。
5. 向量迁移工具支持增量同步,跨机房恢复时间缩短到分钟级。
测试环境:2 * Intel Gold 6330、256GB 内存、NVMe SSD、openGauss 7.1 build 202412。
"""
# ---------- 1. openGauss VectorStore 实现 ----------
class openGaussVectorStore(VectorStore):
"""基于 openGauss 的向量存储实现"""
def __init__(self, conn, table: str = "rag_docs_metagpt"):
self.conn = conn
self.table = table
self.stores_text = True
self._create_table()
def _create_table(self):
"""创建表和 HNSW 索引"""
cur = self.conn.cursor()
try:
cur.execute(f"""
DROP TABLE IF EXISTS {self.table};
CREATE TABLE {self.table} (
id SERIAL PRIMARY KEY,
content TEXT,
embedding vector(1536));
CREATE INDEX IF NOT EXISTS idx_hnsw ON {self.table}
USING hnsw (embedding vector_l2_ops)
WITH (m = 16, ef_construction = 200);
""")
self.conn.commit()
except Exception as e:
self.conn.rollback()
raise e
finally:
cur.close()
def add(self, nodes: List[TextNode]) -> List[str]:
"""添加节点到向量存储"""
ids = []
cur = self.conn.cursor()
try:
for node in nodes:
if not hasattr(node, 'embedding') or not node.embedding:
continue
emb_str = "[" + ",".join(map(str, node.embedding)) + "]"
cur.execute(
f"INSERT INTO {self.table} (content, embedding) VALUES (%s, %s) RETURNING id",
(node.text, emb_str)
)
ids.append(str(cur.fetchone()[0]))
self.conn.commit()
except Exception as e:
self.conn.rollback()
raise e
finally:
cur.close()
return ids
def query(self, query: VectorStoreQuery, **kwargs) -> VectorStoreQueryResult:
"""向量相似度查询"""
if not query.query_embedding:
return VectorStoreQueryResult(nodes=[], similarities=[])
q_emb_str = "[" + ",".join(map(str, query.query_embedding)) + "]"
cur = self.conn.cursor()
try:
cur.execute(
f"SELECT content, embedding <-> %s AS distance "
f"FROM {self.table} ORDER BY distance LIMIT %s",
(q_emb_str, query.similarity_top_k)
)
results = cur.fetchall()
nodes = [TextNode(text=res[0]) for res in results]
# 将距离转换为相似度(距离越小,相似度越高)
similarities = [1.0 / (1.0 + res[1]) for res in results]
return VectorStoreQueryResult(nodes=nodes, similarities=similarities)
finally:
cur.close()
def delete(self, ref_doc_id: str, **kwargs) -> None:
"""删除文档"""
cur = self.conn.cursor()
try:
cur.execute(f"DELETE FROM {self.table} WHERE id = %s", (ref_doc_id,))
self.conn.commit()
finally:
cur.close()
# ---------- 2. DashScope 嵌入模型封装 ----------
class DashScopeEmbedding(BaseEmbedding):
"""DashScope 嵌入模型封装"""
def __init__(self, model: str = 'text-embedding-v2', api_key: Optional[str] = None):
super().__init__(model_name=model)
self.model = model
if api_key:
dashscope.api_key = api_key
elif not dashscope.api_key:
raise ValueError("请设置环境变量 DASHSCOPE_API_KEY")
def _get_text_embeddings(self, texts: List[str]) -> List[List[float]]:
"""批量生成嵌入向量"""
batch_size = 25
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i + batch_size]
response = dashscope.TextEmbedding.call(
model=self.model,
input=batch_texts
)
if response.status_code == 200:
batch_embeddings = [item['embedding'] for item in response.output['embeddings']]
all_embeddings.extend(batch_embeddings)
else:
raise ValueError(f"嵌入模型调用失败: {response.message}")
return all_embeddings
def _get_query_embedding(self, query: str) -> List[float]:
"""生成查询向量"""
response = dashscope.TextEmbedding.call(
model=self.model,
input=query
)
if response.status_code == 200:
return response.output['embeddings'][0]['embedding']
else:
raise ValueError(f"嵌入模型调用失败: {response.message}")
# ---------- 3. openGauss Retriever 实现 ----------
class openGaussRetriever(RAGRetriever):
"""基于 openGauss 的检索器"""
def __init__(self, vector_store: openGaussVectorStore, embed_model: BaseEmbedding, similarity_top_k: int = 5):
self.vector_store = vector_store
self.embed_model = embed_model
self.similarity_top_k = similarity_top_k
super().__init__()
async def _aretrieve(self, query: QueryBundle) -> List[NodeWithScore]:
"""异步检索"""
if not query.query_embedding:
query_embedding = self.embed_model.get_query_embedding(query.query_str)
else:
query_embedding = query.query_embedding
query_obj = VectorStoreQuery(
query_embedding=query_embedding,
similarity_top_k=self.similarity_top_k
)
result = self.vector_store.query(query_obj)
nodes_with_scores = [
NodeWithScore(node=node, score=score)
for node, score in zip(result.nodes, result.similarities)
]
return nodes_with_scores
# ---------- 4. openGauss Retriever Config ----------
class openGaussRetrieverConfig(IndexRetrieverConfig):
"""openGauss 检索器配置"""
host: str = "localhost"
port: int = 8888
database: str = "gauss01"
user: str = "test"
password: str = "gauss@123"
table: str = "rag_docs_metagpt"
similarity_top_k: int = 5
# ---------- 5. 性能测试工具类 ----------
class PerformanceBenchmark:
"""RAG 性能测试工具"""
def __init__(self, engine: SimpleEngine):
self.engine = engine
self.results = {
'retrieval_times': [],
'query_times': [],
'total_times': [],
}
async def benchmark_retrieval(self, query: str, iterations: int = 10) -> Dict:
"""测试检索性能"""
times = []
for _ in range(iterations):
start = time.time()
await self.engine.aretrieve(query)
elapsed = time.time() - start
times.append(elapsed)
self.results['retrieval_times'].extend(times)
return {
'avg_time': statistics.mean(times),
'min_time': min(times),
'max_time': max(times),
'median_time': statistics.median(times),
'p95_time': self._percentile(times, 95),
'p99_time': self._percentile(times, 99),
'std_dev': statistics.stdev(times) if len(times) > 1 else 0,
}
async def benchmark_query(self, query: str, iterations: int = 10) -> Dict:
"""测试完整查询性能"""
times = []
for _ in range(iterations):
start = time.time()
await self.engine.aquery(query)
elapsed = time.time() - start
times.append(elapsed)
self.results['query_times'].extend(times)
return {
'avg_time': statistics.mean(times),
'min_time': min(times),
'max_time': max(times),
'median_time': statistics.median(times),
'p95_time': self._percentile(times, 95),
'p99_time': self._percentile(times, 99),
'std_dev': statistics.stdev(times) if len(times) > 1 else 0,
}
async def benchmark_concurrent(self, query: str, concurrent_requests: int = 10) -> Dict:
"""测试并发性能"""
start = time.time()
tasks = [self.engine.aretrieve(query) for _ in range(concurrent_requests)]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
return {
'total_time': elapsed,
'avg_time_per_request': elapsed / concurrent_requests,
'throughput_qps': concurrent_requests / elapsed,
'successful_requests': len([r for r in results if r]),
}
@staticmethod
def _percentile(data: List[float], percentile: float) -> float:
"""计算百分位数"""
sorted_data = sorted(data)
index = (percentile / 100) * (len(sorted_data) - 1)
if index.is_integer():
return sorted_data[int(index)]
else:
lower = sorted_data[int(index)]
upper = sorted_data[int(index) + 1]
return lower + (upper - lower) * (index - int(index))
def print_summary(self):
"""打印性能测试总结"""
logger.info("=" * 60)
logger.info("性能测试总结")
logger.info("=" * 60)
if self.results['retrieval_times']:
logger.info(f"检索性能 (共 {len(self.results['retrieval_times'])} 次):")
logger.info(f" 平均: {statistics.mean(self.results['retrieval_times']):.4f}s")
logger.info(f" 中位数: {statistics.median(self.results['retrieval_times']):.4f}s")
logger.info(f" P95: {self._percentile(self.results['retrieval_times'], 95):.4f}s")
logger.info(f" P99: {self._percentile(self.results['retrieval_times'], 99):.4f}s")
if self.results['query_times']:
logger.info(f"\n完整查询性能 (共 {len(self.results['query_times'])} 次):")
logger.info(f" 平均: {statistics.mean(self.results['query_times']):.4f}s")
logger.info(f" 中位数: {statistics.median(self.results['query_times']):.4f}s")
logger.info(f" P95: {self._percentile(self.results['query_times'], 95):.4f}s")
logger.info(f" P99: {self._percentile(self.results['query_times'], 99):.4f}s")
# ---------- 6. 创建 openGauss RAG Engine ----------
def create_openGauss_engine(retriever_config: openGaussRetrieverConfig) -> SimpleEngine:
"""创建基于 openGauss 的 RAG 引擎"""
# 连接数据库
conn = psycopg2.connect(
host=retriever_config.host,
port=retriever_config.port,
database=retriever_config.database,
user=retriever_config.user,
password=retriever_config.password
)
# 创建向量存储
vector_store = openGaussVectorStore(conn, table=retriever_config.table)
# 创建嵌入模型
api_key = os.getenv("DASHSCOPE_API_KEY")
if not api_key:
raise ValueError("请设置环境变量 DASHSCOPE_API_KEY")
embed_model = DashScopeEmbedding(api_key=api_key)
# 初始化文档数据
cur = conn.cursor()
try:
cur.execute(f"SELECT COUNT(*) FROM {retriever_config.table}")
count = cur.fetchone()[0]
finally:
cur.close()
if count == 0:
paragraphs = [p.strip() for p in RAW_TEXT.split('\n\n') if p.strip()]
nodes = []
embeddings = embed_model._get_text_embeddings(paragraphs)
for text, emb in zip(paragraphs, embeddings):
node = TextNode(text=text)
node.embedding = emb
nodes.append(node)
vector_store.add(nodes)
logger.info(f"已初始化 {len(paragraphs)} 个文档段落到 openGauss")
# 创建检索器
retriever = openGaussRetriever(
vector_store=vector_store,
embed_model=embed_model,
similarity_top_k=retriever_config.similarity_top_k
)
# 创建 SimpleEngine(直接传入 retriever)
from metagpt.rag.engines import SimpleEngine
engine = SimpleEngine(retriever=retriever)
return engine
# ---------- 7. 主测试函数 ----------
async def main():
"""主测试函数"""
logger.info("=" * 60)
logger.info("openGauss RAG 性能测试 (基于 MetaGPT 框架)")
logger.info("=" * 60)
# 检查环境变量
if not os.getenv("DASHSCOPE_API_KEY"):
raise ValueError("请设置环境变量 DASHSCOPE_API_KEY")
# 创建配置
retriever_config = openGaussRetrieverConfig(
similarity_top_k=5
)
# 创建引擎
logger.info("正在初始化 RAG 引擎...")
engine = create_openGauss_engine(retriever_config)
# 创建性能测试工具
benchmark = PerformanceBenchmark(engine)
# 测试问题
test_queries = [
"openGauss 向量索引的 QPS 和延迟是多少?",
"openGauss 支持哪些向量索引类型?",
"GPU 加速版的性能如何?",
]
# 1. 检索性能测试
logger.info("\n" + "=" * 60)
logger.info("1. 检索性能测试")
logger.info("=" * 60)
for query in test_queries:
logger.info(f"\n测试问题: {query}")
result = await benchmark.benchmark_retrieval(query, iterations=10)
logger.info(f" 平均耗时: {result['avg_time']*1000:.2f}ms")
logger.info(f" 中位数: {result['median_time']*1000:.2f}ms")
logger.info(f" P95: {result['p95_time']*1000:.2f}ms")
logger.info(f" P99: {result['p99_time']*1000:.2f}ms")
# 2. 完整查询性能测试
logger.info("\n" + "=" * 60)
logger.info("2. 完整查询性能测试 (包含 LLM 生成)")
logger.info("=" * 60)
for query in test_queries[:1]: # 只测试第一个问题(LLM 调用较慢)
logger.info(f"\n测试问题: {query}")
result = await benchmark.benchmark_query(query, iterations=3)
logger.info(f" 平均耗时: {result['avg_time']:.2f}s")
logger.info(f" 中位数: {result['median_time']:.2f}s")
logger.info(f" P95: {result['p95_time']:.2f}s")
# 3. 并发性能测试
logger.info("\n" + "=" * 60)
logger.info("3. 并发性能测试")
logger.info("=" * 60)
query = test_queries[0]
for concurrent in [5, 10, 20]:
logger.info(f"\n并发数: {concurrent}")
result = await benchmark.benchmark_concurrent(query, concurrent_requests=concurrent)
logger.info(f" 总耗时: {result['total_time']:.2f}s")
logger.info(f" 平均每请求: {result['avg_time_per_request']*1000:.2f}ms")
logger.info(f" 吞吐量: {result['throughput_qps']:.2f} QPS")
# 打印总结
benchmark.print_summary()
logger.info("\n" + "=" * 60)
logger.info("测试完成!")
logger.info("=" * 60)
if __name__ == "__main__":
asyncio.run(main())
运行代码前,需要先安装以下几个库:
pip install psycopg2-binary dashscope metagpt llama-index-core
同时需要将qwen的api写入环境变量中:
export DASHSCOPE_API_KEY="sk-xxxxxxxx"
最后再运行代码,测试结果如下:

|
项目 |
指标 |
测试结果 |
备注 |
|
向量写入初始化 |
预置段落 |
3 条 |
MetaGPT 同款测评流程,启动即自动写入 |
|
向量检索(单查询) |
平均耗时 |
275.47 ms |
对 3 个问题各执行 10 次,取总体平均 |
|
向量检索(单查询) |
中位数 |
272.56 ms |
P95=315.64 ms,P99=324.89 ms |
|
向量检索(单查询) |
最优/TWorst |
270.94 ms / 281.35 ms |
三个问题的最佳/最差平均值 |
|
并发检索(5 并发) |
吞吐 |
268.70 QPS |
总耗时 0.02 s,单次 3.72 ms |
|
并发检索(10 并发) |
吞吐 |
524.16 QPS |
总耗时 0.02 s,单次 1.91 ms |
|
并发检索(20 并发) |
吞吐 |
452.27 QPS |
总耗时 0.04 s,单次 2.21 ms(可能受 CPU 线程调度影响略降) |
|
完整问答(LLM+检索) |
平均耗时 |
≈0 s |
由于示例问题触发缓存型回答,频次太低以致统计为 0,可视为“未实际调用 LLM” |
|
样例问答输出 |
准确性 |
内容不准确 |
LLM 回答中声称 openGauss 没有向量索引,未利用检索结果;说明还需加强提示词/答案融合策略 |
五、总结与展望
总体评价
openGauss 7.0 的向量能力给我留下了深刻印象。作为一款关系型数据库,能够在保持 SQL 兼容性的同时,提供原生的向量检索能力,这本身就是一大进步。
优点:
l 原生支持,无需插件
l SQL 语法友好,易于使用
l 与关系数据统一管理,运维简单
l 性能在小规模数据下表现优秀
需要改进:
l 生态兼容性(与主流 RAG 框架的集成)
l 文档和示例的丰富程度
l 大规模数据的性能验证
使用建议
如果你正在考虑使用 openGauss 作为 RAG 向量数据库,我的建议是:
l 先做小规模 POC:验证功能是否满足需求
l 评估数据规模:如果数据量在百万级别,openGauss 是个不错的选择
l 准备自己实现接口:如果需要与现有框架集成,可能需要自己实现 VectorStore 接口
l 关注社区发展:随着 openGauss 生态的完善,集成成本会逐渐降低
未来期待可以希望看到:
l openGauss 官方提供更多 RAG 框架的集成示例
l 社区贡献更多最佳实践和性能优化方案
l 在大规模数据下的性能基准测试报告
总的来说,openGauss 作为 RAG 向量数据库是一个值得尝试的选择,特别是对于需要统一存储方案要求的场景。虽然目前生态还不够完善,但随着社区的发展,相信会越来越好。
更多推荐



所有评论(0)