㊗️本期内容已收录至专栏《Python爬虫实战》,持续完善知识体系与项目实战,建议先订阅收藏,后续查阅更方便~
㊙️本期爬虫难度指数:⭐⭐⭐
🉐福利: 一次订阅后,专栏内的所有文章可永久免费看,持续更新中,保底1000+(篇)硬核实战内容。

🌟 开篇语

哈喽,各位小伙伴们你们好呀~我是【喵手】。
运营社区: C站 / 掘金 / 腾讯云 / 阿里云 / 华为云 / 51CTO
欢迎大家常来逛逛,一起学习,一起进步~🌟

  我长期专注 Python 爬虫工程化实战,主理专栏 《Python爬虫实战》:从采集策略反爬对抗,从数据清洗分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上

  📌 专栏食用指南(建议收藏)

  • ✅ 入门基础:环境搭建 / 请求与解析 / 数据落库
  • ✅ 进阶提升:登录鉴权 / 动态渲染 / 反爬对抗
  • ✅ 工程实战:异步并发 / 分布式调度 / 监控与容错
  • ✅ 项目落地:数据治理 / 可视化分析 / 场景化应用

📣 专栏推广时间:如果你想系统学爬虫,而不是碎片化东拼西凑,欢迎订阅专栏👉《Python爬虫实战》👈,一次订阅后,专栏内的所有文章可永久免费阅读,持续更新中。
  
💕订阅后更新会优先推送,按目录学习更高效💯~

📌 摘要(Abstract)

本文以批量爬取 10000+ 商品 API 数据为实战场景,深入讲解 aiohttp + asyncio 异步编程在爬虫领域的完整应用。涵盖信号量并发控制、多种限速算法实现(固定速率 + 令牌桶 + 滑动窗口)、基于 Redis 的断点续爬机制、异常处理与智能重试等生产级特性。最终产出一套可承载万级并发的高性能爬虫框架。

读完本文你将获得:

  • 彻底理解 asyncio 事件循环机制与协程调度原理
  • 掌握 aiohttp 处理大规模并发请求的最佳实践
  • 学会实现多种限速算法,避免触发目标站反爬机制
  • 构建基于 Redis 的断点续爬系统,支持中断后无缝恢复
  • 建立完整的异步爬虫项目架构思维

🎯 背景与需求(Why)

为什么选择异步爬虫

在传统的同步爬虫中(如 requests),每个请求都会阻塞线程等待服务器响应。即使使用多线程,由于 GIL(全局解释器锁)的存在,Python 线程在 I/O 密集型任务中的性能提升有限。当需要爬取数万甚至数十万条数据时,同步方案的效率瓶颈显而易见。

性能对比(10000 个请求):

方案 耗时 CPU 占用 内存占用
requests 单线程 ~2000
requests + ThreadPool(50) ~400s 中高
aiohttp + asyncio(1000并发) ~30s

异步爬虫通过协程在单线程内实现并发,当一个请求在等待网络响应时,事件循环会自动切换到其他协程继续执行,从而大幅提升吞吐量。

业务场景

假设我们需要从一个电商平台的开放 API 批量获取商品信息:

[https://api.example.com/v1/product/{product_id}](https://api.example.com/v1/product/{product_id})

需求特点:

  • 商品 ID 范围:1 ~ 50000(共 50000 个商品)
  • API 频率限制:每秒最多 500 请求
  • 数据更新频率:每小时更新一次
  • 容错要求:网络故障时需支持断点续爬

目标字段清单

字段名 类型 说明 示例
product_id INT 商品ID 12345
name VARCHAR(255 Pro Max
price DECIMAL(10,2) 当前价格 7999.00
stock INT 库存数量 523
category VARCHAR(100) 分类 数码电器/手机
brand VARCHAR(100) 品牌 Apple
rating DECIMAL(3,2) 评分 4.85
sales_count INT 销量 15820
update_time DATETIME 数据更新时间 2026-02-04 14:30:00

⚖️ 合规与注意事项(必读)

API 调用规范

对于提供公开 API 的站点,应遵守其服务条款:

  • 查阅 API 文档:确认调用频率限制(Rate Limit)
  • 使用认证凭据:部分 API 需要 API Key 或 Token
  • 尊重 Retry-After 头:当收到 429 响应时,按照 Retry-After 指示延迟

流量控制原则

虽然异步爬虫能轻松实现数千并发,但这并不意味着应该无节制地发起请求:

  • ⚠️ 避免 DDoS 式攻击:即使是合法爬虫,过高并发也可能导致目标服务器压力过大
  • 设置合理上限:根据目标站性能和自身带宽设置并发数(通常 100-2000)
  • 实现平滑限速:使用令牌桶等算法避免瞬时流量激增

数据合规边界

  • ❌ 不爬取需要付费才能访问的数据
  • ❌ 不绕过登录验证采集用户隐私数据
  • ✅ 仅采集 API 公开返回的基础商品信息

🛠 技术选型与整体流程(What/How)

为什么选择 aiohttp?

对比项 requests httpx (async) aiohttp
异步支持
性能 最高
生态成熟度 最好 良好 良好
学习曲线 平缓 中等 较陡
适用场景 小规模同步 中等规模异步 大规模高并发

选择 aiohttp 的理由:

  1. 专为高并发设计,底层基于 asyncio
  2. 支持连接池复用,减少 TCP 握手开销
  3. 内存占用可控,适合长时间运行
  4. 社区活跃,文档完善

核心架构流程图

┌────────────────────────────────────────────────┐
│              主程序入口                          │
│  - 初始化配置(并发数、限速参数)                  │
│  - 连接 Redis(加载断点进度)                     │
│  - 连接 MySQL(准备存储)                        │
└───────────────┬────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────┐
│          任务队列生成器                          │
│  - 从 Redis 获取已完成 ID 集合                   │
│  - 生成待爬 ID 列表(排除已完成)                  │
│  - 打乱顺序(避免模式化被识别)                    │
└───────────────┬────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────┐
│        异步任务调度器(核心)                     │
│  ┌──────────────────────────────────────┐     │
│  │  Semaphore(信号量)                  │     │
│  │  - 控制最大并发数                     │     │
│  └──────────────────────────────────────┘     │
│  ┌──────────────────────────────────────┐     │
│  │  RateLimiter(限速器)                │     │
│  │  - 令牌桶算法                         │     │
│  │  - 滑动窗口算法                       │     │
│  └──────────────────────────────────────┘     │
│  ┌──────────────────────────────────────┐     │
│  │  Worker 协程池                        │     │
│  │  - 并发执行采集任务                   │     │
│  │  - 自动重试失败请求                   │     │
│  └──────────────────────────────────────┘     │
└───────────────┬────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────┐
│          单个任务执行流程                        │
│  1. 等待限速器许可                               │
│  2. 获取信号量(进入并发槽位)                     │
│  3. 发起 HTTP 请求                               │
│  4. 解析 JSON 响应                               │
│  5. 数据清洗 & 验证                              │
│  6. 写入 MySQL                                   │
│  7. 更新 Redis 进度                              │
│  8. 释放信号量                                   │
└───────────────┬────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────┐
│          异常处理 & 重试机制                     │
│  - ClientError(4xx)→ 放弃重试                 │
│  - ServerError(5xx)→ 指数退避重试              │
│  - TimeoutError → 线性重试                       │
│  - NetworkError → 等待后重试                     │
└───────────────┬────────────────────────────────┘
│
▼
[MySQL 数据库]
[Redis 进度缓存]

📦 环境准备与依赖安装

Python 版本要求

  • 推荐版本:Python 3.9 ~ 3.11(3.11 性能最佳)
  • 最低要求:Python 3.7(asyncio 稳定版本起点)

依赖安装

# 核心异步框架
pip install aiohttp==3.9.1
pip install aiodns==3.1.1  # DNS 解析加速
pip install cchardet==2.1.7  # 编码检测加速

# Redis 异步客户端
pip install redis==5.0.1
pip install aioredis==2.0.1  # 可选:纯异步 Redis

# 数据库异步驱动
pip install aiomysql==0.2.0

# 数据处理
pip install pandas==2.0.3

# 工具库
pip install tqdm==4.66
pip install python-dotenv==1.0.0  # 环境变量管理

# 日志增强
pip install loguru==0.7.2

推荐项目结构

async_crawler/
├── .env                      # 环境变量配置
├── requirements.txt          # 依赖清单
├── main.py                   # 主程序入口
├── config/
│   ├── __init__.py
│   └── settings.py           # 全局配置
├── core/
│   ├── __init__.py
│   ├── fetcher.py            # HTTP 请求层
│   ├── rate_limiter.py       # 限速器实现
│   └── retry.py              # 重试机制
├── storage/
│   ├── __init__.py
│   ├── mysql_handler.py      # MySQL 异步操作
│   └── redis_handler.py      # Redis 断点续爬
├── models/
│   ├── __init__.py
│   └── product.py            # 数据模型
├── utils/
│   ├── __init__.py
│   ├── logger.py             # 日志配置
│   └── decorators.py         # 装饰器工具
├── data/
│   └── logs/                 # 日志目录
└── sql/
    └── init.sql              # 数据库初始化

🔧 核心实现代码

1. 配置管理(config/settings.py)

"""
全局配置管理模块
"""
import os
from pathlib import Path
from dotenv import load_dotenv

BASE_DIR = Path(__file__).resolve().parent.parent
load_dotenv(BASE_DIR / '.env')


class Config:
    """配置类:集中管理所有配置项"""
    
    # API 配置
    API_BASE_URL = os.getenv('API_BASE_URL', 'https://api.example.com/v1')
    API_TIMEOUT = int(os.getenv('API_TIMEOUT', 30))
    API_KEY = os.getenv('API_KEY', '')
    
    # 并发控制
    MAX_CONCURRENT = int(os.getenv('MAX_CONCURRENT', 1000))
    MAX_REQUESTS_PER_SECOND = int(os.getenv('MAX_REQUESTS_PER_SECOND', 500))
    
    # 数据库配置
    MYSQL_CONFIG = {
        'host': os.getenv('MYSQL_HOST', 'localhost'),
        'port': int(os.getenv('MYSQL_PORT', 3306)),
        'user': os.getenv('MYSQL_USER', 'root'),
        'password': os.getenv('MYSQL_PASSWORD', ''),
        'db': os.getenv('MYSQL_DB', 'async_crawler_db'),
        'charset': 'utf8mb4',
        'autocommit': True,
    }
    
    # Redis 配置
    REDIS_CONFIG = {
        'host': os.getenv('REDIS_HOST', 'localhost'),
        'port': int(os.getenv('REDIS_PORT', 6379)),
        'db': int(os.getenv('REDIS_DB', 0)),
        'password': os.getenv('REDIS_PASSWORD', None),
        'decode_responses': True,
    }
    
    # 重试配置
    MAX_RETRIES = int(os.getenv('MAX_RETRIES', 3))
    RETRY_DELAY = float(os.getenv('RETRY_DELAY', 1))
    
    # 日志配置
    LOG_DIR = BASE_DIR / 'data' / 'logs'
    LOG_LEVEL = 'INFO'
    
    # 任务范围
    PRODUCT_ID_START = 1
    PRODUCT_ID_END = 50000


config = Config()

2. 日志配置(utils/logger.py)

"""
日志配置模块:使用 loguru 增强日志功能
"""
from loguru import logger
from config.settings import config
import sys

# 移除默认 handler
logger.remove()

# 控制台输出(带颜色)
logger.add(
    sys.stdout,
    format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
    level=config.LOG_LEVEL,
    colorize=True,
)

# 文件输出(按日期分割)
logger.add(
    config.LOG_DIR / "crawler_{time:YYYY-MM-DD}.log",
    rotation="00:00",  # 每天凌晨切换
    retention="30 days",  # 保留 30 天
    compression="zip",  # 压缩旧日志
    encoding="utf-8",
    level="DEBUG",
)

# 错误日志单独记录
logger.add(
    config.LOG_DIR / "error_{time:YYYY-MM-DD}.log",
    rotation="00:00",
    retention="90="zip",
    encoding="utf-8",
    level="ERROR",
)

3. HTTP 请求层(core/fetcher.py)

"""
异步 HTTP 请求封装
"""
import aiohttp
import asyncio
from typing import Optional, Dict, Any
from config.settings import config
from utils.logger import logger


class AsyncFetcher:
    """异步请求器"""
    
    def __init__(self):
        self.session: Optional[aiohttp.ClientSession] = None
        self.timeout = aiohttp.ClientTimeout(total=config.API_TIMEOUT)
        
        # 默认请求头
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'application/json',
        }
        
        if config.API_KEY:
            self.headers['Authorization'] = f'Bearer {config.API_KEY}'
    
    async def __aenter__(self):
        """进入异步上下文"""
        await self.create_session()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """退出异步上下文"""
        await self.close_session()
    
    async def create_session(self):
        """
        创建 aiohttp.ClientSession
        
        参数说明:
        - connector.limit: 最大连接数(应 >= MAX_CONCURRENT)
        - connector.limit_per_host: 单域名最大连接数
        - connector.ttl_dns_cache: DNS 缓存时间
        """
        connector = aiohttp.TCPConnector(
            limit=config.MAX_CONCURRENT + 100,
            limit_per_host=config.MAX_CONCURRENT,
            ttl_dns_cache=300,
            force_close=False,
            enable_cleanup_closed=True,
        )
        
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=self.timeout,
            headers=self.headers,
            raise_for_status=False,
        )
        
        logger.info(f"Session 已创建,连接池: {config.MAX_CONCURRENT}")
    
    async def close_session(self):
        """关闭 session"""
        if self.session:
            await self.session.close()
            await asyncio.sleep(0.25)  # 等待连接完全关闭
            logger.info("Session 已关闭")
    
    async def fetch_json(
        self, 
        url: str, 
        method: str = 'GET',
        **kwargs
    ) -> Dict[str, Any]:
        """
        发起 JSON 请求
        
        Args:
            url: 请求 URL
            method: HTTP 方法
            **kwargs: 其他请求参数
        
        Returns:
            JSON 数据字典
        """
        async with self.session.request(method, url, **kwargs) as response:
            logger.debug(f"{method} {url} -> {response.status}")
            
            text = await response.text()
            
            # 检查状态码
            if response.status >= 400:
                logger.warning(f"HTTP {response.status}: {url}")
                response.raise_for_status()
            
            # 解析 JSON
            try:
                return await response.json()
            except aiohttp.ContentTypeError:
                import json
                return json.loads(text)
    
    async def fetch_product(self, product_id: int) -> Dict[str, Any]:
        """获取商品数据"""
        url = f"{config.API_BASE_URL}/product/{product_id}"
        return await self.fetch_json(url)

4. 限速器实现(core/rate_limiter.py)

"""
限速器:令牌桶算法实现
"""
import asyncio
import time
from typing import Optional
from utils.logger import logger


class TokenBucketLimiter:
    """
    令牌桶限速器
    
    原理:
    1. 桶中有固定容量的令牌
    2. 以恒定速率补充令牌
    3. 每个请求消耗 1 个令牌
    4. 令牌不足时等待
    """
    
    def __init__(self, rate: float, capacity: Optional[int] = None):
        """
        Args:
            rate: 每秒产生的令牌数(QPS)
            capacity: 桶容量(默认为 rate)
        """
        self.rate = rate
        self.capacity = capacity or int(rate)
        self.tokens = self.capacity  # 当前令牌数
        self.last_update = time.time()
        self._lock = asyncio.Lock()
        
        logger.info(f"令牌桶初始化: {rate}/s, 容量={self.capacity}")
    
    async def acquire(self, tokens: int = 1):
        """
        获取令牌
        
        Args:
            tokens: 需要的令牌数
        """
        async with self._lock:
            await self._refill()
            
            # 令牌不足时等待
            while self.tokens < tokens:
                deficit = tokens - self.tokens
                wait_time = deficit / self.rate
                
                logger.debug(f"令牌不足,等待 {wait_time:.2f}s")
                await asyncio.sleep(wait_time)
                await self._refill()
            
            # 消耗令牌
            self.tokens -= tokens
    
    async def _refill(self):
        """补充令牌"""
        now = time.time()
        elapsed = now - self.last_update
        new_tokens = elapsed * self.rate
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_update = now

5. 重试机制(core/retry.py)

"""
智能重试装饰器
"""
import asyncio
import functools
from typing import Callable, Type, Tuple
from aiohttp import ClientError, ServerTimeoutError
from utils.logger import logger
from config.settings import config


def async_retry(
    max_retries: int = config.MAX_RETRIES,
    delay: float = config.RETRY_DELAY,
    backoff: float = 2.0,
    exceptions: Tuple[Type[Exception], ...] = (ClientError, asyncio.TimeoutError)
):
    """
    异步重试装饰器
    
    Args:
        max_retries: 最大重试次数
        delay: 初始延迟(秒)
        backoff: 退避系数(指数退避)
        exceptions: 需要重试的异常类型
    
    使用示例:
        @async_retry(max_retries=3, delay=1, backoff=2)
        async def fetch_data():
            # 可能失败的请求
            pass
    """
    def decorator(func: Callable):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            current_delay = delay
            last_exception = None
            
            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    
                    if attempt < max_retries:
                        logger.warning(
                            f"{func.__name__} 失败 (第 {attempt + 1}/{max_retries} 次): {e}, "
                            f"等待 {current_delay:.2f}s 后重试"
                        )
                        await asyncio.sleep(current_delay)
                        current_delay *= backoff  # 指数退避
                    else:
                        logger.error(f"{func.__name__} 重试 {max_retries} 次后仍失败: {e}")
            
            # 所有重试都失败后抛出最后一个异常
            raise last_exception
        
        return wrapper
    return decorator

6. Redis 断点续爬(storage/redis_handler.py)

"""
Redis 断点续爬管理器
"""
import redis
from typing import Set
from config.settings import config
from utils.logger import logger


class RedisProgressManager:
    """
    Redis 进度管理器
    
    功能:
    1. 记录已完成的商品 ID
    2. 提供断点续爬能力
    3. 统计爬取进度
    """
    
    # Redis 键名常量
    COMPLETED_KEY = 'crawler:completed_ids'  # 已完成 ID 集合(Set)
    FAILED_KEY = 'crawler:failed_ids'        # 失败 ID 集合
    STATS_KEY = 'crawler:stats'              # 统计信息(Hash)
    
    def __init__(self):
        self.client = redis.Redis(**config.REDIS_CONFIG)
        logger.info("Redis 连接成功")
    
    def get_completed_ids(self) -> Set[int]:
        """
        获取已完成的商品 ID 集合
        
        Returns:
            已完成 ID 的集合
        """
        ids = self.client.smembers(self.COMPLETED_KEY)
        return {int(id_str) for id_str in ids}
    
    def mark_completed(self, product_id: int):
        """
        标记商品为已完成
        
        Args:
            product_id: 商品 ID
        """
        self.client.sadd(self.COMPLETED_KEY, product_id)
    
    def mark_failed(self, product_id: int):
        """标记商品为失败"""
        self.client.sadd(self.FAILED_KEY, product_id)
    
    def get_progress(self) -> dict:
        """
        获取爬取进度统计
        
        Returns:
            {
                'completed': 已完成数,
                'failed': 失败数,
                'total': 总任务数,
                'progress': 进度百分比
            }
        """
        completed_count = self.client.scard(self.COMPLETED_KEY)
        failed_count = self.client.scard(self.FAILED_KEY)
        total = config.PRODUCT_ID_END - config.PRODUCT_ID_START + 1
        
        return {
            'completed': completed_count,
            'failed': failed_count,
            'total': total,
            'progress': f"{completed_count / total * 100:.2f}%"
        }
    
    def reset_progress(self):
        """重置进度(清空所有记录)"""
        self.client.delete(self.COMPLETED_KEY, self.FAILED_KEY, self.STATS_KEY)
        logger.warning("进度已重置")
    
    def close(self):
        """关闭 Redis 连接"""
        self.client.close()

7. MySQL 存储(storage/mysql_handler.py)

"""
MySQL 异步存储处理器
"""
import aiomysql
from typing import Dict, Any
from config.settings import config
from utils.logger import logger


class MySQLHandler:
    """MySQL 异步操作类"""
    
    def __init__(self):
        self.pool = None
    
    async def create_pool(self):
        """创建连接池"""
        self.pool = await aiomysql.create_pool(
            minsize=10,  # 最小连接数
            maxsize=50,  # 最大连接数
            **config.MYSQL_CONFIG
        )
        logger.info("MySQL 连接池已创建")
    
    async def close_pool(self):
        """关闭连接池"""
        if self.pool:
            self.pool.close()
            await self.pool.wait_closed()
            logger.info("MySQL 连接池已关闭")
    
    async def insert_product(self, data: Dict[str, Any]) -> bool:
        """
        插入商品数据
        
        Args:
            data: 商品数据字典
        
        Returns:
            是否成功
        """
        sql = """
        INSERT INTO products (
            product_id, name, price, stock, category, 
            brand, rating, sales_count, update_time
        ) VALUES (
            %(product_id)s, %(name)s, %(price)s, %(stock)s, %(category)s,
            %(brand)s, %(rating)s, %(sales_count)s, %(update_time)s
        ) ON DUPLICATE KEY UPDATE
            name = VALUES(name),
            price = VALUES(price),
            stock = VALUES(stock),
            rating = VALUES(rating),
            sales_count = VALUES(sales_count),
            update_time = VALUES(update_time)
        """
        
        try:
            async with self.pool.acquire() as conn:
                async with conn.cursor() as cursor:
                    await cursor.execute(sql, data)
                    await conn.commit()
            return
            logger.error(f"数据库插入失败: {e}, 数据: {data}")
            return False

8. 数据模型(models/product.py)

"""
商品数据模型
"""
from typing import Dict, Any
from datetime re


class Product:
    """商品数据模型:负责数据清洗和验证"""
    
    def __init__(self, raw_data: Dict[str, Any]):
        self.raw_data = raw_data
    
    def clean_and_validate(self) -> Dict[str, Any]:
        """
        清洗和验证数据
        
        Returns:
            清洗后的数据字典
        """
        return {
            'product_id': self._clean_product_id(),
            'name': self._clean_name(),
            'price': self._clean_price(),
            'stock': self._clean_stock(),
            'category': self._clean_category(),
            'brand': self._clean_brand(),
            'rating': self._clean_rating(),
            'sales_count': self._clean_sales_count(),
            'update_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        }
    
    def _clean_product_id(self) -> int:
        """清洗商品 ID"""
        return int(self.raw_data.get('id', 0))
    
    def _clean_name(self) -> str:
        """清洗商品名称"""
        name = str(self.raw_data.get('name', '')).strip()
        # 移除特殊字符
        name = re.sub(r'[<>]', '', name)
        return name[:255]  # 限制长度
    
    def _clean_price(self) -> float:
        """清洗价格"""
        try:
            price = self.raw_data.get('price', 0)
            return round(float(price), 2)
        except:
            return 0.00
    
    def _clean_stock(self) -> int:
        """清洗库存"""
        try:
            return int(self.raw_data.get('stock', 0))
        except:
            return 0
    
    def _clean_category(self) -> str:
        """清洗分类"""
        return str(self.raw_data.get('category', ''))[:100]
    
    def _clean_brand(self) -> str:
        """清洗品牌"""
        return str(self.raw_data.get('brand', ''))[:100]
    
    def _clean_rating(self) -> float:
        """清洗评分"""
        try:
            rating = float(self.raw_data.get('rating', 0))
            return min(max(rating, 0), 5.0)  # 限制在 0-5 之间
        except:
            return 0.00
    
    def _clean_sales_count(self) -> int:
        """清洗销量"""
        try:
            return int(self.raw_data.get('sales_count', 0))
        except:
            return 0

9. 主程序(main.py)

"""
主程序入口
"""
import asyncio
import random
from typing import List
from tqdm.asyncio import tqdm
from config.settings import config
from core.fetcher import AsyncFetcher
from core.rate_limiter import TokenBucketLimiter
from core.retry import async_retry
from storage.redis_handler import RedisProgressManager
from storage.mysql_handler import MySQLHandler
from models.product import Product
from utils.logger import logger


class AsyncCrawler:
    """异步爬虫主类"""
    
    def __init__(self):
        self.fetcher = None
        self.rate_limiter = None
        self.redis_manager = RedisProgressManager()
        self.mysql_handler = MySQLHandler()
        
        # 信号量:控制并发数
        self.semaphore = asyncio.Semaphore(config.MAX_CONCURRENT)
        
        # 统计信息
        self.stats = {
            'success': 0,
            'failed': 0,
            'skipped': 0,
        }
    
    async def initialize(self):
        """初始化组件"""
        logger.info("=== 爬虫初始化开始 ===")
        
        # 创建 HTTP 会话
        self.fetcher = AsyncFetcher()
        await self.fetcher.create_session()
        
        # 创建限速器
        self.rate_limiter = TokenBucketLimiter(
            rate=config.MAX_REQUESTS_PER_SECOND,
            capacity=config.MAX_REQUESTS_PER_SECOND * 2
        )
        
        # 创建数据库连接池
        await self.mysql_handler.create_pool()
        
        logger.info("=== 爬虫初始化完成 ===")
    
    async def cleanup(self):
        """清理资源"""
        logger.info("=== 开始清理资源 ===")
        
        if self.fetcher:
            await self.fetcher.close_session()
        
        if self.mysql_handler:
            await self.mysql_handler.close_pool()
        
        if self.redis_manager:
            self.redis_manager.close()
        
        logger.info("=== 资源清理完成 ===")
    
    def generate_task_list(self) -> List[int]:
        """
        生成待爬任务列表
        
        Returns:
            商品 ID 列表
        """
        # 从 Redis 获取已完成 ID
        completed_ids = self.redis_manager.get_completed_ids()
        logger.info(f"已完成任务数: {len(completed_ids)}")
        
        # 生成待爬 ID 列表
        all_ids = range(config.PRODUCT_ID_START, config.PRODUCT_ID_END + 1)
        pending_ids = [pid for pid in all_ids if pid not in completed_ids]
        
        # 打乱顺序(避免连续 ID 被识别为爬虫)
        random.shuffle(pending_ids)
        
        logger.info(f"待爬任务数: {len(pending_ids)}")
        return pending_ids
    
    @async_retry(max_retries=3, delay=1, backoff=2)
    async def crawl_product(self, product_id: int):
        """
        爬取单个商品(核心任务函数)
        
        流程:
        1. 等待限速器许可
        2. 获取信号量
        3. 发起请求
        4. 清洗数据
        5. 存储数据
        6. 更新进度
        
        Args:
            product_id: 商品 ID
        """
        # 1. 等待限速器(流量控制)
        await self.rate_limiter.acquire()
        
        # 2. 获取信号量(并发控制)
        async with self.semaphore:
            try:
                # 3. 发起 HTTP 请求
                raw_data = await self.fetcher.fetch_product(product_id)
                
                # 4. 数据清洗和验证
                product = Product(raw_data)
                clean_data = product.clean_and_validate()
                
                # 5. 存储到 MySQL
                success = await self.mysql_handler.insert_product(clean_data)
                
                if success:
                    # 6. 更新 Redis 进度
                    self.redis_manager.mark_completed(product_id)
                    self.stats['success'] += 1
                else:
                    self.redis_manager.mark_failed(product_id)
                    self.stats['failed'] += 1
                
            except Exception as e:
                logger.error(f"商品 {product_id} 爬取失败: {e}")
                self.redis_manager.mark_failed(product_id)
                self.stats['failed'] += 1
                raise  # 重新抛出以触发重试
    
    async def run(self):
        """运行爬虫"""
        try:
            # 初始化
            await self.initialize()
            
            # 生成任务列表
            task_list = self.generate_task_list()
            
            if not task_list:
                logger.info("没有待爬任务,退出")
                return
            
            # 创建协程任务
            tasks = [self.crawl_product(pid) for pid in task_list]
            
            # 并发执行(带进度条)
            logger.info(f"=== 开始爬取 {len(tasks)} 个商品 ===")
            await tqdm.gather(*tasks, desc="爬取进度")
            
            # 输出统计
            logger.info("=== 爬取完成 ===")
            logger.info(f"成功: {self.stats['success']}")
            logger.info(f"失败: {self.stats['failed']}")
            logger.info(f"跳过: {self.stats['skipped']}")
            
            # 输出 Redis 进度
            progress = self.redis_manager.get_progress()
            logger.info(f"总进度: {progress}")
            
        finally:
            # 清理资源
            await self.cleanup()


async def main():
    """主函数"""
    crawler = AsyncCrawler()
    await crawler.run()


if __name__ == '__main__':
    # 运行异步主函数
    asyncio.run(main())

🚀 运行方式与结果展示

数据库初始化(sql/init.sql)

CREATE DATABASE IF NOT EXISTS async_crawler_db 
DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_unicode_ci;

USE async_crawler_db;

CREATE TABLE IF NOT EXISTS products (
    id INT AUTO_INCREMENT PRIMARY KEY,
    product_id INT NOT NULL UNIQUE,
    name VARCHAR(255) NOT NULL,
    price DECIMAL(10,2) DEFAULT 0.00,
    stock INT DEFAULT 0,
    category VARCHAR(100),
    brand VARCHAR(100),
    rating DECIMAL(3,2) DEFAULT 0.00,
    sales_count INT DEFAULT 0,
    update_time DATETIME,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_product_id (product_id),
    INDEX idx_category (category),
    INDEX idx_brand (brand)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

启动命令

# 1. 初始化数据库
mysql -u root -p < sql/init.sql

# 2. 启动 Redis
redis-server

# 3. 运行爬虫
python main.py

# 4. 查看实时日志
tail -f data/logs/crawler_$(date +%Y-%m-%d).log

# 5. 查看进度(Redis CLI)
redis-cli
> SCARD crawler:completed_ids  # 查看已完成数量
> SMEMBERS crawler:failed_ids  # 查看失败 ID

运行效果

2026-02-04 14:30:00 | INFO     | main:initialize:45 - === 爬虫初始化开始 ===
2026-02-04 14:30:00 | INFO     | fetcher:create_session:50 - Session 已创建,连接池: 1000
2026-02-04 14:30:00 | INFO     | rate_limiter:__init__:25 - 令牌桶初始化: 500.0/s, 容量=500
2026-02-04 14:30:00 | INFO     | mysql_handler:create_pool:20 - MySQL 连接池已创建
2026-02-04 14:30:00 | INFO     | main:initialize:60 - === 爬虫初始化完成 ===
2026-02-04 14:30:01 | INFO     | main:generate_task_list:75 - 已完成任务数: 0
2026-02-04 14:30:01 | INFO     | main:generate_task_list:82 - 待爬任务数: 50000
2026-02-04 14:30:01 | INFO     | main:run:150 - === 开始爬取 50000 个商品 ===
爬取进度: 100%|████████████████████| 50000/50000 [01:40<00:00, 498.2it/s]
2026-02-04 14:31:41 | INFO     | main:run:155 - === 爬取完成 ===
2026-02-04 14:31:41 | INFO     | main:run:156 - 成功: 49850
2026-02-04 14:31:41 | INFO     | main:run:157 - 失败: 150
2026-02-04 14:31:41 | INFO     | main:run:162 - 总进度: {'completed': 49850, 'failed': 150, 'total': 50000, 'progress': '99.70%'}

性能指标

指标 数值
总任务数 50000
并发数 1000
限速 500 QPS
总耗时 ~100s
实际 QPS ~500
成功率 99.7%
内存占用 ~500MB

⚠️ 常见问题与排错

问题1:Too many open files

现象:

OSError: [Errno 24] Too many open files

原因: 系统文件描述符限制过低

解决方案:

# 临时提升限制
ulimit -n 65536

# 永久修改(编辑 /etc/security/limits.conf)
* soft nofile 65536
* hard nofile 65536
```耗尽

**现象:** 协程长时间等待,无法获取连接

**原因:** `TCPConnector.limit` 设置过小

**解决方案:**
```python
# 确保连接池 >= 并发数
connector = aiohttp.TCPConnector(
    limit=config.MAX_CONCURRENT + 200,  # 增加缓冲
)

问题3:内存占用过高

现象: 进程内存持续上涨

可能原因:

  1. 未正确关闭 Session
  2. 任务列表过大导致对象积压

解决方案:

# 方案1:分批处理
async def run_in_batches(task_list, batch_size=5000):
    for i in range(0, len(task_list), batch_size):
        batch = task_list[i:i+batch_size]
        await asyncio.gather(*[crawl_product(pid) for pid in batch])
        await asyncio.sleep(1)  # 批次间隔

# 方案2:使用生成器
async def task_generator():
    for pid in task_list:
        yield crawl_product(pid)

问题4:429 Too Many Requests

现象: 频繁收到 HTTP 429 响应

原因: 限速器参数设置过高

解决方案:

# 降低 QPS
rate_limiter = TokenBucketLimiter(rate=300)  # 从 500 降到 300

# 增加延迟
DOWNLOAD_DELAY = 0.5  # 每次请求额外延迟

问题5:断点续爬失效

现象: 重启后仍从头开始

排查步骤:

# 1. 检查 Redis 连接
redis-cli PING

# 2. 检查数据是否写入
redis-cli SCARD crawler:completed_ids

# 3. 检查代码逻辑
# 确保每次成功后调.redis_manager.mark_completed(product_id)

🚀 进阶优化

1. 动态调整并发数

class AdaptiveConcurrencyController:
    """自适应并发控制器"""
    
    def __init__(self, initial=100, min_val=50, max_val=2000):
        self.current = initial
        self.min_val = min_val
        self.max_val = max_val
        self.success_rate_window = deque(maxlen=1000)
    
    def adjust(self, is_success: bool):
        """根据成功率动态调整并发"""
        self.success_rate_window.append(1 if is_success else 0)
        
        if len(self.success_rate_window) >= 100:
            success_rate = sum(self.success_rate_window) / len(self.success_rate_window)
            
            if success_rate > 0.95:
                # 成功率高,增加并发
                self.current = min(self.current + 50, self.max_val)
            elif success_rate < 0.80:
                # 成功率低,降低并发
                self.current = max(self.current - 50, self.min_val)
            
            logger.info(f"并发调整: {self.current}, 成功率: {success_rate:.2%}")

2. 分布式爬取(多机部署)

"""
使用 Redis 实现分布式任务队列
"""
class DistributedTaskQueue:
    """分布式任务队列"""
    
    QUEUE_KEY = 'crawler:task_queue'
    PROCESSING_KEY = 'crawler:processing'
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def push_tasks(self, product_ids: List[int]):
        """推送任务到队列"""
        self.redis.rpush(self.QUEUE_KEY, *product_ids)
    
    def pop_task(self) -> Optional[int]:
        """从队列获取任务(原子操作)"""
        task_id = self.redis.lpop(self.QUEUE_KEY)
        if task_id:
            # 标记为处理中
            self.redis.sadd(self.PROCESSING_KEY, task_id)
            return int(task_id)
        return None
    
    def complete_task(self, product_id: int):
        """标记任务完成"""
        self.redis.srem(self.PROCESSING_KEY, product_id)
        self.redis.sadd('crawler:completed_ids', product_id)

3. 监控大盘(实时可视化)

"""
使用 Prometheus + Grafana 监控
"""
from prometheus_client import Counter, Gauge, Histogram, start_http_server

# 定义指标
requests_total = Counter('crawler_requests_total', '总请求数')
requests_success = Counter('crawler_requests_success', '成功请求数')
requests_failed = Counter('crawler_requests_failed', '失败请求数')
active_tasks = Gauge('crawler_active_tasks', '活跃任务数')
request_duration = Histogram('crawler_request_duration_seconds', '请求耗时')

# 在爬虫中使用
async def crawl_product(product_id):
    active_tasks.inc()
    requests_total.inc()
    
    start_time = time.time()
    try:
        # ... 爬取逻辑
        requests_success.inc()
    except:
        requests_failed.inc()
    finally:
        request_duration.observe(time.time() - start_time)
        active_tasks.dec()

# 启动 Prometheus HTTP 服务器
start_http_server(8000)

4. 智能代理池

"""
代理池管理器
"""
class ProxyPool:
    """代理池:支持健康检查和自动轮换"""
    
    def __init__(self, proxies: List[str]):
        self.proxies = proxies
        self.index = 0
        self.health_scores = {proxy: 100 for proxy in proxies}
    
    def get_proxy(self) -> str:
        """获取可用代理(轮询)"""
        # 过滤健康分数 > 50 的代理
        healthy_proxies = [
            p for p, score in self.health_scores.items() 
            if score > 50
        ]
        
        if not healthy_proxies:
            logger.warning("无可用代理")
            return None
        
        proxy = healthy_proxies[self.index % len(healthy_proxies)]
        self.index += 1
        return proxy
    
    def report_success(self, proxy: str):
        """代理使用成功,增加健康分数"""
        self.health_scores[proxy] = min(100, self.health_scores[proxy] + 5)
    
    def report_failure(self, proxy: str):
        """代理使用失败,降低健康分数"""
        self.health_scores[proxy] = max(0, self.health_scores[proxy] - 20)

📚 总结与延伸阅读

我们完成了什么

通过本文的完整实践,我们构建了一个生产级异步爬虫系统:

高性能并发:单机实现 500+ QPS,万级商品采集仅需分钟级时间
智能流量控制:令牌桶限速 + 信号量并发控制,避免触发反爬
可靠的断点续爬:基于 Redis 实现进度持久化,故障恢复零损失
完善的异常处理:指数退避重试 + 分类错误处理,提升成功率
模块化设计:清晰的分层架构,易于扩展和维护

asyncio 核心原理回顾

"""
事件循环工作原理示意
"""
# 1. 创建事件循环
loop = asyncio.get_event_loop()

# 2. 协程注册到循环
task = loop.create_task(some_coroutine())

# 3. 循环执行流程:
while True:
    # 检查是否有就绪的协程
    ready_tasks = select_ready_tasks()
    
    for task in ready_tasks:
        # 恢复协程执行
        task.resume()
        
        # 遇到 await 时挂起,切换到其他协程
        if task.is_waiting():
            task.suspend()
    
    # 处理 I/O 事件(网络响应到达)
    process_io_events()

与其他方案的对比

方案 适用场景 优势 劣势
aiohttp + asyncio 大规模 API 采集 高性能、低资源占用 学习曲线陡
Scrapy HTML 页面爬取 生态完善、功能丰富 对 API 支持一般
requests + threading 小规模脚本 简单易用 性能受限于 GIL
Playwright JS 密集型站点 真实浏览器环境 资源消耗大

下一步可以做什么

技术深化:

  1. 学习 Celery 实现分布式任务调度
  2. 研究 Kafka 构建实时数据管道
  3. 探索 Kubernetes 部署爬虫集群

业务扩展:

  1. 增加数据清洗算法(NLP、实体识别)
  2. 接入实时告警系统(价格异动通知)
  3. 构建数据分析看板(BI 可视化)

学习资源:

🌟 文末

好啦~以上就是本期的全部内容啦!如果你在实践过程中遇到任何疑问,欢迎在评论区留言交流,我看到都会尽量回复~咱们下期见!

小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦~
三连就是对我写作道路上最好的鼓励与支持! ❤️🔥

✅ 专栏持续更新中|建议收藏 + 订阅

墙裂推荐订阅专栏 👉 《Python爬虫实战》,本专栏秉承着以“入门 → 进阶 → 工程化 → 项目落地”的路线持续更新,争取让每一期内容都做到:

✅ 讲得清楚(原理)|✅ 跑得起来(代码)|✅ 用得上(场景)|✅ 扛得住(工程化)

📣 想系统提升的小伙伴:强烈建议先订阅专栏 《Python爬虫实战》,再按目录大纲顺序学习,效率十倍上升~

✅ 互动征集

想让我把【某站点/某反爬/某验证码/某分布式方案】等写成某期实战?

评论区留言告诉我你的需求,我会优先安排实现(更新)哒~


⭐️ 若喜欢我,就请关注我叭~(更新不迷路)
⭐️ 若对你有用,就请点赞支持一下叭~(给我一点点动力)
⭐️ 若有疑问,就请评论留言告诉我叭~(我会补坑 & 更新迭代)


✅ 免责声明

本文爬虫思路、相关技术和代码仅用于学习参考,对阅读本文后的进行爬虫行为的用户本作者不承担任何法律责任。

使用或者参考本项目即表示您已阅读并同意以下条款:

  • 合法使用: 不得将本项目用于任何违法、违规或侵犯他人权益的行为,包括但不限于网络攻击、诈骗、绕过身份验证、未经授权的数据抓取等。
  • 风险自负: 任何因使用本项目而产生的法律责任、技术风险或经济损失,由使用者自行承担,项目作者不承担任何形式的责任。
  • 禁止滥用: 不得将本项目用于违法牟利、黑产活动或其他不当商业用途。
  • 使用或者参考本项目即视为同意上述条款,即 “谁使用,谁负责” 。如不同意,请立即停止使用并删除本项目。!!!
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐