飞书多维表格批量数据上传工具 - 技术文档
本工具是一个高性能、高可靠性的数据上传解决方案,专门用于将CSV格式的数据批量上传到飞书多维表格。通过多级优化策略,实现了在保证数据完整性的前提下最大化上传速度。数据流🎯 第一部分:数据结构对齐流程1.1 整体数据对齐架构#mermaid-svg-e3kjHsIxfvNwLYk6 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-
·
飞书多维表格批量数据上传工具 - 技术文档
📋 项目概述
本工具是一个高性能、高可靠性的数据上传解决方案,专门用于将CSV格式的数据批量上传到飞书多维表格。通过多级优化策略,实现了在保证数据完整性的前提下最大化上传速度。
🚀 核心特性
高性能
- 极速上传:支持200条/批的大批次上传
- 并行处理:多线程并发上传,最高3线程并行
- 连接复用:使用Session保持连接,减少TCP握手开销
- 智能降级:批量失败时自动切换到单条模式
高可靠性
- 三级保障:批量→小批→单条的智能降级机制
- 自动重试:指数退避重试策略(1秒、2秒、4秒)
- 错误隔离:批次间错误互不影响
- 数据保全:确保即使部分失败也不会丢失数据
易用性
- 自动映射:CSV字段与飞书表格字段自动按顺序匹配
- 进度显示:实时上传进度和统计信息
- 结果验证:上传前后记录数量对比验证
- 错误日志:详细的失败记录保存和报告
🛠 技术架构
核心组件
FeishuUploader
├── 身份认证 (get_access_token)
├── 数据读取 (read_csv_file)
├── 字段映射 (auto_create_field_mapping)
├── 批量上传 (ultra_batch_upload)
│ ├── 多线程处理 (ThreadPoolExecutor)
│ ├── 批次管理 (upload_batch_with_retry)
│ └── 降级处理 (fallback_to_single_upload)
└── 结果验证 (check_existing_records)
数据流
CSV文件 → 数据解析 → 字段映射 → 批次拆分 → 并行上传 → 结果验证
🎯 第一部分:数据结构对齐流程
1.1 整体数据对齐架构
1.2 字段映射详细流程
1.3 数据格式转换逻辑
flowchart LR
A[原始CSV数据] --> B{数据类型判断}
B --> C[文本类型]
C --> D[strvalue 直接转换]
B --> E[数字类型]
E --> F[移除元/¥等符号]
F --> G[清理千分位逗号]
G --> H[转换为float类型]
B --> I[日期类型]
I --> J[标准化日期格式]
J --> K[确保时间戳兼容]
D --> L[格式化后的数据]
H --> L
K --> L
style L fill:#c8e6c9
1.4 字段映射验证机制
flowchart TD
A[开始验证] --> B[测试单条数据上传]
B --> C{上传结果检查}
C --> D[✅ 记录ID正常返回]
D --> E[字段映射正确]
E --> F[继续批量上传]
C --> G[❌ 记录ID为None]
G --> H[检查错误信息]
H --> I{错误类型分析}
I --> J[字段名称错误]
J --> K[重新检查字段映射]
I --> L[数据类型错误]
L --> M[调整数据格式化逻辑]
I --> N[权限问题]
N --> O[检查应用权限配置]
K --> P[重新测试单条上传]
M --> P
O --> P
style F fill:#c8e6c9
⚡ 第二部分:性能优化架构
2.1 整体性能优化架构
graph TB
A[原始数据] --> B[分批处理]
B --> C[并行上传引擎]
C --> D[三级健壮性保障]
D --> E[结果汇总分析]
B --> F[批次大小优化]
C --> G[多线程并发]
D --> H[智能重试机制]
D --> I[自动降级策略]
F --> J[动态批次调整]
G --> K[连接池复用]
H --> L[指数退避算法]
I --> M[批量→小批→单条]
style E fill:#c8e6c9
2.2 批量上传核心流程
flowchart TD
A[开始批量上传] --> B[计算批次数量]
B --> C[初始化线程池]
C --> D[提交批次任务]
subgraph 并行处理引擎
D --> E[批次1上传]
D --> F[批次2上传]
D --> G[批次N上传]
end
E --> H{上传结果}
F --> H
G --> H
H --> I[✅ 成功]
I --> J[统计成功数量]
H --> K[❌ 失败]
K --> L[进入重试流程]
L --> M{重试结果}
M --> I
M --> N[重试仍失败]
N --> O[降级到单条上传]
O --> P[单条上传结果]
P --> Q[统计最终成功数]
J --> Q
Q --> R[生成上传报告]
style R fill:#c8e6c9
2.3 三级健壮性保障机制
flowchart TD
A[开始上传批次] --> B[第一级: 大批次上传]
B --> C{上传结果}
C --> D[✅ 成功]
D --> E[继续下一批次]
C --> F[❌ 失败]
F --> G[第二级: 小批次上传]
G --> H{上传结果}
H --> I[✅ 成功]
I --> E
H --> J[❌ 失败]
J --> K[第三级: 单条上传]
K --> L{上传结果}
L --> M[✅ 成功]
M --> E
L --> N[❌ 失败]
N --> O[记录失败信息]
O --> E
E --> P[所有批次完成]
style P fill:#c8e6c9
2.4 智能重试与降级策略
flowchart LR
A[上传请求] --> B{请求状态}
B --> C[成功]
C --> D[✅ 完成]
B --> E[失败]
E --> F{错误类型判断}
F --> G[网络超时]
G --> H[等待1秒重试]
F --> I[服务器错误 5xx]
I --> H
F --> J[频率限制 429]
J --> K[等待2秒重试]
F --> L[客户端错误 4xx]
L --> M[立即降级处理]
H --> N[重试计数+1]
K --> N
N --> O{重试次数检查}
O --> P[≤最大重试次数]
P --> A
O --> Q[>最大重试次数]
Q --> M
M --> R[降级到小批次]
R --> S{小批次结果}
S --> T[✅ 成功]
T --> D
S --> U[❌ 失败]
U --> V[降级到单条上传]
V --> W{单条上传结果}
W --> X[✅ 成功]
X --> D
W --> Y[❌ 失败]
Y --> Z[记录最终失败]
style D fill:#c8e6c9
style Z fill:#ffcdd2
📊 性能基准
上传速度对比(619条记录)
| 方案 | 批次大小 | 并行度 | 预计时间 | 速度(条/秒) | 适用场景 |
|---|---|---|---|---|---|
| 安全模式 | 30条 | 1线程 | 60-90秒 | 7-10条/秒 | 网络不稳定环境 |
| 平衡模式 | 90条 | 1线程 | 45-75秒 | 8-14条/秒 | 一般网络环境 |
| 极速模式 | 200条 | 3线程 | 20-35秒 | 18-30条/秒 | 稳定网络环境 |
资源消耗
- 内存使用:约50-100MB(取决于CSV文件大小)
- 网络带宽:单个请求最大2-3MB(200条记录)
- CPU使用:多线程时约10-30%
🔧 环境要求
系统要求
- Python 3.7+
- 网络连接(可访问飞书API)
依赖包
pip install pandas requests python-dotenv
📝 使用指南
快速开始
配置应用信息
app_id = "your_app_id"
app_secret = "your_app_secret"
base_id = "your_base_id" # 飞书表格URL中的base_id
table_id = "your_table_id" # 飞书表格URL中的table_id
准备CSV文件
- 确保CSV文件与飞书表格字段顺序一致
- 建议先使用小批量数据测试字段映射
运行上传
uploader = FeishuUploader()
uploader.get_access_token()
df = uploader.read_csv_file("your_data.csv")
uploader.auto_create_field_mapping(df.columns.tolist())
success_count, failed_batches = uploader.ultra_batch_upload(df)
配置说明
批次大小选择
# 网络稳定,追求速度
batch_size=200, max_workers=3
# 网络一般,平衡速度稳定性
batch_size=90, max_workers=1
# 网络较差,保证稳定性
batch_size=30, max_workers=1
重试策略配置
# 默认重试策略
max_retries=3
retry_delays=[1, 2, 4] # 指数退避
# 激进重试策略(网络不稳定时)
max_retries=5
retry_delays=[1, 1, 2, 2, 4]
🔍 故障排除
常见问题
1. 认证失败
症状:获取access_token失败
解决方案:
- 检查app_id和app_secret是否正确
- 确认应用权限已开通
- 验证网络连接
2. 字段映射错误
症状:API返回"字段名称不存在"
解决方案:
- 确认CSV字段与飞书表格字段顺序一致
- 检查字段名称中的特殊字符和空格
- 使用get_table_fields_with_types()验证字段信息
3. 上传超时
症状:请求超时或连接中断
解决方案:
- 减小批次大小(200→100→50)
- 增加超时时间(timeout=60→120)
- 启用单线程模式(max_workers=1)
4. 数据格式错误
症状:上传成功但记录ID为None
解决方案:
- 检查数字字段格式(移除"元"、"¥"等符号)
- 验证日期字段格式
- 确认单选字段值在可选范围内
调试模式
启用详细日志:
# 在代码中添加调试输出
print(f"请求数据: {json.dumps(payload, ensure_ascii=False)}")
print(f"响应状态: {response.status_code}")
print(f"响应内容: {response.text}")
⚡ 性能优化建议
网络优化
- 使用有线网络:避免WiFi不稳定性
- 关闭VPN:减少网络跳转
- 选择优质网络:企业宽带或5G网络
代码优化
- 预处理数据:上传前清理和验证数据
- 分批策略:根据数据量动态调整批次大小
- 连接复用:使用Session对象保持长连接
系统优化
- 调整线程数:根据CPU核心数调整max_workers
- 内存管理:大文件分批读取,避免内存溢出
- 错误处理:设置合理的超时和重试策略
📈 扩展功能
数据预处理
def preprocess_data(df):
"""数据预处理函数"""
# 清理空值
df = df.fillna('')
# 格式化数字
df['金额'] = df['金额'].str.replace('元', '').str.replace(',', '')
# 日期格式化
df['交易时间'] = pd.to_datetime(df['交易时间'])
return df
增量上传
def incremental_upload(df, key_field='交易单号'):
"""增量上传,避免重复数据"""
existing_records = get_existing_records()
existing_keys = set(record[key_field] for record in existing_records)
new_data = df[~df[key_field].isin(existing_keys)]
return upload_records(new_data)
监控告警
def send_alert(success_rate, error_count):
"""发送上传结果告警"""
if success_rate < 0.95 or error_count > 10:
# 发送邮件、钉钉、企业微信通知
pass
🔒 安全建议
敏感信息保护
- 使用环境变量:不要硬编码app_secret
- 配置文件加密:敏感配置信息加密存储
- 访问日志审计:记录所有API调用
权限控制
- 最小权限原则:应用只申请必要权限
- 定期轮换密钥:定期更新app_secret
- IP白名单:限制API调用来源IP
import pandas as pd # 用于数据处理和CSV文件读取
import requests # 用于发送HTTP请求,与飞书API交互
import json # 用于处理JSON数据格式
import time # 用于添加延迟和计时
import os # 用于处理文件路径等系统操作
import concurrent.futures # 用于实现多线程并行处理
from dotenv import load_dotenv # 用于加载环境变量(虽然本代码未直接使用,但保留用于扩展)
# 加载环境变量(如果需要从.env文件读取配置可启用)
load_dotenv()
class FeishuUltraUploader:
"""飞书多维表格极速批量上传器
该类实现了将CSV数据高效上传到飞书多维表格的功能,
包含自动字段映射、批量上传、多线程并行处理、失败重试和降级策略等特性。
"""
def __init__(self):
"""初始化上传器配置参数"""
# 飞书应用认证信息(实际使用时建议从环境变量或配置文件读取)
self.app_id = "cli....................." # 飞书应用ID
self.app_secret = "ycuY....................." # 飞书应用密钥
# 飞书多维表格信息(从表格URL中获取)
self.base_id = "PmkIbe8....................." # 应用ID
self.table_id = "tbl6....................." # 表格ID
self.base_url = "https://open.feishu.cn/open-apis" # 飞书API基础地址
self.access_token = None # 访问令牌,用于API认证
self.field_mapping = {} # 字段映射关系(CSV字段 -> 飞书表格字段)
self.field_types = {} # 飞书表格字段类型映射(字段名 -> 类型)
def get_access_token(self):
"""获取飞书API访问令牌
飞书API调用需要先通过app_id和app_secret获取访问令牌,
该令牌有一定有效期,过期后需要重新获取。
返回:
bool: 获取成功返回True,失败返回False
"""
# 构建获取访问令牌的API地址
url = f"{self.base_url}/auth/v3/app_access_token/internal"
# 请求参数(包含认证信息)
payload = {
"app_id": self.app_id,
"app_secret": self.app_secret
}
try:
# 发送POST请求获取令牌
response = requests.post(url, json=payload, timeout=30)
response.raise_for_status() # 检查HTTP请求是否成功
result = response.json() # 解析JSON响应
self.access_token = result.get("app_access_token") # 提取访问令牌
print("✅ 访问令牌获取成功")
return True
except Exception as e:
print(f"❌ 获取访问令牌失败: {e}")
return False
def read_csv_file(self, file_path):
"""读取CSV文件并返回数据框
读取指定路径的CSV文件,解析为pandas数据框,方便后续处理。
参数:
file_path (str): CSV文件路径
返回:
pandas.DataFrame: 解析后的数据集;失败则返回None
"""
try:
# 读取CSV文件(自动处理表头)
df = pd.read_csv(file_path)
# 输出读取结果信息
print(f"✅ CSV文件读取成功,共{len(df)}行,{len(df.columns)}列")
return df
except Exception as e:
print(f"❌ 读取CSV文件失败: {e}")
return None
def get_table_fields_with_types(self):
"""获取飞书表格的字段信息及类型
调用飞书API获取目标表格的所有字段名称和类型,
用于后续的数据格式转换和字段映射验证。
返回:
list: 包含字段信息的字典列表;失败则返回None
"""
# 检查访问令牌是否已获取
if not self.access_token:
return None
# 构建获取字段信息的API地址
url = f"{self.base_url}/bitable/v1/apps/{self.base_id}/tables/{self.table_id}/fields"
# 请求头(包含认证信息)
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
try:
# 发送GET请求获取字段信息
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status() # 检查请求是否成功
result = response.json() # 解析响应
fields = result.get("data", {}).get("items", []) # 提取字段列表
# 存储字段类型信息(用于后续数据格式化)
self.field_types = {}
for field in fields:
field_name = field.get("field_name", "") # 字段名称
field_type = field.get("type", "") # 字段类型(如文本、数字、日期等)
self.field_types[field_name] = field_type
return fields
except Exception as e:
print(f"❌ 获取表格字段失败: {e}")
return None
def auto_create_field_mapping(self, csv_columns):
"""自动创建CSV字段与飞书表格字段的映射关系
按照字段顺序自动匹配CSV列和飞书表格字段,
这是简化操作的方式,适用于字段顺序一致的场景。
参数:
csv_columns (list): CSV文件的列名列表
返回:
dict: 字段映射关系(CSV列名 -> 飞书字段名)
"""
# 获取飞书表格字段信息
feishu_fields = self.get_table_fields_with_types()
if not feishu_fields:
return {}
# 提取飞书表格的字段名称列表
feishu_field_names = [field["field_name"] for field in feishu_fields]
mapping = {}
# 按顺序匹配CSV字段和飞书字段(取两者中数量较少的进行匹配)
min_fields = min(len(csv_columns), len(feishu_field_names))
for i in range(min_fields):
mapping[csv_columns[i]] = feishu_field_names[i]
print("✅ 字段映射完成")
return mapping
def format_value_by_type(self, value, field_type):
"""根据飞书字段类型格式化数据值
不同类型的字段需要特定的数据格式,如数字字段需要去除货币符号等。
参数:
value: 原始数据值
field_type: 飞书表格字段类型
返回:
格式化后的值;空值返回None
"""
# 处理空值情况
if pd.isna(value) or value == "":
return None
# 处理数字类型字段(飞书类型标识为2)
if field_type == 2:
try:
# 如果是字符串,先清理货币符号和千分位逗号
if isinstance(value, str):
value = str(value).replace('元', '').replace('¥', '').replace(',', '').strip()
# 转换为浮点数
return float(value)
except (ValueError, TypeError):
# 转换失败时返回原始字符串(避免整个记录失败)
return str(value)
# 其他类型(文本、日期等)直接转换为字符串
else:
return str(value)
def ultra_batch_upload(self, df, batch_size=200, max_workers=3):
"""极速批量上传主方法
核心方法,实现大批次上传、多线程并行处理、失败重试和降级策略,
最大化上传效率的同时保证可靠性。
参数:
df (pandas.DataFrame): 要上传的数据框
batch_size (int): 每批上传的记录数(最大200,飞书API限制)
max_workers (int): 并行线程数
返回:
tuple: (成功上传的记录数, 失败的批次信息列表)
"""
# 检查访问令牌是否有效
if not self.access_token:
return 0, []
# 输出上传任务基本信息
print(f"🚀 极速批量上传启动,共{len(df)}条记录")
print(f"📊 批次大小: {batch_size}条,并行线程: {max_workers}")
# 计算总批次数并拆分数据为多个批次
total_batches = (len(df) + batch_size - 1) // batch_size # 向上取整计算总批次
batches = []
for batch_num in range(total_batches):
# 计算当前批次的起始和结束索引
start_idx = batch_num * batch_size
end_idx = min((batch_num + 1) * batch_size, len(df))
batch_df = df.iloc[start_idx:end_idx] # 提取当前批次数据
batches.append((batch_num + 1, batch_df)) # 存储批次编号和数据(编号从1开始)
# 初始化统计变量
success_count = 0 # 成功上传的总记录数
failed_batches = [] # 存储失败的批次信息
# 使用线程池实现并行上传
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有批次任务到线程池
future_to_batch = {
executor.submit(self.upload_batch_with_retry, batch_num, batch_df): batch_num
for batch_num, batch_df in batches
}
# 遍历所有完成的任务,收集结果
for future in concurrent.futures.as_completed(future_to_batch):
batch_num = future_to_batch[future] # 获取当前任务对应的批次编号
try:
# 获取任务结果(成功数和失败数)
batch_success, batch_failed = future.result()
success_count += batch_success # 累计成功数
# 记录失败批次信息
if batch_failed:
failed_batches.append({
"batch": batch_num,
"failed_count": batch_failed
})
print(f"✅ 批次 {batch_num} 完成: 成功 {batch_success} 条")
except Exception as e:
print(f"❌ 批次 {batch_num} 异常: {e}")
failed_batches.append({
"batch": batch_num,
"error": str(e)
})
return success_count, failed_batches
def upload_batch_with_retry(self, batch_num, batch_df, max_retries=2):
"""上传单个批次数据(带重试机制)
对单个批次进行上传,并在失败时进行重试,
多次重试失败后会降级为单条上传模式。
参数:
batch_num (int): 批次编号
batch_df (pandas.DataFrame): 该批次的数据
max_retries (int): 最大重试次数
返回:
tuple: (该批次成功上传的记录数, 失败的记录数)
"""
# 准备当前批次的上传数据(转换为飞书API要求的格式)
records_data = self.prepare_batch_data(batch_df)
# 重试循环
for attempt in range(max_retries):
try:
print(f" 批次 {batch_num} 尝试第 {attempt + 1} 次上传 ({len(records_data)}条)...")
# 构建批量创建记录的API地址
url = f"{self.base_url}/bitable/v1/apps/{self.base_id}/tables/{self.table_id}/records/batch_create"
# 请求头
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
# 请求体(包含要上传的记录数据)
payload = {
"records": records_data
}
# 发送POST请求上传批次数据
response = requests.post(url, headers=headers, json=payload, timeout=60)
# 处理响应结果
if response.status_code == 200:
result = response.json()
# 提取成功创建的记录数
created_records = result.get("data", {}).get("records", [])
return len(created_records), 0
else:
error_msg = f"HTTP {response.status_code}"
print(f" 批次 {batch_num} 上传失败: {error_msg}")
# 如果还有重试次数,等待后重试
if attempt < max_retries - 1:
wait_time = 1 # 重试等待时间(秒)
print(f" {wait_time}秒后重试...")
time.sleep(wait_time)
continue
# 所有重试都失败,降级为单条上传
return self.fallback_to_single_upload(batch_df)
except Exception as e:
print(f" 批次 {batch_num} 异常: {e}")
# 如果还有重试次数,等待后重试
if attempt < max_retries - 1:
wait_time = 1 # 重试等待时间(秒)
print(f" {wait_time}秒后重试...")
time.sleep(wait_time)
continue
# 所有重试都失败,降级为单条上传
return self.fallback_to_single_upload(batch_df)
# 所有尝试失败(理论上不会走到这里,因为重试次数已耗尽)
return 0, len(batch_df)
def prepare_batch_data(self, batch_df):
"""准备批次上传的数据格式
将pandas数据框转换为飞书API要求的记录格式,
包含字段映射和数据类型转换。
参数:
batch_df (pandas.DataFrame): 批次数据
返回:
list: 符合飞书API格式的记录列表
"""
records_data = []
# 遍历批次中的每一行数据
for _, row in batch_df.iterrows():
fields_data = {} # 存储当前行的字段数据
# 遍历字段映射关系,处理每个字段
for csv_field, feishu_field_name in self.field_mapping.items():
# 检查当前字段是否存在且不为空
if csv_field in row and pd.notna(row[csv_field]):
value = row[csv_field] # 获取原始值
# 获取飞书字段类型(默认为文本类型)
field_type = self.field_types.get(feishu_field_name, 1)
# 根据字段类型格式化值
formatted_value = self.format_value_by_type(value, field_type)
if formatted_value is not None:
fields_data[feishu_field_name] = formatted_value # 添加到字段数据
# 如果有有效字段数据,添加到记录列表
if fields_data:
records_data.append({"fields": fields_data})
return records_data
def fallback_to_single_upload(self, batch_df):
"""降级策略:单条上传模式
当批次上传失败时,自动降级为单条上传,
确保尽可能多的记录能成功上传,减少数据丢失。
参数:
batch_df (pandas.DataFrame): 批次数据
返回:
tuple: (单条上传成功的记录数, 失败的记录数)
"""
success_count = 0 # 成功数
failed_count = 0 # 失败数
print(" 🐌 降级到极速单条上传...")
# 使用会话保持连接,减少TCP握手开销,提高单条上传效率
session = requests.Session()
session.headers.update({
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
})
# 单条创建记录的API地址
url = f"{self.base_url}/bitable/v1/apps/{self.base_id}/tables/{self.table_id}/records"
# 遍历批次中的每一行数据
for idx, row in batch_df.iterrows():
try:
fields_data = {} # 当前行的字段数据
# 处理每个字段
for csv_field, feishu_field_name in self.field_mapping.items():
if csv_field in row and pd.notna(row[csv_field]):
value = row[csv_field]
field_type = self.field_types.get(feishu_field_name, 1)
formatted_value = self.format_value_by_type(value, field_type)
if formatted_value is not None:
fields_data[feishu_field_name] = formatted_value
# 如果没有有效字段数据,跳过当前行
if not fields_data:
continue
# 发送单条记录上传请求
payload = {"fields": fields_data}
response = session.post(url, json=payload, timeout=10)
# 检查上传结果
if response.status_code == 200:
success_count += 1
else:
failed_count += 1
# 每成功50条输出一次进度(避免输出过于频繁)
if success_count % 50 == 0:
print(f" 单条上传进度: {success_count}/{len(batch_df)}")
except Exception:
# 任何异常都视为失败
failed_count += 1
# 关闭会话释放资源
session.close()
# 输出单条上传结果
print(f" ✅ 单条上传完成: {success_count} 条成功, {failed_count} 条失败")
return success_count, failed_count
def check_existing_records(self):
"""检查表格中现有记录数量
用于上传前后的数量对比,验证上传效果。
返回:
int: 现有记录数量;失败返回0
"""
if not self.access_token:
return 0
# 构建获取记录列表的API地址
url = f"{self.base_url}/bitable/v1/apps/{self.base_id}/tables/{self.table_id}/records"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
try:
# 发送GET请求获取记录数量(默认只返回部分记录,这里仅用计数)
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
result = response.json()
records = result.get("data", {}).get("items", [])
return len(records)
return 0
except:
return 0
def main():
"""主函数:执行飞书表格批量上传流程"""
# 1. 初始化上传器实例
uploader = FeishuUltraUploader()
# 2. 获取飞书API访问令牌(必须第一步成功,否则后续操作无法进行)
if not uploader.get_access_token():
return
# 3. 读取CSV数据文件
csv_file = "data/output/merged_bills.csv" # CSV文件路径(根据实际情况修改)
df = uploader.read_csv_file(csv_file)
if df is None: # 读取失败则退出
return
# 4. 自动创建CSV字段与飞书表格字段的映射关系
csv_columns = df.columns.tolist() # 获取CSV的列名列表
uploader.field_mapping = uploader.auto_create_field_mapping(csv_columns)
# 检查映射是否成功创建
if not uploader.field_mapping:
print("❌ 字段映射创建失败")
return
# 5. 上传前检查表格中的记录数量(用于后续对比)
print("🔍 检查上传前的记录数量...")
before_count = uploader.check_existing_records()
print(f"上传前表格中的记录数量: {before_count}")
# 6. 执行极速批量上传
start_time = time.time() # 记录开始时间
# 调用上传方法(大批次+多线程组合最大化效率)
success_count, failed_batches = uploader.ultra_batch_upload(
df,
batch_size=200, # 批次大小(飞书API最大支持200条/批)
max_workers=3 # 并行线程数(根据网络情况调整)
)
end_time = time.time() # 记录结束时间
elapsed_time = end_time - start_time # 计算总耗时
# 7. 上传后检查表格中的记录数量(验证实际新增数量)
print("\n🔍 检查上传后的记录数量...")
after_count = uploader.check_existing_records()
# 8. 输出上传结果统计信息
print(f"\n🎉 极速上传完成!")
print(f"⏱️ 总耗时: {elapsed_time:.2f}秒")
print(f"📊 平均速度: {success_count/elapsed_time:.2f} 条/秒")
print(f"✅ 成功: {success_count} 条")
print(f"📈 实际新增: {after_count - before_count} 条") # 实际新增 = 上传后总数 - 上传前总数
# 处理失败批次(如果有)
if failed_batches:
print(f"❌ 失败批次: {len(failed_batches)} 个")
# 将失败信息保存到JSON文件,方便后续处理
with open("upload_failed_batches.json", "w", encoding="utf-8") as f:
json.dump(failed_batches, f, ensure_ascii=False, indent=2)
print("失败批次信息已保存到 upload_failed_batches.json")
# 程序入口:当直接运行该脚本时执行main函数
if __name__ == "__main__":
main()
更多推荐


所有评论(0)