📚 项目中的异步实现详解

🎯 项目中的异步/并发模式概览

你的项目使用了 3种不同的异步/并发模式

┌─────────────────────────────────────────────────────────────┐
│               项目中的异步/并发架构                           │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  1️⃣  FastAPI async/await    ← ASGI异步I/O                  │
│      └─ 处理HTTP请求/响应                                     │
│                                                              │
│  2️⃣  threading.Thread       ← 后台线程                      │
│      └─ 长任务异步执行(华为云同步)                          │
│                                                              │
│  3️⃣  ThreadPoolExecutor     ← 线程池                        │
│      └─ 并发查询云API(标签批量查询)                         │
│                                                              │
│  4️⃣  APScheduler            ← 定时任务调度                   │
│      └─ BackgroundScheduler(后台线程运行)                  │
│                                                              │
└─────────────────────────────────────────────────────────────┘

🎯 1. FastAPI 的 async/await(异步 I/O)

什么是 async/await?

# ❌ 同步函数(阻塞)
def get_data():
    data = db.query(...)  # 等待数据库
    return data

# ✅ 异步函数(非阻塞)
async def get_data():
    data = await db.query(...)  # 释放CPU给其他请求
    return data

你的项目中的使用

@app.get("/api/huawei/resources", response_model=ResourceListResponse)
async def get_huawei_resources(
    db: Session = Depends(get_db),
    expire_days: int = Query(default=30, description="到期天数范围"),
    resource_type: Optional[str] = Query(default=None, description="资源类型"),
    region: Optional[str] = Query(default=None, description="区域"),
    status: Optional[str] = Query(default=None, description="状态筛选"),
    search: Optional[str] = Query(default=None, description="搜索关键词"),
    auto_renew: Optional[bool] = Query(default=None, description="自动续费筛选,True=已开启,False=未开启"),
    page: int = Query(default=1, ge=1, description="页码"),
    page_size: int = Query(default=20, ge=1, le=10000, description="每页数量(导出时可设置为10000)"),
    sort_by: str = Query(default="expire_time", description="排序字段"),
    sort_order: str = Query(default="asc", description="排序方向")
):

知识点讲解

1️⃣ async def - 定义异步函数
# 你的项目
@app.get("/api/huawei/resources")
async def get_huawei_resources(...):  # ⭐ async 关键字
    # 函数体

特点

  • async def 定义的函数是协程函数
  • 调用时返回一个协程对象,而不是直接执行
  • FastAPI 会自动管理协程的执行
2️⃣ 同步代码在 async 函数中(你的项目)
@app.get("/api/huawei/resources")
async def get_huawei_resources(db: Session = Depends(get_db)):
    # ⚠️ 注意:这里没有使用 await,因为 SQLAlchemy 的查询是同步的
    query = db.query(ExpiringResource)  # 同步数据库查询
    resources = query.all()  # 阻塞式查询
    return resources

重要概念

  • 你的项目中大部分异步函数内部实际上是同步代码
  • 这是因为 SQLAlchemy(你用的ORM)是同步库
  • FastAPI 会在线程池中执行这些同步代码,避免阻塞事件循环
3️⃣ async vs def 的区别
# 方式1:异步函数(你的项目使用)
@app.get("/api/resources")
async def get_resources(db: Session = Depends(get_db)):
    data = db.query(Resource).all()  # 同步查询
    return data

# 方式2:同步函数
@app.get("/api/resources")
def get_resources(db: Session = Depends(get_db)):
    data = db.query(Resource).all()  # 同步查询
    return data

对比

特性 async def def
FastAPI处理方式 在线程池中运行同步代码 直接在主线程运行
并发能力 高(可以处理多个请求) 低(阻塞主线程)
适用场景 I/O密集型(数据库、API) CPU密集型(计算)
你的项目 ✅ 使用 ❌ 未使用

FastAPI 的智能处理

async def endpoint():
    # FastAPI 检测到内部是同步代码
    db.query(...)  # 同步查询
    
    # FastAPI 自动在 ThreadPoolExecutor 中运行
    # 相当于:
    # await run_in_threadpool(lambda: db.query(...))

🎯 2. threading.Thread - 后台线程(长任务异步)

这是你的项目中最核心的异步实现!

实现原理

@app.post("/api/huawei/sync", response_model=SyncResponse)
async def sync_huawei_resources(
    db: Session = Depends(get_db),
    expire_days: int = Query(default=30, description="到期天数范围")
):
    """同步华为云到期资源(异步模式:立即返回,后台执行)"""
    import threading
    
    # 创建同步日志
    sync_log = ResourceSyncLog(
        cloud_provider="huawei",
        sync_type=SyncTypeEnum.MANUAL.value,  # "手动"
        status=SyncStatusEnum.IN_PROGRESS.value,  # "进行中"
        start_time=datetime.now()
    )
    db.add(sync_log)
    db.commit()
    db.refresh(sync_log)
    sync_log_id = sync_log.id
    
    def background_sync():
        """后台同步任务"""
        db_thread = SessionLocal()
        try:
            logger.info("🚀 [华为云] 开始后台同步到期资源...")
            
            # 获取同步日志
            log = db_thread.query(ResourceSyncLog).filter(ResourceSyncLog.id == sync_log_id).first()
            if not log:
                logger.error(f"❌ [华为云] 同步日志不存在: ID={sync_log_id}")
                return
            logger.info(f"✅ [华为云] 成功获取同步日志: ID={log.id}, 当前状态={log.status}")
            
            # 获取BSS客户端
            bss_client = get_huawei_bss_client()
            
            # 获取到期资源
            logger.info("📡 正在从华为云BSS API获取资源列表...")
            resources = bss_client.fetch_expiring_resources(expire_days=expire_days)
            logger.info(f"✅ 成功获取 {len(resources)} 个资源,开始保存到数据库...")
            
            # 统计信息
            stats = bss_client.get_resource_statistics(resources)
            
            # 更新数据库 - 先将所有华为云资源标记为非活跃
            logger.info("🔄 更新现有资源状态...")
            db_thread.query(ExpiringResource).filter(
                ExpiringResource.cloud_provider == "huawei"
            ).update({"is_active": False})
            
            # 保存新资源
            logger.info(f"💾 开始保存资源到数据库 (共 {len(resources)} 个)...")
            
            for resource_data in resources:
                existing = db_thread.query(ExpiringResource).filter(
                    ExpiringResource.cloud_provider == "huawei",
                    ExpiringResource.resource_id == resource_data["resource_id"]
                ).first()
                
                if existing:
                    # 更新现有资源
                    for key, value in resource_data.items():
                        if key != "cloud_provider" and key != "project_id" and hasattr(existing, key):
                            setattr(existing, key, value)
                    existing.is_active = True
                    existing.last_synced_at = datetime.now()
                else:
                    # 新增资源
                    new_resource = ExpiringResource(
                        cloud_provider="huawei",
                        region=resource_data.get("region", ""),
                        region_name=resource_data.get("region_name", ""),
                        resource_id=resource_data.get("resource_id", ""),
                        resource_name=resource_data.get("resource_name", ""),
                        resource_type=resource_data.get("resource_type", ""),
                        resource_type_name=resource_data.get("resource_type_name", ""),
                        resource_spec=resource_data.get("resource_spec", ""),
                        enterprise_project_id=resource_data.get("enterprise_project_id", ""),
                        enterprise_project_name=resource_data.get("enterprise_project_name", ""),
                        tag_product_line=resource_data.get("tag_product_line"),
                        tag_env=resource_data.get("tag_env"),
                        tag_name=resource_data.get("tag_name"),
                        create_time=resource_data.get("create_time"),
                        expire_time=resource_data.get("expire_time"),
                        status=resource_data.get("status", ""),
                        payment_type=resource_data.get("payment_type", ""),
                        auto_renew=resource_data.get("auto_renew", False),
                        is_active=True,
                        last_synced_at=datetime.now()
                    )
                    db_thread.add(new_resource)
            
            logger.info(f"  ✅ 资源保存完成")
            
            # 更新同步日志为成功
            logger.info(f"🔄 [华为云] 准备更新同步日志状态... (log.id={log.id})")
            # ⭐ 直接使用枚举的字符串值
            log.status = SyncStatusEnum.SUCCESS.value  # "成功"
            log.end_time = datetime.now()
            log.total_resources = len(resources)
            log.expired = stats.get("expired", 0)
            log.expiring_7_days = stats.get("expiring_7_days", 0)
            log.expiring_15_days = stats.get("expiring_15_days", 0)
            log.expiring_30_days = stats.get("expiring_30_days", 0)
            logger.info(f"   状态已设置为: {log.status} (字符串), total={log.total_resources}")
            
            logger.info(f"🔄 [华为云] 开始提交事务...")
            try:
                db_thread.commit()
                logger.info(f"✅ [华为云] 事务提交成功!状态已更新为: {log.status}")
            except Exception as commit_error:
                logger.error(f"❌ [华为云] 事务提交失败: {commit_error}")
                import traceback
                logger.error(traceback.format_exc())
                db_thread.rollback()
                raise
            
            logger.info(f"✅ [华为云] 资源同步完成!共 {len(resources)} 个资源")
            logger.info(f"   📊 统计: 已过期 {stats.get('expired', 0)} 个, 7天内到期 {stats.get('expiring_7_days', 0)} 个, 15天内到期 {stats.get('expiring_15_days', 0)} 个, 30天内到期 {stats.get('expiring_30_days', 0)} 个")
            
        except Exception as e:
            logger.error(f"[华为云] 后台同步失败: {e}")
            import traceback
            logger.error(traceback.format_exc())
            
            # 更新同步日志为失败
            try:
                log = db_thread.query(ResourceSyncLog).filter(ResourceSyncLog.id == sync_log_id).first()
                if log:
                    log.status = SyncStatusEnum.FAILED.value  # "失败"
                    log.end_time = datetime.now()
                    log.error_message = str(e)[:500]  # 限制错误信息长度
                    db_thread.commit()
            except Exception as commit_error:
                logger.error(f"更新同步日志失败: {commit_error}")
        finally:
            db_thread.close()
    
    # 启动后台线程
    thread = threading.Thread(target=background_sync, daemon=True)
    thread.start()
    
    # 立即返回,告诉前端同步已启动
    logger.info(f"✅ 华为云同步任务已启动(后台执行中),同步日志ID: {sync_log_id}")
    return SyncResponse(
        success=True,
        message=f"华为云同步任务已启动,正在后台执行中。请等待自动刷新。",
        total_resources=0,  # 还不知道总数
        sync_time=datetime.now(),
        statistics=None,
        sync_log_id=sync_log_id  # ⭐ 返回同步日志ID
    )

知识点讲解

1️⃣ 为什么需要后台线程?
# ❌ 问题:如果不使用后台线程(同步执行)
@app.post("/api/huawei/sync")
async def sync_huawei_resources():
    # 同步执行,需要1小时
    resources = bss_client.fetch_expiring_resources()  # 阻塞1小时
    db.bulk_insert(resources)
    return {"success": True}
    
# 结果:
# - HTTP请求超时(Nginx默认60秒)
# - 用户等待1小时没有响应
# - 前端显示"同步失败"

# ✅ 解决:使用后台线程
@app.post("/api/huawei/sync")
async def sync_huawei_resources():
    # 1. 立即返回(< 1秒)
    thread = threading.Thread(target=background_sync, daemon=True)
    thread.start()
    return {"success": True, "sync_log_id": 123}
    
    # 2. 后台线程继续执行(1小时)
    # 3. 前端通过轮询查询状态
2️⃣ threading.Thread 核心参数
thread = threading.Thread(
    target=background_sync,    # ⭐ 要执行的函数
    daemon=True                # ⭐ 守护线程
)
thread.start()

daemon=True 详解

# daemon=True(守护线程)
thread = threading.Thread(target=long_task, daemon=True)
thread.start()

# 主程序退出时:
# → 守护线程自动终止
# → 适合后台任务,不影响程序退出

# daemon=False(非守护线程,默认)
thread = threading.Thread(target=long_task, daemon=False)
thread.start()

# 主程序退出时:
# → 等待非守护线程完成
# → 可能导致程序无法退出

你的项目为什么用 daemon=True?

# 场景:FastAPI服务重启
# 1. 用户点击"同步"按钮
# 2. 后台线程开始执行(预计1小时)
# 3. 管理员重启服务(5分钟后)

# daemon=True:
#   → 线程立即终止,服务快速重启 ✅
#   → 同步记录状态保持"进行中"(可以手动重新同步)

# daemon=False:
#   → 等待线程执行完(1小时),服务无法重启 ❌
3️⃣ 独立数据库会话(⭐⭐⭐⭐⭐ 超级重要!)
def background_sync():
    # ⭐ 创建独立的数据库会话
    db_thread = SessionLocal()
    
    try:
        # 使用独立会话
        log = db_thread.query(ResourceSyncLog).first()
        db_thread.commit()
    finally:
        db_thread.close()  # ⭐ 必须关闭

为什么不能共享主线程的 db 会话?

# ❌ 错误做法:共享数据库会话
@app.post("/api/huawei/sync")
async def sync_huawei_resources(db: Session = Depends(get_db)):
    def background_sync():
        # 危险!使用主线程的 db 会话
        log = db.query(ResourceSyncLog).first()  # ❌ 会话冲突
        db.commit()  # ❌ 可能在主线程已关闭
    
    thread = threading.Thread(target=background_sync, daemon=True)
    thread.start()
    return {"success": True}
    # HTTP请求结束,db会话被关闭
    # 后台线程还在使用 db → 崩溃!

# ✅ 正确做法:创建独立会话
def background_sync():
    db_thread = SessionLocal()  # ✅ 独立会话
    try:
        log = db_thread.query(ResourceSyncLog).first()
        db_thread.commit()
    finally:
        db_thread.close()  # ✅ 安全关闭
4️⃣ 闭包捕获变量
sync_log_id = sync_log.id  # ⭐ 在主线程获取ID

def background_sync():
    # ⭐ 闭包:访问外部变量 sync_log_id
    log = db_thread.query(ResourceSyncLog).filter(
        ResourceSyncLog.id == sync_log_id  # 使用外部变量
    ).first()

闭包知识点

# 闭包:内部函数访问外部函数的变量

def outer_function():
    message = "Hello"  # 外部变量
    
    def inner_function():
        print(message)  # ⭐ 闭包:访问外部变量
    
    return inner_function

func = outer_function()
func()  # 输出: Hello

🎯 3. ThreadPoolExecutor - 线程池(并发查询)

实现原理

def _batch_query_tags(self, resources: List[Dict], max_workers: int = 10):
    
    """批量查询资源标签(使用线程池并发查询,提高速度)"""
    
    # ... 省略部分代码 ...
    
    completed = 0
    total = len(resources)
    # 动态调整并发数:降低并发数以减少内存占用(生产环境优化)
    actual_workers = min(current_workers, max(3, min(5, total // 200)))  # 至少3个,最多5个(降低并发,减少内存)
    logger.info(f"🏷️  开始批量查询标签,并发数: {actual_workers}, 总资源数: {total}")
    
    with ThreadPoolExecutor(max_workers=actual_workers) as executor:
        # 提交所有任务
        future_to_resource = {
            executor.submit(query_tag_for_resource, res_info): res_info 
            for res_info in resources
        }
        
        # 处理完成的任务
        for future in as_completed(future_to_resource):
            completed += 1
            # 每完成100个打印一次进度(减少日志输出,提高速度)
            if completed % 100 == 0 or completed == total:
                logger.info(f"  🏷️  标签查询进度: {completed}/{total} ({completed*100//total}%)")
            try:
                future.result()  # 获取结果(如果有异常会抛出)
            except Exception as e:
                # 静默处理错误,不打印每个失败(减少日志输出,提高速度)
                pass
            
            # 如果检测到429错误,在请求之间添加小延迟
            if rate_limit_detected and completed % 10 == 0:
                time.sleep(0.5)  # 每10个请求后等待0.5秒
    
    # 如果检测到限流,记录警告
    if rate_limit_detected:
        logger.debug("⚠️  检测到API限流(429错误),已自动降低并发并添加重试机制")

知识点讲解

1️⃣ ThreadPoolExecutor 基础
from concurrent.futures import ThreadPoolExecutor, as_completed

# 创建线程池
with ThreadPoolExecutor(max_workers=5) as executor:
    # 提交任务
    future1 = executor.submit(func, arg1)
    future2 = executor.submit(func, arg2)
    
    # 等待结果
    result1 = future1.result()
    result2 = future2.result()

概念图

ThreadPoolExecutor (max_workers=5)
┌──────────────────────────────────────────┐
│  线程池 (5个工作线程)                      │
│                                           │
│  🧵 Thread 1 → 任务1 (查询资源A的标签)     │
│  🧵 Thread 2 → 任务2 (查询资源B的标签)     │
│  🧵 Thread 3 → 任务3 (查询资源C的标签)     │
│  🧵 Thread 4 → 任务4 (查询资源D的标签)     │
│  🧵 Thread 5 → 任务5 (查询资源E的标签)     │
│                                           │
│  等待队列: [任务6, 任务7, 任务8, ...]      │
└──────────────────────────────────────────┘
2️⃣ 为什么使用线程池而不是普通线程?
# ❌ 方式1:为每个任务创建线程(不推荐)
for resource in resources:  # 1000个资源
    thread = threading.Thread(target=query_tag, args=(resource,))
    thread.start()  # 创建1000个线程!

# 问题:
# - 创建/销毁线程开销大
# - 线程数过多,系统崩溃
# - 内存占用爆炸

# ✅ 方式2:线程池(你的项目)
with ThreadPoolExecutor(max_workers=5) as executor:
    for resource in resources:  # 1000个资源
        executor.submit(query_tag, resource)  # 只用5个线程

# 优点:
# - 线程复用,性能更好
# - 控制并发数,避免过载
# - 自动管理线程生命周期
3️⃣ executor.submit() vs map()
# 方式1:submit()(你的项目使用)
futures = []
for resource in resources:
    future = executor.submit(query_tag, resource)
    futures.append(future)

# 获取结果(按完成顺序)
for future in as_completed(futures):
    result = future.result()

# 方式2:map()
results = executor.map(query_tag, resources)

# 获取结果(按提交顺序)
for result in results:
    process(result)

对比

特性 submit() map()
返回值 Future对象 迭代器
结果顺序 按完成顺序(as_completed) 按提交顺序
灵活性 高(可以单独处理每个Future) 低(批量处理)
你的项目 ✅ 使用 ❌ 未使用
4️⃣ as_completed() - 按完成顺序处理
# 提交所有任务
future_to_resource = {
    executor.submit(query_tag, res): res 
    for res in resources
}

# ⭐ 按完成顺序处理(谁先完成谁先处理)
for future in as_completed(future_to_resource):
    result = future.result()
    print(f"任务完成: {result}")

执行流程

时间轴:
0秒  → 提交任务A, B, C, D, E
1秒  → 任务C完成 ✅ → 立即处理C
2秒  → 任务A完成 ✅ → 立即处理A
3秒  → 任务E完成 ✅ → 立即处理E
4秒  → 任务B完成 ✅ → 立即处理B
5秒  → 任务D完成 ✅ → 立即处理D

处理顺序: C → A → E → B → D (按完成顺序)
5️⃣ future.result() - 获取结果
future = executor.submit(query_tag, resource)

# 获取结果(阻塞,直到任务完成)
try:
    result = future.result(timeout=30)  # 最多等待30秒
    print(f"成功: {result}")
except TimeoutError:
    print("任务超时")
except Exception as e:
    print(f"任务失败: {e}")
6️⃣ 动态调整并发数(你的项目的优化)
# 动态计算并发数
actual_workers = min(
    current_workers,           # 配置的最大并发数
    max(3, min(5, total // 200))  # 根据资源数量动态调整
)

计算逻辑

# 示例计算
resources = 1000# 第1步:total // 200 = 1000 // 200 = 5
# 第2步:min(5, 5) = 5
# 第3步:max(3, 5) = 5
# 第4步:min(current_workers, 5) = min(10, 5) = 5

# 结果:使用5个线程

# 如果只有100个资源:
# total // 200 = 100 // 200 = 0
# min(5, 0) = 0
# max(3, 0) = 3  ← 至少3个线程

# 如果有5000个资源:
# total // 200 = 5000 // 200 = 25
# min(5, 25) = 5  ← 最多5个线程

🎯 4. APScheduler BackgroundScheduler

已在前面详细讲解,这里简要回顾:

scheduler = BackgroundScheduler()  # ⭐ 后台线程运行
scheduler.start()  # 启动调度器(创建后台线程)

# 调度器在单独的线程中运行
# 不阻塞FastAPI主线程

🎯 5. 异步模式对比总结

完整对比表

异步模式 使用场景 并发级别 你的项目 复杂度
async/await I/O密集型(HTTP、数据库) 高(成千上万) ✅ API端点 ⭐⭐⭐
threading.Thread 长任务后台执行 低(几个线程) ✅ 华为云同步 ⭐⭐
ThreadPoolExecutor 批量并发任务 中(3-10个线程) ✅ 标签查询 ⭐⭐⭐
BackgroundScheduler 定时任务 低(后台线程) ✅ 定时同步 ⭐⭐⭐⭐

执行流程图

┌──────────────────────────────────────────────────────────────┐
│                    完整执行流程                               │
└──────────────────────────────────────────────────────────────┘
                         │
                         ▼
              ┌─────────────────────┐
              │  FastAPI 启动       │
              │  (主线程)           │
              └──────────┬──────────┘
                         │
                         ├─→ BackgroundScheduler 启动(后台线程)
                         │     └─→ 定时执行同步任务
                         │
                         ▼
              ┌─────────────────────┐
              │  HTTP 请求到达      │
              │  GET /api/resources │
              └──────────┬──────────┘
                         │
                         ▼
              ┌─────────────────────┐
              │  async def endpoint │ ← FastAPI异步处理
              │  查询数据库           │
              └──────────┬──────────┘
                         │
                         ▼
              ┌─────────────────────┐
              │  返回响应(快速)    │
              └─────────────────────┘


              ┌─────────────────────┐
              │  HTTP 请求到达      │
              │  POST /api/sync     │
              └──────────┬──────────┘
                         │
                         ▼
              ┌─────────────────────┐
              │  创建同步日志        │
              │  sync_log_id = 123  │
              └──────────┬──────────┘
                         │
                         ▼
              ┌─────────────────────┐
              │  启动后台线程        │ ← threading.Thread
              │  Thread.start()     │
              └──────────┬──────────┘
                         │
                         ├─→ 立即返回HTTP响应
                         │   {"sync_log_id": 123}
                         │
                         └─→ 后台线程执行
                               │
                               ▼
                    ┌──────────────────────┐
                    │  background_sync()   │
                    │  独立数据库会话       │
                    └────────┬─────────────┘
                             │
                             ▼
                    ┌──────────────────────┐
                    │  查询云API资源       │
                    │  (1000个资源)        │
                    └────────┬─────────────┘
                             │
                             ▼
                    ┌──────────────────────┐
                    │  ThreadPoolExecutor  │ ← 线程池
                    │  并发查询标签         │
                    │  (5个线程)           │
                    └────────┬─────────────┘
                             │
                             ▼
                    ┌──────────────────────┐
                    │  保存到数据库         │
                    │  更新同步状态         │
                    └──────────────────────┘

🎯 6. 关键知识点总结

✅ 你的项目做得好的地方

  1. 独立数据库会话

    db_thread = SessionLocal()  # ✅ 避免会话冲突
    
  2. 守护线程

    daemon=True  # ✅ 不阻塞程序退出
    
  3. 线程池控制并发

    ThreadPoolExecutor(max_workers=5)  # ✅ 避免过载
    
  4. 动态调整并发数

    actual_workers = min(..., max(3, min(5, total // 200)))  # ✅ 智能优化
    
  5. 异步返回 + 轮询

    return {"sync_log_id": 123}  # ✅ 立即返回
    # 前端轮询状态
    

⚠️ 可以改进的地方

  1. 使用 asyncio 替代 ThreadPoolExecutor(可选)

    # 当前:线程池
    with ThreadPoolExecutor() as executor:
        executor.submit(query_tag, resource)
    
    # 改进:asyncio(更高效)
    async with aiohttp.ClientSession() as session:
        tasks = [query_tag_async(session, res) for res in resources]
        await asyncio.gather(*tasks)
    
  2. 使用 Celery 替代 threading.Thread(大型项目)

    # 当前:本地线程
    thread = threading.Thread(target=background_sync)
    thread.start()
    
    # 改进:Celery(分布式任务队列)
    @celery.task
    def background_sync():
        ...
    
    background_sync.delay()  # 在独立的worker进程中执行
    

📊 异步知识点清单

知识点 说明 你的项目 重要性
async/await Python异步语法 ✅ API端点 ⭐⭐⭐⭐⭐
threading.Thread 后台线程 ✅ 长任务 ⭐⭐⭐⭐⭐
daemon=True 守护线程 ✅ 使用 ⭐⭐⭐⭐
ThreadPoolExecutor 线程池 ✅ 标签查询 ⭐⭐⭐⭐⭐
as_completed() 按完成顺序处理 ✅ 使用 ⭐⭐⭐⭐
独立数据库会话 避免会话冲突 ✅ 使用 ⭐⭐⭐⭐⭐
闭包 访问外部变量 ✅ sync_log_id ⭐⭐⭐
BackgroundScheduler 后台调度器 ✅ 定时任务 ⭐⭐⭐⭐⭐

你的异步实现非常完整和专业!💯 特别是独立数据库会话和守护线程的使用非常规范。

还有什么想深入了解的吗?比如:

  • 如何从 threading 迁移到 asyncio?
  • 如何使用 Celery 实现分布式任务?
  • 如何监控线程池的性能?
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐