前一篇《大模型实战:专门为 RAG 设计一套「检索 / 解释 / 评估」Agent 组合》,已经为 RAG 系统拆出了三类专用 Agent:

  • 检索 Agent:专门负责「问题 → 文档片段」;
  • 解释 Agent:专门负责「文档片段 → 最终答案」;
  • 评估 Agent:专门负责给每次回答打分、贴标签。

但如果评估结果只是写进日志、可视化一下图表,而不会驱动系统“自动变好”,那这套评估其实只完成了 50% 的价值。

这一篇要做的,就是在这三类 Agent 外,再搭一圈:

质量保障闭环
评估 Agent 输出 → 样本分类 → 触发修复策略 → 结果反馈 / 迭代

整篇依然只讲工程实现和代码骨架,不涉及任何敏感语境。


一、整体闭环长什么样?

先用一行文字把要做的事讲清楚:

评估 Agent 输出 → 样本分类 → 触发修复策略 → 结果反馈 + 模型/配置迭代

核心目标是:

  • 每次问答都有一个结构化的评估结果(score + labels);
  • 低分 / 高风险样本不要“悄悄滑过去”,而是进入一个质量队列;
  • 根据标签自动触发不同修复动作(重检索、自我反思、换 Prompt……);
  • 修复后把效果记录下来,为后续调参/微调提供证据。

简单理解为:给 RAG 系统接上了一条“自动质检 + 返修流水线”


二、第一步:捕获评估事件

前一篇里,你的 EvaluatorAgent 输出结构类似:

{
    "score": 0.63,
    "labels": ["相关性一般", "轻微遗漏"],
    "checks": {
        "relevance": 0.6,
        "factuality": 0.8,
        "completeness": 0.5,
        "overconfidence": 0.2
    },
    "suggestion": "可适当增加文档上下文,提高完整性。"
}

现在要做的第一件事,是把这些评估结果打包成“质量事件”,塞进一个队列,用于后续异步处理。

from datetime import datetime

class QualityEvent:
    def __init__(self, request_id, question, docs, answer, evaluation, timestamp=None):
        self.request_id = request_id
        self.question = question
        self.docs = docs          # 本次用到的文档片段
        self.answer = answer      # 返回给用户的答案
        self.evaluation = evaluation  # EvaluatorAgent 的输出
        self.timestamp = timestamp or datetime.utcnow()

在你的 RAG Orchestrator 里,增加一个质量队列(示例用内存 Queue,实际建议换成 Redis / MQ):

from queue import Queue

quality_queue = Queue()

RAGOrchestrator.run() 的末尾加一段钩子逻辑:

class RAGOrchestrator:
    def __init__(self, retriever, explainer, evaluator, quality_queue=None):
        self.retriever = retriever
        self.explainer = explainer
        self.evaluator = evaluator
        self.quality_queue = quality_queue

    def run(self, request_id: str, question: str, context: dict | None = None) -> dict:
        context = context or {}

        # 1. 检索 + 2. 解释(略)
        docs_res = self.retriever.handle(question, context)
        docs = docs_res["docs"]
        expl_res = self.explainer.handle(question, docs, context)
        answer = expl_res["answer"]

        # 3. 评估
        evaluation = self.evaluator.handle(question, docs, answer, context)

        # 4. 把质量事件投进队列(只针对低于阈值的样本)
        if self.quality_queue and evaluation.get("score", 1.0) < 0.7:
            event = QualityEvent(
                request_id=request_id,
                question=question,
                docs=docs,
                answer=answer,
                evaluation=evaluation
            )
            self.quality_queue.put(event)

        return {
            "answer": answer,
            "meta": {
                "retrieval": docs_res.get("retrieval_logs", {}),
                "used_doc_ids": expl_res.get("used_doc_ids", []),
                "evaluation": evaluation,
            }
        }

到这里,每个“低分回答”都会变成一个 QualityEvent,进入统一队列,等待后续自动处理。


三、第二步:按标签把样本分桶

评估结果里已经有 labels,比如:

  • ["相关性低", "回答不完整"]
  • ["事实性错误"]
  • ["可能过度自信"]

我们可以按标签把事件分到不同“样本桶(SampleBucket)”,方便后續针对性处理。

from collections import defaultdict

class SampleBucket:
    def __init__(self):
        # label -> List[QualityEvent]
        self.buckets = defaultdict(list)

    def add(self, event: QualityEvent):
        labels = event.evaluation.get("labels", [])
        for label in labels:
            self.buckets[label].append(event)

    def get_samples(self, label: str, limit: int = 50):
        return self.buckets.get(label, [])[:limit]

再预先定义几类你关心的错误标签(可根据自己的 EvaluatorAgent 调整):

TARGET_LABELS = [
    "相关性低",
    "事实性错误",
    "回答不完整",
    "可能过度自信",
]

等下后台 worker 会从队列取出事件,然后调用 bucket.add(event) 完成分类。


四、第三步:为不同错误类型定义“修复策略”

闭环的核心,就是“发现问题之后做什么”。可以先从最常见的几类问题入手:

标签 典型问题 修复方向举例
相关性低 检索出的文档跟问题不太贴合 Query 重写 / 调整检索参数
事实性错误 数字、名称与文档不一致 增加召回、强调事实一致性约束
回答不完整 文档里有信息但答案只说了一半 增加上下文 / 重写回答 Prompt
可能过度自信 没文档支撑却用绝对语气断言 启用自我反思 / 调整语气策略

先写一个 FixStrategy 类,把“标签 → 修复逻辑”收拢在一起:

class FixStrategy:
    def __init__(self, retriever_agent, explainer_agent):
        self.retriever = retriever_agent
        self.explainer = explainer_agent

    def _rewrite_query(self, query: str) -> str:
        prompt = (
            "请将下面的问题改写成更适合检索的简洁查询语句,保留所有关键信息:\n"
            f"{query}\n"
            "只输出改写后的语句。"
        )
        return self.retriever.llm(prompt).strip()  # 这里假设 retriever 有 llm 属性

    def apply(self, label: str, event: QualityEvent) -> dict:
        """
        :return: 可选返回 {"new_score": float, "new_answer": str, ...}
        """
        if label == "相关性低":
            # 1)重写 Query
            new_query = self._rewrite_query(event.question)
            # 2)重新检索
            new_docs_res = self.retriever.handle(new_query, {})
            new_docs = new_docs_res["docs"]
            event.evaluation["rewritten_query"] = new_query
            event.evaluation["new_docs"] = new_docs
            # 可选:立即重答并评估一遍,计算 new_score
            return {}

        elif label == "事实性错误":
            # 1)尝试更严格的相似度限制或增加 top_k
            new_docs_res = self.retriever.handle(
                event.question,
                {"boost_similarity": True, "top_k": 8}
            )
            event.evaluation["new_docs"] = new_docs_res["docs"]
            return {}

        elif label == "回答不完整":
            # 标记需要上下文增强,后续可用 Prompt / 检索策略处理
            event.evaluation["need_context_enrich"] = True
            return {}

        elif label == "可能过度自信":
            # 标记需要自我反思流程
            event.evaluation["need_self_reflection"] = True
            return {}

        # 默认不做处理
        return {}

为了方便扩展,再加一个策略注册表:

class FixStrategyRegistry:
    def __init__(self):
        self._registry = {}

    def register(self, label: str, func):
        self._registry[label] = func

    def get(self, label: str):
        return self._registry.get(label)

初始化时注册:

fix_strategy = FixStrategy(retriever_agent, explainer_agent)
strategy_registry = FixStrategyRegistry()

for lbl in ["相关性低", "事实性错误", "回答不完整", "可能过度自信"]:
    strategy_registry.register(lbl, fix_strategy.apply)

五、第四步:后台 Worker 串起“分类 → 修复 → 反馈记录”

现在只缺一个后台线程:不停从 quality_queue 里取事件,分桶、套策略、记录效果。

5.1 反馈数据表

先用一个 SQLite 做示例(实际你可以换成 MySQL / ClickHouse 等):

import sqlite3

class FeedbackDB:
    def __init__(self, db_path: str = "quality_feedback.db"):
        self.conn = sqlite3.connect(db_path)
        self._init_table()

    def _init_table(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS feedback (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                request_id TEXT,
                original_score REAL,
                new_score REAL,
                fix_label TEXT,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        """)
        self.conn.commit()

    def record(self, request_id: str, original_score: float,
               new_score: float | None, fix_label: str):
        self.conn.execute(
            "INSERT INTO feedback (request_id, original_score, new_score, fix_label)"
            "VALUES (?, ?, ?, ?)",
            (request_id, original_score, new_score, fix_label)
        )
        self.conn.commit()

5.2 Worker 逻辑

import threading

feedback_db = FeedbackDB()
sample_bucket = SampleBucket()

def quality_worker():
    while True:
        event: QualityEvent = quality_queue.get()
        eval_res = event.evaluation
        original_score = eval_res.get("score", 0.0)
        labels = eval_res.get("labels", [])

        # 1)加入样本桶
        sample_bucket.add(event)

        # 2)按标签逐个触发策略
        for label in labels:
            strategy = strategy_registry.get(label)
            if not strategy:
                continue
            new_info = strategy(label, event) or {}
            new_score = new_info.get("new_score")  # 可选字段
            # 3)记录反馈
            feedback_db.record(event.request_id, original_score, new_score, label)

# 启动后台线程
t = threading.Thread(target=quality_worker, daemon=True)
t.start()

到这里,一个完整的“评估→分类→修复→反馈”闭环就跑起来了。
后续你可以在离线脚本里分析 feedback 表,看:

  • 哪些标签出现最频繁;
  • 哪些策略“修复成功率”较高(new_score > original_score 的比例);
  • 哪些策略几乎没用(需要重写 / 删除)。

六、怎么和现有监控 & Agent 体系结合?

你前面已经有:

  • 调用日志(LLMCallLog);
  • 工作流 Trace;
  • RAG 专用的检索 / 解释 / 评估 Agent;
  • 可视化看板。

现在多出的只是几块:

  1. 质量事件队列
    • 可以替换为 Redis List、Kafka Topic 等,方便多实例消费;
  2. 样本桶 + 策略注册表
    • 按标签管理样本和修复逻辑;
  3. 反馈表
    • 为后续“参数调优 / 模型微调 / Prompt 优化”提供数据支持。

在看板层,你可以新增几张图:

  • 按标签统计最近 N 天的低分样本数量(柱状图 / 堆叠图);
  • 按策略统计“修复成功率”(基于 feedback 表);
  • 低分样本的 TOP 问题类型(帮助你优先做哪一类优化)。

七、如何小成本地先试起来?

不建议一上来就做得很复杂,可以按下面的节奏渐进式落地:

  1. 第一步:只接队列 + 样本桶

    • 把所有 score < 阈值的事件放进 quality_queue
    • Worker 只做分类,不做策略;
    • 定期从 SampleBucket 或 DB 导出样本,让你 offline 分析。
  2. 第二步:给“相关性低”加一个最简单的策略

    • 做 Query 重写 + 重新检索;
    • 对比重检索结果的相关度 & 用户反馈。
  3. 第三步:给“事实性错误”“过度自信”等再加策略

    • 结合你前一篇的自我反思 Agent;
    • 在 EvaluatorAgent 判定高风险时强制走一轮反思。
  4. 第四步:把修复数据接入可视化 & 自动调参

    • 比如平均修复后 score 提高了多少;
    • 哪个策略的 ROI 最高(少量开销换来大量质量提升)。

八、小结

这一篇做的事情,是把已有的:

  • 检索 / 解释 / 评估三类 Agent;
  • 调用监控与日志体系;
  • 图式工作流 Orchestrator;

再往前推了一步,补上了RAG 的自动化“质检 + 返修”闭环

  1. 对每次问答做结构化评估;
  2. 对低分 / 高风险样本进入质量事件队列;
  3. 按标签分桶,触发对应修复策略;
  4. 把修复效果记录下来,支持后续分析与优化。

你可以先在一个不那么敏感的小场景试跑,比如内部文档问答或技术 FAQ,然后再逐步扩展到更重要的链路。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐