【实战教程】基于 Milvus + 通义千问构建食品安全智能问答系统(RAG 问答篇)(二)
本文完整拆解了 RAG 流程的「检索 + 生成」核心环节,实现了从用户问题 → 向量检索 → 大模型回答的端到端链路。代码与入库阶段形成闭环,兼顾工程性与易用性,可直接作为食品安全知识库问答系统的核心模块。后续可进一步优化交互形式(如 Web 界面)、扩展知识库类型(如 Word/Excel),完善 RAG 应用落地能力。
·
上一篇我们完成了 PDF 知识库的向量入库,本文将聚焦RAG 问答核心流程—— 通过 Milvus 相似检索获取上下文,结合通义千问 API 生成精准回答,完整实现 “用户提问→向量检索→智能回答” 的闭环。
一、代码拆分
步骤 1:环境配置与依赖导入
import os
import dashscope
from dashscope import Generation
from dotenv import load_dotenv
from pymilvus import connections, Collection, utility
from sentence_transformers import SentenceTransformer
关键说明:
os + dotenv:加载环境变量(Milvus 配置、通义千问 API Key),解耦配置与代码;pymilvus:Milvus 检索核心依赖,实现数据库连接、相似性搜索;sentence_transformers:生成查询文本向量,与入库阶段模型保持一致;dashscope:阿里云通义千问 SDK,调用大模型生成回答;
步骤 2:加载嵌入模型(与入库阶段统一)
# 从本地路径加载嵌入模型
local_model_path = 'bge-small-zh-v1.5'
try:
embedding_model = SentenceTransformer(local_model_path)
print(f"✅ 成功从本地路径加载嵌入模型: {local_model_path}")
except Exception as e:
print(f"❌ 加载嵌入模型失败: {e}")
exit() # 如果模型加载失败,程序无法运行,直接退出
关键说明:
- 模型路径与入库阶段保持一致(bge-small-zh-v1.5),确保查询向量与入库向量维度 / 语义一致;
- 模型加载失败直接退出(向量生成是检索的核心,无模型则无法继续);
步骤 3:加载环境变量与 Milvus 配置
# 加载环境变量
load_dotenv()
# Milvus 配置
MILVUS_HOST = os.getenv("MILVUS_HOST", "localhost")
MILVUS_PORT = os.getenv("MILVUS_PORT", "19530")
COLLECTION_NAME = os.getenv("COLLECTION_NAME", "food_safety_kb")
# 全局集合对象
collection = None
关键说明:
- 环境变量默认值兜底,与入库阶段配置对齐,避免跨环节配置不一致;
- 定义全局
collection对象,减少重复初始化开销;
步骤 4:初始化 Milvus 连接
def init_milvus():
"""初始化 Milvus 连接并返回集合对象"""
global collection
try:
# 连接 Milvus
if "default" not in connections.list_connections():
connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)
print(f"✅ 成功连接 Milvus: {MILVUS_HOST}:{MILVUS_PORT}")
# 检查集合是否存在
if not utility.has_collection(COLLECTION_NAME):
print(f"❌ 集合 '{COLLECTION_NAME}' 不存在")
exit()
# 获取集合对象
collection = Collection(COLLECTION_NAME)
print(f"✅ 成功获取集合: {COLLECTION_NAME}")
return collection
except Exception as e:
print(f"❌ 初始化 Milvus 失败: {e}")
exit()
核心知识点:
- 连接校验:先检查
default连接是否存在,避免重复连接; - 集合校验:确保入库阶段创建的
food_safety_kb集合存在,否则退出; - 全局赋值:将集合对象赋值给全局变量,供检索函数调用;
步骤 5:Milvus 相似性检索
# 搜索相似文本
def search_similar(query, top_k=3):
"""
使用 SentenceTransformer 模型对查询进行编码,
然后在 Milvus 中搜索相似的文本片段。
"""
print(f"正在为问题 '{query}' 搜索相关信息...")
# 使用 embedding_model 对查询文本进行编码
query_embedding = embedding_model.encode([query])[0]
# 使用余弦相似度(与入库索引 metric_type 一致)
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = collection.search(
data=[query_embedding],
anns_field="embedding",
param=search_params,
limit=top_k,
output_fields=["text"]
)
context = ""
for hit in results[0]:
context += hit.entity.get("text") + "\n\n"
return context.strip()
核心知识点:
- 向量编码:查询文本编码为 512 维向量(与入库阶段模型输出维度一致);
- 检索参数:
metric_type="L2":与入库索引的距离度量方式一致(保证相似度计算逻辑统一);nprobe=10:检索时遍历的聚类桶数(trade-off:nprobe 越大,检索精度越高、速度略降);limit=top_k:返回 Top3 最相似文本块,兼顾相关性与回答简洁性;
- 上下文拼接:将相似文本块拼接为字符串,作为大模型回答的参考依据;
步骤 6:调用通义千问生成回答
# 使用通义千问 API 生成回答
def generate_answer_with_qwen(query, context):
"""
调用通义千问 API 生成回答
"""
# 从环境变量中获取 API Key
api_key = os.getenv("DASHSCOPE_API_KEY")
# 检查 API Key 是否成功获取
if not api_key:
return "错误:无法从环境变量 DASHSCOPE_API_KEY 中获取 API Key。请确保你已经正确设置了环境变量。"
# 配置通义千问 API Key
dashscope.api_key = api_key
# 构建提示词
messages = [
{
"role": "system",
"content": "你是一个食品安全领域的专家。请根据下面提供的上下文信息来回答用户的问题。如果上下文信息不足以回答,请说“抱歉,我无法回答这个问题。”请不要编造信息。"
},
{
"role": "user",
"content": f"上下文: {context}\n\n问题: {query}"
}
]
try:
# 调用通义千问 API
response = Generation.call(
model="qwen-turbo",
messages=messages,
result_format="message",
stream=False,
temperature=0.7,
top_p=0.8
)
if response.status_code == 200:
return response.output.choices[0].message.content.strip()
else:
return f"调用 API 失败: {response.code} - {response.message}"
except Exception as e:
return f"调用 API 时发生未知错误: {str(e)}"
核心知识点:
- API Key 校验:从环境变量读取
DASHSCOPE_API_KEY,避免硬编码泄露; - 提示词工程:
system角色:限定大模型身份(食品安全专家),约束回答规则(不编造、无信息则拒答);user角色:拼接检索到的上下文 + 用户问题,让大模型基于知识库回答;
- 大模型参数:
model="qwen-turbo":通义千问轻量版,兼顾速度与效果;temperature=0.7:回答随机性(0~1,值越小越严谨);top_p=0.8:采样策略,控制回答多样性;
步骤 7:RAG 问答主逻辑
# 问答主函数
def rag_qa(query):
context = search_similar(query)
if not context:
return "抱歉,我无法回答这个问题。"
print("正在调用通义千问 API 生成回答...")
answer = generate_answer_with_qwen(query, context)
return answer
关键说明:
- 检索结果校验:若未检索到相似文本(context 为空),直接返回拒答提示;
- 链路串联:检索 → 上下文拼接 → 大模型回答,完成完整 RAG 流程;
步骤 8:问答测试
# 测试
if __name__ == "__main__":
# 初始化 Milvus
init_milvus()
# 加载集合
print("正在加载 Milvus 集合...")
collection.load()
print("Milvus 集合加载成功,可以开始问答。")
while True:
user_input = input("\n请输入问题(输入 'exit' 退出):")
if user_input.lower() == "exit":
# 在退出前,可以选择卸载集合以释放资源(可选)
collection.release()
print("已卸载 Milvus 集合,程序退出。")
break
answer = rag_qa(user_input)
print("\n回答:", answer)
print("-" * 50)
核心知识点:
collection.load():将 Milvus 集合加载到内存,提升检索速度(首次检索前必须执行);- 交互式循环:持续接收用户输入,输入
exit时卸载集合并退出; collection.release():卸载集合释放内存,避免资源占用;
二、常见问题与解决方案
| 问题现象 | 原因分析 | 解决方案 |
|---|---|---|
| 检索结果为空 | 1. 集合无数据;2. 查询向量不匹配 | 1. 确认已执行入库流程;2. 检查模型是否与入库阶段一致 |
| 通义千问调用失败 | 1. API Key 错误;2. 网络问题 | 1. 核对 DASHSCOPE_API_KEY;2. 确保网络可访问阿里云接口 |
| Milvus 加载集合失败 | 集合未创建 / 连接地址错误 | 1. 执行入库代码创建集合;2. 检查 MILVUS_HOST/MILVUS_PORT |
| 回答偏离知识库 | 检索 top_k 过小 / 提示词不严谨 | 1. 调大 top_k(如 5);2. 强化 system 角色的约束 |
三、扩展优化方向
- 检索优化:改用余弦相似度(COSINE)替代 L2,更贴合文本语义检索场景;
- 上下文优化:对检索结果进行重排序 / 去重,避免冗余信息;
- 流式回答:开启
stream=True,实现通义千问流式输出回答; - 多轮对话:增加对话历史记忆,支持上下文连续问答;
- 异常重试:为 Milvus 检索、API 调用添加重试机制(tenacity 库);
四、总结
本文完整拆解了 RAG 流程的「检索 + 生成」核心环节,实现了从用户问题 → 向量检索 → 大模型回答的端到端链路。代码与入库阶段形成闭环,兼顾工程性与易用性,可直接作为食品安全知识库问答系统的核心模块。后续可进一步优化交互形式(如 Web 界面)、扩展知识库类型(如 Word/Excel),完善 RAG 应用落地能力。
如果觉得本文有帮助,欢迎点赞 + 收藏,后续将更新《RAG 系统进阶优化》系列内容~
更多推荐

所有评论(0)