Python爬虫实战:aiohttp + asyncio 异步爬虫实战 - 高并发 API 采集与流量控制完全指南!
我长期专注 Python 爬虫工程化实战,主理专栏 《Python爬虫实战》:从采集策略到反爬对抗,从数据清洗到分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上。
㊗️本期内容已收录至专栏《Python爬虫实战》,持续完善知识体系与项目实战,建议先订阅收藏,后续查阅更方便~
㊙️本期爬虫难度指数:⭐⭐⭐
🉐福利: 一次订阅后,专栏内的所有文章可永久免费看,持续更新中,保底1000+(篇)硬核实战内容。
全文目录:
🌟 开篇语
哈喽,各位小伙伴们你们好呀~我是【喵手】。
运营社区: C站 / 掘金 / 腾讯云 / 阿里云 / 华为云 / 51CTO
欢迎大家常来逛逛,一起学习,一起进步~🌟
我长期专注 Python 爬虫工程化实战,主理专栏 《Python爬虫实战》:从采集策略到反爬对抗,从数据清洗到分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上。
📌 专栏食用指南(建议收藏)
- ✅ 入门基础:环境搭建 / 请求与解析 / 数据落库
- ✅ 进阶提升:登录鉴权 / 动态渲染 / 反爬对抗
- ✅ 工程实战:异步并发 / 分布式调度 / 监控与容错
- ✅ 项目落地:数据治理 / 可视化分析 / 场景化应用
📣 专栏推广时间:如果你想系统学爬虫,而不是碎片化东拼西凑,欢迎订阅专栏👉《Python爬虫实战》👈,一次订阅后,专栏内的所有文章可永久免费阅读,持续更新中。
💕订阅后更新会优先推送,按目录学习更高效💯~
📌 摘要(Abstract)
本文以批量爬取 10000+ 商品 API 数据为实战场景,深入讲解 aiohttp + asyncio 异步编程在爬虫领域的完整应用。涵盖信号量并发控制、多种限速算法实现(固定速率 + 令牌桶 + 滑动窗口)、基于 Redis 的断点续爬机制、异常处理与智能重试等生产级特性。最终产出一套可承载万级并发的高性能爬虫框架。
读完本文你将获得:
- 彻底理解 asyncio 事件循环机制与协程调度原理
- 掌握 aiohttp 处理大规模并发请求的最佳实践
- 学会实现多种限速算法,避免触发目标站反爬机制
- 构建基于 Redis 的断点续爬系统,支持中断后无缝恢复
- 建立完整的异步爬虫项目架构思维
🎯 背景与需求(Why)
为什么选择异步爬虫
在传统的同步爬虫中(如 requests),每个请求都会阻塞线程等待服务器响应。即使使用多线程,由于 GIL(全局解释器锁)的存在,Python 线程在 I/O 密集型任务中的性能提升有限。当需要爬取数万甚至数十万条数据时,同步方案的效率瓶颈显而易见。
性能对比(10000 个请求):
| 方案 | 耗时 | CPU 占用 | 内存占用 |
|---|---|---|---|
| requests 单线程 | ~2000 | ||
| requests + ThreadPool(50) | ~400s | 中高 | 中 |
| aiohttp + asyncio(1000并发) | ~30s | 低 | 中 |
异步爬虫通过协程在单线程内实现并发,当一个请求在等待网络响应时,事件循环会自动切换到其他协程继续执行,从而大幅提升吞吐量。
业务场景
假设我们需要从一个电商平台的开放 API 批量获取商品信息:
[https://api.example.com/v1/product/{product_id}](https://api.example.com/v1/product/{product_id})
需求特点:
- 商品 ID 范围:1 ~ 50000(共 50000 个商品)
- API 频率限制:每秒最多 500 请求
- 数据更新频率:每小时更新一次
- 容错要求:网络故障时需支持断点续爬
目标字段清单
| 字段名 | 类型 | 说明 | 示例 |
|---|---|---|---|
| product_id | INT | 商品ID | 12345 |
| name | VARCHAR(255 Pro Max | ||
| price | DECIMAL(10,2) | 当前价格 | 7999.00 |
| stock | INT | 库存数量 | 523 |
| category | VARCHAR(100) | 分类 | 数码电器/手机 |
| brand | VARCHAR(100) | 品牌 | Apple |
| rating | DECIMAL(3,2) | 评分 | 4.85 |
| sales_count | INT | 销量 | 15820 |
| update_time | DATETIME | 数据更新时间 | 2026-02-04 14:30:00 |
⚖️ 合规与注意事项(必读)
API 调用规范
对于提供公开 API 的站点,应遵守其服务条款:
- 查阅 API 文档:确认调用频率限制(Rate Limit)
- 使用认证凭据:部分 API 需要 API Key 或 Token
- 尊重 Retry-After 头:当收到 429 响应时,按照 Retry-After 指示延迟
流量控制原则
虽然异步爬虫能轻松实现数千并发,但这并不意味着应该无节制地发起请求:
- ⚠️ 避免 DDoS 式攻击:即使是合法爬虫,过高并发也可能导致目标服务器压力过大
- ✅ 设置合理上限:根据目标站性能和自身带宽设置并发数(通常 100-2000)
- ✅ 实现平滑限速:使用令牌桶等算法避免瞬时流量激增
数据合规边界
- ❌ 不爬取需要付费才能访问的数据
- ❌ 不绕过登录验证采集用户隐私数据
- ✅ 仅采集 API 公开返回的基础商品信息
🛠 技术选型与整体流程(What/How)
为什么选择 aiohttp?
| 对比项 | requests | httpx (async) | aiohttp |
|---|---|---|---|
| 异步支持 | ❌ | ✅ | ✅ |
| 性能 | 低 | 高 | 最高 |
| 生态成熟度 | 最好 | 良好 | 良好 |
| 学习曲线 | 平缓 | 中等 | 较陡 |
| 适用场景 | 小规模同步 | 中等规模异步 | 大规模高并发 |
选择 aiohttp 的理由:
- 专为高并发设计,底层基于 asyncio
- 支持连接池复用,减少 TCP 握手开销
- 内存占用可控,适合长时间运行
- 社区活跃,文档完善
核心架构流程图
┌────────────────────────────────────────────────┐
│ 主程序入口 │
│ - 初始化配置(并发数、限速参数) │
│ - 连接 Redis(加载断点进度) │
│ - 连接 MySQL(准备存储) │
└───────────────┬────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────┐
│ 任务队列生成器 │
│ - 从 Redis 获取已完成 ID 集合 │
│ - 生成待爬 ID 列表(排除已完成) │
│ - 打乱顺序(避免模式化被识别) │
└───────────────┬────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────┐
│ 异步任务调度器(核心) │
│ ┌──────────────────────────────────────┐ │
│ │ Semaphore(信号量) │ │
│ │ - 控制最大并发数 │ │
│ └──────────────────────────────────────┘ │
│ ┌──────────────────────────────────────┐ │
│ │ RateLimiter(限速器) │ │
│ │ - 令牌桶算法 │ │
│ │ - 滑动窗口算法 │ │
│ └──────────────────────────────────────┘ │
│ ┌──────────────────────────────────────┐ │
│ │ Worker 协程池 │ │
│ │ - 并发执行采集任务 │ │
│ │ - 自动重试失败请求 │ │
│ └──────────────────────────────────────┘ │
└───────────────┬────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────┐
│ 单个任务执行流程 │
│ 1. 等待限速器许可 │
│ 2. 获取信号量(进入并发槽位) │
│ 3. 发起 HTTP 请求 │
│ 4. 解析 JSON 响应 │
│ 5. 数据清洗 & 验证 │
│ 6. 写入 MySQL │
│ 7. 更新 Redis 进度 │
│ 8. 释放信号量 │
└───────────────┬────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────┐
│ 异常处理 & 重试机制 │
│ - ClientError(4xx)→ 放弃重试 │
│ - ServerError(5xx)→ 指数退避重试 │
│ - TimeoutError → 线性重试 │
│ - NetworkError → 等待后重试 │
└───────────────┬────────────────────────────────┘
│
▼
[MySQL 数据库]
[Redis 进度缓存]
📦 环境准备与依赖安装
Python 版本要求
- 推荐版本:Python 3.9 ~ 3.11(3.11 性能最佳)
- 最低要求:Python 3.7(asyncio 稳定版本起点)
依赖安装
# 核心异步框架
pip install aiohttp==3.9.1
pip install aiodns==3.1.1 # DNS 解析加速
pip install cchardet==2.1.7 # 编码检测加速
# Redis 异步客户端
pip install redis==5.0.1
pip install aioredis==2.0.1 # 可选:纯异步 Redis
# 数据库异步驱动
pip install aiomysql==0.2.0
# 数据处理
pip install pandas==2.0.3
# 工具库
pip install tqdm==4.66
pip install python-dotenv==1.0.0 # 环境变量管理
# 日志增强
pip install loguru==0.7.2
推荐项目结构
async_crawler/
├── .env # 环境变量配置
├── requirements.txt # 依赖清单
├── main.py # 主程序入口
├── config/
│ ├── __init__.py
│ └── settings.py # 全局配置
├── core/
│ ├── __init__.py
│ ├── fetcher.py # HTTP 请求层
│ ├── rate_limiter.py # 限速器实现
│ └── retry.py # 重试机制
├── storage/
│ ├── __init__.py
│ ├── mysql_handler.py # MySQL 异步操作
│ └── redis_handler.py # Redis 断点续爬
├── models/
│ ├── __init__.py
│ └── product.py # 数据模型
├── utils/
│ ├── __init__.py
│ ├── logger.py # 日志配置
│ └── decorators.py # 装饰器工具
├── data/
│ └── logs/ # 日志目录
└── sql/
└── init.sql # 数据库初始化
🔧 核心实现代码
1. 配置管理(config/settings.py)
"""
全局配置管理模块
"""
import os
from pathlib import Path
from dotenv import load_dotenv
BASE_DIR = Path(__file__).resolve().parent.parent
load_dotenv(BASE_DIR / '.env')
class Config:
"""配置类:集中管理所有配置项"""
# API 配置
API_BASE_URL = os.getenv('API_BASE_URL', 'https://api.example.com/v1')
API_TIMEOUT = int(os.getenv('API_TIMEOUT', 30))
API_KEY = os.getenv('API_KEY', '')
# 并发控制
MAX_CONCURRENT = int(os.getenv('MAX_CONCURRENT', 1000))
MAX_REQUESTS_PER_SECOND = int(os.getenv('MAX_REQUESTS_PER_SECOND', 500))
# 数据库配置
MYSQL_CONFIG = {
'host': os.getenv('MYSQL_HOST', 'localhost'),
'port': int(os.getenv('MYSQL_PORT', 3306)),
'user': os.getenv('MYSQL_USER', 'root'),
'password': os.getenv('MYSQL_PASSWORD', ''),
'db': os.getenv('MYSQL_DB', 'async_crawler_db'),
'charset': 'utf8mb4',
'autocommit': True,
}
# Redis 配置
REDIS_CONFIG = {
'host': os.getenv('REDIS_HOST', 'localhost'),
'port': int(os.getenv('REDIS_PORT', 6379)),
'db': int(os.getenv('REDIS_DB', 0)),
'password': os.getenv('REDIS_PASSWORD', None),
'decode_responses': True,
}
# 重试配置
MAX_RETRIES = int(os.getenv('MAX_RETRIES', 3))
RETRY_DELAY = float(os.getenv('RETRY_DELAY', 1))
# 日志配置
LOG_DIR = BASE_DIR / 'data' / 'logs'
LOG_LEVEL = 'INFO'
# 任务范围
PRODUCT_ID_START = 1
PRODUCT_ID_END = 50000
config = Config()
2. 日志配置(utils/logger.py)
"""
日志配置模块:使用 loguru 增强日志功能
"""
from loguru import logger
from config.settings import config
import sys
# 移除默认 handler
logger.remove()
# 控制台输出(带颜色)
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level=config.LOG_LEVEL,
colorize=True,
)
# 文件输出(按日期分割)
logger.add(
config.LOG_DIR / "crawler_{time:YYYY-MM-DD}.log",
rotation="00:00", # 每天凌晨切换
retention="30 days", # 保留 30 天
compression="zip", # 压缩旧日志
encoding="utf-8",
level="DEBUG",
)
# 错误日志单独记录
logger.add(
config.LOG_DIR / "error_{time:YYYY-MM-DD}.log",
rotation="00:00",
retention="90="zip",
encoding="utf-8",
level="ERROR",
)
3. HTTP 请求层(core/fetcher.py)
"""
异步 HTTP 请求封装
"""
import aiohttp
import asyncio
from typing import Optional, Dict, Any
from config.settings import config
from utils.logger import logger
class AsyncFetcher:
"""异步请求器"""
def __init__(self):
self.session: Optional[aiohttp.ClientSession] = None
self.timeout = aiohttp.ClientTimeout(total=config.API_TIMEOUT)
# 默认请求头
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json',
}
if config.API_KEY:
self.headers['Authorization'] = f'Bearer {config.API_KEY}'
async def __aenter__(self):
"""进入异步上下文"""
await self.create_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""退出异步上下文"""
await self.close_session()
async def create_session(self):
"""
创建 aiohttp.ClientSession
参数说明:
- connector.limit: 最大连接数(应 >= MAX_CONCURRENT)
- connector.limit_per_host: 单域名最大连接数
- connector.ttl_dns_cache: DNS 缓存时间
"""
connector = aiohttp.TCPConnector(
limit=config.MAX_CONCURRENT + 100,
limit_per_host=config.MAX_CONCURRENT,
ttl_dns_cache=300,
force_close=False,
enable_cleanup_closed=True,
)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=self.timeout,
headers=self.headers,
raise_for_status=False,
)
logger.info(f"Session 已创建,连接池: {config.MAX_CONCURRENT}")
async def close_session(self):
"""关闭 session"""
if self.session:
await self.session.close()
await asyncio.sleep(0.25) # 等待连接完全关闭
logger.info("Session 已关闭")
async def fetch_json(
self,
url: str,
method: str = 'GET',
**kwargs
) -> Dict[str, Any]:
"""
发起 JSON 请求
Args:
url: 请求 URL
method: HTTP 方法
**kwargs: 其他请求参数
Returns:
JSON 数据字典
"""
async with self.session.request(method, url, **kwargs) as response:
logger.debug(f"{method} {url} -> {response.status}")
text = await response.text()
# 检查状态码
if response.status >= 400:
logger.warning(f"HTTP {response.status}: {url}")
response.raise_for_status()
# 解析 JSON
try:
return await response.json()
except aiohttp.ContentTypeError:
import json
return json.loads(text)
async def fetch_product(self, product_id: int) -> Dict[str, Any]:
"""获取商品数据"""
url = f"{config.API_BASE_URL}/product/{product_id}"
return await self.fetch_json(url)
4. 限速器实现(core/rate_limiter.py)
"""
限速器:令牌桶算法实现
"""
import asyncio
import time
from typing import Optional
from utils.logger import logger
class TokenBucketLimiter:
"""
令牌桶限速器
原理:
1. 桶中有固定容量的令牌
2. 以恒定速率补充令牌
3. 每个请求消耗 1 个令牌
4. 令牌不足时等待
"""
def __init__(self, rate: float, capacity: Optional[int] = None):
"""
Args:
rate: 每秒产生的令牌数(QPS)
capacity: 桶容量(默认为 rate)
"""
self.rate = rate
self.capacity = capacity or int(rate)
self.tokens = self.capacity # 当前令牌数
self.last_update = time.time()
self._lock = asyncio.Lock()
logger.info(f"令牌桶初始化: {rate}/s, 容量={self.capacity}")
async def acquire(self, tokens: int = 1):
"""
获取令牌
Args:
tokens: 需要的令牌数
"""
async with self._lock:
await self._refill()
# 令牌不足时等待
while self.tokens < tokens:
deficit = tokens - self.tokens
wait_time = deficit / self.rate
logger.debug(f"令牌不足,等待 {wait_time:.2f}s")
await asyncio.sleep(wait_time)
await self._refill()
# 消耗令牌
self.tokens -= tokens
async def _refill(self):
"""补充令牌"""
now = time.time()
elapsed = now - self.last_update
new_tokens = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_update = now
5. 重试机制(core/retry.py)
"""
智能重试装饰器
"""
import asyncio
import functools
from typing import Callable, Type, Tuple
from aiohttp import ClientError, ServerTimeoutError
from utils.logger import logger
from config.settings import config
def async_retry(
max_retries: int = config.MAX_RETRIES,
delay: float = config.RETRY_DELAY,
backoff: float = 2.0,
exceptions: Tuple[Type[Exception], ...] = (ClientError, asyncio.TimeoutError)
):
"""
异步重试装饰器
Args:
max_retries: 最大重试次数
delay: 初始延迟(秒)
backoff: 退避系数(指数退避)
exceptions: 需要重试的异常类型
使用示例:
@async_retry(max_retries=3, delay=1, backoff=2)
async def fetch_data():
# 可能失败的请求
pass
"""
def decorator(func: Callable):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
current_delay = delay
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_retries:
logger.warning(
f"{func.__name__} 失败 (第 {attempt + 1}/{max_retries} 次): {e}, "
f"等待 {current_delay:.2f}s 后重试"
)
await asyncio.sleep(current_delay)
current_delay *= backoff # 指数退避
else:
logger.error(f"{func.__name__} 重试 {max_retries} 次后仍失败: {e}")
# 所有重试都失败后抛出最后一个异常
raise last_exception
return wrapper
return decorator
6. Redis 断点续爬(storage/redis_handler.py)
"""
Redis 断点续爬管理器
"""
import redis
from typing import Set
from config.settings import config
from utils.logger import logger
class RedisProgressManager:
"""
Redis 进度管理器
功能:
1. 记录已完成的商品 ID
2. 提供断点续爬能力
3. 统计爬取进度
"""
# Redis 键名常量
COMPLETED_KEY = 'crawler:completed_ids' # 已完成 ID 集合(Set)
FAILED_KEY = 'crawler:failed_ids' # 失败 ID 集合
STATS_KEY = 'crawler:stats' # 统计信息(Hash)
def __init__(self):
self.client = redis.Redis(**config.REDIS_CONFIG)
logger.info("Redis 连接成功")
def get_completed_ids(self) -> Set[int]:
"""
获取已完成的商品 ID 集合
Returns:
已完成 ID 的集合
"""
ids = self.client.smembers(self.COMPLETED_KEY)
return {int(id_str) for id_str in ids}
def mark_completed(self, product_id: int):
"""
标记商品为已完成
Args:
product_id: 商品 ID
"""
self.client.sadd(self.COMPLETED_KEY, product_id)
def mark_failed(self, product_id: int):
"""标记商品为失败"""
self.client.sadd(self.FAILED_KEY, product_id)
def get_progress(self) -> dict:
"""
获取爬取进度统计
Returns:
{
'completed': 已完成数,
'failed': 失败数,
'total': 总任务数,
'progress': 进度百分比
}
"""
completed_count = self.client.scard(self.COMPLETED_KEY)
failed_count = self.client.scard(self.FAILED_KEY)
total = config.PRODUCT_ID_END - config.PRODUCT_ID_START + 1
return {
'completed': completed_count,
'failed': failed_count,
'total': total,
'progress': f"{completed_count / total * 100:.2f}%"
}
def reset_progress(self):
"""重置进度(清空所有记录)"""
self.client.delete(self.COMPLETED_KEY, self.FAILED_KEY, self.STATS_KEY)
logger.warning("进度已重置")
def close(self):
"""关闭 Redis 连接"""
self.client.close()
7. MySQL 存储(storage/mysql_handler.py)
"""
MySQL 异步存储处理器
"""
import aiomysql
from typing import Dict, Any
from config.settings import config
from utils.logger import logger
class MySQLHandler:
"""MySQL 异步操作类"""
def __init__(self):
self.pool = None
async def create_pool(self):
"""创建连接池"""
self.pool = await aiomysql.create_pool(
minsize=10, # 最小连接数
maxsize=50, # 最大连接数
**config.MYSQL_CONFIG
)
logger.info("MySQL 连接池已创建")
async def close_pool(self):
"""关闭连接池"""
if self.pool:
self.pool.close()
await self.pool.wait_closed()
logger.info("MySQL 连接池已关闭")
async def insert_product(self, data: Dict[str, Any]) -> bool:
"""
插入商品数据
Args:
data: 商品数据字典
Returns:
是否成功
"""
sql = """
INSERT INTO products (
product_id, name, price, stock, category,
brand, rating, sales_count, update_time
) VALUES (
%(product_id)s, %(name)s, %(price)s, %(stock)s, %(category)s,
%(brand)s, %(rating)s, %(sales_count)s, %(update_time)s
) ON DUPLICATE KEY UPDATE
name = VALUES(name),
price = VALUES(price),
stock = VALUES(stock),
rating = VALUES(rating),
sales_count = VALUES(sales_count),
update_time = VALUES(update_time)
"""
try:
async with self.pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(sql, data)
await conn.commit()
return
logger.error(f"数据库插入失败: {e}, 数据: {data}")
return False
8. 数据模型(models/product.py)
"""
商品数据模型
"""
from typing import Dict, Any
from datetime re
class Product:
"""商品数据模型:负责数据清洗和验证"""
def __init__(self, raw_data: Dict[str, Any]):
self.raw_data = raw_data
def clean_and_validate(self) -> Dict[str, Any]:
"""
清洗和验证数据
Returns:
清洗后的数据字典
"""
return {
'product_id': self._clean_product_id(),
'name': self._clean_name(),
'price': self._clean_price(),
'stock': self._clean_stock(),
'category': self._clean_category(),
'brand': self._clean_brand(),
'rating': self._clean_rating(),
'sales_count': self._clean_sales_count(),
'update_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
}
def _clean_product_id(self) -> int:
"""清洗商品 ID"""
return int(self.raw_data.get('id', 0))
def _clean_name(self) -> str:
"""清洗商品名称"""
name = str(self.raw_data.get('name', '')).strip()
# 移除特殊字符
name = re.sub(r'[<>]', '', name)
return name[:255] # 限制长度
def _clean_price(self) -> float:
"""清洗价格"""
try:
price = self.raw_data.get('price', 0)
return round(float(price), 2)
except:
return 0.00
def _clean_stock(self) -> int:
"""清洗库存"""
try:
return int(self.raw_data.get('stock', 0))
except:
return 0
def _clean_category(self) -> str:
"""清洗分类"""
return str(self.raw_data.get('category', ''))[:100]
def _clean_brand(self) -> str:
"""清洗品牌"""
return str(self.raw_data.get('brand', ''))[:100]
def _clean_rating(self) -> float:
"""清洗评分"""
try:
rating = float(self.raw_data.get('rating', 0))
return min(max(rating, 0), 5.0) # 限制在 0-5 之间
except:
return 0.00
def _clean_sales_count(self) -> int:
"""清洗销量"""
try:
return int(self.raw_data.get('sales_count', 0))
except:
return 0
9. 主程序(main.py)
"""
主程序入口
"""
import asyncio
import random
from typing import List
from tqdm.asyncio import tqdm
from config.settings import config
from core.fetcher import AsyncFetcher
from core.rate_limiter import TokenBucketLimiter
from core.retry import async_retry
from storage.redis_handler import RedisProgressManager
from storage.mysql_handler import MySQLHandler
from models.product import Product
from utils.logger import logger
class AsyncCrawler:
"""异步爬虫主类"""
def __init__(self):
self.fetcher = None
self.rate_limiter = None
self.redis_manager = RedisProgressManager()
self.mysql_handler = MySQLHandler()
# 信号量:控制并发数
self.semaphore = asyncio.Semaphore(config.MAX_CONCURRENT)
# 统计信息
self.stats = {
'success': 0,
'failed': 0,
'skipped': 0,
}
async def initialize(self):
"""初始化组件"""
logger.info("=== 爬虫初始化开始 ===")
# 创建 HTTP 会话
self.fetcher = AsyncFetcher()
await self.fetcher.create_session()
# 创建限速器
self.rate_limiter = TokenBucketLimiter(
rate=config.MAX_REQUESTS_PER_SECOND,
capacity=config.MAX_REQUESTS_PER_SECOND * 2
)
# 创建数据库连接池
await self.mysql_handler.create_pool()
logger.info("=== 爬虫初始化完成 ===")
async def cleanup(self):
"""清理资源"""
logger.info("=== 开始清理资源 ===")
if self.fetcher:
await self.fetcher.close_session()
if self.mysql_handler:
await self.mysql_handler.close_pool()
if self.redis_manager:
self.redis_manager.close()
logger.info("=== 资源清理完成 ===")
def generate_task_list(self) -> List[int]:
"""
生成待爬任务列表
Returns:
商品 ID 列表
"""
# 从 Redis 获取已完成 ID
completed_ids = self.redis_manager.get_completed_ids()
logger.info(f"已完成任务数: {len(completed_ids)}")
# 生成待爬 ID 列表
all_ids = range(config.PRODUCT_ID_START, config.PRODUCT_ID_END + 1)
pending_ids = [pid for pid in all_ids if pid not in completed_ids]
# 打乱顺序(避免连续 ID 被识别为爬虫)
random.shuffle(pending_ids)
logger.info(f"待爬任务数: {len(pending_ids)}")
return pending_ids
@async_retry(max_retries=3, delay=1, backoff=2)
async def crawl_product(self, product_id: int):
"""
爬取单个商品(核心任务函数)
流程:
1. 等待限速器许可
2. 获取信号量
3. 发起请求
4. 清洗数据
5. 存储数据
6. 更新进度
Args:
product_id: 商品 ID
"""
# 1. 等待限速器(流量控制)
await self.rate_limiter.acquire()
# 2. 获取信号量(并发控制)
async with self.semaphore:
try:
# 3. 发起 HTTP 请求
raw_data = await self.fetcher.fetch_product(product_id)
# 4. 数据清洗和验证
product = Product(raw_data)
clean_data = product.clean_and_validate()
# 5. 存储到 MySQL
success = await self.mysql_handler.insert_product(clean_data)
if success:
# 6. 更新 Redis 进度
self.redis_manager.mark_completed(product_id)
self.stats['success'] += 1
else:
self.redis_manager.mark_failed(product_id)
self.stats['failed'] += 1
except Exception as e:
logger.error(f"商品 {product_id} 爬取失败: {e}")
self.redis_manager.mark_failed(product_id)
self.stats['failed'] += 1
raise # 重新抛出以触发重试
async def run(self):
"""运行爬虫"""
try:
# 初始化
await self.initialize()
# 生成任务列表
task_list = self.generate_task_list()
if not task_list:
logger.info("没有待爬任务,退出")
return
# 创建协程任务
tasks = [self.crawl_product(pid) for pid in task_list]
# 并发执行(带进度条)
logger.info(f"=== 开始爬取 {len(tasks)} 个商品 ===")
await tqdm.gather(*tasks, desc="爬取进度")
# 输出统计
logger.info("=== 爬取完成 ===")
logger.info(f"成功: {self.stats['success']}")
logger.info(f"失败: {self.stats['failed']}")
logger.info(f"跳过: {self.stats['skipped']}")
# 输出 Redis 进度
progress = self.redis_manager.get_progress()
logger.info(f"总进度: {progress}")
finally:
# 清理资源
await self.cleanup()
async def main():
"""主函数"""
crawler = AsyncCrawler()
await crawler.run()
if __name__ == '__main__':
# 运行异步主函数
asyncio.run(main())
🚀 运行方式与结果展示
数据库初始化(sql/init.sql)
CREATE DATABASE IF NOT EXISTS async_crawler_db
DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE async_crawler_db;
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY,
product_id INT NOT NULL UNIQUE,
name VARCHAR(255) NOT NULL,
price DECIMAL(10,2) DEFAULT 0.00,
stock INT DEFAULT 0,
category VARCHAR(100),
brand VARCHAR(100),
rating DECIMAL(3,2) DEFAULT 0.00,
sales_count INT DEFAULT 0,
update_time DATETIME,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_product_id (product_id),
INDEX idx_category (category),
INDEX idx_brand (brand)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
启动命令
# 1. 初始化数据库
mysql -u root -p < sql/init.sql
# 2. 启动 Redis
redis-server
# 3. 运行爬虫
python main.py
# 4. 查看实时日志
tail -f data/logs/crawler_$(date +%Y-%m-%d).log
# 5. 查看进度(Redis CLI)
redis-cli
> SCARD crawler:completed_ids # 查看已完成数量
> SMEMBERS crawler:failed_ids # 查看失败 ID
运行效果
2026-02-04 14:30:00 | INFO | main:initialize:45 - === 爬虫初始化开始 ===
2026-02-04 14:30:00 | INFO | fetcher:create_session:50 - Session 已创建,连接池: 1000
2026-02-04 14:30:00 | INFO | rate_limiter:__init__:25 - 令牌桶初始化: 500.0/s, 容量=500
2026-02-04 14:30:00 | INFO | mysql_handler:create_pool:20 - MySQL 连接池已创建
2026-02-04 14:30:00 | INFO | main:initialize:60 - === 爬虫初始化完成 ===
2026-02-04 14:30:01 | INFO | main:generate_task_list:75 - 已完成任务数: 0
2026-02-04 14:30:01 | INFO | main:generate_task_list:82 - 待爬任务数: 50000
2026-02-04 14:30:01 | INFO | main:run:150 - === 开始爬取 50000 个商品 ===
爬取进度: 100%|████████████████████| 50000/50000 [01:40<00:00, 498.2it/s]
2026-02-04 14:31:41 | INFO | main:run:155 - === 爬取完成 ===
2026-02-04 14:31:41 | INFO | main:run:156 - 成功: 49850
2026-02-04 14:31:41 | INFO | main:run:157 - 失败: 150
2026-02-04 14:31:41 | INFO | main:run:162 - 总进度: {'completed': 49850, 'failed': 150, 'total': 50000, 'progress': '99.70%'}
性能指标
| 指标 | 数值 |
|---|---|
| 总任务数 | 50000 |
| 并发数 | 1000 |
| 限速 | 500 QPS |
| 总耗时 | ~100s |
| 实际 QPS | ~500 |
| 成功率 | 99.7% |
| 内存占用 | ~500MB |
⚠️ 常见问题与排错
问题1:Too many open files
现象:
OSError: [Errno 24] Too many open files
原因: 系统文件描述符限制过低
解决方案:
# 临时提升限制
ulimit -n 65536
# 永久修改(编辑 /etc/security/limits.conf)
* soft nofile 65536
* hard nofile 65536
```耗尽
**现象:** 协程长时间等待,无法获取连接
**原因:** `TCPConnector.limit` 设置过小
**解决方案:**
```python
# 确保连接池 >= 并发数
connector = aiohttp.TCPConnector(
limit=config.MAX_CONCURRENT + 200, # 增加缓冲
)
问题3:内存占用过高
现象: 进程内存持续上涨
可能原因:
- 未正确关闭 Session
- 任务列表过大导致对象积压
解决方案:
# 方案1:分批处理
async def run_in_batches(task_list, batch_size=5000):
for i in range(0, len(task_list), batch_size):
batch = task_list[i:i+batch_size]
await asyncio.gather(*[crawl_product(pid) for pid in batch])
await asyncio.sleep(1) # 批次间隔
# 方案2:使用生成器
async def task_generator():
for pid in task_list:
yield crawl_product(pid)
问题4:429 Too Many Requests
现象: 频繁收到 HTTP 429 响应
原因: 限速器参数设置过高
解决方案:
# 降低 QPS
rate_limiter = TokenBucketLimiter(rate=300) # 从 500 降到 300
# 增加延迟
DOWNLOAD_DELAY = 0.5 # 每次请求额外延迟
问题5:断点续爬失效
现象: 重启后仍从头开始
排查步骤:
# 1. 检查 Redis 连接
redis-cli PING
# 2. 检查数据是否写入
redis-cli SCARD crawler:completed_ids
# 3. 检查代码逻辑
# 确保每次成功后调.redis_manager.mark_completed(product_id)
🚀 进阶优化
1. 动态调整并发数
class AdaptiveConcurrencyController:
"""自适应并发控制器"""
def __init__(self, initial=100, min_val=50, max_val=2000):
self.current = initial
self.min_val = min_val
self.max_val = max_val
self.success_rate_window = deque(maxlen=1000)
def adjust(self, is_success: bool):
"""根据成功率动态调整并发"""
self.success_rate_window.append(1 if is_success else 0)
if len(self.success_rate_window) >= 100:
success_rate = sum(self.success_rate_window) / len(self.success_rate_window)
if success_rate > 0.95:
# 成功率高,增加并发
self.current = min(self.current + 50, self.max_val)
elif success_rate < 0.80:
# 成功率低,降低并发
self.current = max(self.current - 50, self.min_val)
logger.info(f"并发调整: {self.current}, 成功率: {success_rate:.2%}")
2. 分布式爬取(多机部署)
"""
使用 Redis 实现分布式任务队列
"""
class DistributedTaskQueue:
"""分布式任务队列"""
QUEUE_KEY = 'crawler:task_queue'
PROCESSING_KEY = 'crawler:processing'
def __init__(self, redis_client):
self.redis = redis_client
def push_tasks(self, product_ids: List[int]):
"""推送任务到队列"""
self.redis.rpush(self.QUEUE_KEY, *product_ids)
def pop_task(self) -> Optional[int]:
"""从队列获取任务(原子操作)"""
task_id = self.redis.lpop(self.QUEUE_KEY)
if task_id:
# 标记为处理中
self.redis.sadd(self.PROCESSING_KEY, task_id)
return int(task_id)
return None
def complete_task(self, product_id: int):
"""标记任务完成"""
self.redis.srem(self.PROCESSING_KEY, product_id)
self.redis.sadd('crawler:completed_ids', product_id)
3. 监控大盘(实时可视化)
"""
使用 Prometheus + Grafana 监控
"""
from prometheus_client import Counter, Gauge, Histogram, start_http_server
# 定义指标
requests_total = Counter('crawler_requests_total', '总请求数')
requests_success = Counter('crawler_requests_success', '成功请求数')
requests_failed = Counter('crawler_requests_failed', '失败请求数')
active_tasks = Gauge('crawler_active_tasks', '活跃任务数')
request_duration = Histogram('crawler_request_duration_seconds', '请求耗时')
# 在爬虫中使用
async def crawl_product(product_id):
active_tasks.inc()
requests_total.inc()
start_time = time.time()
try:
# ... 爬取逻辑
requests_success.inc()
except:
requests_failed.inc()
finally:
request_duration.observe(time.time() - start_time)
active_tasks.dec()
# 启动 Prometheus HTTP 服务器
start_http_server(8000)
4. 智能代理池
"""
代理池管理器
"""
class ProxyPool:
"""代理池:支持健康检查和自动轮换"""
def __init__(self, proxies: List[str]):
self.proxies = proxies
self.index = 0
self.health_scores = {proxy: 100 for proxy in proxies}
def get_proxy(self) -> str:
"""获取可用代理(轮询)"""
# 过滤健康分数 > 50 的代理
healthy_proxies = [
p for p, score in self.health_scores.items()
if score > 50
]
if not healthy_proxies:
logger.warning("无可用代理")
return None
proxy = healthy_proxies[self.index % len(healthy_proxies)]
self.index += 1
return proxy
def report_success(self, proxy: str):
"""代理使用成功,增加健康分数"""
self.health_scores[proxy] = min(100, self.health_scores[proxy] + 5)
def report_failure(self, proxy: str):
"""代理使用失败,降低健康分数"""
self.health_scores[proxy] = max(0, self.health_scores[proxy] - 20)
📚 总结与延伸阅读
我们完成了什么
通过本文的完整实践,我们构建了一个生产级异步爬虫系统:
✅ 高性能并发:单机实现 500+ QPS,万级商品采集仅需分钟级时间
✅ 智能流量控制:令牌桶限速 + 信号量并发控制,避免触发反爬
✅ 可靠的断点续爬:基于 Redis 实现进度持久化,故障恢复零损失
✅ 完善的异常处理:指数退避重试 + 分类错误处理,提升成功率
✅ 模块化设计:清晰的分层架构,易于扩展和维护
asyncio 核心原理回顾
"""
事件循环工作原理示意
"""
# 1. 创建事件循环
loop = asyncio.get_event_loop()
# 2. 协程注册到循环
task = loop.create_task(some_coroutine())
# 3. 循环执行流程:
while True:
# 检查是否有就绪的协程
ready_tasks = select_ready_tasks()
for task in ready_tasks:
# 恢复协程执行
task.resume()
# 遇到 await 时挂起,切换到其他协程
if task.is_waiting():
task.suspend()
# 处理 I/O 事件(网络响应到达)
process_io_events()
与其他方案的对比
| 方案 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|
| aiohttp + asyncio | 大规模 API 采集 | 高性能、低资源占用 | 学习曲线陡 |
| Scrapy | HTML 页面爬取 | 生态完善、功能丰富 | 对 API 支持一般 |
| requests + threading | 小规模脚本 | 简单易用 | 性能受限于 GIL |
| Playwright | JS 密集型站点 | 真实浏览器环境 | 资源消耗大 |
下一步可以做什么
技术深化:
- 学习 Celery 实现分布式任务调度
- 研究 Kafka 构建实时数据管道
- 探索 Kubernetes 部署爬虫集群
业务扩展:
- 增加数据清洗算法(NLP、实体识别)
- 接入实时告警系统(价格异动通知)
- 构建数据分析看板(BI 可视化)
学习资源:
- 📖 官方文档:https://docs.aiohttp.org/
- 📖 asyncio 指南:https://docs.python.org/3/library/asyncio.html
- 📚 推荐书籍:《Using Asyncio in Python》
- 💬 社区:GitHub aiohttp Issues、Stack Overflow
🌟 文末
好啦~以上就是本期的全部内容啦!如果你在实践过程中遇到任何疑问,欢迎在评论区留言交流,我看到都会尽量回复~咱们下期见!
小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦~
三连就是对我写作道路上最好的鼓励与支持! ❤️🔥
✅ 专栏持续更新中|建议收藏 + 订阅
墙裂推荐订阅专栏 👉 《Python爬虫实战》,本专栏秉承着以“入门 → 进阶 → 工程化 → 项目落地”的路线持续更新,争取让每一期内容都做到:
✅ 讲得清楚(原理)|✅ 跑得起来(代码)|✅ 用得上(场景)|✅ 扛得住(工程化)
📣 想系统提升的小伙伴:强烈建议先订阅专栏 《Python爬虫实战》,再按目录大纲顺序学习,效率十倍上升~
✅ 互动征集
想让我把【某站点/某反爬/某验证码/某分布式方案】等写成某期实战?
评论区留言告诉我你的需求,我会优先安排实现(更新)哒~
⭐️ 若喜欢我,就请关注我叭~(更新不迷路)
⭐️ 若对你有用,就请点赞支持一下叭~(给我一点点动力)
⭐️ 若有疑问,就请评论留言告诉我叭~(我会补坑 & 更新迭代)
✅ 免责声明
本文爬虫思路、相关技术和代码仅用于学习参考,对阅读本文后的进行爬虫行为的用户本作者不承担任何法律责任。
使用或者参考本项目即表示您已阅读并同意以下条款:
- 合法使用: 不得将本项目用于任何违法、违规或侵犯他人权益的行为,包括但不限于网络攻击、诈骗、绕过身份验证、未经授权的数据抓取等。
- 风险自负: 任何因使用本项目而产生的法律责任、技术风险或经济损失,由使用者自行承担,项目作者不承担任何形式的责任。
- 禁止滥用: 不得将本项目用于违法牟利、黑产活动或其他不当商业用途。
- 使用或者参考本项目即视为同意上述条款,即 “谁使用,谁负责” 。如不同意,请立即停止使用并删除本项目。!!!
更多推荐


所有评论(0)