大模型实战:构建 RAG 的自动化质量保障闭环
本文提出为RAG系统构建质量保障闭环,在原有检索、解释、评估三类Agent基础上,增加自动质检和返修流程。核心流程包括:捕获低分回答作为质量事件,按错误标签分类,触发针对性修复策略(如重检索、调整Prompt等),并记录修复效果。通过SQLite存储反馈数据,后台Worker异步处理质量队列,实现评估→分类→修复→反馈的完整闭环。建议从小规模试点开始,逐步扩展策略复杂度,最终将修复数据用于系统优化
前一篇《大模型实战:专门为 RAG 设计一套「检索 / 解释 / 评估」Agent 组合》,已经为 RAG 系统拆出了三类专用 Agent:
- 检索 Agent:专门负责「问题 → 文档片段」;
- 解释 Agent:专门负责「文档片段 → 最终答案」;
- 评估 Agent:专门负责给每次回答打分、贴标签。
但如果评估结果只是写进日志、可视化一下图表,而不会驱动系统“自动变好”,那这套评估其实只完成了 50% 的价值。
这一篇要做的,就是在这三类 Agent 外,再搭一圈:
质量保障闭环:
评估 Agent 输出 → 样本分类 → 触发修复策略 → 结果反馈 / 迭代
整篇依然只讲工程实现和代码骨架,不涉及任何敏感语境。
一、整体闭环长什么样?
先用一行文字把要做的事讲清楚:
评估 Agent 输出 → 样本分类 → 触发修复策略 → 结果反馈 + 模型/配置迭代
核心目标是:
- 每次问答都有一个结构化的评估结果(score + labels);
- 低分 / 高风险样本不要“悄悄滑过去”,而是进入一个质量队列;
- 根据标签自动触发不同修复动作(重检索、自我反思、换 Prompt……);
- 修复后把效果记录下来,为后续调参/微调提供证据。
简单理解为:给 RAG 系统接上了一条“自动质检 + 返修流水线”。
二、第一步:捕获评估事件
前一篇里,你的 EvaluatorAgent 输出结构类似:
{
"score": 0.63,
"labels": ["相关性一般", "轻微遗漏"],
"checks": {
"relevance": 0.6,
"factuality": 0.8,
"completeness": 0.5,
"overconfidence": 0.2
},
"suggestion": "可适当增加文档上下文,提高完整性。"
}
现在要做的第一件事,是把这些评估结果打包成“质量事件”,塞进一个队列,用于后续异步处理。
from datetime import datetime
class QualityEvent:
def __init__(self, request_id, question, docs, answer, evaluation, timestamp=None):
self.request_id = request_id
self.question = question
self.docs = docs # 本次用到的文档片段
self.answer = answer # 返回给用户的答案
self.evaluation = evaluation # EvaluatorAgent 的输出
self.timestamp = timestamp or datetime.utcnow()
在你的 RAG Orchestrator 里,增加一个质量队列(示例用内存 Queue,实际建议换成 Redis / MQ):
from queue import Queue
quality_queue = Queue()
在 RAGOrchestrator.run() 的末尾加一段钩子逻辑:
class RAGOrchestrator:
def __init__(self, retriever, explainer, evaluator, quality_queue=None):
self.retriever = retriever
self.explainer = explainer
self.evaluator = evaluator
self.quality_queue = quality_queue
def run(self, request_id: str, question: str, context: dict | None = None) -> dict:
context = context or {}
# 1. 检索 + 2. 解释(略)
docs_res = self.retriever.handle(question, context)
docs = docs_res["docs"]
expl_res = self.explainer.handle(question, docs, context)
answer = expl_res["answer"]
# 3. 评估
evaluation = self.evaluator.handle(question, docs, answer, context)
# 4. 把质量事件投进队列(只针对低于阈值的样本)
if self.quality_queue and evaluation.get("score", 1.0) < 0.7:
event = QualityEvent(
request_id=request_id,
question=question,
docs=docs,
answer=answer,
evaluation=evaluation
)
self.quality_queue.put(event)
return {
"answer": answer,
"meta": {
"retrieval": docs_res.get("retrieval_logs", {}),
"used_doc_ids": expl_res.get("used_doc_ids", []),
"evaluation": evaluation,
}
}
到这里,每个“低分回答”都会变成一个 QualityEvent,进入统一队列,等待后续自动处理。
三、第二步:按标签把样本分桶
评估结果里已经有 labels,比如:
["相关性低", "回答不完整"]["事实性错误"]["可能过度自信"]
我们可以按标签把事件分到不同“样本桶(SampleBucket)”,方便后續针对性处理。
from collections import defaultdict
class SampleBucket:
def __init__(self):
# label -> List[QualityEvent]
self.buckets = defaultdict(list)
def add(self, event: QualityEvent):
labels = event.evaluation.get("labels", [])
for label in labels:
self.buckets[label].append(event)
def get_samples(self, label: str, limit: int = 50):
return self.buckets.get(label, [])[:limit]
再预先定义几类你关心的错误标签(可根据自己的 EvaluatorAgent 调整):
TARGET_LABELS = [
"相关性低",
"事实性错误",
"回答不完整",
"可能过度自信",
]
等下后台 worker 会从队列取出事件,然后调用 bucket.add(event) 完成分类。
四、第三步:为不同错误类型定义“修复策略”
闭环的核心,就是“发现问题之后做什么”。可以先从最常见的几类问题入手:
| 标签 | 典型问题 | 修复方向举例 |
|---|---|---|
| 相关性低 | 检索出的文档跟问题不太贴合 | Query 重写 / 调整检索参数 |
| 事实性错误 | 数字、名称与文档不一致 | 增加召回、强调事实一致性约束 |
| 回答不完整 | 文档里有信息但答案只说了一半 | 增加上下文 / 重写回答 Prompt |
| 可能过度自信 | 没文档支撑却用绝对语气断言 | 启用自我反思 / 调整语气策略 |
先写一个 FixStrategy 类,把“标签 → 修复逻辑”收拢在一起:
class FixStrategy:
def __init__(self, retriever_agent, explainer_agent):
self.retriever = retriever_agent
self.explainer = explainer_agent
def _rewrite_query(self, query: str) -> str:
prompt = (
"请将下面的问题改写成更适合检索的简洁查询语句,保留所有关键信息:\n"
f"{query}\n"
"只输出改写后的语句。"
)
return self.retriever.llm(prompt).strip() # 这里假设 retriever 有 llm 属性
def apply(self, label: str, event: QualityEvent) -> dict:
"""
:return: 可选返回 {"new_score": float, "new_answer": str, ...}
"""
if label == "相关性低":
# 1)重写 Query
new_query = self._rewrite_query(event.question)
# 2)重新检索
new_docs_res = self.retriever.handle(new_query, {})
new_docs = new_docs_res["docs"]
event.evaluation["rewritten_query"] = new_query
event.evaluation["new_docs"] = new_docs
# 可选:立即重答并评估一遍,计算 new_score
return {}
elif label == "事实性错误":
# 1)尝试更严格的相似度限制或增加 top_k
new_docs_res = self.retriever.handle(
event.question,
{"boost_similarity": True, "top_k": 8}
)
event.evaluation["new_docs"] = new_docs_res["docs"]
return {}
elif label == "回答不完整":
# 标记需要上下文增强,后续可用 Prompt / 检索策略处理
event.evaluation["need_context_enrich"] = True
return {}
elif label == "可能过度自信":
# 标记需要自我反思流程
event.evaluation["need_self_reflection"] = True
return {}
# 默认不做处理
return {}
为了方便扩展,再加一个策略注册表:
class FixStrategyRegistry:
def __init__(self):
self._registry = {}
def register(self, label: str, func):
self._registry[label] = func
def get(self, label: str):
return self._registry.get(label)
初始化时注册:
fix_strategy = FixStrategy(retriever_agent, explainer_agent)
strategy_registry = FixStrategyRegistry()
for lbl in ["相关性低", "事实性错误", "回答不完整", "可能过度自信"]:
strategy_registry.register(lbl, fix_strategy.apply)
五、第四步:后台 Worker 串起“分类 → 修复 → 反馈记录”
现在只缺一个后台线程:不停从 quality_queue 里取事件,分桶、套策略、记录效果。
5.1 反馈数据表
先用一个 SQLite 做示例(实际你可以换成 MySQL / ClickHouse 等):
import sqlite3
class FeedbackDB:
def __init__(self, db_path: str = "quality_feedback.db"):
self.conn = sqlite3.connect(db_path)
self._init_table()
def _init_table(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS feedback (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id TEXT,
original_score REAL,
new_score REAL,
fix_label TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
def record(self, request_id: str, original_score: float,
new_score: float | None, fix_label: str):
self.conn.execute(
"INSERT INTO feedback (request_id, original_score, new_score, fix_label)"
"VALUES (?, ?, ?, ?)",
(request_id, original_score, new_score, fix_label)
)
self.conn.commit()
5.2 Worker 逻辑
import threading
feedback_db = FeedbackDB()
sample_bucket = SampleBucket()
def quality_worker():
while True:
event: QualityEvent = quality_queue.get()
eval_res = event.evaluation
original_score = eval_res.get("score", 0.0)
labels = eval_res.get("labels", [])
# 1)加入样本桶
sample_bucket.add(event)
# 2)按标签逐个触发策略
for label in labels:
strategy = strategy_registry.get(label)
if not strategy:
continue
new_info = strategy(label, event) or {}
new_score = new_info.get("new_score") # 可选字段
# 3)记录反馈
feedback_db.record(event.request_id, original_score, new_score, label)
# 启动后台线程
t = threading.Thread(target=quality_worker, daemon=True)
t.start()
到这里,一个完整的“评估→分类→修复→反馈”闭环就跑起来了。
后续你可以在离线脚本里分析 feedback 表,看:
- 哪些标签出现最频繁;
- 哪些策略“修复成功率”较高(new_score > original_score 的比例);
- 哪些策略几乎没用(需要重写 / 删除)。
六、怎么和现有监控 & Agent 体系结合?
你前面已经有:
- 调用日志(LLMCallLog);
- 工作流 Trace;
- RAG 专用的检索 / 解释 / 评估 Agent;
- 可视化看板。
现在多出的只是几块:
- 质量事件队列:
- 可以替换为 Redis List、Kafka Topic 等,方便多实例消费;
- 样本桶 + 策略注册表:
- 按标签管理样本和修复逻辑;
- 反馈表:
- 为后续“参数调优 / 模型微调 / Prompt 优化”提供数据支持。
在看板层,你可以新增几张图:
- 按标签统计最近 N 天的低分样本数量(柱状图 / 堆叠图);
- 按策略统计“修复成功率”(基于 feedback 表);
- 低分样本的 TOP 问题类型(帮助你优先做哪一类优化)。
七、如何小成本地先试起来?
不建议一上来就做得很复杂,可以按下面的节奏渐进式落地:
-
第一步:只接队列 + 样本桶
- 把所有 score < 阈值的事件放进
quality_queue; - Worker 只做分类,不做策略;
- 定期从
SampleBucket或 DB 导出样本,让你 offline 分析。
- 把所有 score < 阈值的事件放进
-
第二步:给“相关性低”加一个最简单的策略
- 做 Query 重写 + 重新检索;
- 对比重检索结果的相关度 & 用户反馈。
-
第三步:给“事实性错误”“过度自信”等再加策略
- 结合你前一篇的自我反思 Agent;
- 在 EvaluatorAgent 判定高风险时强制走一轮反思。
-
第四步:把修复数据接入可视化 & 自动调参
- 比如平均修复后 score 提高了多少;
- 哪个策略的 ROI 最高(少量开销换来大量质量提升)。
八、小结
这一篇做的事情,是把已有的:
- 检索 / 解释 / 评估三类 Agent;
- 调用监控与日志体系;
- 图式工作流 Orchestrator;
再往前推了一步,补上了RAG 的自动化“质检 + 返修”闭环:
- 对每次问答做结构化评估;
- 对低分 / 高风险样本进入质量事件队列;
- 按标签分桶,触发对应修复策略;
- 把修复效果记录下来,支持后续分析与优化。
你可以先在一个不那么敏感的小场景试跑,比如内部文档问答或技术 FAQ,然后再逐步扩展到更重要的链路。
更多推荐



所有评论(0)