01 — 前言

在上一篇文章中,我们已经盘点了市面上百花齐放的向量数据库生态。当视角从“方案选型”转向“落地开发”时,我们需要一个最具代表性的解剖对象来透视整个技术栈的底层逻辑。

我们特意选择以**Milvus为蓝本进行拆解。作为云原生与分布式架构的集大成者,Milvus的设计严谨且完备,它将向量数据库最核心的复杂性完整地呈现了出来。相比于一些封装极简的轻量级工具,Milvus 更像是一本详尽的教科书——**理解了它对分布式存储、索引构建与计算逻辑的处理方式,再去使用其他任何向量数据库,都将是即插即用的降维打击。

我们将目光聚焦于四个核心概念:集合(Collection)、索引(Index)、度量(Metric)与搜索(Search)。这并非 Milvus 独有的术语,而是贯穿整个向量检索领域的通用原语。无论是在其他数据库中被称为Table还是Class,无论底层算法如何变迁,这四个维度始终定义着数据如何被组织、精度与速度如何权衡、相似度如何被定义以及最终结果如何被交付。掌握这四个概念的实战逻辑,实际上就是掌握了驾驭所有向量数据库的底层心法。

这篇文章先介绍Collection(集合)相关的基本概念,下一篇我们再详细介绍索引、度量和搜索。本篇文章篇幅较长,干货满满,建议先收藏再阅读。

02 — 安装Milvus

安装 Milvus 主要有三种方式,取决于你的使用场景(是快速开发原型、本地测试,还是生产环境部署)。

  • 方案一:Milvus Lite (最轻量,适合 Python 开发/原型验证)

如果你只是想在 Python 脚本或 Jupyter Notebook 中快速试用 Milvus,或者用于 RAG 的 Demo 开发,这是最简单的方法。它不需要 Docker,直接作为一个 Python 库运行。

适用场景: 原型开发、本地测试、CI/CD 环境。

安装命令:

pip install milvus-lite

如何连接数据库:

from milvus_lite import MilvusLite
# 指定本地文件作为数据库存储
client = MilvusLite(uri="./milvus_demo.db")
  • 方案二:Milvus Standalone(最常用,适合本地开发/小规模生产)

这是标准的单机部署方式,适合在本地电脑(Mac/Windows/Linux)上完整体验 Milvus 的所有功能。

前置要求: 需已安装 Docker 和 Docker Compose。

安装步骤:

**1.下载配置文件:**下载官方的 docker-compose.yaml 文件。

wget https://github.com/milvus-io/milvus/releases/download/v2.3.5/milvus-standalone-docker-compose.yml -O docker-compose.yml

(注:如果无法使用 wget,可以直接在浏览器访问上述链接保存文件)

2.启动服务:

sudo docker-compose up -d

3.验证状态:

sudo docker-compose ps
  • 方案三:Milvus Distributed(适合大规模生产环境)

如果你需要在生产环境中处理海量向量数据(亿级以上),建议使用 K8s 集群部署(Milvus Cluster)。

前置要求: 一个 K8s 集群,已安装 Helm。

步骤:

  1. 添加 Milvus Helm 仓库:
helm repo add milvus https://zilliztech.github.io/milvus-helm/
helm repo update
  1. 安装 Milvus:
helm install my-release milvus/milvus

无论你使用 Docker (方案二)还是 K8s(方案三) 安装服务端,你都需要客户端 SDK 来连接数据库。

pip install pymilvus

连接测试代码:

from pymilvus import connections
# 连接到本地 Docker 启动的 Milvus
connections.connect(
alias="default",
host='localhost',
port='19530'
)
print("连接成功!")

总结建议

  • 刚开始写代码/测试 RAG 流程:直接用 Milvus Lite (pip install milvus-lite)。
  • 在本地正式开发应用:推荐 Docker Compose
  • 上生产环境:使用 Kubernetes

03 — 基本概念

在Milvus(以及大多数现代向量数据库)中,其层级结构与 MySQL/PostgreSQL 这种传统 RDBMS(关系型数据库)是高度对应的。

你可以直接使用这个对照表来理解:

层级 传统 RDBMS (如 MySQL) 向量数据库 (Milvus) 解释
L1 Database (数据库) Database (数据库) 逻辑隔离容器。用于区分不同的业务项目或租户。
L2 Table (表) Collection (集合) 数据存储单元。这是你最常操作的对象,定义了Schema。
L3 Row (行/记录) Entity (实体) 一条具体的数据(包含向量和标量)。
L4 Column (列/字段) Field (字段) 具体的属性(如 vector, id, text)。
  • Database(数据库)

在 Milvus 中,数据库是组织和管理数据的逻辑单元。多租户创建多个数据库,为不同的应用程序或租户从逻辑上隔离数据。类似关系型数据库里表和数据库的关系,每个Collection(集合)都是创建在某一个数据库里的。

下面的代码demo包含了对Milvus数据库的基本操作:

from pymilvus import MilvusClient, Milestone
def database_management_demo():
"""
Milvus 数据库(Database)管理全流程演示
包含:连接、创建、列出、修改属性、切换、删除
"""
# =======================================================
# 1. 初始化连接 (默认连接到 'default' 数据库)
# =======================================================
# 这里的 uri 指向你 Docker 部署的本地 Milvus 地址
print("\n--- 1. 连接到 Milvus ---")
client = MilvusClient(uri="http://localhost:19530", token="root:Milvus")
# 注意:如果你没有修改过配置,docker版默认是不开启鉴权的,token可以省略。
# 如果开启了鉴权,默认是 root:Milvus
# 定义我们要演示的数据库名称
demo_db_name = "my_demo_database"
# =======================================================
# 2. 查 (List): 查看当前所有数据库
# =======================================================
print(f"\n--- 2. 列出当前所有数据库 ---")
dbs = client.list_databases()
print(f"当前数据库列表: {dbs}")
# 为了演示顺利,如果之前存在同名数据库,先清理掉
if demo_db_name in dbs:
print(f"检测到 {demo_db_name} 已存在,正在清理以确保演示纯净...")
# 注意:删除数据库前,必须确保数据库内没有 Collection (集合)
# 这里假设它是空的直接删除,实际生产中要先循环 drop_collection
client.drop_database(demo_db_name)
# =======================================================
# 3. 增 (Create): 创建数据库
# =======================================================
print(f"\n--- 3. 创建新数据库: {demo_db_name} ---")
# 创建数据库。
# Milvus 的 Database 创建相对简单,不像 Collection 那样需要定义复杂的 Schema。
# 这里的 properties 是可选参数,用于存储一些自定义的元数据键值对。
client.create_database(
db_name=demo_db_name,
properties={"owner": "demo_user", "priority": "high"}
)
print(f"数据库 {demo_db_name} 创建成功!")
# =======================================================
# 4. 改 (Alter): 修改数据库属性
# =======================================================
# 注意:你不能直接“重命名”数据库。
# 这里的“改”通常指的是修改数据库的 Properties (元数据/配置属性)。
print(f"\n--- 4. 修改数据库属性 (Alter) ---")
# 比如我们想给这个数据库增加一个属性,或者修改之前的属性
client.alter_database(
db_name=demo_db_name,
properties={"priority": "critical", "description": "for_wechat_article_demo"}
)
# 验证修改结果(通过 describe_database 查看详细信息)
db_info = client.describe_database(demo_db_name)
print(f"修改后的数据库信息: {db_info}")
# =======================================================
# 5. 切换/使用数据库 (Use)
# =======================================================
print(f"\n--- 5. 切换/使用指定数据库 ---")
# 在 MilvusClient 中,要操作特定数据库,推荐的方式是实例化一个新的 Client 指向该 DB。
# 这样后续所有的 create_collection, search 等操作都会在这个 DB 下进行。
db_specific_client = MilvusClient(
uri="http://localhost:19530",
db_name=demo_db_name  # <--- 这里指定数据库
)
# 验证一下我们是否真的在用新数据库
# 我们在新数据库里建一个极简的 Collection 试试
# 这是一个“隐性”验证:如果在 list_collections 里能看到,说明切换成功
if not db_specific_client.list_collections():
print(f"当前连接的数据库 ({demo_db_name}) 为空,切换成功。")
# =======================================================
# 6. 删 (Drop): 删除数据库
# =======================================================
print(f"\n--- 6. 删除数据库: {demo_db_name} ---")
# 注意:Milvus 规定,删除数据库前,该数据库必须是空的(没有任何 Collection)。
# 如果里面有表,必须先 drop_collection。
# 因为我们刚才只是切过去看了一眼,没建表,所以可以直接删。
db_specific_client.close() # 先关闭针对该库的连接
client.drop_database(demo_db_name)
print(f"数据库 {demo_db_name} 已删除。")
# 最终确认
final_dbs = client.list_databases()
print(f"最终数据库列表: {final_dbs}")
if __name__ == "__main__":
try:
database_management_demo()
except Exception as e:
print(f"\n发生错误: {e}")
  • Collection(集合)

在Milvus的架构中,**Collection(集合)**是管理对象的最核心层级。对于初次接触向量数据库的开发者,可以从以下两个工程维度来理解它的定义:

1. 数据的逻辑隔离与检索边界:Collection是数据存储的逻辑命名空间。 Milvus 的计算引擎设计决定了所有的搜索请求(Search)都必须明确指定一个 Collection。系统不支持跨Collectio的联合检索。这意味着Collection构成了天然的业务隔离边界——你需要根据业务场景(如“问答库”、“商品库”)将数据划分到不同的Collection中,以此确立检索范围。

2. 资源加载的最小控制单元(核心机制):这是 Milvus 区别于许多传统数据库的关键特性。 由于向量检索(特别是索引搜索)是计算密集型任务,为了保证低延迟,系统要求活跃数据必须驻留内存(Memory Resident)。Collection 是管理这种“磁盘-内存”流转的最小单位。

  • Release状态(卸载):数据仅持久化存储在对象存储/硬盘中,不占用 Query Node 的内存资源,不可被检索。
  • Load状态(加载):数据被完整加载至内存中(Query Node),此时系统处于就绪状态,可随时响应查询。

在生产环境中,合理控制 Collection 的 Load/Release 状态,是进行容量规划和成本优化的关键手段。

下面的代码demo包含了对Milvus Collection的基本操作。重点展示如何创建集合,以及如何通过 loadrelease 接口控制计算资源的分配:

from pymilvus import MilvusClient
def collection_lifecycle_demo():
"""
Milvus 集合(Collection) 核心操作演示
展示集合的创建、数据写入以及“加载/释放”的资源管理机制。
"""
# 1. 初始化客户端连接
# -------------------------------------------------------
client = MilvusClient(uri="http://localhost:19530", token="root:Milvus")
collection_name = "production_collection_demo"
# 环境清理:确保演示环境纯净
if client.has_collection(collection_name):
client.drop_collection(collection_name)
# 2. 创建集合 (Create)
# -------------------------------------------------------
# 在不显式定义 Schema 的快速模式下,只需指定向量维度。
# Milvus 会自动初始化主键字段 'id' 和向量字段 'vector'。
print(f"\n--- [Step 1] 创建集合: {collection_name} ---")
client.create_collection(
collection_name=collection_name,
dimension=768  # 指定向量维度为 768
)
print(f"集合已创建。当前状态:仅元数据存在,无数据实体。")
# 3. 写入数据 (Insert)
# -------------------------------------------------------
# 向集合中写入演示数据。
# 注意:此时数据已写入持久化存储(Disk/MinIO),但尚未加载到查询内存中。
print(f"\n--- [Step 2] 写入数据 ---")
data = [
{"id": 1001, "vector": [0.1] * 768, "source": "paper_A"},
{"id": 1002, "vector": [0.2] * 768, "source": "paper_B"}
]
res = client.insert(collection_name=collection_name, data=data)
print(f"写入完成,影响行数: {res['insert_count']}")
# 4. 加载集合 (Load) - 关键操作
# -------------------------------------------------------
# 将数据从持久化存储热加载到 Query Node 内存中。
# 只有执行了 Load 操作,集合才具备可检索性 (Searchable)。
print(f"\n--- [Step 3] 加载集合 (Load) ---")
client.load_collection(collection_name)
print("集合状态变更: Loaded (内存驻留,服务就绪)。")
# (此时可执行 search/query 操作...)
# 5. 释放集合 (Release) - 关键操作
# -------------------------------------------------------
# 将数据从内存中卸载,释放计算资源,但保留磁盘数据。
# 适用于冷备数据或非高频访问的业务场景。
print(f"\n--- [Step 4] 释放集合 (Release) ---")
client.release_collection(collection_name)
print("集合状态变更: Released (内存已释放,不可检索)。")
# 6. 删除集合 (Drop)
# -------------------------------------------------------
# 物理删除集合及其包含的所有数据。
print(f"\n--- [Step 5] 删除集合 ---")
client.drop_collection(collection_name)
print("集合已物理删除。")
if __name__ == "__main__":
collection_lifecycle_demo()
  • 集合的Schema(架构)

Collection和schema的关系就像是关系型数据库里表和表结构的关系,schema定义了一个collection的内部结构。一般创建collection时就要提前定义好它的schema。

集合架构包含一个主键、最多四个向量字段以及多个标量字段:

  • 向量字段(Vector Field)
  • 数量约束:支持 1 到 4 个
  • 核心作用:Milvus 2.4+ 版本支持多向量(Multi-Vector)特性,允许在一个实体中同时存储文本向量、图像向量或不同模型生成的向量(如稀疏向量 + 稠密向量)。
  • 技术要求:每个向量字段的维度(Dimension)在定义时必须静态锁定,不可变更。
  • 标量字段(Scalar Field)
  • 数量约束:支持多个(受限于系统总字段数限制,通常为 64 个)。
  • 核心作用:用于存储非向量的属性数据(如 Int64, VarChar, Bool, Array, JSON 等)。
  • RAG 场景价值:它们主要用于**混合检索(Hybrid Search)**中的前置过滤(Pre-filtering)。通过标量条件缩小检索范围,可以显著提升查询的准确率与性能。

在上述字段中,必须选定一个特殊的字段来承担“身份标识”的职责。

  • 主键(Primary Key)
  • 数量约束有且仅有 1 个
  • 定义方式:你需要从标量字段中指定一个(仅支持 Int64VarChar 类型)作为主键,用于在 Collection 内部唯一标识一个实体。
  • AutoID 机制(生成策略): 这是依附于主键的一个关键配置项,决定了主键值的来源:
  • 禁用 AutoID (auto_id=False):**(RAG 实战推荐)**主键生成权归属开发者。写入数据时,必须显式传入主键值。这种方式有利于保持向量数据库与外部业务库(如 MySQL)或原始语料库 ID 的严格一致。
  • 启用 AutoID (auto_id=True): 主键生成权移交系统。Milvus 会自动生成唯一的 ID(通常基于 Snowflake 算法),适用于不关注 ID 具体业务含义的流式写入场景。

下面这段demo代码创建了一个包含了两个向量和五个标量字段的Collection:

from pymilvus import MilvusClient, DataType
def schema_definition_demo():
# ——————————————
# 0. 连接 Milvus
# ——————————————
client = MilvusClient(uri="http://localhost:19530", token="root:Milvus")
collection_name = "schema_structure_demo"
# 清理旧数据,确保演示环境纯净
if client.has_collection(collection_name):
client.drop_collection(collection_name)
# ——————————————
# 1. 创建 Schema 对象
# ——————————————
# auto_id=False: 表示我们将手动管理主键 ID
# enable_dynamic_field=True: 允许存储 Schema 定义之外的动态字段
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True,
description="包含多模态向量与丰富标量的全功能 Schema"
)
print("✓ 已创建基础 Schema 对象")
# ——————————————
# 2. 定义主键 (Primary Key)
# ——————————————
# 必须指定一个主键,此处使用 Int64 类型
schema.add_field(
field_name="doc_id",
datatype=DataType.INT64,
is_primary=True,
description="文档唯一标识 ID"
)
print("✓ 已添加主键字段: doc_id")
# ——————————————
# 3. 定义 2 种向量字段 (Vector Fields)
# ——————————————
# [向量 1] 稠密浮点向量 (Float Vector)
# 场景:用于存储文本 embedding,是最常用的向量类型
schema.add_field(
field_name="dense_vector",
datatype=DataType.FLOAT_VECTOR,
dim=768,
description="768维文本语义向量"
)
# [向量 2] 二进制向量 (Binary Vector)
# 场景:用于图像指纹或哈希去重
# 注意:dim 指的是比特位数 (Bits),128 bit 对应 16 字节
schema.add_field(
field_name="binary_hash",
datatype=DataType.BINARY_VECTOR,
dim=128,
description="128位二进制哈希向量"
)
print("✓ 已添加向量字段: dense_vector, binary_hash")
# ——————————————
# 4. 定义 5 种标量字段 (Scalar Fields)
# ——————————————
# [标量 1] 字符串 (VarChar)
schema.add_field(
field_name="title",
datatype=DataType.VARCHAR,
max_length=256,
description="文档标题"
)
# [标量 2] 整数 (Int32)
schema.add_field(
field_name="publish_year",
datatype=DataType.INT32,
description="发布年份"
)
# [标量 3] 布尔值 (Bool)
schema.add_field(
field_name="is_reviewed",
datatype=DataType.BOOL,
description="审核状态"
)
# [标量 4] JSON 对象 (JSON)
# 用于存储结构灵活的元数据,如 {"author": "Alex", "source": "web"}
schema.add_field(
field_name="meta_info",
datatype=DataType.JSON,
description="扩展元信息"
)
# [标量 5] 数组 (Array)
# 用于存储多值属性,如文章的多个标签 ["Tech", "AI"]
schema.add_field(
field_name="tags",
datatype=DataType.ARRAY,
element_type=DataType.VARCHAR, # 声明数组内部存的是字符串
max_capacity=10,               # 数组最多包含 10 个元素
max_length=50,                 # 每个元素最长 50 字符
description="标签列表"
)
print("✓ 已添加标量字段: title, publish_year, is_reviewed, meta_info, tags")
# ——————————————
# 5. 使用 Schema 创建 Collection
# ——————————————
print(f"\n--- 正在创建集合: {collection_name} ---")
# 将定义好的 Schema 传入,正式创建集合容器
client.create_collection(
collection_name=collection_name,
schema=schema
)
print("✓ 集合创建成功,Schema 结构已生效。")
# ——————————————
# 6. 查看 Collection 详情
# ——————————————
# 获取并打印集合的详细信息,验证字段结构是否符合预期
desc = client.describe_collection(collection_name)
print(f"\n[集合详情验证]:")
print(f" - 字段总数: {len(desc['fields'])} (包含自动生成的隐藏字段)")
for field in desc['fields']:
# 简单打印字段名和类型 ID
print(f" - Field: {field['name']}, Type: {field['type']}")
# 清理环境
client.drop_collection(collection_name)
print("\n✓ 演示结束,已清理测试集合。")
if __name__ == "__main__":
schema_definition_demo()
  • Index(索引)

如果说 Schema 定义了数据的“骨架”,那么 索引(Index) 就是让数据跑起来的“肌肉”。

在海量数据场景下,逐条扫描数据(Brute-force 暴力搜索)的效率是无法接受的。索引通过特定的数学结构重新组织数据,以空间换时间,从而极大提升检索效率。在 Milvus 的实战中,关于索引的配置有以下三条必须遵守的铁律:

**1. 向量索引:从“可选”变成“强制”:**在传统数据库中,不建索引可能只是查询慢一点。但在 Milvus 中,向量字段的索引是强制性的。如果你不为向量字段构建索引,系统在搜索时通常会默认退化为暴力搜索(FLAT),这在生产级数据规模下不仅性能极差,甚至可能因资源耗尽而导致查询失败。因此,数据写入完成后,第一件事就是构建向量索引。

2. 标量索引:建议“应建尽建”:对于用于过滤(Filter)的标量字段(如 year, category, user_id),建议为所有常用的过滤字段创建索引。 虽然标量索引不是强制的,但在混合检索(Hybrid Search)中,Milvus 会先利用标量索引快速圈定范围,再进行向量计算。如果标量字段缺乏索引,过滤阶段的性能瓶颈会严重拖累整体的 RAG 响应速度。

**3. 配置逻辑的差异:向量 vs 标量:**在代码层面,两者的配置深度不同:

  • 向量字段:需要同时定义 索引类型(Index Type)度量类型(Metric Type)。你不仅要告诉系统用什么算法(如 IVF, HNSW)来组织数据,还要告诉系统用什么数学公式(如 欧氏距离 L2, 余弦相似度 COSINE)来计算距离。

  • 标量字段:只需设置索引类型(Index Type)。系统通常会自动根据数据类型选择最优的倒排索引或 Trie 树结构,配置相对简单。

    下一篇文章我们会给大家详细介绍索引类型和度量类型。

构建向量索引有两种方式:

  • 自动构建:创建Collection时,指定Collection的字段的索引规则。这样后续插入的数据就会自动构建索引。这种方式适合初始数据不大,并且总体数据量较小的场景,Milvus完全可以自动管理数据的索引。
  • 先插数据,再手动创建索引:创建Collection时,不指定Collection的索引规则,也就是说这时候插入数据是不带索引的。先将所有数据全都插入到数据库中,然后再创建索引。创建完索引之后,后续再插入新的数据,Milvus数据库就会给新的数据自动构建索引了。

第二种方式的效率和效果比第一种更高:

原因一:索引质量(针对 IVF 类索引)

这是最关键的。像 IVF_FLAT 这种最常用的索引,它需要一个**“训练(Train)”**的过程。

  • 它需要观察数据的整体分布,算出 128 个(nlist)聚类中心点。
  • 如果边插边建:第一批数据进来时,系统只能根据这批数据算中心点。后面进来的数据分布可能完全不同,导致索引的划分非常不准,搜索精度下降。
  • 如果最后建:系统拥有了**“上帝视角”**,能根据所有数据的分布计算出最完美的聚类中心,检索效果最好。
原因二:系统开销(CPU 利用率)
  • 边插边建:每满一个 Segment(比如 500MB)就启动一次索引构建任务。如果你有 1TB 数据,就要启动 2000 次构建任务,频繁抢占 CPU,导致写入速度变慢。
  • 最后建:数据全部落盘后,启动一次大规模的计算任务,集中火力干完。虽然总计算量差不多,但省去了频繁的任务调度和上下文切换开销,整体耗时更短。

所以在初始数据量就很大的情况下,第二种方式显然是最佳方案。

下面两份demo代码分别演示了上面两种方案的核心代码:

# 方式一:创建Collection时定义索引规则
# 1. 准备 Schema
schema = MilvusClient.create_schema(auto_id=False, enable_dynamic_field=True)
schema.add_field("doc_id", DataType.INT64, is_primary=True)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=768)
# 2. 准备索引配置 (Index Params)
# 关键点:我们在这里就定义好怎么建索引
index_params = client.prepare_index_params()
index_params.add_index(
field_name="vector",
index_type="IVF_FLAT",
metric_type="COSINE",
params={"nlist": 128}
)
# 3. 创建集合时,直接传入 index_params
# -------------------------------------------------------
# 这一步相当于告诉 Milvus:“这个集合从第一天起,所有数据都要按这个规则建索引”
client.create_collection(
collection_name=collection_name,
schema=schema,
index_params=index_params  # <--- 关键参数
)
# 方式二:先创建没有索引的Collection,把所有数据插入完成后再创建索引
# 数据导完了,现在开始根据所有数据计算索引,并立下“规矩”
print("... 开始构建索引...")
index_params = client.prepare_index_params()
index_params.add_index(
field_name="vector",
index_type="IVF_FLAT",
metric_type="COSINE",
params={"nlist": 128}
)
# 显式调用 create_index
client.create_index(
collection_name=collection_name,
index_params=index_params
)
  • Entity(实体)

在 Milvus 的架构中,**实体(Entity)**是Collection中承载数据的最小单位。如果把 Collection 比作一张 Excel 表,那么 Entity 就是表中的“一行数据”。

关于实体,在实战中需要掌握以下核心逻辑:

1. Schema 约束与动态字段(Dynamic Field):通常情况下,实体必须严格遵循 Schema 定义。如果插入的数据包含未定义的字段,系统会报错。但在 RAG 场景中,文档的元数据往往不固定(有的有作者,有的只有 URL)。 Milvus 提供了动态字段机制来解决这个问题:

  • 开启方式:在定义 Schema 时设置 enable_dynamic_field=True
  • 写入行为:一旦开启,当您插入实体时,如果数据中包含了 Schema 里没定义的额外字段,Milvus 不会报错,而是会自动将这些字段打包存储在一个特殊的内部 JSON 字段($meta)中。这赋予了 Milvus 类似 NoSQL 的灵活性。

**2. 分区路由(Partitioning):**实体进入 Collection 后,必须落脚于某个分区(Partition)。

  • 默认行为:若不指定分区,实体会自动进入名为 _default 的默认分区。
  • 指定路由:如果业务需要物理隔离(如按年份归档),可以在插入请求中显式指定 partition_name,将数据精准投递到特定分区。

**3. 智能更新(Upsert):**在数据维护中,我们常面临“不确定数据是否存在”的困境。此时,Upsert(Update + Insert)是最佳方案。

  • 机制:基于主键判断。如果主键不存在,则执行 Insert(插入);如果主键已存在,则执行 Update(覆盖更新)。
  • 强制要求:Upsert 请求中的实体数据必须包含主键,这是判断覆盖与否的唯一依据。

下面的demo代码包含了插入数据(包含动态字段)、指定分区插入、用upsert插入/更新数据的操作:

import random
from pymilvus import MilvusClient, DataType
def entity_operations_demo():
# 1. 初始化连接
client = MilvusClient(uri="http://localhost:19530", token="root:Milvus")
collection_name = "entity_dynamic_demo"
partition_name = "vip_zone"
# 清理环境
if client.has_collection(collection_name):
client.drop_collection(collection_name)
# 2. 准备 Schema (开启动态字段)
# -------------------------------------------------------
# 关键点:enable_dynamic_field=True
# 这允许我们在后续插入数据时,带上 schema 里没定义的字段
schema = MilvusClient.create_schema(
auto_id=False,
enable_dynamic_field=True
)
schema.add_field("doc_id", DataType.INT64, is_primary=True)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=768)
# 注意:我们只定义了 doc_id 和 vector 两个字段
# 准备索引
index_params = client.prepare_index_params()
index_params.add_index(field_name="vector", index_type="IVF_FLAT", metric_type="COSINE", params={"nlist": 128})
# 创建集合
client.create_collection(collection_name=collection_name, schema=schema, index_params=index_params)
client.create_partition(collection_name=collection_name, partition_name=partition_name)
print(f"✓ 集合已创建,动态字段支持: Open")
# =======================================================
# 3. 场景一:插入包含“动态字段”的数据
# =======================================================
print(f"\n--- 场景一:插入动态字段 ---")
# 构造数据:
# 'source' 和 'publish_year' 根本没在 Schema 里定义!
# 但因为开了 dynamic_field,它们会被自动接纳并存入 $meta
data_dynamic = [
{
"doc_id": 1001,
"vector": [random.random() for _ in range(768)],
"source": "Wikipedia",      # <--- 动态字段
"publish_year": 2024        # <--- 动态字段
}
]
res1 = client.insert(collection_name=collection_name, data=data_dynamic)
print(f"动态字段插入成功,影响行数: {res1['insert_count']}")
# 验证一下,看看能不能把动态字段查出来
# 提示:在 output_fields 中可以使用通配符 * 或者指定动态字段名
check_res = client.query(
collection_name=collection_name,
filter="doc_id == 1001",
output_fields=["source", "publish_year"]
)
print(f"读取验证 -> source: {check_res[0].get('source')}, year: {check_res[0].get('publish_year')}")
# =======================================================
# 4. 场景二:指定分区插入 (Insert to Partition)
# =======================================================
print(f"\n--- 场景二:指定分区插入 ---")
data_partition = [
{"doc_id": 2001, "vector": [random.random() for _ in range(768)], "tag": "VIP_User"}
]
res2 = client.insert(
collection_name=collection_name,
data=data_partition,
partition_name=partition_name # <--- 指定落入 'vip_zone'
)
print(f"插入 '{partition_name}' 分区成功。")
# =======================================================
# 5. 场景三:Upsert (覆盖更新)
# =======================================================
print(f"\n--- 场景三:Upsert 操作 ---")
# 假设 1001 的 source 信息错了,需要修正
upsert_data = [
# 更新 1001 (存在则覆盖)
{"doc_id": 1001, "vector": [random.random() for _ in range(768)], "source": "Official Doc"},
# 插入 3001 (不存在则新增)
{"doc_id": 3001, "vector": [random.random() for _ in range(768)], "source": "Blog"}
]
res3 = client.upsert(collection_name=collection_name, data=upsert_data)
print(f"Upsert 操作成功,更新/插入行数: {res3['upsert_count']}")
# 验证 1001 更新结果
updated_res = client.query(collection_name=collection_name, filter="doc_id == 1001", output_fields=["source"])
print(f"验证 1001 更新后的 source: {updated_res[0]['source']} (预期: Official Doc)")
# 清理
client.drop_collection(collection_name)
if __name__ == "__main__":
entity_operations_demo()

04

核心配置

在掌握了集合的基础操作后,要构建高性能的 RAG 系统,我们还需要了解围绕 Collection 的五个核心配置:

1. 分区(Partition):逻辑加速

  • 定义:它是集合内部的逻辑子集。所有分区共享同一个 Schema,但数据是物理隔离的。
  • 实战价值查询剪枝(Pruning)。通过将数据按业务属性(如“2024年数据”、“法务部文档”)分配到不同分区,搜索时指定分区名称,Milvus 即可直接忽略其他分区的数据。这能在数据量巨大的情况下显著提升检索效率。

2. 分片(Shard):物理扩展

  • 定义:它是集合的水平物理切片,每个分片对应一个独立的数据写入通道(vChannel)。
  • 实战价值提升吞吐量。默认情况下集合只有 1 个分片。对于海量数据写入或高并发场景,适当增加分片数量,可以利用多节点的并行计算能力,突破单节点的写入瓶颈。
  • 注意:Partition 是逻辑隔离(为了搜得快),Shard 是物理分散(为了写得快)。

3. 别名(Alias):灵活切换

  • 定义:集合的“软链接”或昵称。一个集合可以有多个别名,但一个别名同一时间只能指向一个集合。
  • 实战价值零停机升级。在生产环境中,你可以创建一个新集合Collection_V2 并重建索引,数据准备好后,只需将别名 Prod_Latest 的指向从 V1 瞬间切换到 V2,业务代码完全无需修改。

4. 函数(Function):内部转换

  • 定义:Milvus 允许在创建集合时绑定自定义函数,用于字段间的自动转换。
  • 实战价值简化客户端逻辑。这是 Milvus 2.5+ 全文检索(Full Text Search)的核心机制。例如,你可以定义一个函数,自动将输入的文本字段(VarChar)在数据库内部转换为稀疏向量(Sparse Vector),从而让客户端免去复杂的 Embedding 预处理步骤。

5. 一致性级别(Consistency Level):权衡艺术

  • 定义:作为分布式数据库,Milvus 遵循 CAP 定理,允许用户定义跨节点的数据同步策略。
  • 实战价值速度与精度的平衡。你可以在创建集合或单次搜索时指定一致性。

Milvus 支持以下四种级别:

  • Strong(强一致性)
  • 定义:保证数据写入并收到成功响应后,随后的搜索一定能搜到这条数据。
  • 实战价值数据准确性最高,但性能损耗最大。适用于对数据实时性要求极高(如金融风控、实时订单检索)的场景,但在 RAG 场景中通常不推荐,因为会显著增加搜索延迟。
  • Bounded Staleness(有界旧一致性)—— Milvus 默认级别
  • 定义:允许搜索结果在一定的时间范围内(默认为 5秒)滞后于最新写入的数据。即:“我不要求立刻搜到刚插进去的那条,但 5 秒之前的必须能搜到”。
  • 实战价值最佳平衡点。它在保证搜索速度的同时,将数据不可见的时间控制在可接受范围内。
  • Session(会话一致性)
  • 定义:保证“自己写的数据,自己能立刻搜到”。即在同一个客户端会话中,写入后立刻搜索是可见的,但其他客户端可能稍有延迟。
  • 实战价值最适合 RAG 问答。当用户上传一篇文档并立刻提问时,Session 级别能确保他马上能搜到这篇文档的内容,用户体验极佳,且不会像 Strong 那样拖慢系统整体速度。
  • Eventually(最终一致性)
  • 定义:不保证写入后立刻能搜到,但保证数据“最终”会同步一致。
  • 实战价值搜索速度最快,吞吐量最高。适用于海量数据批量导入后的离线检索场景,或者对实时性完全不敏感的知识库。

在 RAG 系统开发中,如果场景是“用户上传文档即问答”,推荐使用 Session;如果是后台更新知识库供全员检索,推荐使用默认的 Bounded Staleness

05 — 小结

本文介绍了三种安装Milvus的方式,然后详细介绍了Collection的几个基础概念。下一篇我们将继续以Milvus为例,学习另外几个向量数据库重要的概念:索引类型、度量类型和搜索。敬请期待!

学AI大模型的正确顺序,千万不要搞错了

🤔2026年AI风口已来!各行各业的AI渗透肉眼可见,超多公司要么转型做AI相关产品,要么高薪挖AI技术人才,机遇直接摆在眼前!

有往AI方向发展,或者本身有后端编程基础的朋友,直接冲AI大模型应用开发转岗超合适!

就算暂时不打算转岗,了解大模型、RAG、Prompt、Agent这些热门概念,能上手做简单项目,也绝对是求职加分王🔋

在这里插入图片描述

📝给大家整理了超全最新的AI大模型应用开发学习清单和资料,手把手帮你快速入门!👇👇

学习路线:

✅大模型基础认知—大模型核心原理、发展历程、主流模型(GPT、文心一言等)特点解析
✅核心技术模块—RAG检索增强生成、Prompt工程实战、Agent智能体开发逻辑
✅开发基础能力—Python进阶、API接口调用、大模型开发框架(LangChain等)实操
✅应用场景开发—智能问答系统、企业知识库、AIGC内容生成工具、行业定制化大模型应用
✅项目落地流程—需求拆解、技术选型、模型调优、测试上线、运维迭代
✅面试求职冲刺—岗位JD解析、简历AI项目包装、高频面试题汇总、模拟面经

以上6大模块,看似清晰好上手,实则每个部分都有扎实的核心内容需要吃透!

我把大模型的学习全流程已经整理📚好了!抓住AI时代风口,轻松解锁职业新可能,希望大家都能把握机遇,实现薪资/职业跃迁~

这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费

在这里插入图片描述

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐