到目前为止,已经有了:

  • 为 RAG 拆出来的三类专用 Agent:
    • 检索 Agent(Retriever);
    • 解释 Agent(Explainer);
    • 评估 Agent(Evaluator);
  • 基于评估结果的质量保障闭环
    每次问答后会产出结构化的 evaluation(score + labels + checks),低分样本会被打进 QualityEvent 队列。

但如果这些质量事件只用来看报表,不进一步“喂回给模型”,那系统的能力顶多是“会自我监控”,谈不上“会自我变强”。

这篇就做一件事:

在已有质量闭环的基础上,
自动把低质量回答转成「可微调的数据集」​
接到你的微调/对齐管道里,让模型真正做到“越用越懂你的场景”。


一、整体思路:从事件到数据集的四个阶段

从工程角度,把整条链路拆成四段,你后面可以一段段落地:

  1. 采集:把每次 RAG 调用的「问题 + 文档 + 答案 + 评估结果」结构化存下来;
  2. 清洗与标注:基于评估标签,自动识别错误类型、提取上下文、生成“纠正指令”;
  3. 样本构造与格式化:生成 (prompt, completion, label, confidence) 这种训练样本,导出成 JSONL 等格式;
  4. 微调与回流:把数据喂给微调管道,得到新版本模型,再通过 A/B 测试逐步上线。

可以用一个文字版管线来理解:

RAG 请求
  └─► RAG Orchestrator(检索 + 解释 + 评估)
         └─► QualityEvent(仅低分样本)
                └─► 数据清洗 & 标注
                       └─► 训练样本构造(JSONL)
                              └─► 微调脚本 / 平台
                                     └─► 新模型版本
                                            └─► A/B 测试 & 回流

这篇重点讲前 3 段,也就是**“从 QualityEvent 到可用训练数据”**这块。


二、采集层:把质量事件可靠落盘

前一篇里你已经有了 QualityEvent 结构,建议统一成类似这样(Python 表示):

class QualityEvent:
    def __init__(self, request_id, question, docs, answer, evaluation, timestamp=None):
        self.request_id = request_id           # 本次请求唯一ID
        self.question = question               # 用户问题
        self.docs = docs                       # 检索到的文档片段列表
        self.answer = answer                   # 返回给用户的答案
        self.evaluation = evaluation           # EvaluatorAgent 的结构化评估
        self.timestamp = timestamp or datetime.utcnow()

采集要求:​

  • 只对低于某阈值(比如 score < 0.7)的样本生成 QualityEvent
  • 用一个统一通道(Redis / Kafka / DB 表)来收集,避免散落在不同日志里;
  • 每个事件至少有:
    • request_id
    • question
    • docs(含 idcontent
    • answer
    • evaluation = {"score": float, "labels": [...], "checks": {...}}

可以在 RAG Orchestrator 的最后一段加一个钩子:

if evaluation["score"] < 0.7:
    event = {
        "request_id": request_id,
        "question": question,
        "docs": docs,              # list[{"id":..., "content":..., "score":...}]
        "answer": answer,
        "evaluation": evaluation
    }
    redis.rpush("quality_events", json.dumps(event, ensure_ascii=False))

然后由一个独立的 ingester 进程周期性拉取并写入数据库(SQLite/MySQL 都行):

CREATE TABLE raw_events (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  request_id TEXT UNIQUE,
  question TEXT,
  docs TEXT,        -- JSON
  answer TEXT,
  evaluation TEXT,  -- JSON
  timestamp DATETIME,
  processed BOOLEAN DEFAULT FALSE
);

这一步的核心目的只有一个:稳定地把“有问题的样本”存起来,供后续加工。​


三、清洗与标注:从 raw event 变成“带标签的样本骨架”

有了 raw_events 表之后,下一步就是对事件做结构化的清洗与标注,包括:

  1. 确定错误类型(label);
  2. 选取合适的文档片段做上下文;
  3. 生成一段“纠正型提示语”(correction_prompt);
  4. 给每条样本算一个置信度(confidence)。

3.1 错误类型识别

你前面 EvaluatorAgent 已经会给出如:

  • "相关性低" / "low_relevance"
  • "事实性错误" / "factuality_error"
  • "回答不完整" / "incomplete_answer"
  • "可能过度自信" / "overconfidence"

可以定义一个简单的优先级规则:

def detect_label(evaluation: dict) -> str:
    labels = evaluation.get("labels", [])
    if not labels:
        return "unspecified"
    priority = ["factuality_error", "incomplete_answer", "low_relevance", "overconfidence"]
    for p in priority:
        if p in labels:
            return p
    return labels[0]

3.2 上下文选择

文档片段在 docs 列表中,一般已经带有相似度得分;这里可以简单取 top3:

def select_context(docs: list[dict], top_k: int = 3) -> list[dict]:
    # 如果未排序,可以按 score 排一下
    docs_sorted = sorted(docs, key=lambda d: d.get("score", 0), reverse=True)
    return docs_sorted[:top_k]

3.3 生成“纠正型 Prompt”

根据不同错误类型给出不同的说明,这个说明就是未来微调时的“指令部分”:

def build_correction_instruction(label: str) -> str:
    if label == "factuality_error":
        return (
            "请根据以下文档内容重新回答问题,注意:\n"
            "1. 必须严格依据文档中的事实,避免任何事实性错误;\n"
            "2. 如果文档中没有相关信息,请明确回答“资料中未提及”。"
        )
    if label == "incomplete_answer":
        return (
            "请根据以下文档内容重新回答问题,注意:\n"
            "1. 尽可能覆盖文档中的所有相关要点;\n"
            "2. 可以增加必要的解释,使回答更完整。"
        )
    if label == "low_relevance":
        return (
            "请先改写问题,使其更贴近文档主题,再基于文档重新回答;\n"
            "回答时要确保内容与文档高度相关。"
        )
    if label == "overconfidence":
        return (
            "请根据以下文档内容重新回答问题,注意:\n"
            "1. 使用更谨慎、中性的语气;\n"
            "2. 避免使用“肯定”“必须”“一定”等绝对化表达。"
        )
    return "请根据以下文档内容重新回答问题。"

3.4 组合成标注样本骨架

综合上面步骤,对一条 raw event 生成一个“标注样本骨架”:

def annotate_event(raw: dict) -> dict:
    label = detect_label(raw["evaluation"])
    context_docs = select_context(raw["docs"], top_k=3)
    instruction = build_correction_instruction(label)
    score = raw["evaluation"].get("score", 0.0)
    confidence = min(1.0, score * 1.5)  # 简单放大一点,高分样本权重大

    return {
        "sample_id": f"{raw['request_id']}_{label}",
        "question": raw["question"],
        "context_docs": context_docs,
        "original_answer": raw["answer"],
        "label": label,
        "instruction": instruction,
        "score": score,
        "confidence": confidence
    }

到这一步,你有了一个结构化的“样本骨架”,里面包含:

  • 问题;
  • 文档上下文;
  • 原始答案;
  • 错误类型;
  • 一段解释清楚期望行为的指令。

四、样本构造:生成 (prompt, completion) 训练对

接下来要做的是把这些骨架,变成能直接喂给微调框架的 (prompt, completion)

这里有两种做法:

  1. 自动生成修正答案(完全自动化,适合快速迭代 / 先做一版);
  2. 半自动(给出 prompt,让人工或更强模型生成高质量答案,再作为金标)。

4.1 Prompt 构造:把文档、指令和问题拼在一起

建议 Prompt 大致结构:

[文档1] ...
[文档2] ...
[文档3] ...

<纠正型指令>

# 问题
{question}

代码示例:

def build_prompt(annotated: dict) -> str:
    ctx_parts = []
    for i, d in enumerate(annotated["context_docs"], start=1):
        ctx_parts.append(f"[文档{i}] {d['content'][:500]}")
    ctx_block = "\n".join(ctx_parts)

    prompt = f"{ctx_block}\n\n{annotated['instruction']}\n\n# 问题\n{annotated['question']}"
    return prompt

4.2 Completion 构造:如何得到“更好的答案”

简单模式(适合作为示例 / 过渡方案)​
不真的生成新答案,而是把“纠正提示 + 旧答案”拼在一起:

def build_naive_completion(annotated: dict) -> str:
    if annotated["label"] == "factuality_error":
        prefix = "【纠正】根据文档内容,更正之前回答中的事实性错误:"
    elif annotated["label"] == "incomplete_answer":
        prefix = "【补充】在之前回答的基础上补充更多关键信息:"
    elif annotated["label"] == "overconfidence":
        prefix = "【调整】用更谨慎的语气重新表述:"
    else:
        prefix = "【重述】根据文档重新回答:"
    return f"{prefix}{annotated['original_answer']}"

更推荐的模式
用一个质量更好的 LLM(可以是你已有的“审稿 Agent”或外部高性能模型)基于 prompt 再生成一遍“理想答案”,作为 completion

def build_llm_completion(llm, annotated: dict) -> str:
    prompt = build_prompt(annotated)
    # llm 是你封装好的聊天函数,内部可带 system 提示
    return llm(prompt).strip()

你可以先用简单模式跑一轮,验证整个数据管线是否通;
再逐步替换成“高质量 LLM 自动修正”,甚至后续引入人工审核。


五、格式化与导出:产出微调数据集

大多数微调工具链都乐于接受 JSONL 形式的数据:

{"prompt": "...", "completion": "...", "label": "...", "confidence": 0.85}

你可以写一个简单的 DatasetBuilder,把前面所有步骤串起来:

class DatasetBuilder:
    def __init__(self, conn, output_path="rag_quality_tuning.jsonl"):
        self.conn = conn  # raw_events 所在的 DB 连接
        self.output_path = output_path

    def fetch_pending_events(self, limit=100) -> list[dict]:
        cur = self.conn.cursor()
        cur.execute("""
          SELECT request_id, question, docs, answer, evaluation
          FROM raw_events
          WHERE processed = FALSE
          ORDER BY timestamp DESC
          LIMIT ?
        """, (limit,))
        rows = cur.fetchall()
        events = []
        for r in rows:
            events.append({
                "request_id": r[0],
                "question": r[1],
                "docs": json.loads(r[2]),
                "answer": r[3],
                "evaluation": json.loads(r[4])
            })
        return events

    def mark_processed(self, request_ids: list[str]):
        if not request_ids:
            return
        cur = self.conn.cursor()
        cur.executemany(
            "UPDATE raw_events SET processed = TRUE WHERE request_id = ?",
            [(rid,) for rid in request_ids]
        )
        self.conn.commit()

    def process_batch(self, llm=None, min_confidence=0.6):
        events = self.fetch_pending_events(limit=50)
        if not events:
            return 0

        with open(self.output_path, "a", encoding="utf-8") as f:
            processed_ids = []
            for ev in events:
                annotated = annotate_event(ev)
                if annotated["confidence"] < min_confidence:
                    continue

                prompt = build_prompt(annotated)
                completion = (
                    build_llm_completion(llm, annotated)
                    if llm is not None
                    else build_naive_completion(annotated)
                )

                record = {
                    "prompt": prompt,
                    "completion": completion,
                    "label": annotated["label"],
                    "confidence": annotated["confidence"],
                    "score": annotated["score"]
                }
                f.write(json.dumps(record, ensure_ascii=False) + "\n")
                processed_ids.append(ev["request_id"])

        self.mark_processed(processed_ids)
        return len(processed_ids)

定时任务里跑就行:

builder = DatasetBuilder(conn)
while True:
    n = builder.process_batch(llm=high_quality_llm)
    print(f"本轮生成训练样本 {n} 条")
    time.sleep(60)

六、如何接入你已有的微调管道

这一部分不展开具体平台(HuggingFace / SageMaker / 自建训练集群都可以),给两个落地建议:

  1. 按版本管理数据

    • 每次导出的 JSONL 带版本号,比如 rag_quality_tuning_v001.jsonl
    • 用 Git + DVC 或对象存储版本化管理,便于回溯。
  2. 做成一个固定的训练 recipe

    • 约定:新数据积累到 N 条(比如 1000 条)就触发一轮 SFT(LoRA 优先);
    • 训练完生成一个 model_rag_qa_vXYZ 版本;
    • 接到你之前的 A/B 测试与工作流调度框架上,灰度一部分流量验证效果。

七、实践中的几个小建议

  1. 不要一开始就“全自动大规模微调”

    • 建议先小批量(几百条)跑一遍,人工 spot check 一部分样本;
    • 确认数据质量 OK,再扩大规模。
  2. 可以先只用在“辅助模型”上

    • 比如只微调用来做 Query 重写的“小模型”,或者只给 Rerank 模型用;
    • 主回答模型可以后置一两个版本,等效果验证稳了再切。
  3. 样本权重要用好 confidence

    • 在训练脚本里对高 confidence 样本赋予更高 loss 权重;
    • 对 score 很低但被标为“高风险”的样本,可以留给人工先看一轮。
  4. 和可观测看板打通

    • 在看板上加一个「训练样本产出速率」图;
    • 再加一个「各 label 类型占比」饼图,一眼就能看到系统主要问题集中在哪。

八、小结

这一篇做的事情,是把之前搭好的:

  • RAG 专用 Agent(检索 / 解释 / 评估);
  • 质量评估与事件队列;
  • 工作流和监控体系,

进一步往前推了一步,形成一个**“从质量事件到微调数据集”的自动化闭环**。

关键步骤回顾:

  1. 为低分/高风险回答生成 QualityEvent,可靠落盘;
  2. 基于评估标签自动识别错误类型,提取上下文,生成“纠正型指令”;
  3. 构造 (prompt, completion, label, confidence) 样本,导出为 JSONL;
  4. 把数据喂给你现有的微调管道,通过小步快跑 + A/B 测试逐步提升模型效果。

可以先在一个非核心场景试跑,比如内部知识库问答或产品 FAQ:

  • 先只用简单的“拼接式 completion”;
  • 验证数据流和训练流程畅通;
  • 再逐步引入更强 LLM 做自动修正、引入人工 spot check、放大数据规模。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐