飞书多维表格批量数据上传工具 - 技术文档

📋 项目概述

本工具是一个高性能、高可靠性的数据上传解决方案,专门用于将CSV格式的数据批量上传到飞书多维表格。通过多级优化策略,实现了在保证数据完整性的前提下最大化上传速度。

🚀 核心特性

高性能

  • 极速上传:支持200条/批的大批次上传
  • 并行处理:多线程并发上传,最高3线程并行
  • 连接复用:使用Session保持连接,减少TCP握手开销
  • 智能降级:批量失败时自动切换到单条模式

高可靠性

  • 三级保障:批量→小批→单条的智能降级机制
  • 自动重试:指数退避重试策略(1秒、2秒、4秒)
  • 错误隔离:批次间错误互不影响
  • 数据保全:确保即使部分失败也不会丢失数据

易用性

  • 自动映射:CSV字段与飞书表格字段自动按顺序匹配
  • 进度显示:实时上传进度和统计信息
  • 结果验证:上传前后记录数量对比验证
  • 错误日志:详细的失败记录保存和报告

🛠 技术架构

核心组件

FeishuUploader
├── 身份认证 (get_access_token)
├── 数据读取 (read_csv_file)
├── 字段映射 (auto_create_field_mapping)
├── 批量上传 (ultra_batch_upload)
│   ├── 多线程处理 (ThreadPoolExecutor)
│   ├── 批次管理 (upload_batch_with_retry)
│   └── 降级处理 (fallback_to_single_upload)
└── 结果验证 (check_existing_records)

数据流

CSV文件 → 数据解析 → 字段映射 → 批次拆分 → 并行上传 → 结果验证

🎯 第一部分:数据结构对齐流程

1.1 整体数据对齐架构

CSV文件
读取CSV结构
获取飞书表格字段
字段映射策略
自动顺序映射
手动选择映射
验证字段对应关系
数据格式化处理
生成上传数据

1.2 字段映射详细流程

开始字段映射
读取CSV文件头
获取飞书表格字段列表
选择映射模式
自动顺序映射
按1:1顺序对应字段
验证字段数量匹配
生成映射关系表
手动选择映射
显示所有可用字段
为每个CSV字段选择对应项
确认映射关系
数据格式转换
文本类型: 直接转换
数字类型: 清理符号转换
日期类型: 格式化处理
生成最终上传结构
字段映射完成

1.3 数据格式转换逻辑

flowchart LR
    A[原始CSV数据] --> B{数据类型判断}
    
    B --> C[文本类型]
    C --> D[strvalue 直接转换]
    
    B --> E[数字类型]
    E --> F[移除元/¥等符号]
    F --> G[清理千分位逗号]
    G --> H[转换为float类型]
    
    B --> I[日期类型]
    I --> J[标准化日期格式]
    J --> K[确保时间戳兼容]
    
    D --> L[格式化后的数据]
    H --> L
    K --> L
    
    style L fill:#c8e6c9

1.4 字段映射验证机制

flowchart TD
    A[开始验证] --> B[测试单条数据上传]
    B --> C{上传结果检查}
    
    C --> D[✅ 记录ID正常返回]
    D --> E[字段映射正确]
    E --> F[继续批量上传]
    
    C --> G[❌ 记录ID为None]
    G --> H[检查错误信息]
    H --> I{错误类型分析}
    
    I --> J[字段名称错误]
    J --> K[重新检查字段映射]
    
    I --> L[数据类型错误]
    L --> M[调整数据格式化逻辑]
    
    I --> N[权限问题]
    N --> O[检查应用权限配置]
    
    K --> P[重新测试单条上传]
    M --> P
    O --> P
    
    style F fill:#c8e6c9

⚡ 第二部分:性能优化架构

2.1 整体性能优化架构

graph TB
    A[原始数据] --> B[分批处理]
    B --> C[并行上传引擎]
    C --> D[三级健壮性保障]
    D --> E[结果汇总分析]
    
    B --> F[批次大小优化]
    C --> G[多线程并发]
    D --> H[智能重试机制]
    D --> I[自动降级策略]
    
    F --> J[动态批次调整]
    G --> K[连接池复用]
    H --> L[指数退避算法]
    I --> M[批量→小批→单条]
    
    style E fill:#c8e6c9

2.2 批量上传核心流程

flowchart TD
    A[开始批量上传] --> B[计算批次数量]
    B --> C[初始化线程池]
    C --> D[提交批次任务]
    
    subgraph 并行处理引擎
        D --> E[批次1上传]
        D --> F[批次2上传]
        D --> G[批次N上传]
    end
    
    E --> H{上传结果}
    F --> H
    G --> H
    
    H --> I[✅ 成功]
    I --> J[统计成功数量]
    
    H --> K[❌ 失败]
    K --> L[进入重试流程]
    L --> M{重试结果}
    
    M --> I
    M --> N[重试仍失败]
    N --> O[降级到单条上传]
    O --> P[单条上传结果]
    
    P --> Q[统计最终成功数]
    J --> Q
    Q --> R[生成上传报告]
    
    style R fill:#c8e6c9

2.3 三级健壮性保障机制

flowchart TD
    A[开始上传批次] --> B[第一级: 大批次上传]
    B --> C{上传结果}
    
    C --> D[✅ 成功]
    D --> E[继续下一批次]
    
    C --> F[❌ 失败]
    F --> G[第二级: 小批次上传]
    G --> H{上传结果}
    
    H --> I[✅ 成功]
    I --> E
    
    H --> J[❌ 失败]
    J --> K[第三级: 单条上传]
    K --> L{上传结果}
    
    L --> M[✅ 成功]
    M --> E
    
    L --> N[❌ 失败]
    N --> O[记录失败信息]
    O --> E
    
    E --> P[所有批次完成]
    
    style P fill:#c8e6c9

2.4 智能重试与降级策略

flowchart LR
    A[上传请求] --> B{请求状态}
    
    B --> C[成功]
    C --> D[✅ 完成]
    
    B --> E[失败]
    E --> F{错误类型判断}
    
    F --> G[网络超时]
    G --> H[等待1秒重试]
    
    F --> I[服务器错误 5xx]
    I --> H
    
    F --> J[频率限制 429]
    J --> K[等待2秒重试]
    
    F --> L[客户端错误 4xx]
    L --> M[立即降级处理]
    
    H --> N[重试计数+1]
    K --> N
    N --> O{重试次数检查}
    
    O --> P[≤最大重试次数]
    P --> A
    
    O --> Q[>最大重试次数]
    Q --> M
    
    M --> R[降级到小批次]
    R --> S{小批次结果}
    
    S --> T[✅ 成功]
    T --> D
    
    S --> U[❌ 失败]
    U --> V[降级到单条上传]
    V --> W{单条上传结果}
    
    W --> X[✅ 成功]
    X --> D
    
    W --> Y[❌ 失败]
    Y --> Z[记录最终失败]
    
    style D fill:#c8e6c9
    style Z fill:#ffcdd2

📊 性能基准

上传速度对比(619条记录)

方案 批次大小 并行度 预计时间 速度(条/秒) 适用场景
安全模式 30条 1线程 60-90秒 7-10条/秒 网络不稳定环境
平衡模式 90条 1线程 45-75秒 8-14条/秒 一般网络环境
极速模式 200条 3线程 20-35秒 18-30条/秒 稳定网络环境

资源消耗

  • 内存使用:约50-100MB(取决于CSV文件大小)
  • 网络带宽:单个请求最大2-3MB(200条记录)
  • CPU使用:多线程时约10-30%

🔧 环境要求

系统要求

  • Python 3.7+
  • 网络连接(可访问飞书API)

依赖包

pip install pandas requests python-dotenv

📝 使用指南

快速开始

配置应用信息
app_id = "your_app_id"
app_secret = "your_app_secret"
base_id = "your_base_id"  # 飞书表格URL中的base_id
table_id = "your_table_id"  # 飞书表格URL中的table_id
准备CSV文件
  • 确保CSV文件与飞书表格字段顺序一致
  • 建议先使用小批量数据测试字段映射
运行上传
uploader = FeishuUploader()
uploader.get_access_token()
df = uploader.read_csv_file("your_data.csv")
uploader.auto_create_field_mapping(df.columns.tolist())
success_count, failed_batches = uploader.ultra_batch_upload(df)

配置说明

批次大小选择
# 网络稳定,追求速度
batch_size=200, max_workers=3

# 网络一般,平衡速度稳定性  
batch_size=90, max_workers=1

# 网络较差,保证稳定性
batch_size=30, max_workers=1
重试策略配置
# 默认重试策略
max_retries=3
retry_delays=[1, 2, 4]  # 指数退避

# 激进重试策略(网络不稳定时)
max_retries=5
retry_delays=[1, 1, 2, 2, 4]

🔍 故障排除

常见问题

1. 认证失败

症状:获取access_token失败
解决方案

  • 检查app_id和app_secret是否正确
  • 确认应用权限已开通
  • 验证网络连接
2. 字段映射错误

症状:API返回"字段名称不存在"
解决方案

  • 确认CSV字段与飞书表格字段顺序一致
  • 检查字段名称中的特殊字符和空格
  • 使用get_table_fields_with_types()验证字段信息
3. 上传超时

症状:请求超时或连接中断
解决方案

  • 减小批次大小(200→100→50)
  • 增加超时时间(timeout=60→120)
  • 启用单线程模式(max_workers=1)
4. 数据格式错误

症状:上传成功但记录ID为None
解决方案

  • 检查数字字段格式(移除"元"、"¥"等符号)
  • 验证日期字段格式
  • 确认单选字段值在可选范围内

调试模式

启用详细日志:

# 在代码中添加调试输出
print(f"请求数据: {json.dumps(payload, ensure_ascii=False)}")
print(f"响应状态: {response.status_code}")
print(f"响应内容: {response.text}")

⚡ 性能优化建议

网络优化

  • 使用有线网络:避免WiFi不稳定性
  • 关闭VPN:减少网络跳转
  • 选择优质网络:企业宽带或5G网络

代码优化

  • 预处理数据:上传前清理和验证数据
  • 分批策略:根据数据量动态调整批次大小
  • 连接复用:使用Session对象保持长连接

系统优化

  • 调整线程数:根据CPU核心数调整max_workers
  • 内存管理:大文件分批读取,避免内存溢出
  • 错误处理:设置合理的超时和重试策略

📈 扩展功能

数据预处理

def preprocess_data(df):
    """数据预处理函数"""
    # 清理空值
    df = df.fillna('')
    # 格式化数字
    df['金额'] = df['金额'].str.replace('元', '').str.replace(',', '')
    # 日期格式化
    df['交易时间'] = pd.to_datetime(df['交易时间'])
    return df

增量上传

def incremental_upload(df, key_field='交易单号'):
    """增量上传,避免重复数据"""
    existing_records = get_existing_records()
    existing_keys = set(record[key_field] for record in existing_records)
    new_data = df[~df[key_field].isin(existing_keys)]
    return upload_records(new_data)

监控告警

def send_alert(success_rate, error_count):
    """发送上传结果告警"""
    if success_rate < 0.95 or error_count > 10:
        # 发送邮件、钉钉、企业微信通知
        pass

🔒 安全建议

敏感信息保护

  • 使用环境变量:不要硬编码app_secret
  • 配置文件加密:敏感配置信息加密存储
  • 访问日志审计:记录所有API调用

权限控制

  • 最小权限原则:应用只申请必要权限
  • 定期轮换密钥:定期更新app_secret
  • IP白名单:限制API调用来源IP
import pandas as pd  # 用于数据处理和CSV文件读取
import requests  # 用于发送HTTP请求,与飞书API交互
import json  # 用于处理JSON数据格式
import time  # 用于添加延迟和计时
import os  # 用于处理文件路径等系统操作
import concurrent.futures  # 用于实现多线程并行处理
from dotenv import load_dotenv  # 用于加载环境变量(虽然本代码未直接使用,但保留用于扩展)

# 加载环境变量(如果需要从.env文件读取配置可启用)
load_dotenv()

class FeishuUltraUploader:
    """飞书多维表格极速批量上传器
    
    该类实现了将CSV数据高效上传到飞书多维表格的功能,
    包含自动字段映射、批量上传、多线程并行处理、失败重试和降级策略等特性。
    """
    def __init__(self):
        """初始化上传器配置参数"""
        # 飞书应用认证信息(实际使用时建议从环境变量或配置文件读取)
        self.app_id = "cli....................."  # 飞书应用ID
        self.app_secret = "ycuY....................."  # 飞书应用密钥
        # 飞书多维表格信息(从表格URL中获取)
        self.base_id = "PmkIbe8....................."  # 应用ID
        self.table_id = "tbl6....................."  # 表格ID
        self.base_url = "https://open.feishu.cn/open-apis"  # 飞书API基础地址
        self.access_token = None  # 访问令牌,用于API认证
        self.field_mapping = {}  # 字段映射关系(CSV字段 -> 飞书表格字段)
        self.field_types = {}  # 飞书表格字段类型映射(字段名 -> 类型)
        
    def get_access_token(self):
        """获取飞书API访问令牌
        
        飞书API调用需要先通过app_id和app_secret获取访问令牌,
        该令牌有一定有效期,过期后需要重新获取。
        
        返回:
            bool: 获取成功返回True,失败返回False
        """
        # 构建获取访问令牌的API地址
        url = f"{self.base_url}/auth/v3/app_access_token/internal"
        # 请求参数(包含认证信息)
        payload = {
            "app_id": self.app_id,
            "app_secret": self.app_secret
        }
        
        try:
            # 发送POST请求获取令牌
            response = requests.post(url, json=payload, timeout=30)
            response.raise_for_status()  # 检查HTTP请求是否成功
            result = response.json()  # 解析JSON响应
            self.access_token = result.get("app_access_token")  # 提取访问令牌
            print("✅ 访问令牌获取成功")
            return True
        except Exception as e:
            print(f"❌ 获取访问令牌失败: {e}")
            return False
    
    def read_csv_file(self, file_path):
        """读取CSV文件并返回数据框
        
        读取指定路径的CSV文件,解析为pandas数据框,方便后续处理。
        
        参数:
            file_path (str): CSV文件路径
            
        返回:
            pandas.DataFrame: 解析后的数据集;失败则返回None
        """
        try:
            # 读取CSV文件(自动处理表头)
            df = pd.read_csv(file_path)
            # 输出读取结果信息
            print(f"✅ CSV文件读取成功,共{len(df)}行,{len(df.columns)}列")
            return df
        except Exception as e:
            print(f"❌ 读取CSV文件失败: {e}")
            return None
    
    def get_table_fields_with_types(self):
        """获取飞书表格的字段信息及类型
        
        调用飞书API获取目标表格的所有字段名称和类型,
        用于后续的数据格式转换和字段映射验证。
        
        返回:
            list: 包含字段信息的字典列表;失败则返回None
        """
        # 检查访问令牌是否已获取
        if not self.access_token:
            return None
            
        # 构建获取字段信息的API地址
        url = f"{self.base_url}/bitable/v1/apps/{self.base_id}/tables/{self.table_id}/fields"
        # 请求头(包含认证信息)
        headers = {
            "Authorization": f"Bearer {self.access_token}",
            "Content-Type": "application/json"
        }
        
        try:
            # 发送GET请求获取字段信息
            response = requests.get(url, headers=headers, timeout=30)
            response.raise_for_status()  # 检查请求是否成功
            result = response.json()  # 解析响应
            fields = result.get("data", {}).get("items", [])  # 提取字段列表
            
            # 存储字段类型信息(用于后续数据格式化)
            self.field_types = {}
            for field in fields:
                field_name = field.get("field_name", "")  # 字段名称
                field_type = field.get("type", "")  # 字段类型(如文本、数字、日期等)
                self.field_types[field_name] = field_type
            
            return fields
        except Exception as e:
            print(f"❌ 获取表格字段失败: {e}")
            return None
    
    def auto_create_field_mapping(self, csv_columns):
        """自动创建CSV字段与飞书表格字段的映射关系
        
        按照字段顺序自动匹配CSV列和飞书表格字段,
        这是简化操作的方式,适用于字段顺序一致的场景。
        
        参数:
            csv_columns (list): CSV文件的列名列表
            
        返回:
            dict: 字段映射关系(CSV列名 -> 飞书字段名)
        """
        # 获取飞书表格字段信息
        feishu_fields = self.get_table_fields_with_types()
        if not feishu_fields:
            return {}
        
        # 提取飞书表格的字段名称列表
        feishu_field_names = [field["field_name"] for field in feishu_fields]
        mapping = {}
        
        # 按顺序匹配CSV字段和飞书字段(取两者中数量较少的进行匹配)
        min_fields = min(len(csv_columns), len(feishu_field_names))
        for i in range(min_fields):
            mapping[csv_columns[i]] = feishu_field_names[i]
        
        print("✅ 字段映射完成")
        return mapping
    
    def format_value_by_type(self, value, field_type):
        """根据飞书字段类型格式化数据值
        
        不同类型的字段需要特定的数据格式,如数字字段需要去除货币符号等。
        
        参数:
            value: 原始数据值
            field_type: 飞书表格字段类型
            
        返回:
            格式化后的值;空值返回None
        """
        # 处理空值情况
        if pd.isna(value) or value == "":
            return None
            
        # 处理数字类型字段(飞书类型标识为2)
        if field_type == 2:  
            try:
                # 如果是字符串,先清理货币符号和千分位逗号
                if isinstance(value, str):
                    value = str(value).replace('元', '').replace('¥', '').replace(',', '').strip()
                # 转换为浮点数
                return float(value)
            except (ValueError, TypeError):
                # 转换失败时返回原始字符串(避免整个记录失败)
                return str(value)
        # 其他类型(文本、日期等)直接转换为字符串
        else:
            return str(value)
    
    def ultra_batch_upload(self, df, batch_size=200, max_workers=3):
        """极速批量上传主方法
        
        核心方法,实现大批次上传、多线程并行处理、失败重试和降级策略,
        最大化上传效率的同时保证可靠性。
        
        参数:
            df (pandas.DataFrame): 要上传的数据框
            batch_size (int): 每批上传的记录数(最大200,飞书API限制)
            max_workers (int): 并行线程数
            
        返回:
            tuple: (成功上传的记录数, 失败的批次信息列表)
        """
        # 检查访问令牌是否有效
        if not self.access_token:
            return 0, []
        
        # 输出上传任务基本信息
        print(f"🚀 极速批量上传启动,共{len(df)}条记录")
        print(f"📊 批次大小: {batch_size}条,并行线程: {max_workers}")
        
        # 计算总批次数并拆分数据为多个批次
        total_batches = (len(df) + batch_size - 1) // batch_size  # 向上取整计算总批次
        batches = []
        
        for batch_num in range(total_batches):
            # 计算当前批次的起始和结束索引
            start_idx = batch_num * batch_size
            end_idx = min((batch_num + 1) * batch_size, len(df))
            batch_df = df.iloc[start_idx:end_idx]  # 提取当前批次数据
            batches.append((batch_num + 1, batch_df))  # 存储批次编号和数据(编号从1开始)
        
        # 初始化统计变量
        success_count = 0  # 成功上传的总记录数
        failed_batches = []  # 存储失败的批次信息
        
        # 使用线程池实现并行上传
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            # 提交所有批次任务到线程池
            future_to_batch = {
                executor.submit(self.upload_batch_with_retry, batch_num, batch_df): batch_num 
                for batch_num, batch_df in batches
            }
            
            # 遍历所有完成的任务,收集结果
            for future in concurrent.futures.as_completed(future_to_batch):
                batch_num = future_to_batch[future]  # 获取当前任务对应的批次编号
                try:
                    # 获取任务结果(成功数和失败数)
                    batch_success, batch_failed = future.result()
                    success_count += batch_success  # 累计成功数
                    # 记录失败批次信息
                    if batch_failed:
                        failed_batches.append({
                            "batch": batch_num,
                            "failed_count": batch_failed
                        })
                    print(f"✅ 批次 {batch_num} 完成: 成功 {batch_success} 条")
                except Exception as e:
                    print(f"❌ 批次 {batch_num} 异常: {e}")
                    failed_batches.append({
                        "batch": batch_num,
                        "error": str(e)
                    })
        
        return success_count, failed_batches
    
    def upload_batch_with_retry(self, batch_num, batch_df, max_retries=2):
        """上传单个批次数据(带重试机制)
        
        对单个批次进行上传,并在失败时进行重试,
        多次重试失败后会降级为单条上传模式。
        
        参数:
            batch_num (int): 批次编号
            batch_df (pandas.DataFrame): 该批次的数据
            max_retries (int): 最大重试次数
            
        返回:
            tuple: (该批次成功上传的记录数, 失败的记录数)
        """
        # 准备当前批次的上传数据(转换为飞书API要求的格式)
        records_data = self.prepare_batch_data(batch_df)
        
        # 重试循环
        for attempt in range(max_retries):
            try:
                print(f"  批次 {batch_num} 尝试第 {attempt + 1} 次上传 ({len(records_data)}条)...")
                
                # 构建批量创建记录的API地址
                url = f"{self.base_url}/bitable/v1/apps/{self.base_id}/tables/{self.table_id}/records/batch_create"
                # 请求头
                headers = {
                    "Authorization": f"Bearer {self.access_token}",
                    "Content-Type": "application/json"
                }
                # 请求体(包含要上传的记录数据)
                payload = {
                    "records": records_data
                }
                
                # 发送POST请求上传批次数据
                response = requests.post(url, headers=headers, json=payload, timeout=60)
                
                # 处理响应结果
                if response.status_code == 200:
                    result = response.json()
                    # 提取成功创建的记录数
                    created_records = result.get("data", {}).get("records", [])
                    return len(created_records), 0
                else:
                    error_msg = f"HTTP {response.status_code}"
                    print(f"  批次 {batch_num} 上传失败: {error_msg}")
                    
                    # 如果还有重试次数,等待后重试
                    if attempt < max_retries - 1:
                        wait_time = 1  # 重试等待时间(秒)
                        print(f"  {wait_time}秒后重试...")
                        time.sleep(wait_time)
                        continue
                    
                    # 所有重试都失败,降级为单条上传
                    return self.fallback_to_single_upload(batch_df)
                    
            except Exception as e:
                print(f"  批次 {batch_num} 异常: {e}")
                # 如果还有重试次数,等待后重试
                if attempt < max_retries - 1:
                    wait_time = 1  # 重试等待时间(秒)
                    print(f"  {wait_time}秒后重试...")
                    time.sleep(wait_time)
                    continue
                
                # 所有重试都失败,降级为单条上传
                return self.fallback_to_single_upload(batch_df)
        
        # 所有尝试失败(理论上不会走到这里,因为重试次数已耗尽)
        return 0, len(batch_df)
    
    def prepare_batch_data(self, batch_df):
        """准备批次上传的数据格式
        
        将pandas数据框转换为飞书API要求的记录格式,
        包含字段映射和数据类型转换。
        
        参数:
            batch_df (pandas.DataFrame): 批次数据
            
        返回:
            list: 符合飞书API格式的记录列表
        """
        records_data = []
        # 遍历批次中的每一行数据
        for _, row in batch_df.iterrows():
            fields_data = {}  # 存储当前行的字段数据
            # 遍历字段映射关系,处理每个字段
            for csv_field, feishu_field_name in self.field_mapping.items():
                # 检查当前字段是否存在且不为空
                if csv_field in row and pd.notna(row[csv_field]):
                    value = row[csv_field]  # 获取原始值
                    # 获取飞书字段类型(默认为文本类型)
                    field_type = self.field_types.get(feishu_field_name, 1)
                    # 根据字段类型格式化值
                    formatted_value = self.format_value_by_type(value, field_type)
                    
                    if formatted_value is not None:
                        fields_data[feishu_field_name] = formatted_value  # 添加到字段数据
            
            # 如果有有效字段数据,添加到记录列表
            if fields_data:
                records_data.append({"fields": fields_data})
        
        return records_data
    
    def fallback_to_single_upload(self, batch_df):
        """降级策略:单条上传模式
        
        当批次上传失败时,自动降级为单条上传,
        确保尽可能多的记录能成功上传,减少数据丢失。
        
        参数:
            batch_df (pandas.DataFrame): 批次数据
            
        返回:
            tuple: (单条上传成功的记录数, 失败的记录数)
        """
        success_count = 0  # 成功数
        failed_count = 0   # 失败数
        
        print("  🐌 降级到极速单条上传...")
        
        # 使用会话保持连接,减少TCP握手开销,提高单条上传效率
        session = requests.Session()
        session.headers.update({
            "Authorization": f"Bearer {self.access_token}",
            "Content-Type": "application/json"
        })
        
        # 单条创建记录的API地址
        url = f"{self.base_url}/bitable/v1/apps/{self.base_id}/tables/{self.table_id}/records"
        
        # 遍历批次中的每一行数据
        for idx, row in batch_df.iterrows():
            try:
                fields_data = {}  # 当前行的字段数据
                # 处理每个字段
                for csv_field, feishu_field_name in self.field_mapping.items():
                    if csv_field in row and pd.notna(row[csv_field]):
                        value = row[csv_field]
                        field_type = self.field_types.get(feishu_field_name, 1)
                        formatted_value = self.format_value_by_type(value, field_type)
                        
                        if formatted_value is not None:
                            fields_data[feishu_field_name] = formatted_value
                
                # 如果没有有效字段数据,跳过当前行
                if not fields_data:
                    continue
                
                # 发送单条记录上传请求
                payload = {"fields": fields_data}
                response = session.post(url, json=payload, timeout=10)
                
                # 检查上传结果
                if response.status_code == 200:
                    success_count += 1
                else:
                    failed_count += 1
                
                # 每成功50条输出一次进度(避免输出过于频繁)
                if success_count % 50 == 0:
                    print(f"    单条上传进度: {success_count}/{len(batch_df)}")
                    
            except Exception:
                # 任何异常都视为失败
                failed_count += 1
        
        # 关闭会话释放资源
        session.close()
        # 输出单条上传结果
        print(f"    ✅ 单条上传完成: {success_count} 条成功, {failed_count} 条失败")
        return success_count, failed_count
    
    def check_existing_records(self):
        """检查表格中现有记录数量
        
        用于上传前后的数量对比,验证上传效果。
        
        返回:
            int: 现有记录数量;失败返回0
        """
        if not self.access_token:
            return 0
        
        # 构建获取记录列表的API地址
        url = f"{self.base_url}/bitable/v1/apps/{self.base_id}/tables/{self.table_id}/records"
        headers = {
            "Authorization": f"Bearer {self.access_token}",
            "Content-Type": "application/json"
        }
        
        try:
            # 发送GET请求获取记录数量(默认只返回部分记录,这里仅用计数)
            response = requests.get(url, headers=headers, timeout=30)
            if response.status_code == 200:
                result = response.json()
                records = result.get("data", {}).get("items", [])
                return len(records)
            return 0
        except:
            return 0

def main():
    """主函数:执行飞书表格批量上传流程"""
    # 1. 初始化上传器实例
    uploader = FeishuUltraUploader()
    
    # 2. 获取飞书API访问令牌(必须第一步成功,否则后续操作无法进行)
    if not uploader.get_access_token():
        return
    
    # 3. 读取CSV数据文件
    csv_file = "data/output/merged_bills.csv"  # CSV文件路径(根据实际情况修改)
    df = uploader.read_csv_file(csv_file)
    if df is None:  # 读取失败则退出
        return
    
    # 4. 自动创建CSV字段与飞书表格字段的映射关系
    csv_columns = df.columns.tolist()  # 获取CSV的列名列表
    uploader.field_mapping = uploader.auto_create_field_mapping(csv_columns)
    
    # 检查映射是否成功创建
    if not uploader.field_mapping:
        print("❌ 字段映射创建失败")
        return
    
    # 5. 上传前检查表格中的记录数量(用于后续对比)
    print("🔍 检查上传前的记录数量...")
    before_count = uploader.check_existing_records()
    print(f"上传前表格中的记录数量: {before_count}")
    
    # 6. 执行极速批量上传
    start_time = time.time()  # 记录开始时间
    
    # 调用上传方法(大批次+多线程组合最大化效率)
    success_count, failed_batches = uploader.ultra_batch_upload(
        df, 
        batch_size=200,  # 批次大小(飞书API最大支持200条/批)
        max_workers=3    # 并行线程数(根据网络情况调整)
    )
    
    end_time = time.time()  # 记录结束时间
    elapsed_time = end_time - start_time  # 计算总耗时
    
    # 7. 上传后检查表格中的记录数量(验证实际新增数量)
    print("\n🔍 检查上传后的记录数量...")
    after_count = uploader.check_existing_records()
    
    # 8. 输出上传结果统计信息
    print(f"\n🎉 极速上传完成!")
    print(f"⏱️  总耗时: {elapsed_time:.2f}秒")
    print(f"📊 平均速度: {success_count/elapsed_time:.2f} 条/秒")
    print(f"✅ 成功: {success_count} 条")
    print(f"📈 实际新增: {after_count - before_count} 条")  # 实际新增 = 上传后总数 - 上传前总数
    
    # 处理失败批次(如果有)
    if failed_batches:
        print(f"❌ 失败批次: {len(failed_batches)} 个")
        # 将失败信息保存到JSON文件,方便后续处理
        with open("upload_failed_batches.json", "w", encoding="utf-8") as f:
            json.dump(failed_batches, f, ensure_ascii=False, indent=2)
        print("失败批次信息已保存到 upload_failed_batches.json")

# 程序入口:当直接运行该脚本时执行main函数
if __name__ == "__main__":
    main()
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐