面向“政策项目追踪—QCA 数据集—报告—AI 辅助分析”的本地批处理工作流蓝图
产出规则、阈值与最终模块的需求说明。
场景:一端是探索性很强的文本理解与指标设计,另一端是可重复的批量抓取、抽取、编码、建模与输出报告。本地 + 批处理的优势是可控与低成本,但挑战是复现、追踪与长期维护。下面我把图片里的七条原则逐条落到一套可执行的蓝图:哪些步骤写成模块、哪些用 Notebook、哪些做成工作流节点,以及如何在本地实现复现与可追踪。
0. 总体目标:把你的研究流程变成“可重跑、可对比、可审计”的批处理管线
你最终需要的不是“一个能跑的脚本”,而是一套可以回答这些问题的系统:
- 这次项目追踪的数据来自哪些页面、抓取时间是什么、是否有缺失?
- 文本抽取与编码使用了哪个版本的规则/提示词/模型?
- QCA 数据集每个变量的赋值依据是什么(证据链)?
- 报告里的结论对应哪次运行、哪份数据、哪组参数?
- 换一个阈值/换一个编码规则时,结果差异能否被比较与复现?
实现这些目标的核心不是堆工具,而是把探索性与可重复性分层:探索用 Notebook,稳定流程用模块 + 工作流节点,执行由调度器统一编排,所有产物版本化。
1)原则一:开发环境同时支持“原型闭环”与“生产交互” —— 在本地怎么做?
你的落地方式
- 原型闭环(写/评估/分析):以 Notebook 为中心做变量定义、编码规则试验、抽样验证、可视化诊断。
- 生产交互(部署/监控/调试):在本地用“工作流运行器 + 日志 + 运行档案”替代云上的运维体系;每次运行都生成一个可追踪的 run 目录与元数据。
具体机制(本地也能做到“像生产一样”)
- 统一入口:
python -m pipeline run --config configs/run.yaml - 每次运行生成:
runs/2026-01-28_010203/,包含manifest.json(代码版本、参数、数据快照、模型与提示词版本)logs/(按节点日志)artifacts/(中间产物与最终报告)qc/(抽样核查记录、评测分数)
这就是“与生产部署交互”在本地的等价实现:你不是把东西部署到集群,而是把一次运行当作可审计的“发布单元”。
2)原则二:本地开发,但在“类生产环境”评估 —— 本地基础设施的替代方案
你是本地环境,依然可以用“类生产一致性”降低漂移:
- 用 conda/uv/poetry 锁依赖(强烈建议
uv或poetry)。 - 用 Docker(可选) 固化运行环境:不是为了上云,而是为了让结果能复现、换机器能跑。
- 用 固定数据快照:抓取原始 HTML、解析后的 JSON、最终表格分别存档,任何一步可回放。
最关键的一条:评估必须基于“固定快照”而不是每次都从网上重新抓。否则你无法判断差异来自“规则变化”还是“网页变化”。
3)原则三:Notebook 必要但不充分 —— 哪些必须 Notebook,哪些必须模块化?
适合 Notebook 的任务(探索性、解释性强)
- 变量体系设计与 operationalization
- 例如:政策创新“执行保障(IG)”的指标维度、可观测文本证据、赋值规则与阈值讨论
- 编码规则/提示词试验
- 小样本对比:规则 A vs 规则 B 的一致性、歧义点分析
- 抽样质检与误差分析
- 检查“抽取字段缺失”“同名项目合并错误”“编码偏差”
- fsQCA 的模型探索
- 条件组合敏感性分析、覆盖度/一致性诊断可视化
Notebook 的定位:做“定义问题—试验—解释”,产出规则、阈值与最终模块的需求说明。
必须写成模块(可重复、可批处理、可测试)
- 抓取与更新(网页抓取、接口访问、下载、缓存)
- 解析与规范化(HTML→结构化 JSON;字段清洗、时间格式统一、去重)
- 文本切分与证据抽取(段落定位、关键词/模式匹配、引用片段保存)
- 编码与赋值(把证据→变量值;含 fsQCA 校准)
- 数据集构建(宽表、长表、字典表、变量说明表)
- 报告生成(模板化渲染:Markdown/Word/PDF)
- AI 辅助分析的调用封装(提示词版本、重试、限流、缓存、输出校验)
模块的定位:把“能解释清楚的流程”固化成“能稳定重复的函数/命令”。
4)原则四:工作流是抽象 —— 你的工作流节点应该怎么切?
把你的全链路拆成“少而稳”的节点,节点之间用文件工件(artifact)连接,便于追踪与复现。
工作流 DAG(本地批处理版本)
Flow 0:初始化与快照
- Node A0:
init_run():生成 run_id、写 manifest、冻结配置 - Node A1:
snapshot_inputs():记录项目清单版本、关键词表版本、规则版本
Flow 1:政策项目追踪数据获取
- Node B1:
fetch_sources():抓取原始网页/附件 →raw/html/、raw/files/ - Node B2:
parse_sources():解析成结构化 →interim/records.jsonl - Node B3:
dedupe_linkage():去重、同名归并、建立项目 ID →interim/projects.parquet
Flow 2:证据抽取与变量编码(面向 QCA)
- Node C1:
extract_evidence():针对每个项目抽取证据片段 →interim/evidence.jsonl - Node C2:
ai_assist_label()(可选):LLM 辅助候选标签/摘要/疑点 →interim/ai_labels.jsonl - Node C3:
human_review_queue()(可选):生成待复核清单(高风险/低置信)→qc/review.csv - Node C4:
encode_conditions():规则+(可选)LLM结果 → 条件变量赋值 →processed/qca_conditions.csv - Node C5:
calibrate_fsQCA():把原始赋值校准为集合隶属度 →processed/fsqca_matrix.csv
Flow 3:分析与报告
- Node D1:
run_qca():生成解与指标 →results/qca_results.json - Node D2:
generate_report():输出 Markdown/Docx/PDF →artifacts/report.* - Node D3:
publish_bundle():打包运行产物与索引 →runs/<run_id>.zip(可选)
这套节点划分的原则是:每个节点都有清晰输入输出、能单独重跑、能保存中间结果。
5)原则五:工作流框架怎么选 —— 本地的“优秀人机工程学”标准与推荐
你是本地批处理,我的建议是两层:
- 第一层(轻量且够用):
prefect或dagster任选其一 - 第二层(更轻但可追踪差些):
make + python或invoke/typer(适合极简)
结合你之前讨论过 Prefect,我更倾向 Prefect(本地 profile),原因是:
- 节点化、重试、日志、运行记录(run)天然支持
- 你可以先不引入复杂部署,仅用本地 agent/worker 执行
- 对“探索→固化→可追踪”迁移友好
若你更想强调数据版本、资产化(asset)与强类型依赖,Dagster也很好,但上手心智略重。
选型底线(本地也要满足):
- 能从命令行一键跑全流程
- 能单节点重跑、失败可定位
- 能保存运行元数据与工件索引
- 能把参数、代码、数据快照写入 manifest
6)原则六:调度器负责执行 —— 本地调度怎么做才能“可伸缩+高可用”?
你是本地,不需要上 Kubernetes,也能做出“足够稳”的调度体系。推荐三层递进:
方案 A(起步最佳):Prefect 本地 worker + OS 定时任务
- 调度:Windows 任务计划程序 / macOS launchd / Linux cron
- 执行:Prefect flow run(本机并发可控)
- 优点:最少运维、可追踪性强
- 适用:每天/每周批处理、任务量中等
方案 B(更稳健):Prefect + SQLite/Postgres(本地)+ 失败告警
- 用本地数据库保存运行状态
- 失败自动重试 + 邮件/企业微信(如果你需要)
- 适用:你开始依赖“稳定按时产出”
方案 C(并发提升):Prefect + 多进程/线程 + 队列策略
- 对“抓取/解析/抽取”分区并行
- 注意:并行要绑定缓存与限速,避免网站封禁与结果不一致
你当前阶段建议从 A 起步,确保“每次运行都有档案”,这是最关键的生产化收益。
7)原则七:复现与可追踪 —— 你需要哪几张“底层账本”?
在本地实现复现与追踪,关键是建立三类账本:版本账本、数据账本、决策账本。
7.1 版本账本(Version Ledger)
每次 run 写入:
- git commit(或打包时写入当前代码 hash)
- 依赖锁文件 hash(
poetry.lock/uv.lock) - 编码规则版本(例如
rules/ig_v3.yaml) - prompt 版本(例如
prompts/extract_evidence_v5.md) - 模型与参数(如温度、top_p、max_tokens)
7.2 数据账本(Data Ledger)
- 原始抓取:HTML/附件按来源与时间存档(不可覆盖)
- 解析结果:结构化 records(JSONL/Parquet)
- QCA 数据:conditions、校准矩阵、变量字典(含单位、阈值、缺失处理)
- 每一步产物都带上上游输入的 hash(或至少记录文件路径与生成时间)
7.3 决策账本(Decision Ledger,尤其对 AI 辅助分析)
对每条 AI 参与的编码/摘要记录:
- 输入文本片段的来源指针(URL/文件、段落位置)
- LLM 输出
- 你最终采用的标签/赋值
- 置信度或规则触发原因(为什么这样赋值)
这样你在写论文、做审计或同行复核时能回答:“这个变量值不是凭感觉,是有证据链的。”
8)建议的目录结构(可直接照抄实施)
id: local-qca-blueprint-structure
name: 本地批处理工作流目录结构(政策追踪-QCA-报告-AI辅助)
type: markdown
content: |-
project/
├─ configs/
│ ├─ run.yaml # 本次运行参数:日期范围、项目清单、阈值、模型开关等
│ └─ sources.yaml # 数据源列表、抓取规则、限速策略
├─ rules/
│ ├─ variables.yaml # 变量字典:定义、取值、证据类型、缺失处理
│ ├─ calibration.yaml # fsQCA 校准阈值与方法
│ └─ ig_rules.yaml # IG 等关键变量的规则化编码规范(版本化)
├─ prompts/
│ ├─ extract_evidence.md
│ ├─ label_conditions.md
│ └─ report_summary.md
├─ src/
│ ├─ pipeline/
│ │ ├─ flow.py # 工作流定义(Prefect/Dagster)
│ │ ├─ io.py # 读写工件、hash、manifest
│ │ ├─ fetch.py # 抓取与缓存
│ │ ├─ parse.py # 解析与规范化
│ │ ├─ evidence.py # 证据抽取
│ │ ├─ encode.py # 变量编码(规则+AI结果)
│ │ ├─ calibrate.py # fsQCA 校准
│ │ ├─ qca.py # QCA 计算与导出
│ │ └─ report.py # 报告生成
│ └─ cli.py # 命令行入口:run/step/replay/compare
├─ notebooks/
│ ├─ 01_variable_design.ipynb # 变量设计与样本讨论
│ ├─ 02_prompt_experiments.ipynb# 提示词/抽取策略试验
│ ├─ 03_qca_exploration.ipynb # QCA 探索与敏感性分析
│ └─ 04_error_analysis.ipynb # 误差分析与质检
├─ data/
│ ├─ external/ # 手工输入:项目清单、行政区划、机构名单等
│ ├─ reference/ # 词表、同义词、实体对齐表
│ └─ cache/ # 可清理缓存
├─ runs/
│ ├─ 2026-01-28_010203/
│ │ ├─ manifest.json
│ │ ├─ raw/
│ │ ├─ interim/
│ │ ├─ processed/
│ │ ├─ results/
│ │ ├─ artifacts/
│ │ ├─ qc/
│ │ └─ logs/
└─ README.md
这套结构的用意是:notebooks 只做探索;src 只放可重复模块;runs 是“证据与复现”的唯一真相来源。
9)把“AI 辅助分析”放在正确位置:辅助、可回退、可审计
在你的场景里,AI 最适合做三类“加速器”,但要严格可控:
- 候选证据提取/段落定位:提高人工阅读效率
- 候选标签与摘要:帮助形成初步编码建议
- 报告语言润色与结构化摘要:减少写作成本
但 AI 不应该直接“最终定值”,除非你有明确的门控机制。建议采用:
- 规则优先:能规则化的先规则化(例如关键词、模式、字段缺失处理)
- AI 仅补盲区:对模糊文本、长文摘要、跨段推断才调用
- 低置信/高影响进入人工复核队列:例如影响 QCA 关键条件的编码
- 全量留痕:prompt、输入片段、输出、最终采用情况写入 decision ledger
这样 AI 既提升效率,又不会破坏学术可复核性。
10)一键运行与日常工作方式(你能“马上用起来”的节奏)
日常节奏建议(批处理本地)
- 每日/每周一次:跑
Flow 1更新追踪数据(抓取→解析→归并) - 每次重要更新后:跑
Flow 2生成最新 QCA 变量赋值与校准矩阵 - 写阶段性成果时:跑
Flow 3输出报告(含图表、解、变量解释与证据附录)
你的工作动作如何分配
- Notebook:每当变量定义、阈值、编码规则要改,先在小样本 Notebook 中验证
- 模块:规则稳定后固化到
rules/*.yaml与src/pipeline/*.py - 工作流:每次发布/汇报前跑一次全流程,生成可追踪的 run 包
如果你愿意,我可以在你确认“使用 Prefect 还是 Dagster(或你想纯 Python + Make)”之后,把上面蓝图进一步细化成节点级别的伪代码与配置样例(例如 run.yaml、变量字典 variables.yaml、IG 编码规则示例、以及 manifest.json 字段设计),保证你能直接照着搭起来。你只需要回复两点:
1)你倾向 Prefect 还是 Dagster?
2)AI 辅助分析你打算用哪类模型/接口(本地模型、OpenAI 类、还是暂不接入)?
Prefect + 千问(Qwen)在本地批处理场景的可执行工作流方案(含节点设计、配置样例、代码骨架、复现与追踪机制)
你已确定:Prefect + 本地批处理 + 接入千问。下面我把之前“七条原则”进一步落成可以直接开工的工程蓝图:包含目录、节点、配置文件样例、Prefect Flow 代码骨架、千问调用封装、缓存/追踪/复现与“证据链(decision ledger)”设计。
1. 目标与边界:你要实现的“可执行”到底是什么?
你的管线要做到三件事(这三件事决定设计):
- 可重跑:同一份数据快照 + 同一份规则/提示词 + 同一版本依赖 ⇒ 可复现同样输出
- 可追踪:报告结论能回到:哪条证据、哪次抓取、哪次模型调用、哪版规则
- 可控 AI:千问只做“辅助加速器”,且可回退、可门控、可审计
2. 推荐目录结构(Prefect + 本地 runs 归档)
沿用之前结构,新增 qwen/ 与 templates/,并明确 runs/ 为唯一事实来源。
id: prefect-qwen-local-blueprint
name: Prefect+千问(本地批处理)目录结构与关键文件
type: markdown
content: |-
project/
├─ configs/
│ ├─ run.yaml # 本次运行参数(日期、来源、阈值、AI开关、并发等)
│ ├─ sources.yaml # 数据源配置(URL、抓取策略、频率、解析器)
│ └─ qwen.yaml # 千问接入配置(模型名、温度、重试、限速、缓存)
├─ rules/
│ ├─ variables.yaml # 变量字典(定义/取值/证据要求/缺失处理)
│ ├─ calibration.yaml # fsQCA 校准阈值、隶属度映射
│ └─ encoding_rules.yaml # 规则化编码(优先规则、触发条件、冲突处理)
├─ prompts/
│ ├─ extract_evidence.md # 证据抽取提示词(版本化)
│ ├─ label_conditions.md # 条件变量候选标签提示词(版本化)
│ └─ report_summary.md # 报告生成提示词(版本化,可选)
├─ templates/
│ ├─ report.md.j2 # 报告模板(Jinja2)
│ └─ appendix_evidence.md.j2 # 证据附录模板(引用链)
├─ src/
│ ├─ pipeline/
│ │ ├─ flow.py # Prefect flow 定义(节点编排)
│ │ ├─ tasks_fetch.py # 抓取与缓存
│ │ ├─ tasks_parse.py # 解析与规范化
│ │ ├─ tasks_evidence.py # 证据抽取(规则+可选AI定位)
│ │ ├─ tasks_encode.py # 变量编码(规则优先 + AI辅助)
│ │ ├─ tasks_calibrate.py # fsQCA 校准
│ │ ├─ tasks_qca.py # QCA 计算/导出
│ │ ├─ tasks_report.py # 报告生成(模板化)
│ │ ├─ ledger.py # decision ledger:AI调用与最终采纳记录
│ │ ├─ manifest.py # 运行manifest(版本/参数/输入输出索引)
│ │ ├─ io.py # 工件读写、hash、路径管理
│ │ └─ qc.py # 质检抽样、低置信触发人工复核清单
│ ├─ qwen/
│ │ ├─ client.py # 千问 API 封装(重试/限速/超时/日志)
│ │ ├─ cache.py # 调用缓存(prompt+input hash)
│ │ └─ schemas.py # 结构化输出schema(JSON校验)
│ └─ cli.py # 命令行入口:run/replay/compare/step
├─ notebooks/ # 只放探索:变量设计、提示词试验、误差分析
├─ data/
│ ├─ external/ # 手工输入:项目清单、实体对齐表等
│ └─ reference/ # 词表/同义词/行政区划等
├─ runs/
│ ├─ 2026-01-28_010203/
│ │ ├─ manifest.json
│ │ ├─ raw/ # 原始HTML/附件快照(不可覆盖)
│ │ ├─ interim/ # 解析中间结构化数据
│ │ ├─ processed/ # QCA条件表、校准矩阵、字典
│ │ ├─ results/ # QCA结果、统计、图表数据
│ │ ├─ artifacts/ # report.md/pdf/docx 等最终产物
│ │ ├─ qc/ # 质检样本、人工复核队列
│ │ ├─ ledger/ # AI调用记录与采纳记录(证据链)
│ │ └─ logs/
├─ pyproject.toml / poetry.lock(或 uv.lock)
└─ README.md
3. 工作流节点(Prefect Flow)如何切:少而稳、可重跑、可追踪
Flow 主干(建议就按这个执行顺序)
A. 初始化与冻结(复现基础)
init_run:创建 run_id 目录、写manifest.json初稿、复制 configs/rules/prompts 的“快照副本”
B. 政策项目追踪数据获取
fetch_sources:抓取并落盘raw/(含 URL、抓取时间、响应头、失败原因)parse_sources:解析 HTML/PDF/表格 ⇒interim/records.jsonldedupe_linkage:去重、归并同名、生成稳定project_id⇒interim/projects.parquet
C. 证据抽取与变量编码(QCA核心)
extract_evidence:规则抽取 +(可选)AI辅助“定位段落/摘要”label_with_qwen(可选):千问输出候选标签/理由/引用片段(结构化 JSON)qc_gate:低置信/冲突/缺证 ⇒ 输出qc/review.csv(人工复核队列)encode_conditions:规则优先 + 采纳/回退 AI 建议 ⇒processed/qca_conditions.csvcalibrate_fsqca:校准阈值 ⇒processed/fsqca_matrix.csv
D. 分析与报告
run_qca:生成解、覆盖度、一致性等 ⇒results/qca_results.jsongenerate_report:模板化生成 report + 证据附录 ⇒artifacts/report.md(再转 pdf/docx 可选)finalize_manifest:把每步输入输出、hash、统计摘要写回 manifest
4. 配置文件样例(你可以直接复制再改)
4.1 configs/run.yaml
id: run-yaml-sample
name: configs/run.yaml(样例)
type: markdown
content: |-
run:
name: "policy-qca-batch"
timezone: "Asia/Shanghai"
output_root: "runs"
snapshot_inputs: true
scope:
date_start: "2025-01-01"
date_end: "2026-01-28"
regions: ["北京市", "厦门市"] # 可选
project_keywords_file: "data/external/project_keywords.csv"
processing:
dedupe:
enabled: true
fuzzy_threshold: 0.92
evidence:
max_chars_per_record: 6000
keep_html_snapshot: true
ai:
enabled: true
provider: "qwen"
task_extract_evidence: true
task_label_conditions: true
cache_enabled: true
require_citations: true # 要求返回引用片段/段落位置
low_confidence_threshold: 0.65
qc:
sample_rate: 0.08
always_review_when:
- "conflict_between_rule_and_ai"
- "missing_required_evidence"
- "ai_confidence_below_threshold"
execution:
max_workers: 4
retries: 2
retry_delay_seconds: 10
4.2 configs/qwen.yaml
id: qwen-yaml-sample
name: configs/qwen.yaml(样例)
type: markdown
content: |-
qwen:
base_url: "https://dashscope.aliyuncs.com/api/v1"
api_key_env: "DASHSCOPE_API_KEY"
model: "qwen-plus"
temperature: 0.2
top_p: 0.8
max_tokens: 1200
timeout_seconds: 60
rate_limit:
qps: 2
burst: 2
retry:
max_attempts: 3
backoff_seconds: 2
output:
format: "json" # 你将强制模型输出JSON以便落盘与校验
schema_version: "v1"
cache:
dir: "data/cache/qwen"
key_strategy: "sha256(prompt+input+model+params)"
5. Manifest(运行清单)与 Ledger(决策账本):复现与可追踪的关键
5.1 manifest.json 建议字段(每次 run 自动写入)
run_id、时间戳、操作者(可选)git_commit(若无 git,也可写code_hash)- 依赖锁文件 hash
- configs/rules/prompts 的快照 hash
- 每个节点的输入/输出路径 + 文件 hash + 行数/条数统计
- 千问调用统计:调用次数、总 token(若可获取)、失败率、缓存命中率
5.2 decision ledger(每条 AI 参与都留痕)
每条记录至少包括:
record_id / project_idevidence_pointer:来源文件、段落位置、URL、抓取时间prompt_versionmodel、参数(temperature 等)ai_output_jsonfinal_decision:最终采纳值(可能是规则值/人工值/AI值)decision_reason:采纳/拒绝原因confidence(来自 AI 或规则置信)review_status:是否进入人工复核队列
这套 ledger 是你写论文方法部分、以及做同行复核时的“证据链”。
6. Prefect Flow 代码骨架(最小可用版本)
下面给你一个能跑起来的骨架:包含 run 初始化、几个节点、以及在节点中写入工件与 manifest/ledger 的接口位置。
id: prefect-flow-skeleton
name: src/pipeline/flow.py(Prefect Flow骨架)
type: code.python
content: |-
from __future__ import annotations
from pathlib import Path
from datetime import datetime
import json
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.runtime import flow_run
from pipeline.manifest import Manifest
from pipeline.io import RunPaths
from pipeline.ledger import DecisionLedger
from pipeline.tasks_fetch import fetch_sources
from pipeline.tasks_parse import parse_sources, dedupe_linkage
from pipeline.tasks_evidence import extract_evidence, qwen_extract_or_label
from pipeline.tasks_encode import encode_conditions
from pipeline.tasks_calibrate import calibrate_fsqca
from pipeline.tasks_qca import run_qca
from pipeline.tasks_report import generate_report
@task
def init_run(config_path: str) -> dict:
logger = get_run_logger()
run_id = datetime.now().strftime("%Y-%m-%d_%H%M%S")
base = Path("runs") / run_id
paths = RunPaths(base)
paths.ensure_dirs()
manifest = Manifest.new(run_id=run_id)
manifest.write(paths.manifest_path)
# 复制/快照配置(简单起见:直接记录路径与hash,正式可把文件copy到runs/<id>/snapshot/)
manifest.add_config_ref(config_path=config_path)
logger.info(f"Initialized run at {base}")
return {"run_id": run_id, "run_dir": str(base)}
@flow(name="policy-qca-local-batch")
def policy_qca_flow(config_path: str = "configs/run.yaml"):
logger = get_run_logger()
init = init_run(config_path)
run_dir = Path(init["run_dir"])
paths = RunPaths(run_dir)
manifest = Manifest.load(paths.manifest_path)
ledger = DecisionLedger(paths.ledger_dir)
# B: 抓取与解析
raw_index = fetch_sources(config_path=config_path, out_dir=str(paths.raw_dir))
records_path = parse_sources(raw_index=raw_index, out_dir=str(paths.interim_dir))
projects_path = dedupe_linkage(records_path=records_path, out_dir=str(paths.interim_dir))
# C: 证据抽取 + 千问辅助(可选)
evidence_path = extract_evidence(projects_path=projects_path, out_dir=str(paths.interim_dir))
ai_labels_path = qwen_extract_or_label(
evidence_path=evidence_path,
out_dir=str(paths.interim_dir),
ledger_dir=str(paths.ledger_dir),
config_path=config_path,
)
# 编码 + 校准
conditions_path = encode_conditions(
projects_path=projects_path,
evidence_path=evidence_path,
ai_labels_path=ai_labels_path,
out_dir=str(paths.processed_dir),
qc_dir=str(paths.qc_dir),
ledger_dir=str(paths.ledger_dir),
config_path=config_path,
)
matrix_path = calibrate_fsqca(
conditions_path=conditions_path,
out_dir=str(paths.processed_dir),
config_path=config_path,
)
# D: QCA + 报告
qca_results_path = run_qca(matrix_path=matrix_path, out_dir=str(paths.results_dir))
report_path = generate_report(
projects_path=projects_path,
conditions_path=conditions_path,
qca_results_path=qca_results_path,
out_dir=str(paths.artifacts_dir),
templates_dir="templates",
)
# Finalize manifest
manifest.add_artifact("projects", projects_path)
manifest.add_artifact("evidence", evidence_path)
manifest.add_artifact("ai_labels", ai_labels_path)
manifest.add_artifact("conditions", conditions_path)
manifest.add_artifact("fsqca_matrix", matrix_path)
manifest.add_artifact("qca_results", qca_results_path)
manifest.add_artifact("report", report_path)
manifest.write(paths.manifest_path)
logger.info("Flow completed successfully.")
if __name__ == "__main__":
policy_qca_flow()
上面只展示编排骨架。你接下来要做的是把每个
tasks_*.py写成“纯函数式的可重跑节点”:输入是文件路径与 config,输出是新的文件路径,并且每一步都落盘。
7. 千问接入:调用封装、结构化输出、缓存、重试、限速(关键)
7.1 你需要的调用策略(强烈建议)
- 结构化 JSON 输出:强制模型只返回 JSON,便于校验与落盘
- 缓存:同一
prompt+input+params不重复花钱/不引入随机漂移 - 限速与重试:本地并发跑节点时尤为重要
- 可追踪:每次调用写入 ledger(含输入片段指针、prompt版本、输出、采纳情况)
7.2 千问输出(建议 schema)
对“条件变量候选标签”任务,建议输出:
project_idconditions:{var_name: {value, confidence, rationale, citations[]}}citations: 每条含source_file、paragraph_index或char_range、quote
这让你把 QCA 的变量赋值与“文本证据”绑定起来,满足学术可复核。
8. 调度方式(本地)怎么落地:让 Prefect 像“生产”一样跑
你是本地批处理,推荐最简且稳的方式:
- 日常手动:
python -m src.pipeline.flow或prefect run ... - 定时任务:用 Windows 任务计划程序 / cron 每晚跑一次
- 并发策略:Prefect 任务层面控制
max_workers,千问调用层面控制 QPS
关键是:每次定时跑都会产生一个新的 run 目录,你可以对比两次运行的差异(数据变化 vs 规则变化 vs AI变化)。
9. 你现在可以按“最小闭环”落地的实施顺序(建议 3 天可跑通)
第 1 阶段:不用 AI,先跑通可复现管线
fetch → parse → dedupe → extract_evidence → encode_rules_only → calibrate → qca → report- 把 manifest + runs 归档机制做对
第 2 阶段:接入千问做“候选建议”,不直接定值
- 增加
qwen_extract_or_label节点 - 输出
ai_labels.jsonl - encode 阶段:规则优先,AI 只补缺失/模糊项;低置信进入
qc/review.csv
第 3 阶段:把“证据附录”固化成报告模板的一部分
- 报告中每个变量给出引用片段与来源指针(从 ledger/evidence 汇总)
10. 我需要你补充两点信息,才能把“千问调用封装代码”写到可直接运行
千问接入通常有两种方式(你选一个,我就按那个给你可运行的 qwen/client.py 与 tasks_evidence.py 实现):
- DashScope 官方接口(常见):你有
DASHSCOPE_API_KEY - 你们单位/网关封装后的千问接口:有自定义 base_url 与鉴权方式
你回复我这两项即可:
- 你使用的是 DashScope 吗?(是/否;如果否,给我 base_url 鉴权说明)
- 你想用的模型名:
qwen-plus、qwen-max,还是别的?
一旦确认,我会把以下文件补齐为“可直接拷贝运行”的版本,并把你需要改的地方用加粗标注(按你的偏好):
src/qwen/client.py(限速/重试/超时/结构化输出)src/qwen/cache.py(sha256 缓存键)src/pipeline/tasks_evidence.py(调用千问并写 ledger)src/pipeline/ledger.py(decision ledger JSONL 规范)src/pipeline/manifest.py(manifest 规范与自动hash)
你说得对:如果一上来就按“理想生产级”把 manifest / ledger / cache / rules / prompts / templates 全铺开,文件多、对齐成本高,很容易把研究推进卡死在工程细节上。
下面我把方案降维成“本地批处理可落地、3 天可跑通”的最小版本:先把“能跑 + 可复现 + 可追踪”做出来,再逐步加你需要的东西。你会发现:并不需要一开始就那么多文件。
一句话策略:先做“最小闭环”,再做“可审计增强”
- 阶段 1(最小闭环):只保证“可重跑 + 产物落盘 + 可定位失败”
- 阶段 2(证据链):再补“ledger(AI调用留痕/引用片段)”
- 阶段 3(治理化):再补“规则版本、prompt版本、质检门控、报告模板化”
阶段 1:只用 4 个文件,就能跑通 Prefect 本地批处理(不谈对齐)
你先只保留下面这 4 个文件(其余都不要):
flow.py:Prefect 编排(全流程一键跑)steps.py:所有步骤函数(抓取/解析/编码/输出)qwen.py:千问调用封装(先最简)config.yaml:运行参数(日期范围、开关、输出目录)
输出目录固定为:runs/<run_id>/...
每步把结果写成文件(csv/jsonl),这就已经具备“可重跑”的骨架了。
你此阶段“必须对齐”的只有一件事
只对齐“输入→输出文件名”,例如:
raw_sources.jsonlrecords.jsonlprojects.csvevidence.jsonlqca_conditions.csvreport.md
只要这些产物名固定,你就能稳定串起来;其他“版本/账本/模板/字典”都可以后补。
阶段 2:只新增 1 个文件,把“AI 可追踪”补齐(不影响主流程)
当你觉得流程跑顺了,再加:
ledger.jsonl(不是代码文件,是输出文件)
每次千问调用,就往 runs/<run_id>/ledger.jsonl 追加一行 JSON,记录:
- project_id
- 输入片段指针(来自 evidence.jsonl 的 id)
- prompt 名称(先写死也行)
- 千问输出
- 你是否采纳(先默认采纳/默认不采纳都行)
到这里,你已经能回答“这条变量值来自哪段文本、哪次AI输出”,论文方法复核也够用。
阶段 3:你觉得“确实需要”了再加——而不是一开始就加
下面这些确实有价值,但可以等你痛点出现再加:
rules/variables.yaml:当变量多、规则复杂、频繁改动时再引入prompts/*.md:当你需要做 prompt A/B 对比与版本回溯时再引入templates/*.j2:当报告结构稳定、需要自动化批量生成时再引入manifest.json:当你需要严格复现(代码版本/依赖锁/数据hash)时再引入qc/review.csv:当你开始在意“低置信必须人工复核”时再引入
换句话说:先跑通,再治理化,否则你会被“工程完备性”拖慢研究节奏。
你刚才的“还要对齐 还要_____________”空格,我想确认你最烦的是哪一种?
你补一句就行,我就按你的痛点继续“减文件/减对齐”:
A. 还要对齐 字段/列名(records、projects、conditions 的列经常变)
B. 还要对齐 变量定义/阈值(IG 等指标口径老改)
C. 还要对齐 证据片段与变量值的映射(引用链很费)
D. 还要对齐 千问输出格式(JSON不稳定、解析麻烦)
E. 还要对齐 报告模板(每次都要手工改版式)
你选一个(或直接把空格补完),我就给你一套更“懒人”的最小工程结构:该省的省掉、该自动对齐的自动对齐,而且保证 Prefect + 千问仍然能稳定跑。
能,但要满足两个前提:规格先行 + 机器可验证的对齐方式
像 Claude Code、Cursor、Windsurf、Aider 这类“代码代理/IDE 代理”,确实可以一次性生成多个文件、自动补齐引用、创建目录结构、甚至在本地运行测试并修复——前提是:
- 你给它足够明确的“接口契约”(哪些文件、哪些函数签名、哪些输入输出格式、哪些产物路径);
- 你有一套可自动验证的约束(例如:
make check/pytest/ JSON schema 校验 / 样例数据 golden file)。
否则“对齐”往往只是表面上的:文件都有了,但字段名不一致、路径不一致、输出格式漂移,跑起来还是断。
这些工具“对齐”到什么程度?
1)目录 + 多文件生成:能
- 自动创建
src/pipeline/*.py、configs/*.yaml、templates/*、README等 - 自动把 import 路径补齐、把配置读写串起来
2)跨文件一致性(函数签名、字段名、路径名):在“有契约”时能
最有效的方法是让它遵守一份单一事实来源(Single Source of Truth),例如:
schemas.py/schema.json:定义所有中间表字段paths.py:定义所有产物路径常量contracts.md:规定每个节点输入输出文件名与格式
代理再去生成其他文件时,会围绕这些契约自动对齐。
3)可运行性(能跑通 Prefect flow):能,但通常要它“执行—报错—修复”迭代几轮
这取决于你是否允许它:
- 读取你的本地工程
- 运行命令(
prefect/python/pytest) - 根据报错自动修复
如果只能“静态写代码”,成功率会下降。
4)千问接口细节:能,但需要你明确“使用哪种 SDK/HTTP 方式”
千问(DashScope)不同接入方式差异很大:SDK 版本、endpoint、鉴权环境变量、返回结构。你给出确定的调用方式,对齐会非常顺。
你这种“Prefect + 本地批处理 + 千问 + QCA/报告”最容易卡在什么“对齐”上?
- 中间数据结构漂移:
records.jsonl / projects.csv / evidence.jsonl / conditions.csv字段不一致 - 产物路径不统一:任务写到
data/,Flow 以为在runs/<id>/ - AI 输出不稳定:模型没按 JSON 输出、字段缺失、citation 格式变
- 规则/阈值版本:变量口径变化导致历史 run 不可比
这些都可以靠“契约 + 校验”解决,让代理真正做到“自动对齐”。
建议你用的最省心打法:让工具生成,但由你提供 3 个“硬约束文件”
只要先写(或让我帮你写)这 3 个东西,Claude Code 等工具生成多文件并对齐的成功率会大幅提升:
contracts.md:列出每个 Prefect 节点- 输入文件名/格式
- 输出文件名/格式
- 必填字段
schemas.py或schemas.json:把关键表结构固定(至少四张:records/projects/evidence/conditions)make check(或python -m pipeline.check):自动校验- JSON schema 校验
- CSV 列名校验
- 样例跑通(用 3 条样本数据)
然后你对 Claude Code 下指令:“按 contracts + schemas 生成全部文件,确保 make check 通过;若失败则迭代修复。”
你问“能直接生成并对齐吗”的直接回答
- 能生成各个文件:能。
- 能自动对齐:在你提供契约/校验的情况下基本能;没有契约就很难保证。
我需要你确认两点,我就能把“契约文件 + 工具指令”给你一套可直接复制用的
- 你千问接入是 DashScope(DASHSCOPE_API_KEY) 吗?还是单位网关?
- 你希望最小闭环先跑哪些节点?
- 选项 A:抓取→解析→证据抽取→条件表→报告(先不做 QCA)
- 选项 B:抓取→解析→证据→条件表→校准→QCA→报告(全链路)
你回复这两点后,我会给你两样东西(尽量少文件、但强对齐):
- 一份
contracts.md+schemas.json(你的场景专用) - 一段给 Claude Code(或 Cursor/Aider)用的**“一键生成并自检”的指令模板**(包含迭代修复策略)
“契约/校验”到底是什么?为什么你需要它?
你担心“生成很多文件还要对齐”,本质是缺少一套机器可验证的统一标准:
- 契约(Contract):规定“每个步骤输入/输出是什么、叫什么、长什么样”。
例如:parse_sources的输出必须是records.jsonl,每行必须包含project_id,title,source_url,published_at,text。 - 校验(Validation):把上述规定变成一键检查,让工具(Claude Code 等)自己跑、自己改直到通过。
例如:make check会检查文件是否存在、字段是否齐、JSON 是否符合 schema、样例流程是否跑通。
没有契约/校验,代理当然能“生成文件”,但很难保证跨文件一致;有了契约/校验,它就能做到“生成→运行→失败→修复→直到对齐”。
一、怎么“找”契约/校验?其实主要有 3 个来源
1)从你现有工作中“抽”出来(最靠谱)
把你现在已经在做的事情梳理成几条硬规则:
- 你每次跑完会得到哪些中间文件?(records、projects、evidence、conditions…)
- 哪些列/字段是必须的?(例如 QCA 的条件变量列、项目唯一 ID)
- 哪些字段类型/格式固定?(日期格式、URL、0-1 隶属度)
- 哪些约束必须满足?(
project_id唯一;membership必须在 0-1)
这就是契约的内容。你并不需要“发明标准”,只是把你现在隐含在脑子里的约束写出来。
2)从“失败/返工点”反推(最快)
回忆你最常遇到的断裂点,然后把它写成可检查规则:
- “字段名又变了导致下游报错” → 校验 CSV 列名必须一致
- “AI 输出不是 JSON” → 校验必须能解析为 JSON 且包含指定键
- “网页更新导致结果漂移” → 规定必须用快照文件,不允许直接在线抓取用于分析
- “同名项目合并错” → 规定
project_id生成规则与去重阈值,并输出冲突清单
你越痛的地方,越应该先写校验。
3)借用通用行业模板(省事但要本地化)
常用“契约/校验”模板一般包括:
- 数据契约:JSON Schema / CSV schema
- 运行契约:每个节点的输入输出路径
- AI 契约:结构化输出 schema + 引用(citations)字段
- 回归契约:用小样本 golden data 做“结果不意外变化”检查
二、你这个场景(政策追踪—QCA—报告—千问)最小要写哪些契约?
建议只写 4 份“表/文件契约”(够用且不重):
records.jsonl:解析后的原子记录(每条公告/网页一条)projects.csv:归并后的项目级表(每个项目一行)evidence.jsonl:证据片段表(可追溯引用)conditions.csv:QCA 条件变量表(最终用于 fsQCA)
并且再写 1 份“流程契约”:
- 每个 Prefect 节点:输入文件名 → 输出文件名(固定在
runs/<run_id>/...)
三、校验怎么做?给你一套“从轻到重”的可执行方案
方案 A(最轻、立刻能用):纯 Python 校验列名 + 存在性
你写一个 python -m pipeline.check --run runs/<id>,检查:
- 必须文件存在
- CSV 列名包含必需列
- JSONL 每行可解析为 JSON 且必含关键字段
- QCA 数值范围(如 0~1)
这种最适合你现在:本地、批处理、想快速落地。
方案 B(更正规):JSON Schema + 数据框校验
- JSONL 用
jsonschema校验结构 - CSV/Parquet 用
pandera或pydantic校验类型与约束
方案 C(最强):golden sample 回归测试
准备一个小样本输入(比如 3 个项目、10 条记录),跑完整流程,校验输出与预期一致(或在允许范围内一致)。这能防“悄悄对齐错了”。
四、你可以直接复制的“契约文件”写法(示例)
1)流程契约 contracts.md(人读 + 机器也能照做)
# Workflow Contracts (Local Prefect Batch)
## Paths
All outputs MUST be under: runs/<run_id>/
## Node: parse_sources
Input: runs/<run_id>/raw/raw_index.json
Output: runs/<run_id>/interim/records.jsonl
records.jsonl: JSON Lines, one record per source item
Required keys: record_id, source_url, fetched_at, title, published_at, text
## Node: dedupe_linkage
Input: runs/<run_id>/interim/records.jsonl
Output: runs/<run_id>/interim/projects.csv
projects.csv required columns:
project_id, project_name, region, start_date, latest_update, source_urls
## Node: extract_evidence
Input: runs/<run_id>/interim/projects.csv + records.jsonl
Output: runs/<run_id>/interim/evidence.jsonl
evidence.jsonl required keys:
evidence_id, project_id, source_url, quote, char_start, char_end
## Node: label_conditions_with_qwen (optional)
Input: evidence.jsonl
Output: runs/<run_id>/interim/ai_labels.jsonl
ai_labels.jsonl required keys:
project_id, conditions, citations, confidence
## Node: encode_conditions
Input: projects.csv + evidence.jsonl + (optional) ai_labels.jsonl
Output: runs/<run_id>/processed/conditions.csv
conditions.csv required columns:
project_id, IG, ... (other variables)
All membership values MUST be in [0,1] (if already calibrated).
2)结构契约 schemas.json(给工具“硬对齐”的依据)
你可以先只写最关键的几个字段(不必一次写全)。
{
"records": {
"required": ["record_id", "source_url", "fetched_at", "title", "published_at", "text"]
},
"projects": {
"required": ["project_id", "project_name", "region", "latest_update"]
},
"evidence": {
"required": ["evidence_id", "project_id", "source_url", "quote", "char_start", "char_end"]
},
"conditions": {
"required": ["project_id", "IG"]
}
}
五、最关键的问题:你问“怎么找提供契约/校验”——我的建议是按这 3 步拿到它
Step 1:先选“最终交付件”
对你来说是:conditions.csv + report.md(以及能追溯的 evidence)
Step 2:倒推“必需中间件”
为了让 conditions.csv 能稳定生成,你最少需要:projects.csv 和 evidence.jsonl
Step 3:把“你最不能接受的错误”写成校验规则
例如:
project_id不唯一 → 直接失败- 缺少
source_url或quote→ 直接失败 - IG 赋值没有证据引用 → 进入
qc/review.csv或直接失败(看你策略) - AI 输出无法解析 JSON → 该条回退规则编码,且写入 ledger
契约不是越多越好,而是先写“会让你返工的那几条”。
六、你只要回答两个问题,我就能把“最小契约 + 最小校验脚本”按你项目定制出来
- 你的 conditions.csv 里,除了
project_id和IG,你现在至少还需要哪几个变量列名?(列个清单即可) - 你更希望校验失败时:
- A:直接中止(保证严谨)
- B:降级回退(例如 AI 不合格则不用 AI,继续出结果并标红)
你给出这两点,我就把:
contracts.md(你的变量列与节点路径)schemas.json(按你的字段)python -m pipeline.check的最小实现逻辑(不引入太多依赖)
一次性给你,且让它特别适合 Claude Code 这种工具去“生成文件并对齐”。
更多推荐

所有评论(0)