fastapi-异步
❌ 同步函数(阻塞)data = db.query(...) # 等待数据库# ✅ 异步函数(非阻塞)data = await db.query(...) # 释放CPU给其他请求# 你的项目async def get_huawei_resources(...): # ⭐ async 关键字# 函数体特点async def定义的函数是协程函数调用时返回一个协程对象,而不是直接执行FastAPI 会
·
📚 项目中的异步实现详解
🎯 项目中的异步/并发模式概览
你的项目使用了 3种不同的异步/并发模式:
┌─────────────────────────────────────────────────────────────┐
│ 项目中的异步/并发架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1️⃣ FastAPI async/await ← ASGI异步I/O │
│ └─ 处理HTTP请求/响应 │
│ │
│ 2️⃣ threading.Thread ← 后台线程 │
│ └─ 长任务异步执行(华为云同步) │
│ │
│ 3️⃣ ThreadPoolExecutor ← 线程池 │
│ └─ 并发查询云API(标签批量查询) │
│ │
│ 4️⃣ APScheduler ← 定时任务调度 │
│ └─ BackgroundScheduler(后台线程运行) │
│ │
└─────────────────────────────────────────────────────────────┘
🎯 1. FastAPI 的 async/await(异步 I/O)
什么是 async/await?
# ❌ 同步函数(阻塞)
def get_data():
data = db.query(...) # 等待数据库
return data
# ✅ 异步函数(非阻塞)
async def get_data():
data = await db.query(...) # 释放CPU给其他请求
return data
你的项目中的使用
@app.get("/api/huawei/resources", response_model=ResourceListResponse)
async def get_huawei_resources(
db: Session = Depends(get_db),
expire_days: int = Query(default=30, description="到期天数范围"),
resource_type: Optional[str] = Query(default=None, description="资源类型"),
region: Optional[str] = Query(default=None, description="区域"),
status: Optional[str] = Query(default=None, description="状态筛选"),
search: Optional[str] = Query(default=None, description="搜索关键词"),
auto_renew: Optional[bool] = Query(default=None, description="自动续费筛选,True=已开启,False=未开启"),
page: int = Query(default=1, ge=1, description="页码"),
page_size: int = Query(default=20, ge=1, le=10000, description="每页数量(导出时可设置为10000)"),
sort_by: str = Query(default="expire_time", description="排序字段"),
sort_order: str = Query(default="asc", description="排序方向")
):
知识点讲解
1️⃣ async def - 定义异步函数
# 你的项目
@app.get("/api/huawei/resources")
async def get_huawei_resources(...): # ⭐ async 关键字
# 函数体
特点:
async def定义的函数是协程函数- 调用时返回一个协程对象,而不是直接执行
- FastAPI 会自动管理协程的执行
2️⃣ 同步代码在 async 函数中(你的项目)
@app.get("/api/huawei/resources")
async def get_huawei_resources(db: Session = Depends(get_db)):
# ⚠️ 注意:这里没有使用 await,因为 SQLAlchemy 的查询是同步的
query = db.query(ExpiringResource) # 同步数据库查询
resources = query.all() # 阻塞式查询
return resources
重要概念:
- 你的项目中大部分异步函数内部实际上是同步代码
- 这是因为 SQLAlchemy(你用的ORM)是同步库
- FastAPI 会在线程池中执行这些同步代码,避免阻塞事件循环
3️⃣ async vs def 的区别
# 方式1:异步函数(你的项目使用)
@app.get("/api/resources")
async def get_resources(db: Session = Depends(get_db)):
data = db.query(Resource).all() # 同步查询
return data
# 方式2:同步函数
@app.get("/api/resources")
def get_resources(db: Session = Depends(get_db)):
data = db.query(Resource).all() # 同步查询
return data
对比:
| 特性 | async def | def |
|---|---|---|
| FastAPI处理方式 | 在线程池中运行同步代码 | 直接在主线程运行 |
| 并发能力 | 高(可以处理多个请求) | 低(阻塞主线程) |
| 适用场景 | I/O密集型(数据库、API) | CPU密集型(计算) |
| 你的项目 | ✅ 使用 | ❌ 未使用 |
FastAPI 的智能处理:
async def endpoint():
# FastAPI 检测到内部是同步代码
db.query(...) # 同步查询
# FastAPI 自动在 ThreadPoolExecutor 中运行
# 相当于:
# await run_in_threadpool(lambda: db.query(...))
🎯 2. threading.Thread - 后台线程(长任务异步)
这是你的项目中最核心的异步实现!
实现原理
@app.post("/api/huawei/sync", response_model=SyncResponse)
async def sync_huawei_resources(
db: Session = Depends(get_db),
expire_days: int = Query(default=30, description="到期天数范围")
):
"""同步华为云到期资源(异步模式:立即返回,后台执行)"""
import threading
# 创建同步日志
sync_log = ResourceSyncLog(
cloud_provider="huawei",
sync_type=SyncTypeEnum.MANUAL.value, # "手动"
status=SyncStatusEnum.IN_PROGRESS.value, # "进行中"
start_time=datetime.now()
)
db.add(sync_log)
db.commit()
db.refresh(sync_log)
sync_log_id = sync_log.id
def background_sync():
"""后台同步任务"""
db_thread = SessionLocal()
try:
logger.info("🚀 [华为云] 开始后台同步到期资源...")
# 获取同步日志
log = db_thread.query(ResourceSyncLog).filter(ResourceSyncLog.id == sync_log_id).first()
if not log:
logger.error(f"❌ [华为云] 同步日志不存在: ID={sync_log_id}")
return
logger.info(f"✅ [华为云] 成功获取同步日志: ID={log.id}, 当前状态={log.status}")
# 获取BSS客户端
bss_client = get_huawei_bss_client()
# 获取到期资源
logger.info("📡 正在从华为云BSS API获取资源列表...")
resources = bss_client.fetch_expiring_resources(expire_days=expire_days)
logger.info(f"✅ 成功获取 {len(resources)} 个资源,开始保存到数据库...")
# 统计信息
stats = bss_client.get_resource_statistics(resources)
# 更新数据库 - 先将所有华为云资源标记为非活跃
logger.info("🔄 更新现有资源状态...")
db_thread.query(ExpiringResource).filter(
ExpiringResource.cloud_provider == "huawei"
).update({"is_active": False})
# 保存新资源
logger.info(f"💾 开始保存资源到数据库 (共 {len(resources)} 个)...")
for resource_data in resources:
existing = db_thread.query(ExpiringResource).filter(
ExpiringResource.cloud_provider == "huawei",
ExpiringResource.resource_id == resource_data["resource_id"]
).first()
if existing:
# 更新现有资源
for key, value in resource_data.items():
if key != "cloud_provider" and key != "project_id" and hasattr(existing, key):
setattr(existing, key, value)
existing.is_active = True
existing.last_synced_at = datetime.now()
else:
# 新增资源
new_resource = ExpiringResource(
cloud_provider="huawei",
region=resource_data.get("region", ""),
region_name=resource_data.get("region_name", ""),
resource_id=resource_data.get("resource_id", ""),
resource_name=resource_data.get("resource_name", ""),
resource_type=resource_data.get("resource_type", ""),
resource_type_name=resource_data.get("resource_type_name", ""),
resource_spec=resource_data.get("resource_spec", ""),
enterprise_project_id=resource_data.get("enterprise_project_id", ""),
enterprise_project_name=resource_data.get("enterprise_project_name", ""),
tag_product_line=resource_data.get("tag_product_line"),
tag_env=resource_data.get("tag_env"),
tag_name=resource_data.get("tag_name"),
create_time=resource_data.get("create_time"),
expire_time=resource_data.get("expire_time"),
status=resource_data.get("status", ""),
payment_type=resource_data.get("payment_type", ""),
auto_renew=resource_data.get("auto_renew", False),
is_active=True,
last_synced_at=datetime.now()
)
db_thread.add(new_resource)
logger.info(f" ✅ 资源保存完成")
# 更新同步日志为成功
logger.info(f"🔄 [华为云] 准备更新同步日志状态... (log.id={log.id})")
# ⭐ 直接使用枚举的字符串值
log.status = SyncStatusEnum.SUCCESS.value # "成功"
log.end_time = datetime.now()
log.total_resources = len(resources)
log.expired = stats.get("expired", 0)
log.expiring_7_days = stats.get("expiring_7_days", 0)
log.expiring_15_days = stats.get("expiring_15_days", 0)
log.expiring_30_days = stats.get("expiring_30_days", 0)
logger.info(f" 状态已设置为: {log.status} (字符串), total={log.total_resources}")
logger.info(f"🔄 [华为云] 开始提交事务...")
try:
db_thread.commit()
logger.info(f"✅ [华为云] 事务提交成功!状态已更新为: {log.status}")
except Exception as commit_error:
logger.error(f"❌ [华为云] 事务提交失败: {commit_error}")
import traceback
logger.error(traceback.format_exc())
db_thread.rollback()
raise
logger.info(f"✅ [华为云] 资源同步完成!共 {len(resources)} 个资源")
logger.info(f" 📊 统计: 已过期 {stats.get('expired', 0)} 个, 7天内到期 {stats.get('expiring_7_days', 0)} 个, 15天内到期 {stats.get('expiring_15_days', 0)} 个, 30天内到期 {stats.get('expiring_30_days', 0)} 个")
except Exception as e:
logger.error(f"[华为云] 后台同步失败: {e}")
import traceback
logger.error(traceback.format_exc())
# 更新同步日志为失败
try:
log = db_thread.query(ResourceSyncLog).filter(ResourceSyncLog.id == sync_log_id).first()
if log:
log.status = SyncStatusEnum.FAILED.value # "失败"
log.end_time = datetime.now()
log.error_message = str(e)[:500] # 限制错误信息长度
db_thread.commit()
except Exception as commit_error:
logger.error(f"更新同步日志失败: {commit_error}")
finally:
db_thread.close()
# 启动后台线程
thread = threading.Thread(target=background_sync, daemon=True)
thread.start()
# 立即返回,告诉前端同步已启动
logger.info(f"✅ 华为云同步任务已启动(后台执行中),同步日志ID: {sync_log_id}")
return SyncResponse(
success=True,
message=f"华为云同步任务已启动,正在后台执行中。请等待自动刷新。",
total_resources=0, # 还不知道总数
sync_time=datetime.now(),
statistics=None,
sync_log_id=sync_log_id # ⭐ 返回同步日志ID
)
知识点讲解
1️⃣ 为什么需要后台线程?
# ❌ 问题:如果不使用后台线程(同步执行)
@app.post("/api/huawei/sync")
async def sync_huawei_resources():
# 同步执行,需要1小时
resources = bss_client.fetch_expiring_resources() # 阻塞1小时
db.bulk_insert(resources)
return {"success": True}
# 结果:
# - HTTP请求超时(Nginx默认60秒)
# - 用户等待1小时没有响应
# - 前端显示"同步失败"
# ✅ 解决:使用后台线程
@app.post("/api/huawei/sync")
async def sync_huawei_resources():
# 1. 立即返回(< 1秒)
thread = threading.Thread(target=background_sync, daemon=True)
thread.start()
return {"success": True, "sync_log_id": 123}
# 2. 后台线程继续执行(1小时)
# 3. 前端通过轮询查询状态
2️⃣ threading.Thread 核心参数
thread = threading.Thread(
target=background_sync, # ⭐ 要执行的函数
daemon=True # ⭐ 守护线程
)
thread.start()
daemon=True 详解:
# daemon=True(守护线程)
thread = threading.Thread(target=long_task, daemon=True)
thread.start()
# 主程序退出时:
# → 守护线程自动终止
# → 适合后台任务,不影响程序退出
# daemon=False(非守护线程,默认)
thread = threading.Thread(target=long_task, daemon=False)
thread.start()
# 主程序退出时:
# → 等待非守护线程完成
# → 可能导致程序无法退出
你的项目为什么用 daemon=True?
# 场景:FastAPI服务重启
# 1. 用户点击"同步"按钮
# 2. 后台线程开始执行(预计1小时)
# 3. 管理员重启服务(5分钟后)
# daemon=True:
# → 线程立即终止,服务快速重启 ✅
# → 同步记录状态保持"进行中"(可以手动重新同步)
# daemon=False:
# → 等待线程执行完(1小时),服务无法重启 ❌
3️⃣ 独立数据库会话(⭐⭐⭐⭐⭐ 超级重要!)
def background_sync():
# ⭐ 创建独立的数据库会话
db_thread = SessionLocal()
try:
# 使用独立会话
log = db_thread.query(ResourceSyncLog).first()
db_thread.commit()
finally:
db_thread.close() # ⭐ 必须关闭
为什么不能共享主线程的 db 会话?
# ❌ 错误做法:共享数据库会话
@app.post("/api/huawei/sync")
async def sync_huawei_resources(db: Session = Depends(get_db)):
def background_sync():
# 危险!使用主线程的 db 会话
log = db.query(ResourceSyncLog).first() # ❌ 会话冲突
db.commit() # ❌ 可能在主线程已关闭
thread = threading.Thread(target=background_sync, daemon=True)
thread.start()
return {"success": True}
# HTTP请求结束,db会话被关闭
# 后台线程还在使用 db → 崩溃!
# ✅ 正确做法:创建独立会话
def background_sync():
db_thread = SessionLocal() # ✅ 独立会话
try:
log = db_thread.query(ResourceSyncLog).first()
db_thread.commit()
finally:
db_thread.close() # ✅ 安全关闭
4️⃣ 闭包捕获变量
sync_log_id = sync_log.id # ⭐ 在主线程获取ID
def background_sync():
# ⭐ 闭包:访问外部变量 sync_log_id
log = db_thread.query(ResourceSyncLog).filter(
ResourceSyncLog.id == sync_log_id # 使用外部变量
).first()
闭包知识点:
# 闭包:内部函数访问外部函数的变量
def outer_function():
message = "Hello" # 外部变量
def inner_function():
print(message) # ⭐ 闭包:访问外部变量
return inner_function
func = outer_function()
func() # 输出: Hello
🎯 3. ThreadPoolExecutor - 线程池(并发查询)
实现原理
def _batch_query_tags(self, resources: List[Dict], max_workers: int = 10):
"""批量查询资源标签(使用线程池并发查询,提高速度)"""
# ... 省略部分代码 ...
completed = 0
total = len(resources)
# 动态调整并发数:降低并发数以减少内存占用(生产环境优化)
actual_workers = min(current_workers, max(3, min(5, total // 200))) # 至少3个,最多5个(降低并发,减少内存)
logger.info(f"🏷️ 开始批量查询标签,并发数: {actual_workers}, 总资源数: {total}")
with ThreadPoolExecutor(max_workers=actual_workers) as executor:
# 提交所有任务
future_to_resource = {
executor.submit(query_tag_for_resource, res_info): res_info
for res_info in resources
}
# 处理完成的任务
for future in as_completed(future_to_resource):
completed += 1
# 每完成100个打印一次进度(减少日志输出,提高速度)
if completed % 100 == 0 or completed == total:
logger.info(f" 🏷️ 标签查询进度: {completed}/{total} ({completed*100//total}%)")
try:
future.result() # 获取结果(如果有异常会抛出)
except Exception as e:
# 静默处理错误,不打印每个失败(减少日志输出,提高速度)
pass
# 如果检测到429错误,在请求之间添加小延迟
if rate_limit_detected and completed % 10 == 0:
time.sleep(0.5) # 每10个请求后等待0.5秒
# 如果检测到限流,记录警告
if rate_limit_detected:
logger.debug("⚠️ 检测到API限流(429错误),已自动降低并发并添加重试机制")
知识点讲解
1️⃣ ThreadPoolExecutor 基础
from concurrent.futures import ThreadPoolExecutor, as_completed
# 创建线程池
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交任务
future1 = executor.submit(func, arg1)
future2 = executor.submit(func, arg2)
# 等待结果
result1 = future1.result()
result2 = future2.result()
概念图:
ThreadPoolExecutor (max_workers=5)
┌──────────────────────────────────────────┐
│ 线程池 (5个工作线程) │
│ │
│ 🧵 Thread 1 → 任务1 (查询资源A的标签) │
│ 🧵 Thread 2 → 任务2 (查询资源B的标签) │
│ 🧵 Thread 3 → 任务3 (查询资源C的标签) │
│ 🧵 Thread 4 → 任务4 (查询资源D的标签) │
│ 🧵 Thread 5 → 任务5 (查询资源E的标签) │
│ │
│ 等待队列: [任务6, 任务7, 任务8, ...] │
└──────────────────────────────────────────┘
2️⃣ 为什么使用线程池而不是普通线程?
# ❌ 方式1:为每个任务创建线程(不推荐)
for resource in resources: # 1000个资源
thread = threading.Thread(target=query_tag, args=(resource,))
thread.start() # 创建1000个线程!
# 问题:
# - 创建/销毁线程开销大
# - 线程数过多,系统崩溃
# - 内存占用爆炸
# ✅ 方式2:线程池(你的项目)
with ThreadPoolExecutor(max_workers=5) as executor:
for resource in resources: # 1000个资源
executor.submit(query_tag, resource) # 只用5个线程
# 优点:
# - 线程复用,性能更好
# - 控制并发数,避免过载
# - 自动管理线程生命周期
3️⃣ executor.submit() vs map()
# 方式1:submit()(你的项目使用)
futures = []
for resource in resources:
future = executor.submit(query_tag, resource)
futures.append(future)
# 获取结果(按完成顺序)
for future in as_completed(futures):
result = future.result()
# 方式2:map()
results = executor.map(query_tag, resources)
# 获取结果(按提交顺序)
for result in results:
process(result)
对比:
| 特性 | submit() | map() |
|---|---|---|
| 返回值 | Future对象 | 迭代器 |
| 结果顺序 | 按完成顺序(as_completed) | 按提交顺序 |
| 灵活性 | 高(可以单独处理每个Future) | 低(批量处理) |
| 你的项目 | ✅ 使用 | ❌ 未使用 |
4️⃣ as_completed() - 按完成顺序处理
# 提交所有任务
future_to_resource = {
executor.submit(query_tag, res): res
for res in resources
}
# ⭐ 按完成顺序处理(谁先完成谁先处理)
for future in as_completed(future_to_resource):
result = future.result()
print(f"任务完成: {result}")
执行流程:
时间轴:
0秒 → 提交任务A, B, C, D, E
1秒 → 任务C完成 ✅ → 立即处理C
2秒 → 任务A完成 ✅ → 立即处理A
3秒 → 任务E完成 ✅ → 立即处理E
4秒 → 任务B完成 ✅ → 立即处理B
5秒 → 任务D完成 ✅ → 立即处理D
处理顺序: C → A → E → B → D (按完成顺序)
5️⃣ future.result() - 获取结果
future = executor.submit(query_tag, resource)
# 获取结果(阻塞,直到任务完成)
try:
result = future.result(timeout=30) # 最多等待30秒
print(f"成功: {result}")
except TimeoutError:
print("任务超时")
except Exception as e:
print(f"任务失败: {e}")
6️⃣ 动态调整并发数(你的项目的优化)
# 动态计算并发数
actual_workers = min(
current_workers, # 配置的最大并发数
max(3, min(5, total // 200)) # 根据资源数量动态调整
)
计算逻辑:
# 示例计算
resources = 1000个
# 第1步:total // 200 = 1000 // 200 = 5
# 第2步:min(5, 5) = 5
# 第3步:max(3, 5) = 5
# 第4步:min(current_workers, 5) = min(10, 5) = 5
# 结果:使用5个线程
# 如果只有100个资源:
# total // 200 = 100 // 200 = 0
# min(5, 0) = 0
# max(3, 0) = 3 ← 至少3个线程
# 如果有5000个资源:
# total // 200 = 5000 // 200 = 25
# min(5, 25) = 5 ← 最多5个线程
🎯 4. APScheduler BackgroundScheduler
已在前面详细讲解,这里简要回顾:
scheduler = BackgroundScheduler() # ⭐ 后台线程运行
scheduler.start() # 启动调度器(创建后台线程)
# 调度器在单独的线程中运行
# 不阻塞FastAPI主线程
🎯 5. 异步模式对比总结
完整对比表
| 异步模式 | 使用场景 | 并发级别 | 你的项目 | 复杂度 |
|---|---|---|---|---|
| async/await | I/O密集型(HTTP、数据库) | 高(成千上万) | ✅ API端点 | ⭐⭐⭐ |
| threading.Thread | 长任务后台执行 | 低(几个线程) | ✅ 华为云同步 | ⭐⭐ |
| ThreadPoolExecutor | 批量并发任务 | 中(3-10个线程) | ✅ 标签查询 | ⭐⭐⭐ |
| BackgroundScheduler | 定时任务 | 低(后台线程) | ✅ 定时同步 | ⭐⭐⭐⭐ |
执行流程图
┌──────────────────────────────────────────────────────────────┐
│ 完整执行流程 │
└──────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────┐
│ FastAPI 启动 │
│ (主线程) │
└──────────┬──────────┘
│
├─→ BackgroundScheduler 启动(后台线程)
│ └─→ 定时执行同步任务
│
▼
┌─────────────────────┐
│ HTTP 请求到达 │
│ GET /api/resources │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ async def endpoint │ ← FastAPI异步处理
│ 查询数据库 │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ 返回响应(快速) │
└─────────────────────┘
┌─────────────────────┐
│ HTTP 请求到达 │
│ POST /api/sync │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ 创建同步日志 │
│ sync_log_id = 123 │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ 启动后台线程 │ ← threading.Thread
│ Thread.start() │
└──────────┬──────────┘
│
├─→ 立即返回HTTP响应
│ {"sync_log_id": 123}
│
└─→ 后台线程执行
│
▼
┌──────────────────────┐
│ background_sync() │
│ 独立数据库会话 │
└────────┬─────────────┘
│
▼
┌──────────────────────┐
│ 查询云API资源 │
│ (1000个资源) │
└────────┬─────────────┘
│
▼
┌──────────────────────┐
│ ThreadPoolExecutor │ ← 线程池
│ 并发查询标签 │
│ (5个线程) │
└────────┬─────────────┘
│
▼
┌──────────────────────┐
│ 保存到数据库 │
│ 更新同步状态 │
└──────────────────────┘
🎯 6. 关键知识点总结
✅ 你的项目做得好的地方
-
独立数据库会话
db_thread = SessionLocal() # ✅ 避免会话冲突 -
守护线程
daemon=True # ✅ 不阻塞程序退出 -
线程池控制并发
ThreadPoolExecutor(max_workers=5) # ✅ 避免过载 -
动态调整并发数
actual_workers = min(..., max(3, min(5, total // 200))) # ✅ 智能优化 -
异步返回 + 轮询
return {"sync_log_id": 123} # ✅ 立即返回 # 前端轮询状态
⚠️ 可以改进的地方
-
使用 asyncio 替代 ThreadPoolExecutor(可选)
# 当前:线程池 with ThreadPoolExecutor() as executor: executor.submit(query_tag, resource) # 改进:asyncio(更高效) async with aiohttp.ClientSession() as session: tasks = [query_tag_async(session, res) for res in resources] await asyncio.gather(*tasks) -
使用 Celery 替代 threading.Thread(大型项目)
# 当前:本地线程 thread = threading.Thread(target=background_sync) thread.start() # 改进:Celery(分布式任务队列) @celery.task def background_sync(): ... background_sync.delay() # 在独立的worker进程中执行
📊 异步知识点清单
| 知识点 | 说明 | 你的项目 | 重要性 |
|---|---|---|---|
| async/await | Python异步语法 | ✅ API端点 | ⭐⭐⭐⭐⭐ |
| threading.Thread | 后台线程 | ✅ 长任务 | ⭐⭐⭐⭐⭐ |
| daemon=True | 守护线程 | ✅ 使用 | ⭐⭐⭐⭐ |
| ThreadPoolExecutor | 线程池 | ✅ 标签查询 | ⭐⭐⭐⭐⭐ |
| as_completed() | 按完成顺序处理 | ✅ 使用 | ⭐⭐⭐⭐ |
| 独立数据库会话 | 避免会话冲突 | ✅ 使用 | ⭐⭐⭐⭐⭐ |
| 闭包 | 访问外部变量 | ✅ sync_log_id | ⭐⭐⭐ |
| BackgroundScheduler | 后台调度器 | ✅ 定时任务 | ⭐⭐⭐⭐⭐ |
你的异步实现非常完整和专业!💯 特别是独立数据库会话和守护线程的使用非常规范。
还有什么想深入了解的吗?比如:
- 如何从 threading 迁移到 asyncio?
- 如何使用 Celery 实现分布式任务?
- 如何监控线程池的性能?
更多推荐



所有评论(0)