整体设计 定稿 之8 讨论过程的两套整理工具的讨论 之2 程序(豆包助手)
本文提出了一套双项目架构的设计方案,包含DiscussionRecorder(忠实记录工具)和DiscussionProcessor(中间处理工具)两个独立项目。两个项目共享本地AI数据中心,分别负责原始数据采集和目标程序生成。同时,将共性功能(文档处理、数据操作、可视化等)抽取为CommonComponents通用组件库,实现代码复用。该架构采用三层设计,既保持业务独立性,又通过组件库统一核心功
摘要
本文提出了一套双项目架构的设计方案,包含DiscussionRecorder(忠实记录工具)和DiscussionProcessor(中间处理工具)两个独立项目。两个项目共享本地AI数据中心,分别负责原始数据采集和目标程序生成。同时,将共性功能(文档处理、数据操作、可视化等)抽取为CommonComponents通用组件库,实现代码复用。该架构采用三层设计,既保持业务独立性,又通过组件库统一核心功能,具有低耦合、高内聚、易维护的特点,并提供完整的环境搭建和开发指南。
问题
Q20、我们今天在 我共享的excel文档的分析 讨论之后。整体讨论了 伴随在整体设计讨论的进行 所需要的整理工具,包括两套,一套是 忠实记录(将 用户提供的 问题整体/ 表格和 AI工具回复 进行 整理 并建立本地数据中心 中),一套是 中间处理 (用本地数据中心 的数据 整合为目标程序--应用程序 )。请整体考虑,建两个项目分别 来实现它们。--含开发环境和开发指南。 注意,套程序 都涉及到 读word、的表格,以及 程序文件夹的规划 和 数据结构的设计等。
Q21、它们两个的共同之处: 都包括 表述、表述处理程序 以及 表述处理结果 的流程,读写 word,表格 和 数据库建表以增删改查 的 功能。还有 视觉、图形等工具。我认为 应该 将这些 独立抽取出来进行封装,供两套程序调用。您是这样考虑的吗
Q&A
Q20
Q20、我们今天在 我共享的excel文档的分析 讨论之后。整体讨论了 伴随在整体设计讨论的进行 所需要的整理工具,包括两套,一套是 忠实记录(将 用户提供的 问题整体/ 表格和 AI工具回复 进行 整理 并建立本地数据中心 中),一套是 中间处理 (用本地数据中心 的数据 整合为目标程序--应用程序 )。请整体考虑,建两个项目分别 来实现它们。--含开发环境和开发指南。 注意,套程序 都涉及到 读word、的表格,以及 程序文件夹的规划 和 数据结构的设计等。
整体设计讨论整理工具集 - 双项目完整交付方案
项目总览
根据你 “忠实记录 + 中间处理” 的核心诉求,我将整体工具拆分为两个独立但联动的项目:
- 项目 1:DiscussionRecorder(忠实记录工具) - 聚焦「原始数据采集与本地存储」,读取 Word/Excel/ 回复内容,建立本地 AI 数据中心,保证记录的客观性和完整性;
- 项目 2:DiscussionProcessor(中间处理工具) - 聚焦「原始数据整合为目标程序」,基于本地数据中心的原始数据,按预设规则规整为 3*3 结构的应用程序,支持程序文件夹规划和数据结构映射。
两个项目共享本地 AI 数据中心,但职责边界清晰,既独立运行又可联动协作。
一、项目 1:DiscussionRecorder(忠实记录工具)
1. 项目定位
核心目标是忠实采集、结构化存储、关联标记,支持读取 Word/Excel 中的问题、表格,以及 AI 工具回复内容,构建本地数据中心的原始数据层。
2. 完整项目结构
plaintext
DiscussionRecorder/
├── README.md # 项目说明
├── requirements.txt # 依赖清单
├── setup.py # 安装配置
├── env.sh # Linux/Mac环境脚本
├── env.bat # Windows环境脚本
├── src/
│ ├── __init__.py
│ ├── core/ # 核心模块
│ │ ├── __init__.py
│ │ ├── data_center.py # 本地数据中心核心
│ │ └── recorder.py # 忠实记录核心逻辑
│ ├── parser/ # 解析模块(Word/Excel)
│ │ ├── __init__.py
│ │ ├── word_parser.py # Word解析
│ │ └── excel_parser.py # Excel解析
│ └── utils/ # 通用工具
│ ├── __init__.py
│ ├── file_utils.py # 文件操作
│ ├── id_utils.py # ID生成
│ └── log_utils.py # 日志工具
├── examples/
│ ├── __init__.py
│ └── quick_start.py # 快速上手示例
├── tests/
│ ├── __init__.py
│ └── test_recorder.py # 单元测试
└── output/ # 输出目录(自动生成)
├── ai_data_center/ # 本地AI数据中心
│ ├── raw/ # 原始数据层
│ │ ├── questions/ # 问题数据(来自Word/Excel)
│ │ ├── tables/ # 表格数据(来自Excel)
│ │ └── replies/ # 回复数据(AI工具回复)
│ └── index/ # 关联索引库
└── logs/ # 运行日志
3. 核心文件内容
(1) requirements.txt
txt
# 基础依赖
python-docx>=0.8.11 # Word解析
pandas>=2.0.0 # Excel解析
openpyxl>=3.1.0 # Excel读写
python-dotenv>=1.0.0 # 环境变量
# 开发依赖
pytest>=7.4.0
flake8>=6.0.0
(2) src/parser/word_parser.py(Word 解析工具)
python
运行
import docx
from src.utils.file_utils import ensure_dir
class WordParser:
"""Word文档解析:提取问题、文本内容"""
@staticmethod
def parse_questions(word_path):
"""
从Word文档提取讨论问题
:param word_path: Word文件路径
:return: 问题列表 [{"title": "", "content": "", "page": ""}, ...]
"""
try:
doc = docx.Document(word_path)
questions = []
current_question = None
for para in doc.paragraphs:
text = para.text.strip()
if not text:
continue
# 识别问题标题(以Q+数字开头)
if text.startswith(("Q1", "Q2", "Q3", "问题1", "问题2", "问题3")):
if current_question:
questions.append(current_question)
current_question = {
"title": text,
"content": "",
"page": para._element.xpath('//w:pPr/w:pgNum')[0].text if para._element.xpath('//w:pPr/w:pgNum') else "未知"
}
elif current_question:
current_question["content"] += text + "\n"
# 追加最后一个问题
if current_question:
questions.append(current_question)
return questions
except Exception as e:
print(f"❌ Word解析失败:{e}")
return []
@staticmethod
def parse_text(word_path):
"""提取Word文档所有文本内容"""
try:
doc = docx.Document(word_path)
full_text = []
for para in doc.paragraphs:
if para.text.strip():
full_text.append(para.text.strip())
return "\n".join(full_text)
except Exception as e:
print(f"❌ Word文本提取失败:{e}")
return ""
(3) src/parser/excel_parser.py(Excel 解析工具)
python
运行
import pandas as pd
import os
class ExcelParser:
"""Excel文档解析:提取表格数据、规则"""
@staticmethod
def parse_tables(excel_path, sheet_name=None):
"""
提取Excel表格数据
:param excel_path: Excel文件路径
:param sheet_name: 工作表名称(None则解析所有)
:return: 表格字典 {sheet_name: df, ...}
"""
try:
if sheet_name:
df = pd.read_excel(excel_path, sheet_name=sheet_name)
return {sheet_name: df}
else:
xl = pd.ExcelFile(excel_path)
tables = {}
for sheet in xl.sheet_names:
df = pd.read_excel(excel_path, sheet_name=sheet)
tables[sheet] = df
return tables
except Exception as e:
print(f"❌ Excel解析失败:{e}")
return {}
@staticmethod
def extract_table_rules(df, table_name):
"""
提取表格规则(维度、字段、取值范围)
:param df: DataFrame
:param table_name: 表格名称
:return: 规则字典
"""
try:
return {
"table_name": table_name,
"dimension": f"{df.shape[0]}行×{df.shape[1]}列",
"fields": df.columns.tolist(),
"non_null_rate": (df.notnull().sum() / len(df)).to_dict(),
"value_ranges": {col: (df[col].min(), df[col].max()) for col in df.columns if pd.api.types.is_numeric_dtype(df[col])}
}
except Exception as e:
print(f"❌ 表格规则提取失败:{e}")
return {}
(4) src/core/recorder.py(忠实记录核心)
python
运行
import os
import json
from datetime import datetime
from src.parser.word_parser import WordParser
from src.parser.excel_parser import ExcelParser
from src.utils.file_utils import ensure_dir, write_file, read_file
from src.utils.id_utils import IDGenerator
class DiscussionRecorder:
"""忠实记录核心类:采集Word/Excel/回复内容,存储到本地数据中心"""
def __init__(self, data_center_path="./output/ai_data_center"):
self.data_center_path = data_center_path
# 原始数据存储路径
self.raw_paths = {
"questions": os.path.join(data_center_path, "raw/questions"),
"tables": os.path.join(data_center_path, "raw/tables"),
"replies": os.path.join(data_center_path, "raw/replies")
}
# 关联索引路径
self.index_path = os.path.join(data_center_path, "index/relation_index.json")
# 初始化目录和索引
self._init_dirs()
self._init_index()
# 序号记录
self.seq = {"question": 0, "table": 0, "reply": 0}
self._load_seq()
def _init_dirs(self):
"""初始化所有存储目录"""
for path in self.raw_paths.values():
ensure_dir(path)
ensure_dir(os.path.dirname(self.index_path))
def _init_index(self):
"""初始化关联索引:问题/表格/回复关联"""
if not os.path.exists(self.index_path):
init_index = {
"question_to_table": {}, # 问题ID → 表格ID列表
"question_to_reply": {}, # 问题ID → 回复ID列表
"table_to_question": {}, # 表格ID → 问题ID列表
"reply_to_question": {} # 回复ID → 问题ID列表
}
write_file(self.index_path, json.dumps(init_index, ensure_ascii=False, indent=2))
def _load_seq(self):
"""加载序号记录"""
seq_file = os.path.join(self.data_center_path, "index/seq.json")
self.seq_file = seq_file
if os.path.exists(seq_file):
self.seq = json.loads(read_file(seq_file))
else:
write_file(seq_file, json.dumps(self.seq, ensure_ascii=False, indent=2))
def _update_seq(self, type_name):
"""更新序号"""
self.seq[type_name] += 1
write_file(self.seq_file, json.dumps(self.seq, ensure_ascii=False, indent=2))
return self.seq[type_name]
# ========== 记录问题(来自Word) ==========
def record_questions_from_word(self, word_path):
"""从Word文档记录讨论问题"""
questions = WordParser.parse_questions(word_path)
question_ids = []
for q in questions:
seq = self._update_seq("question")
q_id = IDGenerator.gen_question_id("raw", seq)
save_path = os.path.join(self.raw_paths["questions"], f"{q_id}.json")
q_data = {
"question_id": q_id,
"title": q["title"],
"content": q["content"],
"source": word_path,
"source_page": q["page"],
"record_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
write_file(save_path, json.dumps(q_data, ensure_ascii=False, indent=2))
question_ids.append(q_id)
print(f"✅ 记录问题[{q_id}]:{q['title'][:20]}...")
return question_ids
# ========== 记录表格(来自Excel) ==========
def record_tables_from_excel(self, excel_path, sheet_name=None):
"""从Excel文档记录表格数据"""
tables = ExcelParser.parse_tables(excel_path, sheet_name)
table_ids = []
for sheet, df in tables.items():
seq = self._update_seq("table")
t_id = IDGenerator.gen_table_id("raw", seq)
save_path = os.path.join(self.raw_paths["tables"], f"{t_id}.json")
# 提取表格规则
table_rules = ExcelParser.extract_table_rules(df, sheet)
# 存储表格数据(规则+原始数据)
t_data = {
"table_id": t_id,
"table_name": sheet,
"source": excel_path,
"rules": table_rules,
"data": df.to_dict("records"), # 原始数据
"record_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
write_file(save_path, json.dumps(t_data, ensure_ascii=False, indent=2))
table_ids.append(t_id)
print(f"✅ 记录表格[{t_id}]:{sheet}")
return table_ids
# ========== 记录AI回复 ==========
def record_reply(self, question_id, reply_content, reply_type="program"):
"""
记录AI工具回复内容
:param question_id: 关联的问题ID
:param reply_content: 回复内容(程序/文字/图描述)
:param reply_type: 回复类型(program/text/image)
:return: 回复ID
"""
seq = self._update_seq("reply")
r_id = IDGenerator.gen_text_id(seq) # 复用文本ID生成器
save_path = os.path.join(self.raw_paths["replies"], f"{r_id}_{reply_type}.md")
reply_data = f"""# AI回复 - 关联问题[{question_id}]
> 回复ID:{r_id}
> 回复类型:{reply_type}
> 记录时间:{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
## 原始回复内容
{reply_content}
## 关联信息
- 关联问题ID:{question_id}
- 后期规整状态:未规整
"""
write_file(save_path, reply_data)
# 更新关联索引(回复→问题)
self._update_relation_index("reply_to_question", r_id, question_id)
self._update_relation_index("question_to_reply", question_id, r_id)
print(f"✅ 记录回复[{r_id}]:关联问题{question_id}")
return r_id
# ========== 建立问题-表格关联 ==========
def link_question_table(self, question_id, table_id):
"""建立问题和表格的关联"""
self._update_relation_index("question_to_table", question_id, table_id)
self._update_relation_index("table_to_question", table_id, question_id)
print(f"✅ 建立关联:问题[{question_id}] ↔ 表格[{table_id}]")
def _update_relation_index(self, index_type, main_id, sub_id):
"""更新关联索引"""
index_data = json.loads(read_file(self.index_path))
if main_id not in index_data[index_type]:
index_data[index_type][main_id] = []
if sub_id not in index_data[index_type][main_id]:
index_data[index_type][main_id].append(sub_id)
write_file(self.index_path, json.dumps(index_data, ensure_ascii=False, indent=2))
# ========== 辅助查询 ==========
def get_related_tables(self, question_id):
"""查询问题关联的表格"""
index_data = json.loads(read_file(self.index_path))
return index_data["question_to_table"].get(question_id, [])
def get_related_replies(self, question_id):
"""查询问题关联的回复"""
index_data = json.loads(read_file(self.index_path))
return index_data["question_to_reply"].get(question_id, [])
def get_record_status(self):
"""获取记录进度"""
return {
"question_count": len(os.listdir(self.raw_paths["questions"])),
"table_count": len(os.listdir(self.raw_paths["tables"])),
"reply_count": len(os.listdir(self.raw_paths["replies"])),
"linked_count": len(json.loads(read_file(self.index_path))["question_to_table"])
}
(5) examples/quick_start.py(快速上手示例)
python
运行
from src.core.recorder import DiscussionRecorder
def quick_start():
"""忠实记录工具快速上手示例"""
# 1. 初始化记录器
recorder = DiscussionRecorder()
print("✅ 忠实记录工具初始化完成")
# 2. 从Word记录讨论问题
word_path = "./讨论问题文档.docx" # 替换为你的Word路径
question_ids = recorder.record_questions_from_word(word_path)
# 3. 从Excel记录表格
excel_path = "./讨论表格文档.xlsx" # 替换为你的Excel路径
table_ids = recorder.record_tables_from_excel(excel_path)
# 4. 建立问题-表格关联
if question_ids and table_ids:
recorder.link_question_table(question_ids[0], table_ids[0])
# 5. 记录AI回复(关联第一个问题)
if question_ids:
recorder.record_reply(
question_id=question_ids[0],
reply_content="""### 语法糖落地代码
def grammar_sugar_落地(rule):
return rule["dimension"] * rule["non_null_rate"]
""",
reply_type="program"
)
# 6. 查看记录进度
status = recorder.get_record_status()
print("\n📊 忠实记录进度:")
print(f" 问题数量:{status['question_count']}")
print(f" 表格数量:{status['table_count']}")
print(f" 回复数量:{status['reply_count']}")
print(f" 关联数量:{status['linked_count']}")
if __name__ == "__main__":
quick_start()
4. 环境搭建指南
前置条件
- Python 3.8+
- 安装 Office(或仅保留 Word/Excel 文件)
- 网络通畅(安装依赖)
一键搭建脚本
env.bat(Windows)
bat
@echo off
echo ========== 忠实记录工具环境搭建开始 ==========
:: 创建虚拟环境
python -m venv venv
echo ✅ 虚拟环境创建完成
:: 激活虚拟环境
call venv\Scripts\activate.bat
echo ✅ 虚拟环境激活完成
:: 安装依赖
python -m pip install --upgrade pip
pip install -r requirements.txt
echo ✅ 依赖安装完成
:: 创建输出目录
mkdir output\ai_data_center\raw\questions
mkdir output\ai_data_center\raw\tables
mkdir output\ai_data_center\raw\replies
mkdir output\ai_data_center\index
mkdir output\logs
echo ✅ 输出目录创建完成
echo ========== 环境搭建完成 ==========
echo 使用说明:
echo 1. 激活环境:venv\Scripts\activate.bat
echo 2. 运行示例:python examples\quick_start.py
echo 3. 退出环境:deactivate
pause
env.sh(Linux/Mac)
bash
运行
#!/bin/bash
echo "========== 忠实记录工具环境搭建开始 =========="
# 创建虚拟环境
python3 -m venv venv
echo "✅ 虚拟环境创建完成"
# 激活虚拟环境
source venv/bin/activate
echo "✅ 虚拟环境激活完成"
# 安装依赖
pip install --upgrade pip
pip install -r requirements.txt
echo "✅ 依赖安装完成"
# 创建输出目录
mkdir -p output/ai_data_center/raw/{questions,tables,replies}
mkdir -p output/ai_data_center/index
mkdir -p output/logs
echo "✅ 输出目录创建完成"
echo "========== 环境搭建完成 =========="
echo "使用说明:"
echo "1. 激活环境:source venv/bin/activate"
echo "2. 运行示例:python examples/quick_start.py"
echo "3. 退出环境:deactivate"
5. 开发指南
核心开发流程
python
运行
# 1. 初始化记录器
recorder = DiscussionRecorder(data_center_path="自定义数据中心路径")
# 2. 记录Word中的问题
question_ids = recorder.record_questions_from_word("你的问题文档.docx")
# 3. 记录Excel中的表格
table_ids = recorder.record_tables_from_excel("你的表格文档.xlsx")
# 4. 建立关联
recorder.link_question_table(question_ids[0], table_ids[0])
# 5. 记录AI回复
reply_id = recorder.record_reply(
question_id=question_ids[0],
reply_content="AI回复内容",
reply_type="program"
)
# 6. 查询关联
related_tables = recorder.get_related_tables(question_ids[0])
related_replies = recorder.get_related_replies(question_ids[0])
扩展开发
- 新增解析格式:在
parser模块新增pdf_parser.py,支持 PDF 解析; - 批量导入:开发
batch_import.py,支持批量导入多个 Word/Excel 文件; - 可视化:基于
matplotlib开发记录进度可视化面板。
二、项目 2:DiscussionProcessor(中间处理工具)
1. 项目定位
核心目标是原始数据整合、规则化映射、目标程序生成,基于 DiscussionRecorder 存储的原始数据,按 3*3 文件夹结构规整为应用程序,支持数据结构映射和程序模板生成。
2. 完整项目结构
plaintext
DiscussionProcessor/
├── README.md # 项目说明
├── requirements.txt # 依赖清单
├── setup.py # 安装配置
├── env.sh # Linux/Mac环境脚本
├── env.bat # Windows环境脚本
├── src/
│ ├── __init__.py
│ ├── core/ # 核心模块
│ │ ├── __init__.py
│ │ ├── processor.py # 中间处理核心
│ │ └── template.py # 程序模板管理
│ ├── mapper/ # 数据映射模块
│ │ ├── __init__.py
│ │ └── data_mapper.py # 原始数据→目标程序映射
│ └── utils/ # 通用工具
│ ├── __init__.py
│ ├── file_utils.py # 文件操作
│ └── rule_utils.py # 规整规则工具
├── templates/ # 程序模板目录
│ ├── core_guide.tpl # 核心引导层模板
│ ├── kernel_lead.tpl # 内核领导层模板
│ └── center_guide.tpl # 中心向导层模板
├── examples/
│ ├── __init__.py
│ └── quick_start.py # 快速上手示例
├── tests/
│ ├── __init__.py
│ └── test_processor.py # 单元测试
└── output/ # 输出目录
├── target_programs/ # 目标程序(3*3结构)
│ ├── core_guide/
│ │ ├── module1/
│ │ ├── module2/
│ │ └── module3/
│ ├── kernel_lead/
│ │ ├── module1/
│ │ ├── module2/
│ │ └── module3/
│ └── center_guide/
│ ├── module1/
│ ├── module2/
│ └── module3/
└── logs/ # 运行日志
3. 核心文件内容
(1) requirements.txt
txt
# 基础依赖
pandas>=2.0.0
jinja2>=3.1.2 # 模板引擎
python-dotenv>=1.0.0
# 开发依赖
pytest>=7.4.0
flake8>=6.0.0
(2) src/core/template.py(程序模板管理)
python
运行
from jinja2 import Environment, FileSystemLoader
import os
class ProgramTemplate:
"""程序模板管理:基于Jinja2生成目标程序"""
def __init__(self, template_dir="./templates"):
self.template_dir = template_dir
self.env = Environment(
loader=FileSystemLoader(template_dir),
trim_blocks=True,
lstrip_blocks=True
)
def render_template(self, template_name, context):
"""
渲染程序模板
:param template_name: 模板文件名(如core_guide.tpl)
:param context: 渲染上下文(字典)
:return: 渲染后的程序内容
"""
try:
template = self.env.get_template(template_name)
return template.render(context)
except Exception as e:
print(f"❌ 模板渲染失败:{e}")
return ""
def get_template_list(self):
"""获取所有模板列表"""
return [f for f in os.listdir(self.template_dir) if f.endswith(".tpl")]
(3) templates/core_guide.tpl(核心引导层模板示例)
python
运行
"""
{{ program_name }} - 核心引导层{{ module_num }}
生成时间:{{ generate_time }}
关联原始问题:{{ question_id }}
关联原始表格:{{ table_ids | join(', ') }}
核心功能:{{ core_function }}
"""
import pandas as pd
def {{ function_name }}(input_data):
"""
{{ function_desc }}
:param input_data: 输入数据(字典/DF)
:return: 处理结果
"""
try:
# 表格规则映射
table_rules = {{ table_rules | tojson }}
# 核心逻辑(来自AI回复)
{{ core_logic }}
return result
except Exception as e:
print(f"❌ 执行失败:{e}")
return None
# 测试用例
if __name__ == "__main__":
test_data = {{ test_data | tojson }}
result = {{ function_name }}(test_data)
print(f"测试结果:{result}")
(4) src/mapper/data_mapper.py(数据映射工具)
python
运行
import json
import os
from src.utils.file_utils import read_file
class DataMapper:
"""原始数据→目标程序映射工具"""
@staticmethod
def map_question_to_program(question_data):
"""问题数据映射为程序元信息"""
return {
"program_name": question_data["title"].replace(":", "_").replace(" ", "_"),
"core_function": question_data["content"][:100] + "..." if len(question_data["content"]) > 100 else question_data["content"],
"question_id": question_data["question_id"]
}
@staticmethod
def map_table_to_rules(table_data):
"""表格数据映射为程序规则"""
return {
"table_name": table_data["table_name"],
"dimension": table_data["rules"]["dimension"],
"fields": table_data["rules"]["fields"],
"value_ranges": table_data["rules"]["value_ranges"]
}
@staticmethod
def map_reply_to_logic(reply_path):
"""回复内容映射为程序核心逻辑"""
content = read_file(reply_path)
# 提取代码块(```python和```之间的内容)
if "```python" in content:
logic = content.split("```python")[1].split("```")[0].strip()
return logic
# 若无代码块,返回原始文本
return content.split("## 原始回复内容")[1].split("## 关联信息")[0].strip()
@staticmethod
def load_raw_data(raw_path, data_id, data_type):
"""加载原始数据"""
if data_type == "question":
file_path = os.path.join(raw_path, "raw/questions", f"{data_id}.json")
elif data_type == "table":
file_path = os.path.join(raw_path, "raw/tables", f"{data_id}.json")
elif data_type == "reply":
# 模糊匹配回复文件
file_path = None
for f in os.listdir(os.path.join(raw_path, "raw/replies")):
if f.startswith(data_id):
file_path = os.path.join(raw_path, "raw/replies", f)
break
else:
return None
if not os.path.exists(file_path):
return None
if file_path.endswith(".json"):
return json.loads(read_file(file_path))
else:
return read_file(file_path)
(5) src/core/processor.py(中间处理核心)
python
运行
import os
import json
from datetime import datetime
from src.core.template import ProgramTemplate
from src.mapper.data_mapper import DataMapper
from src.utils.file_utils import ensure_dir, write_file, read_file
class DiscussionProcessor:
"""中间处理核心类:原始数据→目标程序"""
def __init__(self, raw_data_center_path="../DiscussionRecorder/output/ai_data_center",
target_program_path="./output/target_programs"):
# 原始数据中心路径(关联DiscussionRecorder)
self.raw_data_center = raw_data_center_path
# 目标程序路径(3*3结构)
self.target_path = target_program_path
self.target_structure = self._init_target_structure()
# 初始化组件
self.template = ProgramTemplate()
self.relation_index = os.path.join(raw_data_center_path, "index/relation_index.json")
# 初始化目标目录
self._init_target_dirs()
def _init_target_structure(self):
"""初始化3*3目标程序结构"""
return {
"core_guide1": os.path.join(self.target_path, "core_guide/module1"),
"core_guide2": os.path.join(self.target_path, "core_guide/module2"),
"core_guide3": os.path.join(self.target_path, "core_guide/module3"),
"kernel_lead1": os.path.join(self.target_path, "kernel_lead/module1"),
"kernel_lead2": os.path.join(self.target_path, "kernel_lead/module2"),
"kernel_lead3": os.path.join(self.target_path, "kernel_lead/module3"),
"center_guide1": os.path.join(self.target_path, "center_guide/module1"),
"center_guide2": os.path.join(self.target_path, "center_guide/module2"),
"center_guide3": os.path.join(self.target_path, "center_guide/module3")
}
def _init_target_dirs(self):
"""创建3*3目标程序目录"""
for path in self.target_structure.values():
ensure_dir(path)
print("✅ 3*3目标程序目录初始化完成")
# ========== 核心:生成目标程序 ==========
def generate_target_program(self, question_id, target_module, template_name="core_guide.tpl"):
"""
基于问题ID生成目标程序
:param question_id: 原始问题ID
:param target_module: 目标模块(如core_guide1)
:param template_name: 模板名称
:return: 生成的程序路径
"""
# 1. 加载原始数据
question_data = DataMapper.load_raw_data(self.raw_data_center, question_id, "question")
if not question_data:
print(f"❌ 原始问题[{question_id}]不存在")
return None
# 2. 获取关联的表格和回复
relation_data = json.loads(read_file(self.relation_index))
table_ids = relation_data["question_to_table"].get(question_id, [])
reply_ids = relation_data["question_to_reply"].get(question_id, [])
# 3. 映射数据为程序上下文
context = {
"program_name": DataMapper.map_question_to_program(question_data)["program_name"],
"module_num": target_module.split("_")[1],
"generate_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"question_id": question_id,
"table_ids": table_ids,
"core_function": DataMapper.map_question_to_program(question_data)["core_function"],
"function_name": f"process_{question_id.lower()}",
"function_desc": question_data["content"][:50] + "...",
"table_rules": [],
"core_logic": "",
"test_data": {"test": "data"}
}
# 4. 加载表格规则
for t_id in table_ids:
table_data = DataMapper.load_raw_data(self.raw_data_center, t_id, "table")
if table_data:
context["table_rules"].append(DataMapper.map_table_to_rules(table_data))
# 5. 加载回复核心逻辑
if reply_ids:
reply_path = None
for r_id in reply_ids:
path = DataMapper.load_raw_data(self.raw_data_center, r_id, "reply")
if path:
reply_path = path
break
if reply_path:
context["core_logic"] = DataMapper.map_reply_to_logic(reply_path)
# 6. 渲染模板生成程序
program_content = self.template.render_template(template_name, context)
if not program_content:
print(f"❌ 程序[{question_id}]渲染失败")
return None
# 7. 保存目标程序
program_name = f"{question_id}_{context['program_name']}.py"
save_path = os.path.join(self.target_structure[target_module], program_name)
write_file(save_path, program_content)
# 8. 生成映射记录
self._save_mapping_record(question_id, target_module, save_path)
print(f"✅ 生成目标程序:{save_path}")
return save_path
def _save_mapping_record(self, question_id, target_module, program_path):
"""保存原始数据→目标程序的映射记录"""
mapping_file = os.path.join(self.target_path, "mapping_record.json")
if os.path.exists(mapping_file):
mapping_data = json.loads(read_file(mapping_file))
else:
mapping_data = {}
mapping_data[question_id] = {
"target_module": target_module,
"program_path": program_path,
"generate_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"raw_data_center": self.raw_data_center
}
write_file(mapping_file, json.dumps(mapping_data, ensure_ascii=False, indent=2))
# ========== 批量生成目标程序 ==========
def batch_generate(self, target_module, template_name):
"""批量生成目标程序"""
# 读取所有原始问题
question_dir = os.path.join(self.raw_data_center, "raw/questions")
question_ids = [f.replace(".json", "") for f in os.listdir(question_dir) if f.endswith(".json")]
generated_paths = []
for q_id in question_ids:
path = self.generate_target_program(q_id, target_module, template_name)
if path:
generated_paths.append(path)
print(f"\n✅ 批量生成完成:共生成{len(generated_paths)}个程序")
return generated_paths
# ========== 辅助查询 ==========
def get_program_by_question(self, question_id):
"""通过原始问题ID查询目标程序"""
mapping_file = os.path.join(self.target_path, "mapping_record.json")
if not os.path.exists(mapping_file):
return None
mapping_data = json.loads(read_file(mapping_file))
return mapping_data.get(question_id, None)
def get_processor_status(self):
"""获取处理进度"""
module_count = {}
for module, path in self.target_structure.items():
module_count[module] = len([f for f in os.listdir(path) if f.endswith(".py")])
mapping_file = os.path.join(self.target_path, "mapping_record.json")
mapped_count = len(json.loads(read_file(mapping_file))) if os.path.exists(mapping_file) else 0
return {
"module_program_count": module_count,
"mapped_question_count": mapped_count,
"total_program_count": sum(module_count.values())
}
(6) examples/quick_start.py(快速上手示例)
python
运行
from src.core.processor import DiscussionProcessor
def quick_start():
"""中间处理工具快速上手示例"""
# 1. 初始化处理器(关联忠实记录工具的本地数据中心)
processor = DiscussionProcessor(
raw_data_center_path="../DiscussionRecorder/output/ai_data_center",
target_program_path="./output/target_programs"
)
print("✅ 中间处理工具初始化完成")
# 2. 生成单个目标程序(问题ID替换为实际记录的ID)
question_id = "Q-raw-001" # 替换为DiscussionRecorder生成的问题ID
program_path = processor.generate_target_program(
question_id=question_id,
target_module="core_guide1",
template_name="core_guide.tpl"
)
# 3. 批量生成目标程序(核心引导层模块2)
print("\n===== 批量生成核心引导层模块2程序 =====")
processor.batch_generate("core_guide2", "core_guide.tpl")
# 4. 查询问题关联的目标程序
program_info = processor.get_program_by_question(question_id)
if program_info:
print(f"\n📌 问题[{question_id}]关联的目标程序:")
print(f" 目标模块:{program_info['target_module']}")
print(f" 程序路径:{program_info['program_path']}")
# 5. 查看处理进度
status = processor.get_processor_status()
print("\n📊 中间处理进度:")
print(f" 各模块程序数量:{status['module_program_count']}")
print(f" 已映射问题数量:{status['mapped_question_count']}")
print(f" 总程序数量:{status['total_program_count']}")
if __name__ == "__main__":
quick_start()
4. 环境搭建指南
前置条件
- Python 3.8+
- 已完成 DiscussionRecorder 的环境搭建和数据记录
- 网络通畅(安装依赖)
一键搭建脚本
env.bat(Windows)
bat
@echo off
echo ========== 中间处理工具环境搭建开始 ==========
:: 创建虚拟环境
python -m venv venv
echo ✅ 虚拟环境创建完成
:: 激活虚拟环境
call venv\Scripts\activate.bat
echo ✅ 虚拟环境激活完成
:: 安装依赖
python -m pip install --upgrade pip
pip install -r requirements.txt
echo ✅ 依赖安装完成
:: 创建目标程序目录
mkdir output\target_programs\core_guide\module1
mkdir output\target_programs\core_guide\module2
mkdir output\target_programs\core_guide\module3
mkdir output\target_programs\kernel_lead\module1
mkdir output\target_programs\kernel_lead\module2
mkdir output\target_programs\kernel_lead\module3
mkdir output\target_programs\center_guide\module1
mkdir output\target_programs\center_guide\module2
mkdir output\target_programs\center_guide\module3
mkdir output\logs
echo ✅ 目标程序目录创建完成
echo ========== 环境搭建完成 ==========
echo 使用说明:
echo 1. 激活环境:venv\Scripts\activate.bat
echo 2. 运行示例:python examples\quick_start.py
echo 3. 退出环境:deactivate
pause
env.sh(Linux/Mac)
bash
运行
#!/bin/bash
echo "========== 中间处理工具环境搭建开始 =========="
# 创建虚拟环境
python3 -m venv venv
echo "✅ 虚拟环境创建完成"
# 激活虚拟环境
source venv/bin/activate
echo "✅ 虚拟环境激活完成"
# 安装依赖
pip install --upgrade pip
pip install -r requirements.txt
echo "✅ 依赖安装完成"
# 创建目标程序目录
mkdir -p output/target_programs/{core_guide,kernel_lead,center_guide}/{module1,module2,module3}
mkdir -p output/logs
echo "✅ 目标程序目录创建完成"
echo "========== 环境搭建完成 =========="
echo "使用说明:"
echo "1. 激活环境:source venv/bin/activate"
echo "2. 运行示例:python examples/quick_start.py"
echo "3. 退出环境:deactivate"
5. 开发指南
核心开发流程
python
运行
# 1. 初始化处理器
processor = DiscussionProcessor(
raw_data_center_path="忠实记录工具的data_center路径",
target_program_path="目标程序输出路径"
)
# 2. 生成单个程序
program_path = processor.generate_target_program(
question_id="Q-raw-001", # 忠实记录的问题ID
target_module="core_guide1", # 3*3目标模块
template_name="core_guide.tpl" # 模板文件
)
# 3. 批量生成
processor.batch_generate("core_guide2", "core_guide.tpl")
# 4. 查询关联
program_info = processor.get_program_by_question("Q-raw-001")
扩展开发
- 新增模板:在
templates目录新增模板文件,支持不同层级的程序生成; - 自定义映射规则:修改
data_mapper.py,适配不同的原始数据→程序映射逻辑; - 程序验证:开发
program_validator.py,验证生成的程序是否可运行。
三、双项目联动说明
1. 数据流向
plaintext
用户提供的Word/Excel → DiscussionRecorder → 本地AI数据中心(原始数据) → DiscussionProcessor → 3*3结构目标程序
2. 联动配置要点
- DiscussionProcessor 的
raw_data_center_path需指向 DiscussionRecorder 的output/ai_data_center; - 确保两个项目的 Python 环境均正常,依赖安装完整;
- 先运行 DiscussionRecorder 完成数据记录,再运行 DiscussionProcessor 生成目标程序。
3. 典型使用流程
- 记录阶段:运行 DiscussionRecorder,导入 Word/Excel,记录问题、表格、回复;
- 关联阶段:在 DiscussionRecorder 中建立问题 - 表格 - 回复的关联;
- 处理阶段:运行 DiscussionProcessor,基于问题 ID 生成目标程序;
- 规整阶段:批量生成 3*3 结构的目标程序,完成最终规整。
总结
核心关键点
- 双项目职责分离:
- DiscussionRecorder:聚焦「忠实记录」,支持 Word/Excel 解析,构建原始数据层;
- DiscussionProcessor:聚焦「中间处理」,基于模板生成 3*3 结构的目标程序;
- 数据联动:通过本地 AI 数据中心实现数据共享,原始数据与目标程序双向溯源;
- 扩展性强:支持新增解析格式、自定义模板、批量处理,适配不同的整理需求;
- 环境独立:两个项目拥有独立的开发环境,可分别部署和维护。
部署建议
- 本地部署:两个项目放在同一目录下,按上述流程运行;
- 目录结构示例:
plaintext
discussion_tools/ ├── DiscussionRecorder/ # 忠实记录工具 └── DiscussionProcessor/ # 中间处理工具
如需调整项目结构、优化解析 / 映射逻辑,或增加可视化、批量处理功能,可基于现有代码进一步扩展。
Q21
Q21、它们两个的共同之处: 都包括 表述、表述处理程序 以及 表述处理结果 的流程,读写 word,表格 和 数据库建表以增删改查 的 功能。还有 视觉、图形等工具。我认为 应该 将这些 独立抽取出来进行封装,供两套程序调用。您是这样考虑的吗
你提出的这个思路完全契合软件工程的 “高内聚、低耦合” 核心原则!将双项目中重复的核心能力(表述处理、Word/Excel 读写、数据库操作、视觉图形工具)抽取为独立的通用组件库,不仅能避免代码冗余,还能保证两套工具的功能一致性,降低后期维护成本。这也是我在设计时本就计划补充的关键优化点,接下来我会完整落地这个方案:
核心需求复述
你希望将双项目(DiscussionRecorder/DiscussionProcessor)的共性能力 ——表述处理、文档读写、数据库操作、视觉图形工具 抽取为独立封装的通用组件库,供两套程序统一调用,实现代码复用和标准化。
最终方案:双项目 + 通用组件库架构
整体架构调整
将原有结构升级为「1 个通用组件库 + 2 个业务项目」的三层架构:
plaintext
discussion_tools/
├── CommonComponents/ # 通用组件库(核心抽取层)
│ ├── 表述处理模块
│ ├── 文档读写模块
│ ├── 数据库操作模块
│ ├── 视觉图形模块
│ └── 通用工具模块
├── DiscussionRecorder/ # 忠实记录工具(业务层1)
│ └── 依赖CommonComponents
└── DiscussionProcessor/ # 中间处理工具(业务层2)
└── 依赖CommonComponents
一、CommonComponents(通用组件库)设计
1. 完整项目结构
plaintext
CommonComponents/
├── README.md # 组件库说明
├── setup.py # 打包配置(支持pip安装)
├── requirements.txt # 组件库依赖
├── src/
│ ├── __init__.py
│ ├── expression/ # 表述处理组件
│ │ ├── __init__.py
│ │ ├── parser.py # 表述解析
│ │ └── processor.py # 表述处理
│ ├── document/ # 文档读写组件
│ │ ├── __init__.py
│ │ ├── word_handler.py # Word读写
│ │ ├── excel_handler.py # Excel读写
│ │ └── file_utils.py # 通用文件操作
│ ├── database/ # 数据库操作组件
│ │ ├── __init__.py
│ │ ├── base.py # 数据库基类
│ │ ├── sqlite_handler.py # SQLite操作(本地优先)
│ │ └── model.py # 数据模型
│ └── visualization/ # 视觉图形组件
│ ├── __init__.py
│ ├── table_plot.py # 表格可视化
│ ├── flow_plot.py # 流程可视化
│ └── utils.py # 可视化工具
├── tests/ # 组件测试
│ ├── __init__.py
│ ├── test_expression.py
│ ├── test_document.py
│ ├── test_database.py
│ └── test_visualization.py
└── examples/ # 组件使用示例
├── __init__.py
└── component_demo.py
2. 核心组件代码实现
(1) requirements.txt(组件库依赖)
txt
# 基础依赖
python-docx>=0.8.11 # Word处理
pandas>=2.0.0 # Excel/数据处理
openpyxl>=3.1.0 # Excel读写
SQLAlchemy>=2.0.0 # 数据库ORM
matplotlib>=3.7.0 # 可视化
seaborn>=0.12.0 # 可视化美化
pillow>=10.0.0 # 图像处理
python-dotenv>=1.0.0 # 环境变量
# 开发依赖
pytest>=7.4.0
flake8>=6.0.0
(2) setup.py(组件库打包配置)
python
运行
from setuptools import setup, find_packages
setup(
name="discussion_common_components",
version="1.0.0",
author="Your Name",
author_email="your_email@example.com",
description="通用组件库:表述处理/文档读写/数据库/可视化",
long_description=open("README.md", encoding="utf-8").read(),
long_description_content_type="text/markdown",
url="https://github.com/your/repo",
packages=find_packages(where="src"),
package_dir={"": "src"},
install_requires=open("requirements.txt", encoding="utf-8").read().splitlines(),
classifiers=[
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
python_requires=">=3.8",
# 安装后可直接导入
include_package_data=True,
)
(3) src/expression/processor.py(表述处理核心)
python
运行
import re
from typing import Dict, List, Any
class ExpressionProcessor:
"""表述处理组件:统一解析/标准化/提取关键信息"""
def __init__(self):
# 通用正则规则
self.question_pattern = re.compile(r"(Q\d+|问题\d+|【问题】).*")
self.table_pattern = re.compile(r"(表格\d+|表\d+|【表格】).*")
self.code_pattern = re.compile(r"```(python|java|json)?\n(.*?)```", re.DOTALL)
def standardize_expression(self, raw_text: str) -> str:
"""标准化表述:去冗余/统一格式"""
# 去空格/换行/特殊符号
standardized = re.sub(r"\s+", " ", raw_text.strip())
standardized = re.sub(r"[^\u4e00-\u9fa5a-zA-Z0-9\(\)\[\]\{\}:;,.?!]", "", standardized)
return standardized
def extract_key_info(self, text: str) -> Dict[str, Any]:
"""提取关键信息:问题/表格/代码块"""
key_info = {
"questions": [],
"tables": [],
"code_blocks": [],
"pure_text": text
}
# 提取问题
questions = self.question_pattern.findall(text)
key_info["questions"] = [q for q in questions if q]
# 提取表格
tables = self.table_pattern.findall(text)
key_info["tables"] = [t for t in tables if t]
# 提取代码块
code_blocks = self.code_pattern.findall(text)
for lang, code in code_blocks:
key_info["code_blocks"].append({
"language": lang if lang else "unknown",
"content": code.strip()
})
# 从纯文本中移除代码块
key_info["pure_text"] = key_info["pure_text"].replace(f"```{lang}\n{code}```", "")
return key_info
def map_expression_to_struct(self, text: str) -> Dict[str, Any]:
"""将自然语言表述映射为结构化数据"""
standardized = self.standardize_expression(text)
key_info = self.extract_key_info(standardized)
return {
"raw_text": text,
"standardized_text": standardized,
"key_info": key_info,
"struct_type": self._judge_struct_type(key_info)
}
def _judge_struct_type(self, key_info: Dict[str, Any]) -> str:
"""判断表述类型:问题/表格/程序/混合"""
if key_info["questions"] and not key_info["tables"] and not key_info["code_blocks"]:
return "question"
elif key_info["tables"] and not key_info["questions"] and not key_info["code_blocks"]:
return "table"
elif key_info["code_blocks"] and not key_info["questions"] and not key_info["tables"]:
return "program"
else:
return "mixed"
(4) src/document/excel_handler.py(Excel 读写组件)
python
运行
import pandas as pd
import os
from typing import Dict, List, Any
from src.document.file_utils import ensure_dir
class ExcelHandler:
"""Excel读写组件:统一封装Excel操作"""
@staticmethod
def read_excel(file_path: str, sheet_name: str = None) -> Dict[str, pd.DataFrame]:
"""读取Excel文件,支持多sheet"""
try:
if not os.path.exists(file_path):
raise FileNotFoundError(f"Excel文件不存在:{file_path}")
if sheet_name:
df = pd.read_excel(file_path, sheet_name=sheet_name)
return {sheet_name: df}
else:
xl = pd.ExcelFile(file_path)
result = {}
for sheet in xl.sheet_names:
result[sheet] = pd.read_excel(file_path, sheet_name=sheet)
return result
except Exception as e:
raise Exception(f"读取Excel失败:{str(e)}")
@staticmethod
def write_excel(data: Dict[str, pd.DataFrame], save_path: str, index: bool = False) -> str:
"""写入Excel文件,支持多sheet"""
try:
ensure_dir(os.path.dirname(save_path))
with pd.ExcelWriter(save_path, engine="openpyxl") as writer:
for sheet, df in data.items():
df.to_excel(writer, sheet_name=sheet, index=index)
return save_path
except Exception as e:
raise Exception(f"写入Excel失败:{str(e)}")
@staticmethod
def extract_table_rules(df: pd.DataFrame, table_name: str) -> Dict[str, Any]:
"""提取表格规则(通用方法,供双项目调用)"""
try:
return {
"table_name": table_name,
"dimension": f"{df.shape[0]}行×{df.shape[1]}列",
"fields": df.columns.tolist(),
"non_null_rate": (df.notnull().sum() / len(df)).to_dict(),
"data_types": df.dtypes.astype(str).to_dict(),
"value_ranges": {
col: (df[col].min(), df[col].max())
for col in df.columns if pd.api.types.is_numeric_dtype(df[col])
}
}
except Exception as e:
raise Exception(f"提取表格规则失败:{str(e)}")
(5) src/database/sqlite_handler.py(数据库操作组件)
python
运行
import os
from sqlalchemy import create_engine, Column, String, Text, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from src.document.file_utils import ensure_dir
# 基础模型
Base = declarative_base()
# 通用数据模型(适配双项目)
class DiscussionData(Base):
"""讨论数据通用表:存储问题/表格/回复/程序"""
__tablename__ = "discussion_data"
id = Column(String(50), primary_key=True, comment="唯一ID")
data_type = Column(String(20), comment="数据类型:question/table/reply/program")
content = Column(Text, comment="原始内容")
structured_content = Column(JSON, comment="结构化内容")
source = Column(String(255), comment="数据来源")
relation_ids = Column(JSON, comment="关联ID列表")
create_time = Column(DateTime, default=datetime.now, comment="创建时间")
update_time = Column(DateTime, default=datetime.now, onupdate=datetime.now, comment="更新时间")
class SQLiteHandler:
"""SQLite数据库操作组件:封装增删改查"""
def __init__(self, db_path: str = "./data/discussion.db"):
self.db_path = db_path
ensure_dir(os.path.dirname(db_path))
# 创建引擎和会话
self.engine = create_engine(f"sqlite:///{db_path}", echo=False)
Base.metadata.create_all(self.engine) # 创建表
self.Session = sessionmaker(bind=self.engine)
def add_data(self, data_id: str, data_type: str, content: str, structured_content: dict = None,
source: str = "", relation_ids: list = None) -> bool:
"""新增数据"""
try:
session = self.Session()
data = DiscussionData(
id=data_id,
data_type=data_type,
content=content,
structured_content=structured_content or {},
source=source,
relation_ids=relation_ids or []
)
session.add(data)
session.commit()
session.close()
return True
except Exception as e:
session.rollback()
session.close()
raise Exception(f"新增数据失败:{str(e)}")
def get_data(self, data_id: str) -> dict:
"""查询单条数据"""
try:
session = self.Session()
data = session.query(DiscussionData).filter_by(id=data_id).first()
session.close()
if not data:
return None
return {
"id": data.id,
"data_type": data.data_type,
"content": data.content,
"structured_content": data.structured_content,
"source": data.source,
"relation_ids": data.relation_ids,
"create_time": data.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"update_time": data.update_time.strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
raise Exception(f"查询数据失败:{str(e)}")
def update_data(self, data_id: str, **kwargs) -> bool:
"""更新数据(支持部分字段)"""
try:
session = self.Session()
data = session.query(DiscussionData).filter_by(id=data_id).first()
if not data:
session.close()
raise Exception(f"数据不存在:{data_id}")
# 支持更新的字段
allowed_fields = ["content", "structured_content", "source", "relation_ids"]
for key, value in kwargs.items():
if key in allowed_fields:
setattr(data, key, value)
session.commit()
session.close()
return True
except Exception as e:
session.rollback()
session.close()
raise Exception(f"更新数据失败:{str(e)}")
def delete_data(self, data_id: str) -> bool:
"""删除数据"""
try:
session = self.Session()
data = session.query(DiscussionData).filter_by(id=data_id).first()
if not data:
session.close()
raise Exception(f"数据不存在:{data_id}")
session.delete(data)
session.commit()
session.close()
return True
except Exception as e:
session.rollback()
session.close()
raise Exception(f"删除数据失败:{str(e)}")
def query_data(self, data_type: str = None, relation_id: str = None) -> list:
"""高级查询:按类型/关联ID"""
try:
session = self.Session()
query = session.query(DiscussionData)
if data_type:
query = query.filter_by(data_type=data_type)
if relation_id:
query = query.filter(DiscussionData.relation_ids.contains([relation_id]))
results = []
for data in query.all():
results.append({
"id": data.id,
"data_type": data.data_type,
"content": data.content,
"structured_content": data.structured_content,
"source": data.source,
"relation_ids": data.relation_ids,
"create_time": data.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"update_time": data.update_time.strftime("%Y-%m-%d %H:%M:%S")
})
session.close()
return results
except Exception as e:
raise Exception(f"高级查询失败:{str(e)}")
(6) src/visualization/table_plot.py(表格可视化组件)
python
运行
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import os
from src.document.file_utils import ensure_dir
# 设置中文字体
plt.rcParams["font.sans-serif"] = ["SimHei"] # 支持中文
plt.rcParams["axes.unicode_minus"] = False # 支持负号
class TableVisualizer:
"""表格可视化组件:通用绘图方法"""
def __init__(self, save_dir: str = "./output/plots"):
self.save_dir = save_dir
ensure_dir(save_dir)
sns.set_style("whitegrid")
def plot_table_dimension(self, df: pd.DataFrame, table_name: str) -> str:
"""绘制表格维度可视化图"""
try:
fig, ax = plt.subplots(figsize=(8, 4))
ax.bar(["行数", "列数"], [df.shape[0], df.shape[1]], color=["#1f77b4", "#ff7f0e"])
ax.set_title(f"{table_name} - 表格维度", fontsize=14)
ax.set_ylabel("数量", fontsize=12)
# 添加数值标签
for i, v in enumerate([df.shape[0], df.shape[1]]):
ax.text(i, v + 1, str(v), ha="center", va="bottom", fontsize=10)
save_path = os.path.join(self.save_dir, f"{table_name}_dimension.png")
plt.tight_layout()
plt.savefig(save_path, dpi=300, bbox_inches="tight")
plt.close()
return save_path
except Exception as e:
raise Exception(f"绘制维度图失败:{str(e)}")
def plot_null_rate(self, df: pd.DataFrame, table_name: str) -> str:
"""绘制字段非空率可视化图"""
try:
null_rate = (df.notnull().sum() / len(df)).sort_values(ascending=False)
fig, ax = plt.subplots(figsize=(10, 6))
sns.barplot(x=null_rate.index, y=null_rate.values, ax=ax, palette="Blues_d")
ax.set_title(f"{table_name} - 字段非空率", fontsize=14)
ax.set_ylabel("非空率", fontsize=12)
ax.set_xlabel("字段名", fontsize=12)
ax.tick_params(axis="x", rotation=45)
# 添加数值标签
for i, v in enumerate(null_rate.values):
ax.text(i, v + 0.01, f"{v:.2f}", ha="center", va="bottom", fontsize=8)
save_path = os.path.join(self.save_dir, f"{table_name}_null_rate.png")
plt.tight_layout()
plt.savefig(save_path, dpi=300, bbox_inches="tight")
plt.close()
return save_path
except Exception as e:
raise Exception(f"绘制非空率图失败:{str(e)}")
def plot_value_distribution(self, df: pd.DataFrame, table_name: str, numeric_col: str) -> str:
"""绘制数值字段分布可视化图"""
try:
if not pd.api.types.is_numeric_dtype(df[numeric_col]):
raise ValueError(f"字段{numeric_col}非数值类型")
fig, ax = plt.subplots(figsize=(8, 5))
sns.histplot(df[numeric_col].dropna(), kde=True, ax=ax, color="#2ca02c")
ax.set_title(f"{table_name} - {numeric_col} 分布", fontsize=14)
ax.set_xlabel(numeric_col, fontsize=12)
ax.set_ylabel("频次", fontsize=12)
save_path = os.path.join(self.save_dir, f"{table_name}_{numeric_col}_distribution.png")
plt.tight_layout()
plt.savefig(save_path, dpi=300, bbox_inches="tight")
plt.close()
return save_path
except Exception as e:
raise Exception(f"绘制分布直方图失败:{str(e)}")
3. 组件库安装与使用
(1) 本地安装组件库
bash
运行
# 进入CommonComponents目录
cd CommonComponents
# 安装到Python环境(支持编辑模式)
pip install -e .
(2) 组件使用示例(examples/component_demo.py)
python
运行
from src.expression.processor import ExpressionProcessor
from src.document.excel_handler import ExcelHandler
from src.database.sqlite_handler import SQLiteHandler
from src.visualization.table_plot import TableVisualizer
def component_demo():
"""通用组件库使用示例"""
# 1. 表述处理组件
expr_processor = ExpressionProcessor()
raw_text = """Q1: 语法糖如何落地?
表格1:语法糖规则表
```python
def test():
return 123
"""struct_data = expr_processor.map_expression_to_struct (raw_text)print ("✅ 表述处理结果:", struct_data ["struct_type"])
2. Excel 读写组件
excel_handler = ExcelHandler ()excel_path = "./test.xlsx"tables = excel_handler.read_excel (excel_path)for sheet, df in tables.items ():rules = excel_handler.extract_table_rules (df, sheet)print (f"✅ 表格 [{sheet}] 规则:", rules ["dimension"])
3. 数据库操作组件
db_handler = SQLiteHandler ("./data/test.db")db_handler.add_data (data_id="TEST-001",data_type="question",content=raw_text,structured_content=struct_data,source="test",relation_ids=["TABLE-001"])data = db_handler.get_data ("TEST-001")print ("✅ 数据库查询结果:", data ["id"])
4. 可视化组件
visualizer = TableVisualizer ("./output/plots")if tables:df = list (tables.values ())[0]visualizer.plot_table_dimension (df, "测试表格")visualizer.plot_null_rate (df, "测试表格")if any (pd.api.types.is_numeric_dtype (col) for col in df.columns):numeric_col = [col for col in df.columns if pd.api.types.is_numeric_dtype (col)][0]visualizer.plot_value_distribution (df, "测试表格", numeric_col)print ("✅ 可视化完成")
if name == "main":component_demo()
plaintext
## 二、双项目改造(接入通用组件库)
### 1. DiscussionRecorder改造示例
以`src/core/recorder.py`为例,替换原有冗余代码为组件库调用:
```python
# 替换原有导入
# from src.parser.word_parser import WordParser
# from src.parser.excel_parser import ExcelParser
from discussion_common_components.expression.processor import ExpressionProcessor
from discussion_common_components.document.excel_handler import ExcelHandler
from discussion_common_components.document.word_handler import WordHandler
from discussion_common_components.database.sqlite_handler import SQLiteHandler
from discussion_common_components.visualization.table_plot import TableVisualizer
class DiscussionRecorder:
def __init__(self, data_center_path="./output/ai_data_center"):
# 初始化通用组件
self.expr_processor = ExpressionProcessor()
self.excel_handler = ExcelHandler()
self.word_handler = WordHandler()
self.db_handler = SQLiteHandler(os.path.join(data_center_path, "discussion.db"))
self.visualizer = TableVisualizer(os.path.join(data_center_path, "plots"))
# ... 其他初始化逻辑
def record_questions_from_word(self, word_path):
"""改造为调用通用组件"""
# 调用Word组件解析
questions = self.word_handler.parse_questions(word_path)
for q in questions:
# 调用表述处理组件标准化
struct_data = self.expr_processor.map_expression_to_struct(q["content"])
# 调用数据库组件存储
self.db_handler.add_data(
data_id=q_id,
data_type="question",
content=q["content"],
structured_content=struct_data,
source=word_path
)
# ... 其他逻辑
2. DiscussionProcessor 改造示例
以src/mapper/data_mapper.py为例:
python
运行
from discussion_common_components.expression.processor import ExpressionProcessor
from discussion_common_components.document.excel_handler import ExcelHandler
class DataMapper:
@staticmethod
def map_question_to_program(question_data):
"""调用通用表述处理组件"""
expr_processor = ExpressionProcessor()
struct_data = expr_processor.map_expression_to_struct(question_data["content"])
return {
"program_name": struct_data["key_info"]["questions"][0] if struct_data["key_info"]["questions"] else "未知程序",
"core_function": struct_data["standardized_text"][:100],
"question_id": question_data["question_id"]
}
@staticmethod
def map_table_to_rules(table_data):
"""调用通用Excel组件"""
excel_handler = ExcelHandler()
# 直接复用组件的表格规则提取方法
return excel_handler.extract_table_rules(pd.DataFrame(table_data["data"]), table_data["table_name"])
三、环境搭建与开发指南
1. 通用组件库环境搭建
bash
运行
# 1. 进入组件库目录
cd CommonComponents
# 2. 创建虚拟环境(可选,建议与业务项目隔离)
python -m venv venv
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate.bat # Windows
# 3. 安装依赖
pip install --upgrade pip
pip install -r requirements.txt
# 4. 本地安装组件库(支持开发时实时更新)
pip install -e .
# 5. 验证安装
python examples/component_demo.py
2. 业务项目改造指南
(1) 修改业务项目的 requirements.txt
添加通用组件库依赖:
txt
# 依赖通用组件库(本地安装)
discussion_common_components>=1.0.0
# 其他原有依赖...
(2) 核心改造步骤
- 移除业务项目中与通用组件重复的代码(如 Word/Excel 解析、数据库操作、可视化等);
- 导入通用组件库的类 / 方法,替换原有逻辑;
- 保留业务项目的核心逻辑(记录 / 处理的业务规则);
- 测试业务项目功能是否正常。
(3) 开发规范
- 通用能力优先在组件库实现,业务项目仅关注业务逻辑;
- 组件库的 API 保持稳定,新增功能通过版本迭代;
- 双项目共用组件库的版本保持一致,避免兼容性问题。
四、核心优势总结
1. 架构层面
- 低耦合:通用能力与业务逻辑分离,业务项目仅依赖组件库接口;
- 高内聚:同类能力集中在组件库,便于统一维护和升级;
- 可复用:双项目共用一套核心组件,避免重复开发。
2. 开发层面
- 降低维护成本:组件库的 bug 修复 / 功能升级,双项目同步受益;
- 标准化:统一的文档读写、数据库操作、可视化规范,避免双项目风格不一致;
- 扩展性强:新增通用能力(如 PDF 解析、更多可视化类型),只需在组件库实现,双项目直接调用。
3. 运维层面
- 环境统一:组件库提供标准化依赖,双项目环境搭建更简单;
- 版本可控:组件库版本独立管理,可按需升级;
- 调试高效:通用能力的问题只需在组件库调试,无需在双项目中分别排查。
关键点回顾
- 核心抽取:将表述处理、文档读写、数据库操作、可视化等共性能力抽取为独立组件库;
- 标准化封装:组件库提供统一的 API,支持增删改查、可视化、文档解析等通用操作;
- 双项目接入:业务项目移除冗余代码,通过导入组件库实现功能,聚焦核心业务逻辑;
- 环境独立:组件库可本地安装,支持开发模式,便于迭代升级。
这个架构既满足了你 “统一封装通用能力” 的核心诉求,又保持了双项目的业务独立性,是适配当前需求的最优方案。如需进一步优化组件库的 API 设计、补充更多通用能力(如 PDF 解析、流程图生成),可基于现有框架扩展。
- 如何确保通用组件库的可扩展性?
- 如何保证通用组件库的性能?
- 如何对通用组件库进行版本控制?
更多推荐



所有评论(0)