场景:一端是探索性很强的文本理解与指标设计,另一端是可重复的批量抓取、抽取、编码、建模与输出报告。本地 + 批处理的优势是可控与低成本,但挑战是复现、追踪与长期维护。下面我把图片里的七条原则逐条落到一套可执行的蓝图:哪些步骤写成模块、哪些用 Notebook、哪些做成工作流节点,以及如何在本地实现复现与可追踪


0. 总体目标:把你的研究流程变成“可重跑、可对比、可审计”的批处理管线

你最终需要的不是“一个能跑的脚本”,而是一套可以回答这些问题的系统:

  • 这次项目追踪的数据来自哪些页面、抓取时间是什么、是否有缺失?
  • 文本抽取与编码使用了哪个版本的规则/提示词/模型?
  • QCA 数据集每个变量的赋值依据是什么(证据链)?
  • 报告里的结论对应哪次运行、哪份数据、哪组参数?
  • 换一个阈值/换一个编码规则时,结果差异能否被比较与复现?

实现这些目标的核心不是堆工具,而是把探索性与可重复性分层:探索用 Notebook,稳定流程用模块 + 工作流节点,执行由调度器统一编排,所有产物版本化。


1)原则一:开发环境同时支持“原型闭环”与“生产交互” —— 在本地怎么做?

你的落地方式

  • 原型闭环(写/评估/分析):以 Notebook 为中心做变量定义、编码规则试验、抽样验证、可视化诊断。
  • 生产交互(部署/监控/调试):在本地用“工作流运行器 + 日志 + 运行档案”替代云上的运维体系;每次运行都生成一个可追踪的 run 目录与元数据。

具体机制(本地也能做到“像生产一样”)

  • 统一入口:python -m pipeline run --config configs/run.yaml
  • 每次运行生成:runs/2026-01-28_010203/,包含
    • manifest.json(代码版本、参数、数据快照、模型与提示词版本)
    • logs/(按节点日志)
    • artifacts/(中间产物与最终报告)
    • qc/(抽样核查记录、评测分数)

这就是“与生产部署交互”在本地的等价实现:你不是把东西部署到集群,而是把一次运行当作可审计的“发布单元”


2)原则二:本地开发,但在“类生产环境”评估 —— 本地基础设施的替代方案

你是本地环境,依然可以用“类生产一致性”降低漂移:

  • conda/uv/poetry 锁依赖(强烈建议 uvpoetry)。
  • Docker(可选) 固化运行环境:不是为了上云,而是为了让结果能复现、换机器能跑。
  • 固定数据快照:抓取原始 HTML、解析后的 JSON、最终表格分别存档,任何一步可回放。

最关键的一条:评估必须基于“固定快照”而不是每次都从网上重新抓。否则你无法判断差异来自“规则变化”还是“网页变化”。


3)原则三:Notebook 必要但不充分 —— 哪些必须 Notebook,哪些必须模块化?

适合 Notebook 的任务(探索性、解释性强)

  1. 变量体系设计与 operationalization
    • 例如:政策创新“执行保障(IG)”的指标维度、可观测文本证据、赋值规则与阈值讨论
  2. 编码规则/提示词试验
    • 小样本对比:规则 A vs 规则 B 的一致性、歧义点分析
  3. 抽样质检与误差分析
    • 检查“抽取字段缺失”“同名项目合并错误”“编码偏差”
  4. fsQCA 的模型探索
    • 条件组合敏感性分析、覆盖度/一致性诊断可视化

Notebook 的定位:做“定义问题—试验—解释”,产出规则、阈值与最终模块的需求说明。

必须写成模块(可重复、可批处理、可测试)

  1. 抓取与更新(网页抓取、接口访问、下载、缓存)
  2. 解析与规范化(HTML→结构化 JSON;字段清洗、时间格式统一、去重)
  3. 文本切分与证据抽取(段落定位、关键词/模式匹配、引用片段保存)
  4. 编码与赋值(把证据→变量值;含 fsQCA 校准)
  5. 数据集构建(宽表、长表、字典表、变量说明表)
  6. 报告生成(模板化渲染:Markdown/Word/PDF)
  7. AI 辅助分析的调用封装(提示词版本、重试、限流、缓存、输出校验)

模块的定位:把“能解释清楚的流程”固化成“能稳定重复的函数/命令”


4)原则四:工作流是抽象 —— 你的工作流节点应该怎么切?

把你的全链路拆成“少而稳”的节点,节点之间用文件工件(artifact)连接,便于追踪与复现。

工作流 DAG(本地批处理版本)

Flow 0:初始化与快照

  • Node A0:init_run():生成 run_id、写 manifest、冻结配置
  • Node A1:snapshot_inputs():记录项目清单版本、关键词表版本、规则版本

Flow 1:政策项目追踪数据获取

  • Node B1:fetch_sources():抓取原始网页/附件 → raw/html/raw/files/
  • Node B2:parse_sources():解析成结构化 → interim/records.jsonl
  • Node B3:dedupe_linkage():去重、同名归并、建立项目 ID → interim/projects.parquet

Flow 2:证据抽取与变量编码(面向 QCA)

  • Node C1:extract_evidence():针对每个项目抽取证据片段 → interim/evidence.jsonl
  • Node C2:ai_assist_label()(可选):LLM 辅助候选标签/摘要/疑点 → interim/ai_labels.jsonl
  • Node C3:human_review_queue()(可选):生成待复核清单(高风险/低置信)→ qc/review.csv
  • Node C4:encode_conditions():规则+(可选)LLM结果 → 条件变量赋值 → processed/qca_conditions.csv
  • Node C5:calibrate_fsQCA():把原始赋值校准为集合隶属度 → processed/fsqca_matrix.csv

Flow 3:分析与报告

  • Node D1:run_qca():生成解与指标 → results/qca_results.json
  • Node D2:generate_report():输出 Markdown/Docx/PDF → artifacts/report.*
  • Node D3:publish_bundle():打包运行产物与索引 → runs/<run_id>.zip(可选)

这套节点划分的原则是:每个节点都有清晰输入输出、能单独重跑、能保存中间结果


5)原则五:工作流框架怎么选 —— 本地的“优秀人机工程学”标准与推荐

你是本地批处理,我的建议是两层:

  • 第一层(轻量且够用)prefectdagster 任选其一
  • 第二层(更轻但可追踪差些)make + pythoninvoke/typer(适合极简)

结合你之前讨论过 Prefect,我更倾向 Prefect(本地 profile),原因是:

  • 节点化、重试、日志、运行记录(run)天然支持
  • 你可以先不引入复杂部署,仅用本地 agent/worker 执行
  • 对“探索→固化→可追踪”迁移友好

若你更想强调数据版本、资产化(asset)与强类型依赖,Dagster也很好,但上手心智略重。

选型底线(本地也要满足)

  • 能从命令行一键跑全流程
  • 能单节点重跑、失败可定位
  • 能保存运行元数据与工件索引
  • 能把参数、代码、数据快照写入 manifest

6)原则六:调度器负责执行 —— 本地调度怎么做才能“可伸缩+高可用”?

你是本地,不需要上 Kubernetes,也能做出“足够稳”的调度体系。推荐三层递进:

方案 A(起步最佳):Prefect 本地 worker + OS 定时任务

  • 调度:Windows 任务计划程序 / macOS launchd / Linux cron
  • 执行:Prefect flow run(本机并发可控)
  • 优点:最少运维、可追踪性强
  • 适用:每天/每周批处理、任务量中等

方案 B(更稳健):Prefect + SQLite/Postgres(本地)+ 失败告警

  • 用本地数据库保存运行状态
  • 失败自动重试 + 邮件/企业微信(如果你需要)
  • 适用:你开始依赖“稳定按时产出”

方案 C(并发提升):Prefect + 多进程/线程 + 队列策略

  • 对“抓取/解析/抽取”分区并行
  • 注意:并行要绑定缓存与限速,避免网站封禁与结果不一致

你当前阶段建议从 A 起步,确保“每次运行都有档案”,这是最关键的生产化收益。


7)原则七:复现与可追踪 —— 你需要哪几张“底层账本”?

在本地实现复现与追踪,关键是建立三类账本:版本账本、数据账本、决策账本

7.1 版本账本(Version Ledger)

每次 run 写入:

  • git commit(或打包时写入当前代码 hash)
  • 依赖锁文件 hash(poetry.lock/uv.lock
  • 编码规则版本(例如 rules/ig_v3.yaml
  • prompt 版本(例如 prompts/extract_evidence_v5.md
  • 模型与参数(如温度、top_p、max_tokens)

7.2 数据账本(Data Ledger)

  • 原始抓取:HTML/附件按来源与时间存档(不可覆盖)
  • 解析结果:结构化 records(JSONL/Parquet)
  • QCA 数据:conditions、校准矩阵、变量字典(含单位、阈值、缺失处理)
  • 每一步产物都带上上游输入的 hash(或至少记录文件路径与生成时间)

7.3 决策账本(Decision Ledger,尤其对 AI 辅助分析)

对每条 AI 参与的编码/摘要记录:

  • 输入文本片段的来源指针(URL/文件、段落位置)
  • LLM 输出
  • 你最终采用的标签/赋值
  • 置信度或规则触发原因(为什么这样赋值)

这样你在写论文、做审计或同行复核时能回答:“这个变量值不是凭感觉,是有证据链的。”


8)建议的目录结构(可直接照抄实施)

id: local-qca-blueprint-structure
name: 本地批处理工作流目录结构(政策追踪-QCA-报告-AI辅助)
type: markdown
content: |-
  project/
  ├─ configs/
  │  ├─ run.yaml                  # 本次运行参数:日期范围、项目清单、阈值、模型开关等
  │  └─ sources.yaml              # 数据源列表、抓取规则、限速策略
  ├─ rules/
  │  ├─ variables.yaml            # 变量字典:定义、取值、证据类型、缺失处理
  │  ├─ calibration.yaml          # fsQCA 校准阈值与方法
  │  └─ ig_rules.yaml             # IG 等关键变量的规则化编码规范(版本化)
  ├─ prompts/
  │  ├─ extract_evidence.md
  │  ├─ label_conditions.md
  │  └─ report_summary.md
  ├─ src/
  │  ├─ pipeline/
  │  │  ├─ flow.py                # 工作流定义(Prefect/Dagster)
  │  │  ├─ io.py                  # 读写工件、hash、manifest
  │  │  ├─ fetch.py               # 抓取与缓存
  │  │  ├─ parse.py               # 解析与规范化
  │  │  ├─ evidence.py            # 证据抽取
  │  │  ├─ encode.py              # 变量编码(规则+AI结果)
  │  │  ├─ calibrate.py           # fsQCA 校准
  │  │  ├─ qca.py                 # QCA 计算与导出
  │  │  └─ report.py              # 报告生成
  │  └─ cli.py                    # 命令行入口:run/step/replay/compare
  ├─ notebooks/
  │  ├─ 01_variable_design.ipynb   # 变量设计与样本讨论
  │  ├─ 02_prompt_experiments.ipynb# 提示词/抽取策略试验
  │  ├─ 03_qca_exploration.ipynb   # QCA 探索与敏感性分析
  │  └─ 04_error_analysis.ipynb    # 误差分析与质检
  ├─ data/
  │  ├─ external/                 # 手工输入:项目清单、行政区划、机构名单等
  │  ├─ reference/                # 词表、同义词、实体对齐表
  │  └─ cache/                    # 可清理缓存
  ├─ runs/
  │  ├─ 2026-01-28_010203/
  │  │  ├─ manifest.json
  │  │  ├─ raw/
  │  │  ├─ interim/
  │  │  ├─ processed/
  │  │  ├─ results/
  │  │  ├─ artifacts/
  │  │  ├─ qc/
  │  │  └─ logs/
  └─ README.md

这套结构的用意是:notebooks 只做探索;src 只放可重复模块;runs 是“证据与复现”的唯一真相来源


9)把“AI 辅助分析”放在正确位置:辅助、可回退、可审计

在你的场景里,AI 最适合做三类“加速器”,但要严格可控:

  1. 候选证据提取/段落定位:提高人工阅读效率
  2. 候选标签与摘要:帮助形成初步编码建议
  3. 报告语言润色与结构化摘要:减少写作成本

但 AI 不应该直接“最终定值”,除非你有明确的门控机制。建议采用:

  • 规则优先:能规则化的先规则化(例如关键词、模式、字段缺失处理)
  • AI 仅补盲区:对模糊文本、长文摘要、跨段推断才调用
  • 低置信/高影响进入人工复核队列:例如影响 QCA 关键条件的编码
  • 全量留痕:prompt、输入片段、输出、最终采用情况写入 decision ledger

这样 AI 既提升效率,又不会破坏学术可复核性。


10)一键运行与日常工作方式(你能“马上用起来”的节奏)

日常节奏建议(批处理本地)

  • 每日/每周一次:跑 Flow 1 更新追踪数据(抓取→解析→归并)
  • 每次重要更新后:跑 Flow 2 生成最新 QCA 变量赋值与校准矩阵
  • 写阶段性成果时:跑 Flow 3 输出报告(含图表、解、变量解释与证据附录)

你的工作动作如何分配

  • Notebook:每当变量定义、阈值、编码规则要改,先在小样本 Notebook 中验证
  • 模块:规则稳定后固化到 rules/*.yamlsrc/pipeline/*.py
  • 工作流:每次发布/汇报前跑一次全流程,生成可追踪的 run 包

如果你愿意,我可以在你确认“使用 Prefect 还是 Dagster(或你想纯 Python + Make)”之后,把上面蓝图进一步细化成节点级别的伪代码与配置样例(例如 run.yaml、变量字典 variables.yaml、IG 编码规则示例、以及 manifest.json 字段设计),保证你能直接照着搭起来。你只需要回复两点:
1)你倾向 Prefect 还是 Dagster?
2)AI 辅助分析你打算用哪类模型/接口(本地模型、OpenAI 类、还是暂不接入)?

Prefect + 千问(Qwen)在本地批处理场景的可执行工作流方案(含节点设计、配置样例、代码骨架、复现与追踪机制)

你已确定:Prefect + 本地批处理 + 接入千问。下面我把之前“七条原则”进一步落成可以直接开工的工程蓝图:包含目录、节点、配置文件样例、Prefect Flow 代码骨架、千问调用封装、缓存/追踪/复现与“证据链(decision ledger)”设计。


1. 目标与边界:你要实现的“可执行”到底是什么?

你的管线要做到三件事(这三件事决定设计):

  1. 可重跑:同一份数据快照 + 同一份规则/提示词 + 同一版本依赖 ⇒ 可复现同样输出
  2. 可追踪:报告结论能回到:哪条证据、哪次抓取、哪次模型调用、哪版规则
  3. 可控 AI:千问只做“辅助加速器”,且可回退、可门控、可审计

2. 推荐目录结构(Prefect + 本地 runs 归档)

沿用之前结构,新增 qwen/templates/,并明确 runs/ 为唯一事实来源。

id: prefect-qwen-local-blueprint
name: Prefect+千问(本地批处理)目录结构与关键文件
type: markdown
content: |-
  project/
  ├─ configs/
  │  ├─ run.yaml                  # 本次运行参数(日期、来源、阈值、AI开关、并发等)
  │  ├─ sources.yaml              # 数据源配置(URL、抓取策略、频率、解析器)
  │  └─ qwen.yaml                 # 千问接入配置(模型名、温度、重试、限速、缓存)
  ├─ rules/
  │  ├─ variables.yaml            # 变量字典(定义/取值/证据要求/缺失处理)
  │  ├─ calibration.yaml          # fsQCA 校准阈值、隶属度映射
  │  └─ encoding_rules.yaml       # 规则化编码(优先规则、触发条件、冲突处理)
  ├─ prompts/
  │  ├─ extract_evidence.md       # 证据抽取提示词(版本化)
  │  ├─ label_conditions.md       # 条件变量候选标签提示词(版本化)
  │  └─ report_summary.md         # 报告生成提示词(版本化,可选)
  ├─ templates/
  │  ├─ report.md.j2              # 报告模板(Jinja2)
  │  └─ appendix_evidence.md.j2   # 证据附录模板(引用链)
  ├─ src/
  │  ├─ pipeline/
  │  │  ├─ flow.py                # Prefect flow 定义(节点编排)
  │  │  ├─ tasks_fetch.py         # 抓取与缓存
  │  │  ├─ tasks_parse.py         # 解析与规范化
  │  │  ├─ tasks_evidence.py      # 证据抽取(规则+可选AI定位)
  │  │  ├─ tasks_encode.py        # 变量编码(规则优先 + AI辅助)
  │  │  ├─ tasks_calibrate.py     # fsQCA 校准
  │  │  ├─ tasks_qca.py           # QCA 计算/导出
  │  │  ├─ tasks_report.py        # 报告生成(模板化)
  │  │  ├─ ledger.py              # decision ledger:AI调用与最终采纳记录
  │  │  ├─ manifest.py            # 运行manifest(版本/参数/输入输出索引)
  │  │  ├─ io.py                  # 工件读写、hash、路径管理
  │  │  └─ qc.py                  # 质检抽样、低置信触发人工复核清单
  │  ├─ qwen/
  │  │  ├─ client.py              # 千问 API 封装(重试/限速/超时/日志)
  │  │  ├─ cache.py               # 调用缓存(prompt+input hash)
  │  │  └─ schemas.py             # 结构化输出schema(JSON校验)
  │  └─ cli.py                    # 命令行入口:run/replay/compare/step
  ├─ notebooks/                   # 只放探索:变量设计、提示词试验、误差分析
  ├─ data/
  │  ├─ external/                 # 手工输入:项目清单、实体对齐表等
  │  └─ reference/                # 词表/同义词/行政区划等
  ├─ runs/
  │  ├─ 2026-01-28_010203/
  │  │  ├─ manifest.json
  │  │  ├─ raw/                   # 原始HTML/附件快照(不可覆盖)
  │  │  ├─ interim/               # 解析中间结构化数据
  │  │  ├─ processed/             # QCA条件表、校准矩阵、字典
  │  │  ├─ results/               # QCA结果、统计、图表数据
  │  │  ├─ artifacts/             # report.md/pdf/docx 等最终产物
  │  │  ├─ qc/                    # 质检样本、人工复核队列
  │  │  ├─ ledger/                # AI调用记录与采纳记录(证据链)
  │  │  └─ logs/
  ├─ pyproject.toml / poetry.lock(或 uv.lock)
  └─ README.md

3. 工作流节点(Prefect Flow)如何切:少而稳、可重跑、可追踪

Flow 主干(建议就按这个执行顺序)

A. 初始化与冻结(复现基础)

  • init_run:创建 run_id 目录、写 manifest.json 初稿、复制 configs/rules/prompts 的“快照副本”

B. 政策项目追踪数据获取

  • fetch_sources:抓取并落盘 raw/(含 URL、抓取时间、响应头、失败原因)
  • parse_sources:解析 HTML/PDF/表格 ⇒ interim/records.jsonl
  • dedupe_linkage:去重、归并同名、生成稳定 project_idinterim/projects.parquet

C. 证据抽取与变量编码(QCA核心)

  • extract_evidence:规则抽取 +(可选)AI辅助“定位段落/摘要”
  • label_with_qwen(可选):千问输出候选标签/理由/引用片段(结构化 JSON)
  • qc_gate:低置信/冲突/缺证 ⇒ 输出 qc/review.csv(人工复核队列)
  • encode_conditions:规则优先 + 采纳/回退 AI 建议 ⇒ processed/qca_conditions.csv
  • calibrate_fsqca:校准阈值 ⇒ processed/fsqca_matrix.csv

D. 分析与报告

  • run_qca:生成解、覆盖度、一致性等 ⇒ results/qca_results.json
  • generate_report:模板化生成 report + 证据附录 ⇒ artifacts/report.md(再转 pdf/docx 可选)
  • finalize_manifest:把每步输入输出、hash、统计摘要写回 manifest

4. 配置文件样例(你可以直接复制再改)

4.1 configs/run.yaml

id: run-yaml-sample
name: configs/run.yaml(样例)
type: markdown
content: |-
  run:
    name: "policy-qca-batch"
    timezone: "Asia/Shanghai"
    output_root: "runs"
    snapshot_inputs: true

  scope:
    date_start: "2025-01-01"
    date_end: "2026-01-28"
    regions: ["北京市", "厦门市"]          # 可选
    project_keywords_file: "data/external/project_keywords.csv"

  processing:
    dedupe:
      enabled: true
      fuzzy_threshold: 0.92
    evidence:
      max_chars_per_record: 6000
      keep_html_snapshot: true

  ai:
    enabled: true
    provider: "qwen"
    task_extract_evidence: true
    task_label_conditions: true
    cache_enabled: true
    require_citations: true              # 要求返回引用片段/段落位置
    low_confidence_threshold: 0.65

  qc:
    sample_rate: 0.08
    always_review_when:
      - "conflict_between_rule_and_ai"
      - "missing_required_evidence"
      - "ai_confidence_below_threshold"

  execution:
    max_workers: 4
    retries: 2
    retry_delay_seconds: 10

4.2 configs/qwen.yaml

id: qwen-yaml-sample
name: configs/qwen.yaml(样例)
type: markdown
content: |-
  qwen:
    base_url: "https://dashscope.aliyuncs.com/api/v1"
    api_key_env: "DASHSCOPE_API_KEY"
    model: "qwen-plus"
    temperature: 0.2
    top_p: 0.8
    max_tokens: 1200
    timeout_seconds: 60

    rate_limit:
      qps: 2
      burst: 2

    retry:
      max_attempts: 3
      backoff_seconds: 2

    output:
      format: "json"           # 你将强制模型输出JSON以便落盘与校验
      schema_version: "v1"

    cache:
      dir: "data/cache/qwen"
      key_strategy: "sha256(prompt+input+model+params)"

5. Manifest(运行清单)与 Ledger(决策账本):复现与可追踪的关键

5.1 manifest.json 建议字段(每次 run 自动写入)

  • run_id、时间戳、操作者(可选)
  • git_commit(若无 git,也可写 code_hash
  • 依赖锁文件 hash
  • configs/rules/prompts 的快照 hash
  • 每个节点的输入/输出路径 + 文件 hash + 行数/条数统计
  • 千问调用统计:调用次数、总 token(若可获取)、失败率、缓存命中率

5.2 decision ledger(每条 AI 参与都留痕)

每条记录至少包括:

  • record_id / project_id
  • evidence_pointer:来源文件、段落位置、URL、抓取时间
  • prompt_version
  • model、参数(temperature 等)
  • ai_output_json
  • final_decision:最终采纳值(可能是规则值/人工值/AI值)
  • decision_reason:采纳/拒绝原因
  • confidence(来自 AI 或规则置信)
  • review_status:是否进入人工复核队列

这套 ledger 是你写论文方法部分、以及做同行复核时的“证据链”。


6. Prefect Flow 代码骨架(最小可用版本)

下面给你一个能跑起来的骨架:包含 run 初始化、几个节点、以及在节点中写入工件与 manifest/ledger 的接口位置。

id: prefect-flow-skeleton
name: src/pipeline/flow.py(Prefect Flow骨架)
type: code.python
content: |-
  from __future__ import annotations

  from pathlib import Path
  from datetime import datetime
  import json

  from prefect import flow, task, get_run_logger
  from prefect.tasks import task_input_hash
  from prefect.runtime import flow_run

  from pipeline.manifest import Manifest
  from pipeline.io import RunPaths
  from pipeline.ledger import DecisionLedger

  from pipeline.tasks_fetch import fetch_sources
  from pipeline.tasks_parse import parse_sources, dedupe_linkage
  from pipeline.tasks_evidence import extract_evidence, qwen_extract_or_label
  from pipeline.tasks_encode import encode_conditions
  from pipeline.tasks_calibrate import calibrate_fsqca
  from pipeline.tasks_qca import run_qca
  from pipeline.tasks_report import generate_report


  @task
  def init_run(config_path: str) -> dict:
      logger = get_run_logger()
      run_id = datetime.now().strftime("%Y-%m-%d_%H%M%S")
      base = Path("runs") / run_id
      paths = RunPaths(base)

      paths.ensure_dirs()

      manifest = Manifest.new(run_id=run_id)
      manifest.write(paths.manifest_path)

      # 复制/快照配置(简单起见:直接记录路径与hash,正式可把文件copy到runs/<id>/snapshot/)
      manifest.add_config_ref(config_path=config_path)

      logger.info(f"Initialized run at {base}")
      return {"run_id": run_id, "run_dir": str(base)}


  @flow(name="policy-qca-local-batch")
  def policy_qca_flow(config_path: str = "configs/run.yaml"):
      logger = get_run_logger()

      init = init_run(config_path)
      run_dir = Path(init["run_dir"])
      paths = RunPaths(run_dir)

      manifest = Manifest.load(paths.manifest_path)
      ledger = DecisionLedger(paths.ledger_dir)

      # B: 抓取与解析
      raw_index = fetch_sources(config_path=config_path, out_dir=str(paths.raw_dir))
      records_path = parse_sources(raw_index=raw_index, out_dir=str(paths.interim_dir))
      projects_path = dedupe_linkage(records_path=records_path, out_dir=str(paths.interim_dir))

      # C: 证据抽取 + 千问辅助(可选)
      evidence_path = extract_evidence(projects_path=projects_path, out_dir=str(paths.interim_dir))

      ai_labels_path = qwen_extract_or_label(
          evidence_path=evidence_path,
          out_dir=str(paths.interim_dir),
          ledger_dir=str(paths.ledger_dir),
          config_path=config_path,
      )

      # 编码 + 校准
      conditions_path = encode_conditions(
          projects_path=projects_path,
          evidence_path=evidence_path,
          ai_labels_path=ai_labels_path,
          out_dir=str(paths.processed_dir),
          qc_dir=str(paths.qc_dir),
          ledger_dir=str(paths.ledger_dir),
          config_path=config_path,
      )

      matrix_path = calibrate_fsqca(
          conditions_path=conditions_path,
          out_dir=str(paths.processed_dir),
          config_path=config_path,
      )

      # D: QCA + 报告
      qca_results_path = run_qca(matrix_path=matrix_path, out_dir=str(paths.results_dir))
      report_path = generate_report(
          projects_path=projects_path,
          conditions_path=conditions_path,
          qca_results_path=qca_results_path,
          out_dir=str(paths.artifacts_dir),
          templates_dir="templates",
      )

      # Finalize manifest
      manifest.add_artifact("projects", projects_path)
      manifest.add_artifact("evidence", evidence_path)
      manifest.add_artifact("ai_labels", ai_labels_path)
      manifest.add_artifact("conditions", conditions_path)
      manifest.add_artifact("fsqca_matrix", matrix_path)
      manifest.add_artifact("qca_results", qca_results_path)
      manifest.add_artifact("report", report_path)

      manifest.write(paths.manifest_path)
      logger.info("Flow completed successfully.")


  if __name__ == "__main__":
      policy_qca_flow()

上面只展示编排骨架。你接下来要做的是把每个 tasks_*.py 写成“纯函数式的可重跑节点”:输入是文件路径与 config,输出是新的文件路径,并且每一步都落盘


7. 千问接入:调用封装、结构化输出、缓存、重试、限速(关键)

7.1 你需要的调用策略(强烈建议)

  1. 结构化 JSON 输出:强制模型只返回 JSON,便于校验与落盘
  2. 缓存:同一 prompt+input+params 不重复花钱/不引入随机漂移
  3. 限速与重试:本地并发跑节点时尤为重要
  4. 可追踪:每次调用写入 ledger(含输入片段指针、prompt版本、输出、采纳情况)

7.2 千问输出(建议 schema)

对“条件变量候选标签”任务,建议输出:

  • project_id
  • conditions: {var_name: {value, confidence, rationale, citations[]}}
  • citations: 每条含 source_fileparagraph_indexchar_rangequote

这让你把 QCA 的变量赋值与“文本证据”绑定起来,满足学术可复核。


8. 调度方式(本地)怎么落地:让 Prefect 像“生产”一样跑

你是本地批处理,推荐最简且稳的方式:

  • 日常手动:python -m src.pipeline.flowprefect run ...
  • 定时任务:用 Windows 任务计划程序 / cron 每晚跑一次
  • 并发策略:Prefect 任务层面控制 max_workers,千问调用层面控制 QPS

关键是:每次定时跑都会产生一个新的 run 目录,你可以对比两次运行的差异(数据变化 vs 规则变化 vs AI变化)。


9. 你现在可以按“最小闭环”落地的实施顺序(建议 3 天可跑通)

第 1 阶段:不用 AI,先跑通可复现管线

  • fetch → parse → dedupe → extract_evidence → encode_rules_only → calibrate → qca → report
  • 把 manifest + runs 归档机制做对

第 2 阶段:接入千问做“候选建议”,不直接定值

  • 增加 qwen_extract_or_label 节点
  • 输出 ai_labels.jsonl
  • encode 阶段:规则优先,AI 只补缺失/模糊项;低置信进入 qc/review.csv

第 3 阶段:把“证据附录”固化成报告模板的一部分

  • 报告中每个变量给出引用片段与来源指针(从 ledger/evidence 汇总)

10. 我需要你补充两点信息,才能把“千问调用封装代码”写到可直接运行

千问接入通常有两种方式(你选一个,我就按那个给你可运行的 qwen/client.pytasks_evidence.py 实现):

  1. DashScope 官方接口(常见):你有 DASHSCOPE_API_KEY
  2. 你们单位/网关封装后的千问接口:有自定义 base_url 与鉴权方式

你回复我这两项即可:

  • 你使用的是 DashScope 吗?(是/否;如果否,给我 base_url 鉴权说明)
  • 你想用的模型名:qwen-plusqwen-max,还是别的?

一旦确认,我会把以下文件补齐为“可直接拷贝运行”的版本,并把你需要改的地方用加粗标注(按你的偏好):

  • src/qwen/client.py(限速/重试/超时/结构化输出)
  • src/qwen/cache.py(sha256 缓存键)
  • src/pipeline/tasks_evidence.py(调用千问并写 ledger)
  • src/pipeline/ledger.py(decision ledger JSONL 规范)
  • src/pipeline/manifest.py(manifest 规范与自动hash)

你说得对:如果一上来就按“理想生产级”把 manifest / ledger / cache / rules / prompts / templates 全铺开,文件多、对齐成本高,很容易把研究推进卡死在工程细节上。

下面我把方案降维成“本地批处理可落地、3 天可跑通”的最小版本:先把“能跑 + 可复现 + 可追踪”做出来,再逐步加你需要的东西。你会发现:并不需要一开始就那么多文件


一句话策略:先做“最小闭环”,再做“可审计增强”

  • 阶段 1(最小闭环):只保证“可重跑 + 产物落盘 + 可定位失败”
  • 阶段 2(证据链):再补“ledger(AI调用留痕/引用片段)”
  • 阶段 3(治理化):再补“规则版本、prompt版本、质检门控、报告模板化”

阶段 1:只用 4 个文件,就能跑通 Prefect 本地批处理(不谈对齐)

你先只保留下面这 4 个文件(其余都不要):

  1. flow.py:Prefect 编排(全流程一键跑)
  2. steps.py:所有步骤函数(抓取/解析/编码/输出)
  3. qwen.py:千问调用封装(先最简)
  4. config.yaml:运行参数(日期范围、开关、输出目录)

输出目录固定为:runs/<run_id>/...
每步把结果写成文件(csv/jsonl),这就已经具备“可重跑”的骨架了。

你此阶段“必须对齐”的只有一件事

只对齐“输入→输出文件名”,例如:

  • raw_sources.jsonl
  • records.jsonl
  • projects.csv
  • evidence.jsonl
  • qca_conditions.csv
  • report.md

只要这些产物名固定,你就能稳定串起来;其他“版本/账本/模板/字典”都可以后补。


阶段 2:只新增 1 个文件,把“AI 可追踪”补齐(不影响主流程)

当你觉得流程跑顺了,再加:

  1. ledger.jsonl(不是代码文件,是输出文件)

每次千问调用,就往 runs/<run_id>/ledger.jsonl 追加一行 JSON,记录:

  • project_id
  • 输入片段指针(来自 evidence.jsonl 的 id)
  • prompt 名称(先写死也行)
  • 千问输出
  • 你是否采纳(先默认采纳/默认不采纳都行)

到这里,你已经能回答“这条变量值来自哪段文本、哪次AI输出”,论文方法复核也够用。


阶段 3:你觉得“确实需要”了再加——而不是一开始就加

下面这些确实有价值,但可以等你痛点出现再加

  • rules/variables.yaml:当变量多、规则复杂、频繁改动时再引入
  • prompts/*.md:当你需要做 prompt A/B 对比与版本回溯时再引入
  • templates/*.j2:当报告结构稳定、需要自动化批量生成时再引入
  • manifest.json:当你需要严格复现(代码版本/依赖锁/数据hash)时再引入
  • qc/review.csv:当你开始在意“低置信必须人工复核”时再引入

换句话说:先跑通,再治理化,否则你会被“工程完备性”拖慢研究节奏。


你刚才的“还要对齐 还要_____________”空格,我想确认你最烦的是哪一种?

你补一句就行,我就按你的痛点继续“减文件/减对齐”:

A. 还要对齐 字段/列名(records、projects、conditions 的列经常变)
B. 还要对齐 变量定义/阈值(IG 等指标口径老改)
C. 还要对齐 证据片段与变量值的映射(引用链很费)
D. 还要对齐 千问输出格式(JSON不稳定、解析麻烦)
E. 还要对齐 报告模板(每次都要手工改版式)

你选一个(或直接把空格补完),我就给你一套更“懒人”的最小工程结构:该省的省掉、该自动对齐的自动对齐,而且保证 Prefect + 千问仍然能稳定跑。

能,但要满足两个前提:规格先行 + 机器可验证的对齐方式

像 Claude Code、Cursor、Windsurf、Aider 这类“代码代理/IDE 代理”,确实可以一次性生成多个文件、自动补齐引用、创建目录结构、甚至在本地运行测试并修复——前提是:

  1. 你给它足够明确的“接口契约”(哪些文件、哪些函数签名、哪些输入输出格式、哪些产物路径);
  2. 你有一套可自动验证的约束(例如:make check / pytest / JSON schema 校验 / 样例数据 golden file)。
    否则“对齐”往往只是表面上的:文件都有了,但字段名不一致、路径不一致、输出格式漂移,跑起来还是断。

这些工具“对齐”到什么程度?

1)目录 + 多文件生成:能

  • 自动创建 src/pipeline/*.pyconfigs/*.yamltemplates/*README
  • 自动把 import 路径补齐、把配置读写串起来

2)跨文件一致性(函数签名、字段名、路径名):在“有契约”时能

最有效的方法是让它遵守一份单一事实来源(Single Source of Truth),例如:

  • schemas.py / schema.json:定义所有中间表字段
  • paths.py:定义所有产物路径常量
  • contracts.md:规定每个节点输入输出文件名与格式

代理再去生成其他文件时,会围绕这些契约自动对齐。

3)可运行性(能跑通 Prefect flow):能,但通常要它“执行—报错—修复”迭代几轮

这取决于你是否允许它:

  • 读取你的本地工程
  • 运行命令(prefect/python/pytest
  • 根据报错自动修复

如果只能“静态写代码”,成功率会下降。

4)千问接口细节:能,但需要你明确“使用哪种 SDK/HTTP 方式”

千问(DashScope)不同接入方式差异很大:SDK 版本、endpoint、鉴权环境变量、返回结构。你给出确定的调用方式,对齐会非常顺。


你这种“Prefect + 本地批处理 + 千问 + QCA/报告”最容易卡在什么“对齐”上?

  1. 中间数据结构漂移records.jsonl / projects.csv / evidence.jsonl / conditions.csv 字段不一致
  2. 产物路径不统一:任务写到 data/,Flow 以为在 runs/<id>/
  3. AI 输出不稳定:模型没按 JSON 输出、字段缺失、citation 格式变
  4. 规则/阈值版本:变量口径变化导致历史 run 不可比

这些都可以靠“契约 + 校验”解决,让代理真正做到“自动对齐”。


建议你用的最省心打法:让工具生成,但由你提供 3 个“硬约束文件”

只要先写(或让我帮你写)这 3 个东西,Claude Code 等工具生成多文件并对齐的成功率会大幅提升:

  1. contracts.md:列出每个 Prefect 节点
    • 输入文件名/格式
    • 输出文件名/格式
    • 必填字段
  2. schemas.pyschemas.json:把关键表结构固定(至少四张:records/projects/evidence/conditions)
  3. make check(或 python -m pipeline.check:自动校验
    • JSON schema 校验
    • CSV 列名校验
    • 样例跑通(用 3 条样本数据)

然后你对 Claude Code 下指令:“按 contracts + schemas 生成全部文件,确保 make check 通过;若失败则迭代修复。”


你问“能直接生成并对齐吗”的直接回答

  • 能生成各个文件:能。
  • 能自动对齐:在你提供契约/校验的情况下基本能;没有契约就很难保证。

我需要你确认两点,我就能把“契约文件 + 工具指令”给你一套可直接复制用的

  1. 你千问接入是 DashScope(DASHSCOPE_API_KEY) 吗?还是单位网关?
  2. 你希望最小闭环先跑哪些节点?
  • 选项 A:抓取→解析→证据抽取→条件表→报告(先不做 QCA)
  • 选项 B:抓取→解析→证据→条件表→校准→QCA→报告(全链路)

你回复这两点后,我会给你两样东西(尽量少文件、但强对齐):

  • 一份 contracts.md + schemas.json(你的场景专用)
  • 一段给 Claude Code(或 Cursor/Aider)用的**“一键生成并自检”的指令模板**(包含迭代修复策略)

“契约/校验”到底是什么?为什么你需要它?

你担心“生成很多文件还要对齐”,本质是缺少一套机器可验证的统一标准

  • 契约(Contract):规定“每个步骤输入/输出是什么、叫什么、长什么样”。
    例如:parse_sources 的输出必须是 records.jsonl,每行必须包含 project_id,title,source_url,published_at,text
  • 校验(Validation):把上述规定变成一键检查,让工具(Claude Code 等)自己跑、自己改直到通过。
    例如:make check 会检查文件是否存在、字段是否齐、JSON 是否符合 schema、样例流程是否跑通。

没有契约/校验,代理当然能“生成文件”,但很难保证跨文件一致;有了契约/校验,它就能做到“生成→运行→失败→修复→直到对齐”。


一、怎么“找”契约/校验?其实主要有 3 个来源

1)从你现有工作中“抽”出来(最靠谱)

把你现在已经在做的事情梳理成几条硬规则:

  • 你每次跑完会得到哪些中间文件?(records、projects、evidence、conditions…)
  • 哪些列/字段是必须的?(例如 QCA 的条件变量列、项目唯一 ID)
  • 哪些字段类型/格式固定?(日期格式、URL、0-1 隶属度)
  • 哪些约束必须满足?(project_id 唯一;membership 必须在 0-1)

这就是契约的内容。你并不需要“发明标准”,只是把你现在隐含在脑子里的约束写出来。


2)从“失败/返工点”反推(最快)

回忆你最常遇到的断裂点,然后把它写成可检查规则:

  • “字段名又变了导致下游报错” → 校验 CSV 列名必须一致
  • “AI 输出不是 JSON” → 校验必须能解析为 JSON 且包含指定键
  • “网页更新导致结果漂移” → 规定必须用快照文件,不允许直接在线抓取用于分析
  • “同名项目合并错” → 规定 project_id 生成规则与去重阈值,并输出冲突清单

你越痛的地方,越应该先写校验。


3)借用通用行业模板(省事但要本地化)

常用“契约/校验”模板一般包括:

  • 数据契约:JSON Schema / CSV schema
  • 运行契约:每个节点的输入输出路径
  • AI 契约:结构化输出 schema + 引用(citations)字段
  • 回归契约:用小样本 golden data 做“结果不意外变化”检查

二、你这个场景(政策追踪—QCA—报告—千问)最小要写哪些契约?

建议只写 4 份“表/文件契约”(够用且不重):

  1. records.jsonl:解析后的原子记录(每条公告/网页一条)
  2. projects.csv:归并后的项目级表(每个项目一行)
  3. evidence.jsonl:证据片段表(可追溯引用)
  4. conditions.csv:QCA 条件变量表(最终用于 fsQCA)

并且再写 1 份“流程契约”:

  1. 每个 Prefect 节点:输入文件名 → 输出文件名(固定在 runs/<run_id>/...

三、校验怎么做?给你一套“从轻到重”的可执行方案

方案 A(最轻、立刻能用):纯 Python 校验列名 + 存在性

你写一个 python -m pipeline.check --run runs/<id>,检查:

  • 必须文件存在
  • CSV 列名包含必需列
  • JSONL 每行可解析为 JSON 且必含关键字段
  • QCA 数值范围(如 0~1)

这种最适合你现在:本地、批处理、想快速落地。

方案 B(更正规):JSON Schema + 数据框校验

  • JSONL 用 jsonschema 校验结构
  • CSV/Parquet 用 panderapydantic 校验类型与约束

方案 C(最强):golden sample 回归测试

准备一个小样本输入(比如 3 个项目、10 条记录),跑完整流程,校验输出与预期一致(或在允许范围内一致)。这能防“悄悄对齐错了”。


四、你可以直接复制的“契约文件”写法(示例)

1)流程契约 contracts.md(人读 + 机器也能照做)

# Workflow Contracts (Local Prefect Batch)

## Paths
All outputs MUST be under: runs/<run_id>/

## Node: parse_sources
Input: runs/<run_id>/raw/raw_index.json
Output: runs/<run_id>/interim/records.jsonl

records.jsonl: JSON Lines, one record per source item
Required keys: record_id, source_url, fetched_at, title, published_at, text

## Node: dedupe_linkage
Input: runs/<run_id>/interim/records.jsonl
Output: runs/<run_id>/interim/projects.csv

projects.csv required columns:
project_id, project_name, region, start_date, latest_update, source_urls

## Node: extract_evidence
Input: runs/<run_id>/interim/projects.csv + records.jsonl
Output: runs/<run_id>/interim/evidence.jsonl

evidence.jsonl required keys:
evidence_id, project_id, source_url, quote, char_start, char_end

## Node: label_conditions_with_qwen (optional)
Input: evidence.jsonl
Output: runs/<run_id>/interim/ai_labels.jsonl
ai_labels.jsonl required keys:
project_id, conditions, citations, confidence

## Node: encode_conditions
Input: projects.csv + evidence.jsonl + (optional) ai_labels.jsonl
Output: runs/<run_id>/processed/conditions.csv

conditions.csv required columns:
project_id, IG, ... (other variables)
All membership values MUST be in [0,1] (if already calibrated).

2)结构契约 schemas.json(给工具“硬对齐”的依据)

你可以先只写最关键的几个字段(不必一次写全)。

{
  "records": {
    "required": ["record_id", "source_url", "fetched_at", "title", "published_at", "text"]
  },
  "projects": {
    "required": ["project_id", "project_name", "region", "latest_update"]
  },
  "evidence": {
    "required": ["evidence_id", "project_id", "source_url", "quote", "char_start", "char_end"]
  },
  "conditions": {
    "required": ["project_id", "IG"]
  }
}

五、最关键的问题:你问“怎么找提供契约/校验”——我的建议是按这 3 步拿到它

Step 1:先选“最终交付件”

对你来说是:conditions.csv + report.md(以及能追溯的 evidence)

Step 2:倒推“必需中间件”

为了让 conditions.csv 能稳定生成,你最少需要:projects.csvevidence.jsonl

Step 3:把“你最不能接受的错误”写成校验规则

例如:

  • project_id 不唯一 → 直接失败
  • 缺少 source_urlquote → 直接失败
  • IG 赋值没有证据引用 → 进入 qc/review.csv 或直接失败(看你策略)
  • AI 输出无法解析 JSON → 该条回退规则编码,且写入 ledger

契约不是越多越好,而是先写“会让你返工的那几条”。


六、你只要回答两个问题,我就能把“最小契约 + 最小校验脚本”按你项目定制出来

  1. 你的 conditions.csv 里,除了 project_idIG,你现在至少还需要哪几个变量列名?(列个清单即可)
  2. 你更希望校验失败时:
  • A:直接中止(保证严谨)
  • B:降级回退(例如 AI 不合格则不用 AI,继续出结果并标红)

你给出这两点,我就把:

  • contracts.md(你的变量列与节点路径)
  • schemas.json(按你的字段)
  • python -m pipeline.check 的最小实现逻辑(不引入太多依赖)

一次性给你,且让它特别适合 Claude Code 这种工具去“生成文件并对齐”。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐