异步爬虫实战:Python + aiohttp + asyncio 高并发爬取百万级数据
本文从零开始,用异步HTTP请求:aiohttp + 连接池复用;并发控制:Semaphore限制同时请求数量;代理IP池:轮询代理,避免被封;自动重试:tenacity异步重试;Redis去重:百万级数据去重速度极快;异步数据存储:aiosqlite避免IO阻塞;日志监控:实时记录爬取状态。异步爬虫的核心优势是高并发、速度快,但要注意反爬、并发控制、去重、重试等问题,合法合规使用。希望这篇文章能
当数据量达到百万级时,传统的同步爬虫(requests + 多线程)会遇到严重的性能瓶颈:线程切换开销大、IO阻塞严重、速度慢。异步爬虫(aiohttp + asyncio)是解决这个问题的核心——它利用单线程+事件循环,避免了线程切换开销,IO操作时自动切换到其他任务,并发能力极强,速度是同步爬虫的5-10倍。
本文从零开始,用Python 3.10+ + aiohttp + aiosqlite + Redis(去重) + tenacity(重试)实现一个生产级高并发异步爬虫,涵盖:异步HTTP请求、并发控制、代理IP池、自动重试、Redis去重、异步数据存储、日志监控。
一、核心需求与技术选型
1. 核心需求(百万级数据必备)
| 需求 | 说明 |
|---|---|
| 高并发 | 支持50-100并发请求(根据网站反爬调整) |
| 去重 | 避免重复爬取URL(百万级用Redis) |
| 代理IP | 高并发容易被封,需要代理池 |
| 自动重试 | 请求失败(超时、5xx)自动重试3次 |
| 异步存储 | 避免IO阻塞,用异步数据库/文件 |
| 日志监控 | 实时监控爬取进度、失败率、速度 |
2. 技术选型(为什么选这些?)
| 技术 | 版本 | 用途 | 优势 |
|---|---|---|---|
| Python | 3.10+ | 开发语言 | 异步语法成熟(async/await) |
| aiohttp | 3.9+ | 异步HTTP客户端 | 性能极强,支持异步请求、连接池 |
| asyncio | 内置 | 异步框架 | Python原生异步事件循环 |
| aiosqlite | 0.19+ | 异步SQLite | 轻量、无需安装、异步IO |
| Redis | 7.0+ | 去重+缓存 | 百万级数据去重速度极快 |
| tenacity | 8.2+ | 异步重试 | 简单易用,支持异步函数 |
| fake_useragent | 1.4+ | 随机User-Agent | 反爬基础 |
| logging | 内置 | 日志监控 | 实时记录爬取状态 |
二、环境准备
1. 安装Python 3.10+
下载地址:https://www.python.org/downloads/
2. 安装依赖库
# 核心依赖
pip install aiohttp aiosqlite redis tenacity fake_useragent
# 可选:Redis(如果用本地去重则不需要)
# Windows下载:https://github.com/microsoftarchive/redis/releases
# Mac/Linux:brew install redis / apt-get install redis
3. 启动Redis(用于去重)
# Windows:双击redis-server.exe
# Mac/Linux:redis-server
三、项目结构
async-crawler
├── config.py # 配置文件(代理、并发数、重试次数等)
├── crawler.py # 爬虫核心(异步请求、重试、代理)
├── storage.py # 异步数据存储(aiosqlite)
├── deduplicator.py # 去重(Redis)
├── main.py # 主程序(启动爬虫)
└── requirements.txt # 依赖列表
四、核心代码实现
1. 配置文件(config.py)
import os
class Config:
# ==================== 爬虫配置 ====================
MAX_CONCURRENCY = 50 # 最大并发数(根据网站反爬调整,建议10-100)
RETRY_TIMES = 3 # 重试次数
TIMEOUT = 10 # 请求超时时间(秒)
DELAY_MIN = 0.5 # 随机延迟最小值(秒)
DELAY_MAX = 2 # 随机延迟最大值(秒)
# ==================== Redis配置(去重) ====================
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_KEY = "async_crawler:visited_urls" # 去重Key
# ==================== 数据库配置 ====================
DB_PATH = "async_crawler.db"
# ==================== 代理IP池(可选,没有则留空) ====================
# 格式:["http://ip:port", "http://ip:port"]
PROXY_POOL = [
# "http://127.0.0.1:7890", # 示例:本地代理
]
# ==================== 测试URL(用httpbin.org,避免法律风险) ====================
# 实际项目换成你要爬的网站(合法合规!)
TEST_URL_TEMPLATE = "https://httpbin.org/get?page={page}"
MAX_PAGE = 10000 # 测试1万页,实际项目换成百万级
2. 去重模块(deduplicator.py)
用Redis的Set去重,百万级数据速度极快:
import redis
from config import Config
class RedisDeduplicator:
def __init__(self):
self.redis_client = redis.Redis(
host=Config.REDIS_HOST,
port=Config.REDIS_PORT,
db=Config.REDIS_DB,
decode_responses=True
)
self.key = Config.REDIS_KEY
async def is_visited(self, url):
"""检查URL是否已爬取"""
return self.redis_client.sismember(self.key, url)
async def mark_visited(self, url):
"""标记URL为已爬取"""
self.redis_client.sadd(self.key, url)
async def clear(self):
"""清空去重记录(测试用)"""
self.redis_client.delete(self.key)
3. 异步数据存储(storage.py)
用aiosqlite异步存储数据,避免IO阻塞:
import aiosqlite
from datetime import datetime
from config import Config
class AsyncSQLiteStorage:
def __init__(self):
self.db_path = Config.DB_PATH
async def init_db(self):
"""初始化数据库表"""
async with aiosqlite.connect(self.db_path) as conn:
await conn.execute('''
CREATE TABLE IF NOT EXISTS data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE NOT NULL,
content TEXT,
status_code INTEGER,
crawl_time TEXT
)
''')
await conn.commit()
async def save(self, url, content, status_code):
"""异步保存数据"""
crawl_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
async with aiosqlite.connect(self.db_path) as conn:
await conn.execute('''
INSERT OR REPLACE INTO data (url, content, status_code, crawl_time)
VALUES (?, ?, ?, ?)
''', (url, content, status_code, crawl_time))
await conn.commit()
except Exception as e:
print(f"保存失败:{url},错误:{e}")
4. 爬虫核心(crawler.py)
包含:异步请求、并发控制、代理IP、自动重试、随机User-Agent:
import asyncio
import random
import logging
from aiohttp import ClientSession, ClientTimeout, TCPConnector
from fake_useragent import UserAgent
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from config import Config
from deduplicator import RedisDeduplicator
from storage import AsyncSQLiteStorage
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler('crawler.log'), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
class AsyncCrawler:
def __init__(self):
self.deduplicator = RedisDeduplicator()
self.storage = AsyncSQLiteStorage()
self.ua = UserAgent()
self.semaphore = asyncio.Semaphore(Config.MAX_CONCURRENCY) # 并发控制
self.proxy_index = 0 # 代理IP轮询索引
def get_random_proxy(self):
"""获取随机代理IP(轮询)"""
if not Config.PROXY_POOL:
return None
proxy = Config.PROXY_POOL[self.proxy_index % len(Config.PROXY_POOL)]
self.proxy_index += 1
return proxy
def get_random_headers(self):
"""获取随机请求头"""
return {
"User-Agent": self.ua.random,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "keep-alive"
}
@retry(
stop=stop_after_attempt(Config.RETRY_TIMES),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((asyncio.TimeoutError, Exception)),
before_sleep=lambda retry_state: logger.warning(
f"请求失败,重试第 {retry_state.attempt_number} 次..."
)
)
async def fetch(self, session, url):
"""异步请求URL(带重试)"""
proxy = self.get_random_proxy()
headers = self.get_random_headers()
timeout = ClientTimeout(total=Config.TIMEOUT)
async with self.semaphore: # 并发控制
# 随机延迟(反爬)
await asyncio.sleep(random.uniform(Config.DELAY_MIN, Config.DELAY_MAX))
async with session.get(
url,
headers=headers,
proxy=proxy,
timeout=timeout,
ssl=False # 测试用,实际项目根据需要开启
) as response:
content = await response.text()
logger.info(f"请求成功:{url},状态码:{response.status}")
return {
"url": url,
"content": content,
"status_code": response.status
}
async def crawl_url(self, session, url):
"""爬取单个URL"""
# 1. 检查是否已爬取
if await self.deduplicator.is_visited(url):
logger.info(f"URL已爬取,跳过:{url}")
return
# 2. 爬取URL
try:
result = await self.fetch(session, url)
# 3. 保存数据
await self.storage.save(
url=result["url"],
content=result["content"],
status_code=result["status_code"]
)
# 4. 标记为已爬取
await self.deduplicator.mark_visited(url)
except Exception as e:
logger.error(f"爬取失败:{url},错误:{e}")
async def run(self, urls):
"""启动爬虫"""
# 初始化数据库
await self.storage.init_db()
# 创建TCP连接池(复用连接,提升性能)
connector = TCPConnector(limit=Config.MAX_CONCURRENCY, limit_per_host=10)
async with ClientSession(connector=connector) as session:
# 创建任务列表
tasks = [self.crawl_url(session, url) for url in urls]
# 并发执行任务
await asyncio.gather(*tasks)
logger.info("========== 爬虫完成 ==========")
5. 主程序(main.py)
生成测试URL,启动爬虫:
import asyncio
from config import Config
from crawler import AsyncCrawler
def generate_test_urls():
"""生成测试URL(用httpbin.org)"""
urls = []
for page in range(1, Config.MAX_PAGE + 1):
url = Config.TEST_URL_TEMPLATE.format(page=page)
urls.append(url)
return urls
async def main():
# 1. 生成URL列表
urls = generate_test_urls()
print(f"生成URL数量:{len(urls)}")
# 2. 初始化爬虫
crawler = AsyncCrawler()
# 3. 清空去重记录(测试用)
await crawler.deduplicator.clear()
# 4. 启动爬虫
await crawler.run(urls)
if __name__ == "__main__":
# 运行异步主程序
asyncio.run(main())
五、避坑指南(非常重要!)
1. 并发控制
- 并发数不能太高:一般10-100,根据网站反爬调整,太高会被封IP;
- 用Semaphore:限制同时请求的数量,避免给对方服务器造成压力。
2. 反爬措施
- 随机User-Agent:用fake_useragent生成随机User-Agent;
- 随机延迟:每次请求前随机延迟0.5-2秒;
- 代理IP池:高并发必须用代理,否则容易被封;
- 请求头完整:模拟真实浏览器的请求头(Accept、Accept-Language、Connection等)。
3. 异常处理与重试
- 用tenacity:简单易用的异步重试库;
- 重试次数不要太多:3次左右,太多会浪费时间;
- 重试间隔:用指数退避(2秒、4秒、8秒),避免立即重试。
4. 去重
- 百万级用Redis:本地集合内存不够,Redis的Set去重速度极快;
- 去重要及时:爬取成功后立即标记为已爬取,避免重复请求。
5. 数据存储
- 用异步数据库:aiosqlite、asyncpg(PostgreSQL)、motor(MongoDB),避免IO阻塞;
- 批量存储:如果数据量极大,可以批量存储,减少数据库连接次数。
6. 合法合规
- 遵守robots.txt:查看网站的robots.txt,不要爬取禁止的内容;
- 控制爬取频率:不要给对方服务器造成压力;
- 仅供学习研究:不要用于商业用途、恶意爬取、攻击网站;
- 保护用户隐私:不要爬取、存储、传播用户的个人信息。
六、性能优化建议
1. 连接池复用
- aiohttp的TCPConnector:复用TCP连接,减少连接建立开销;
- 设置合理的连接数:
limit=Config.MAX_CONCURRENCY,limit_per_host=10。
2. 异步IO全链路
- 所有IO操作都用异步:HTTP请求、数据库存储、Redis操作,避免任何同步IO阻塞;
- 不要用同步库:比如requests、sqlite3,会阻塞事件循环。
3. 分布式爬虫
- 百万级以上用分布式:用Celery + Redis Queue(RQ)或者Scrapy-Redis,多台机器同时爬取;
- 任务分发:Redis作为任务队列,分发URL到不同的爬虫节点。
4. 监控与告警
- 实时监控:用Prometheus + Grafana监控爬取进度、失败率、速度;
- 告警:失败率超过10%时发送邮件/钉钉告警。
七、测试运行
1. 启动Redis
redis-server
2. 运行爬虫
python main.py
3. 查看日志
- 控制台实时输出日志;
- 查看
crawler.log文件; - 查看
async_crawler.db数据库,验证数据是否保存成功。
八、总结
本文从零开始,用Python + aiohttp + asyncio + Redis + aiosqlite实现了一个生产级高并发异步爬虫,涵盖了:
- 异步HTTP请求:aiohttp + 连接池复用;
- 并发控制:Semaphore限制同时请求数量;
- 代理IP池:轮询代理,避免被封;
- 自动重试:tenacity异步重试;
- Redis去重:百万级数据去重速度极快;
- 异步数据存储:aiosqlite避免IO阻塞;
- 日志监控:实时记录爬取状态。
异步爬虫的核心优势是高并发、速度快,但要注意反爬、并发控制、去重、重试等问题,合法合规使用。希望这篇文章能帮你快速上手异步爬虫,有问题可以在评论区交流!
更多推荐


所有评论(0)