前言

我最近完成了一个有趣的项目——法律文档智能分析系统的批量分析功能扩展。这个项目让我深刻体会到了从单一功能到批量处理的系统设计思路,今天想和大家分享一下这个开发过程中的经验和思考。
场景1
场景2

项目背景

原有系统的局限性

最初的系统只能处理单个法律文档,主要功能包括:

  • 关键条款提取(如回购权、董事会席位等)
  • 智能问答
  • 报告生成

但在实际使用中,会有两个痛点:

  1. 项目综合分析需求:一个投资项目通常包含SPA(股份认购协议)、SHA(股东协议)、MAA(章程)三份文档,需要综合分析
  2. 横向对比分析需求:需要对比多个项目的特定条款(如回购权),进行市场研究

技术栈选择

  • 后端:Python + gpt-4.1-mini
  • 文档处理:python-docx
  • Web界面:Streamlit
  • 架构模式:模块化设计,单一职责原则

核心设计思路

1. 架构设计:从单一到批量

# 原有单文档分析器
class DocumentAnalyzer:
    def __init__(self, app_id: str):
        self.app_id = app_id
        self.document = None
        self.document_text = ""

    def load_document(self, file_path: str) -> bool:
        # 单文档加载逻辑
        pass

# 新增批量分析器
class BatchDocumentAnalyzer:
    def __init__(self, app_id: str):
        self.app_id = app_id
        self.analyzers = {}  # 存储多个文档分析器
        self.documents_info = {}  # 存储文档信息

    def load_documents(self, file_paths: List[str], labels: List[str]) -> Dict[str, bool]:
        # 批量文档加载逻辑
        pass

设计亮点

  • 复用原有的DocumentAnalyzer,避免重复开发
  • 使用组合模式,BatchDocumentAnalyzer管理多个DocumentAnalyzer实例
  • 保持接口一致性,降低学习成本

2. 场景化设计:针对性解决问题

场景1:项目综合分析
def scenario_1_project_comprehensive_analysis(self,
                                            spa_file: str,
                                            sha_file: str,
                                            maa_file: str,
                                            project_name: str = "投资项目") -> Dict:
    """
    针对单个项目的SPA、SHA、MAA进行综合分析
    """
    # 定义每种文档类型的重点条款
    spa_focus_clauses = [
        "投资概述", "董事会席位", "保护性条款", "信息及检查权",
        "优先认购权", "反稀释", "拖售权", "优先清算权"
    ]

    sha_focus_clauses = [
        "董事会席位", "保护性条款", "限制转股人的股权转让限制",
        "投资方自由转股", "优先认购权", "优先购买权&共同出售权",
        "拖售权", "回购权", "最惠国"
    ]

    maa_focus_clauses = [
        "董事会席位", "保护性条款", "优先分红", "优先清算权", "回购权"
    ]

    # 分别分析每个文档
    # ...
场景2:多项目条款对比
def scenario_2_multi_project_clause_comparison(self,
                                              file_paths: List[str],
                                              project_names: List[str],
                                              target_clause: str = "回购权") -> Dict:
    """
    对多个项目的特定条款进行对比分析
    """
    # 批量加载文档
    load_results = self.load_documents(file_paths, project_names)

    # 分析每个项目的目标条款
    clause_results = {}
    for project_name in project_names:
        if project_name in self.analyzers:
            clause_result = self.analyzers[project_name].extract_single_clause(target_clause)
            clause_results[project_name] = clause_result

    # 生成对比报告
    # ...

设计亮点

  • 场景化设计,针对具体业务需求
  • 灵活的条款配置,不同文档类型关注不同重点
  • 统一的报告生成格式

3. 报告生成:结构化输出

def _generate_comprehensive_report(self,
                                 project_name: str,
                                 analysis_results: Dict,
                                 load_results: Dict) -> str:
    """生成综合分析报告"""

    report_lines = []
    report_lines.append(f"# {project_name} 综合分析报告\n")
    report_lines.append("---\n")

    # 分析概览
    report_lines.append("## 📊 分析概览\n")
    report_lines.append(f"- **项目名称**: {project_name}\n")
    report_lines.append(f"- **分析时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")

    # 核心条款汇总
    report_lines.append("\n## 🎯 核心条款汇总\n")

    # 按条款类型汇总
    all_clauses = set()
    for doc_results in analysis_results.values():
        all_clauses.update(doc_results.keys())

    for clause_type in sorted(all_clauses):
        report_lines.append(f"### {clause_type}\n")

        for doc_label, doc_results in analysis_results.items():
            if clause_type in doc_results:
                clauses = doc_results[clause_type]
                if clauses and len(clauses) > 0:
                    first_clause = clauses[0]
                    if isinstance(first_clause, dict) and 'summary' in first_clause:
                        summary = first_clause['summary']
                        report_lines.append(f"**{doc_label}**:\n")
                        report_lines.append(f"{summary}\n\n")

    return ''.join(report_lines)

技术难点与解决方案

1. 内存管理问题

问题:批量处理多个大文档时,内存占用过高

解决方案

def load_documents(self, file_paths: List[str], document_labels: List[str] = None) -> Dict[str, bool]:
    """批量加载文档,优化内存使用"""
    results = {}

    for i, (file_path, label) in enumerate(zip(file_paths, document_labels)):
        print(f"🔄 正在加载文档 {i+1}/{len(file_paths)}: {label}")

        analyzer = DocumentAnalyzer(self.app_id)
        success = analyzer.load_document(file_path)

        if success:
            self.analyzers[label] = analyzer
            # 只保存必要的文档信息,不保存完整文档对象
            self.documents_info[label] = {
                'file_path': file_path,
                'file_name': os.path.basename(file_path),
                'char_count': len(analyzer.document_text),
                'paragraph_count': len(analyzer.document.paragraphs)
            }
            results[label] = True
        else:
            results[label] = False

    return results

2. API调用频率控制

问题:批量分析时API调用过于频繁,可能触发限流

解决方案

import time
from functools import wraps

def rate_limit(calls_per_minute=30):
    """API调用频率限制装饰器"""
    min_interval = 60.0 / calls_per_minute
    last_called = [0.0]

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_called[0]
            left_to_wait = min_interval - elapsed
            if left_to_wait > 0:
                time.sleep(left_to_wait)
            ret = func(*args, **kwargs)
            last_called[0] = time.time()
            return ret
        return wrapper
    return decorator

@rate_limit(calls_per_minute=20)
def _call_meituan_api(self, messages: List[Dict], max_tokens: int = 2000) -> Optional[str]:
    # API调用逻辑
    pass

3. 错误处理与容错机制

问题:批量处理时,单个文档失败不应影响整体流程

解决方案

def scenario_1_project_comprehensive_analysis(self, spa_file: str, sha_file: str, maa_file: str, project_name: str = "投资项目") -> Dict:
    """项目综合分析,带容错机制"""

    # 加载文档
    file_paths = [spa_file, sha_file, maa_file]
    labels = [f"{project_name}-SPA", f"{project_name}-SHA", f"{project_name}-MAA"]

    load_results = self.load_documents(file_paths, labels)

    # 检查加载结果,记录失败但不中断
    failed_docs = [label for label, success in load_results.items() if not success]
    if failed_docs:
        print(f"⚠️ 以下文档加载失败: {', '.join(failed_docs)}")

    # 继续处理成功加载的文档
    analysis_results = {}

    for label, clauses in document_clause_mapping.items():
        if label in self.analyzers:
            print(f"🔍 正在分析 {label}")
            doc_results = {}

            for clause_type in clauses:
                try:
                    clause_result = self.analyzers[label].extract_single_clause(clause_type)
                    doc_results[clause_type] = clause_result
                except Exception as e:
                    print(f"⚠️ 条款提取失败 {label}-{clause_type}: {str(e)}")
                    doc_results[clause_type] = []  # 失败时返回空结果

            analysis_results[label] = doc_results

    return {
        'project_name': project_name,
        'load_results': load_results,
        'analysis_results': analysis_results,
        'comprehensive_report': self._generate_comprehensive_report(project_name, analysis_results, load_results)
    }

Web界面设计

场景选择界面

def main():
    # 场景选择
    st.header("📋 选择分析场景")

    col1, col2 = st.columns(2)

    with col1:
        scenario1_selected = st.button(
            "🎯 场景1:项目综合分析",
            help="上传单个投资项目的SPA、SHA、MAA,生成综合分析报告",
            use_container_width=True,
            type="primary" if st.session_state.selected_scenario == "scenario1" else "secondary"
        )

    with col2:
        scenario2_selected = st.button(
            "🔍 场景2:多项目条款对比",
            help="上传多个投资项目的文档,对比分析特定条款",
            use_container_width=True,
            type="primary" if st.session_state.selected_scenario == "scenario2" else "secondary"
        )

动态文件上传

def show_scenario1_interface():
    """显示场景1的界面"""
    st.header("🎯 场景1:项目综合分析")

    # 项目名称输入
    project_name = st.text_input(
        "📝 项目名称",
        value="投资项目",
        placeholder="请输入项目名称,如:某某科技A轮投资"
    )

    # 文件上传
    col1, col2, col3 = st.columns(3)

    with col1:
        st.subheader("📄 SPA (股份认购协议)")
        spa_file = st.file_uploader(
            "上传SPA文档",
            type=['docx'],
            key="spa_upload",
            help="Stock Purchase Agreement - 股份认购协议"
        )

    # 类似地处理SHA和MAA文件上传

部署与分发

创建便携式安装包

# create_package.py
import zipfile
import os
import shutil

def create_distribution_package():
    """创建分发包"""

    # 核心文件列表
    core_files = [
        'document_analyzer.py',
        'batch_analyzer.py',
        'batch_web_interface.py',
        'batch_demo.py',
        'requirements_core.txt',  # 核心依赖,不包含streamlit
        'requirements.txt',       # 完整依赖
        'start_batch_system.bat',
        'README.md',
        'BATCH_ANALYSIS_GUIDE.md'
    ]

    package_name = f"legal_analyzer_{datetime.now().strftime('%Y%m%d_%H%M')}.zip"

    with zipfile.ZipFile(package_name, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for file in core_files:
            if os.path.exists(file):
                zipf.write(file)
                print(f"✅ 添加文件: {file}")

    print(f"📦 分发包创建完成: {package_name}")

解决依赖安装问题

针对streamlit安装失败的问题,我采用了多层次的解决方案:

  1. 核心依赖分离
# requirements_core.txt - 只包含核心功能
python-docx>=0.8.11
requests>=2.31.0
  1. 多镜像源支持
# 安装脚本提供多个选择
pip install -r requirements.txt
# 如果失败,尝试官方源
pip install -r requirements.txt -i https://pypi.org/simple/
# 如果还失败,只安装核心依赖
pip install -r requirements_core.txt
  1. 降级方案
# 在代码中检测streamlit是否可用
try:
    import streamlit as st
    WEB_INTERFACE_AVAILABLE = True
except ImportError:
    WEB_INTERFACE_AVAILABLE = False
    print("⚠️ Streamlit未安装,Web界面不可用,请使用命令行版本")

if __name__ == "__main__":
    if WEB_INTERFACE_AVAILABLE:
        # 启动Web界面
        pass
    else:
        # 启动命令行版本
        from batch_demo import main
        main()

性能优化

1. 并发处理

import concurrent.futures
from typing import List, Dict

def parallel_clause_extraction(self, analyzers: Dict[str, DocumentAnalyzer], clause_types: List[str]) -> Dict[str, Dict]:
    """并行提取条款"""

    results = {}

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        # 为每个分析器创建任务
        future_to_analyzer = {}

        for label, analyzer in analyzers.items():
            future = executor.submit(self._extract_clauses_for_analyzer, analyzer, clause_types)
            future_to_analyzer[future] = label

        # 收集结果
        for future in concurrent.futures.as_completed(future_to_analyzer):
            label = future_to_analyzer[future]
            try:
                results[label] = future.result()
            except Exception as e:
                print(f"❌ {label} 分析失败: {str(e)}")
                results[label] = {}

    return results

def _extract_clauses_for_analyzer(self, analyzer: DocumentAnalyzer, clause_types: List[str]) -> Dict:
    """为单个分析器提取条款"""
    doc_results = {}
    for clause_type in clause_types:
        try:
            clause_result = analyzer.extract_single_clause(clause_type)
            doc_results[clause_type] = clause_result
        except Exception as e:
            print(f"⚠️ 条款提取失败 {clause_type}: {str(e)}")
            doc_results[clause_type] = []
    return doc_results

2. 缓存机制

import hashlib
import pickle
import os

class CachedBatchAnalyzer(BatchDocumentAnalyzer):
    """带缓存的批量分析器"""

    def __init__(self, app_id: str, cache_dir: str = ".cache"):
        super().__init__(app_id)
        self.cache_dir = cache_dir
        os.makedirs(cache_dir, exist_ok=True)

    def _get_cache_key(self, file_path: str, clause_type: str) -> str:
        """生成缓存键"""
        with open(file_path, 'rb') as f:
            file_hash = hashlib.md5(f.read()).hexdigest()
        return f"{file_hash}_{clause_type}_{self.app_id}"

    def extract_single_clause_cached(self, analyzer: DocumentAnalyzer, clause_type: str, file_path: str) -> List[Dict]:
        """带缓存的条款提取"""
        cache_key = self._get_cache_key(file_path, clause_type)
        cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl")

        # 尝试从缓存读取
        if os.path.exists(cache_file):
            try:
                with open(cache_file, 'rb') as f:
                    cached_result = pickle.load(f)
                print(f"📋 使用缓存结果: {clause_type}")
                return cached_result
            except Exception as e:
                print(f"⚠️ 缓存读取失败: {str(e)}")

        # 执行实际分析
        result = analyzer.extract_single_clause(clause_type)

        # 保存到缓存
        try:
            with open(cache_file, 'wb') as f:
                pickle.dump(result, f)
            print(f"💾 结果已缓存: {clause_type}")
        except Exception as e:
            print(f"⚠️ 缓存保存失败: {str(e)}")

        return result

测试与验证

自动化测试

# test_batch_system.py
def test_batch_analyzer_initialization():
    """测试批量分析器初始化"""
    try:
        analyzer = BatchDocumentAnalyzer("test_app_id")
        summary = analyzer.get_analysis_summary()
        assert summary['total_documents'] == 0
        assert summary['analyzers_count'] == 0
        return True
    except Exception as e:
        print(f"❌ 初始化测试失败: {str(e)}")
        return False

def test_document_loading_simulation():
    """测试文档加载功能"""
    try:
        analyzer = BatchDocumentAnalyzer("test_app_id")

        # 使用不存在的文件测试错误处理
        test_files = ["test_spa.docx", "test_sha.docx", "test_maa.docx"]
        test_labels = ["测试项目-SPA", "测试项目-SHA", "测试项目-MAA"]

        results = analyzer.load_documents(test_files, test_labels)

        # 验证错误处理
        assert all(not success for success in results.values())
        return True
    except Exception as e:
        print(f"❌ 文档加载测试失败: {str(e)}")
        return False

演示系统

# demo_batch_analysis.py
def create_demo_documents():
    """创建演示用的文档副本"""
    sample_file = "sample_investment_agreement.docx"
    if not os.path.exists(sample_file):
        return None

    # 创建三个副本,模拟SPA、SHA、MAA
    demo_files = {
        "demo_spa.docx": "演示项目-SPA",
        "demo_sha.docx": "演示项目-SHA",
        "demo_maa.docx": "演示项目-MAA"
    }

    created_files = []
    for demo_file, label in demo_files.items():
        try:
            shutil.copy2(sample_file, demo_file)
            created_files.append(demo_file)
            print(f"✅ 创建演示文档: {demo_file} ({label})")
        except Exception as e:
            print(f"❌ 创建文档失败 {demo_file}: {str(e)}")
      return created_files

项目总结

技术收获

  1. 架构设计:学会了如何从单一功能扩展到批量处理,保持代码的可维护性
  2. 错误处理:在批量处理中,容错机制的重要性
  3. 用户体验:场景化设计让复杂功能变得易用
  4. 性能优化:并发处理和缓存机制的实际应用

业务价值

  1. 效率提升:从单文档分析到批量处理,效率提升10倍以上
  2. 场景覆盖:满足了项目综合分析和横向对比两大核心需求
  3. 易用性:Web界面和命令行双重支持,适应不同用户习惯

未来规划

  1. 功能扩展

    • 支持PDF格式文档
    • 增加更多条款类型的专项分析
    • 支持条款变化趋势分析
  2. 性能优化

    • 引入异步处理框架
    • 优化内存使用
    • 增加分布式处理能力

结语

这个项目让我深刻体会到,好的软件设计不仅要解决技术问题,更要深入理解用户需求。从单文档到批量处理的演进,不仅仅是功能的叠加,更是对业务场景的深度思考和系统性解决方案的体现。

希望这篇文章能对正在做类似项目的朋友有所帮助。如果你有任何问题或建议,欢迎在评论区交流讨论!


项目地址:https://agreement-analyzer.streamlit.app/
技术栈:Python, Streamlit, API
关键词:批量处理, 文档分析, 系统架构, Python开发

如果这篇文章对你有帮助,请点赞👍收藏⭐关注🔔,你的支持是我继续创作的动力!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐