当数据量达到百万级时,传统的同步爬虫(requests + 多线程)会遇到严重的性能瓶颈:线程切换开销大、IO阻塞严重、速度慢。异步爬虫(aiohttp + asyncio)是解决这个问题的核心——它利用单线程+事件循环,避免了线程切换开销,IO操作时自动切换到其他任务,并发能力极强,速度是同步爬虫的5-10倍。

本文从零开始,用Python 3.10+ + aiohttp + aiosqlite + Redis(去重) + tenacity(重试)实现一个生产级高并发异步爬虫,涵盖:异步HTTP请求、并发控制、代理IP池、自动重试、Redis去重、异步数据存储、日志监控


一、核心需求与技术选型

1. 核心需求(百万级数据必备)

需求 说明
高并发 支持50-100并发请求(根据网站反爬调整)
去重 避免重复爬取URL(百万级用Redis)
代理IP 高并发容易被封,需要代理池
自动重试 请求失败(超时、5xx)自动重试3次
异步存储 避免IO阻塞,用异步数据库/文件
日志监控 实时监控爬取进度、失败率、速度

2. 技术选型(为什么选这些?)

技术 版本 用途 优势
Python 3.10+ 开发语言 异步语法成熟(async/await
aiohttp 3.9+ 异步HTTP客户端 性能极强,支持异步请求、连接池
asyncio 内置 异步框架 Python原生异步事件循环
aiosqlite 0.19+ 异步SQLite 轻量、无需安装、异步IO
Redis 7.0+ 去重+缓存 百万级数据去重速度极快
tenacity 8.2+ 异步重试 简单易用,支持异步函数
fake_useragent 1.4+ 随机User-Agent 反爬基础
logging 内置 日志监控 实时记录爬取状态

二、环境准备

1. 安装Python 3.10+

下载地址:https://www.python.org/downloads/

2. 安装依赖库

# 核心依赖
pip install aiohttp aiosqlite redis tenacity fake_useragent

# 可选:Redis(如果用本地去重则不需要)
# Windows下载:https://github.com/microsoftarchive/redis/releases
# Mac/Linux:brew install redis / apt-get install redis

3. 启动Redis(用于去重)

# Windows:双击redis-server.exe
# Mac/Linux:redis-server

三、项目结构

async-crawler
├── config.py          # 配置文件(代理、并发数、重试次数等)
├── crawler.py         # 爬虫核心(异步请求、重试、代理)
├── storage.py         # 异步数据存储(aiosqlite)
├── deduplicator.py    # 去重(Redis)
├── main.py            # 主程序(启动爬虫)
└── requirements.txt   # 依赖列表

四、核心代码实现

1. 配置文件(config.py)

import os

class Config:
    # ==================== 爬虫配置 ====================
    MAX_CONCURRENCY = 50  # 最大并发数(根据网站反爬调整,建议10-100)
    RETRY_TIMES = 3       # 重试次数
    TIMEOUT = 10           # 请求超时时间(秒)
    DELAY_MIN = 0.5        # 随机延迟最小值(秒)
    DELAY_MAX = 2          # 随机延迟最大值(秒)

    # ==================== Redis配置(去重) ====================
    REDIS_HOST = "localhost"
    REDIS_PORT = 6379
    REDIS_DB = 0
    REDIS_KEY = "async_crawler:visited_urls"  # 去重Key

    # ==================== 数据库配置 ====================
    DB_PATH = "async_crawler.db"

    # ==================== 代理IP池(可选,没有则留空) ====================
    # 格式:["http://ip:port", "http://ip:port"]
    PROXY_POOL = [
        # "http://127.0.0.1:7890",  # 示例:本地代理
    ]

    # ==================== 测试URL(用httpbin.org,避免法律风险) ====================
    # 实际项目换成你要爬的网站(合法合规!)
    TEST_URL_TEMPLATE = "https://httpbin.org/get?page={page}"
    MAX_PAGE = 10000  # 测试1万页,实际项目换成百万级

2. 去重模块(deduplicator.py)

用Redis的Set去重,百万级数据速度极快:

import redis
from config import Config

class RedisDeduplicator:
    def __init__(self):
        self.redis_client = redis.Redis(
            host=Config.REDIS_HOST,
            port=Config.REDIS_PORT,
            db=Config.REDIS_DB,
            decode_responses=True
        )
        self.key = Config.REDIS_KEY

    async def is_visited(self, url):
        """检查URL是否已爬取"""
        return self.redis_client.sismember(self.key, url)

    async def mark_visited(self, url):
        """标记URL为已爬取"""
        self.redis_client.sadd(self.key, url)

    async def clear(self):
        """清空去重记录(测试用)"""
        self.redis_client.delete(self.key)

3. 异步数据存储(storage.py)

用aiosqlite异步存储数据,避免IO阻塞:

import aiosqlite
from datetime import datetime
from config import Config

class AsyncSQLiteStorage:
    def __init__(self):
        self.db_path = Config.DB_PATH

    async def init_db(self):
        """初始化数据库表"""
        async with aiosqlite.connect(self.db_path) as conn:
            await conn.execute('''
                CREATE TABLE IF NOT EXISTS data (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    url TEXT UNIQUE NOT NULL,
                    content TEXT,
                    status_code INTEGER,
                    crawl_time TEXT
                )
            ''')
            await conn.commit()

    async def save(self, url, content, status_code):
        """异步保存数据"""
        crawl_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        try:
            async with aiosqlite.connect(self.db_path) as conn:
                await conn.execute('''
                    INSERT OR REPLACE INTO data (url, content, status_code, crawl_time)
                    VALUES (?, ?, ?, ?)
                ''', (url, content, status_code, crawl_time))
                await conn.commit()
        except Exception as e:
            print(f"保存失败:{url},错误:{e}")

4. 爬虫核心(crawler.py)

包含:异步请求、并发控制、代理IP、自动重试、随机User-Agent:

import asyncio
import random
import logging
from aiohttp import ClientSession, ClientTimeout, TCPConnector
from fake_useragent import UserAgent
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from config import Config
from deduplicator import RedisDeduplicator
from storage import AsyncSQLiteStorage

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.FileHandler('crawler.log'), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

class AsyncCrawler:
    def __init__(self):
        self.deduplicator = RedisDeduplicator()
        self.storage = AsyncSQLiteStorage()
        self.ua = UserAgent()
        self.semaphore = asyncio.Semaphore(Config.MAX_CONCURRENCY)  # 并发控制
        self.proxy_index = 0  # 代理IP轮询索引

    def get_random_proxy(self):
        """获取随机代理IP(轮询)"""
        if not Config.PROXY_POOL:
            return None
        proxy = Config.PROXY_POOL[self.proxy_index % len(Config.PROXY_POOL)]
        self.proxy_index += 1
        return proxy

    def get_random_headers(self):
        """获取随机请求头"""
        return {
            "User-Agent": self.ua.random,
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
            "Connection": "keep-alive"
        }

    @retry(
        stop=stop_after_attempt(Config.RETRY_TIMES),
        wait=wait_exponential(multiplier=1, min=2, max=10),
        retry=retry_if_exception_type((asyncio.TimeoutError, Exception)),
        before_sleep=lambda retry_state: logger.warning(
            f"请求失败,重试第 {retry_state.attempt_number} 次..."
        )
    )
    async def fetch(self, session, url):
        """异步请求URL(带重试)"""
        proxy = self.get_random_proxy()
        headers = self.get_random_headers()
        timeout = ClientTimeout(total=Config.TIMEOUT)

        async with self.semaphore:  # 并发控制
            # 随机延迟(反爬)
            await asyncio.sleep(random.uniform(Config.DELAY_MIN, Config.DELAY_MAX))
            
            async with session.get(
                url,
                headers=headers,
                proxy=proxy,
                timeout=timeout,
                ssl=False  # 测试用,实际项目根据需要开启
            ) as response:
                content = await response.text()
                logger.info(f"请求成功:{url},状态码:{response.status}")
                return {
                    "url": url,
                    "content": content,
                    "status_code": response.status
                }

    async def crawl_url(self, session, url):
        """爬取单个URL"""
        # 1. 检查是否已爬取
        if await self.deduplicator.is_visited(url):
            logger.info(f"URL已爬取,跳过:{url}")
            return
        # 2. 爬取URL
        try:
            result = await self.fetch(session, url)
            # 3. 保存数据
            await self.storage.save(
                url=result["url"],
                content=result["content"],
                status_code=result["status_code"]
            )
            # 4. 标记为已爬取
            await self.deduplicator.mark_visited(url)
        except Exception as e:
            logger.error(f"爬取失败:{url},错误:{e}")

    async def run(self, urls):
        """启动爬虫"""
        # 初始化数据库
        await self.storage.init_db()
        # 创建TCP连接池(复用连接,提升性能)
        connector = TCPConnector(limit=Config.MAX_CONCURRENCY, limit_per_host=10)
        async with ClientSession(connector=connector) as session:
            # 创建任务列表
            tasks = [self.crawl_url(session, url) for url in urls]
            # 并发执行任务
            await asyncio.gather(*tasks)
        logger.info("========== 爬虫完成 ==========")

5. 主程序(main.py)

生成测试URL,启动爬虫:

import asyncio
from config import Config
from crawler import AsyncCrawler

def generate_test_urls():
    """生成测试URL(用httpbin.org)"""
    urls = []
    for page in range(1, Config.MAX_PAGE + 1):
        url = Config.TEST_URL_TEMPLATE.format(page=page)
        urls.append(url)
    return urls

async def main():
    # 1. 生成URL列表
    urls = generate_test_urls()
    print(f"生成URL数量:{len(urls)}")
    # 2. 初始化爬虫
    crawler = AsyncCrawler()
    # 3. 清空去重记录(测试用)
    await crawler.deduplicator.clear()
    # 4. 启动爬虫
    await crawler.run(urls)

if __name__ == "__main__":
    # 运行异步主程序
    asyncio.run(main())

五、避坑指南(非常重要!)

1. 并发控制

  • 并发数不能太高:一般10-100,根据网站反爬调整,太高会被封IP;
  • 用Semaphore:限制同时请求的数量,避免给对方服务器造成压力。

2. 反爬措施

  • 随机User-Agent:用fake_useragent生成随机User-Agent;
  • 随机延迟:每次请求前随机延迟0.5-2秒;
  • 代理IP池:高并发必须用代理,否则容易被封;
  • 请求头完整:模拟真实浏览器的请求头(Accept、Accept-Language、Connection等)。

3. 异常处理与重试

  • 用tenacity:简单易用的异步重试库;
  • 重试次数不要太多:3次左右,太多会浪费时间;
  • 重试间隔:用指数退避(2秒、4秒、8秒),避免立即重试。

4. 去重

  • 百万级用Redis:本地集合内存不够,Redis的Set去重速度极快;
  • 去重要及时:爬取成功后立即标记为已爬取,避免重复请求。

5. 数据存储

  • 用异步数据库:aiosqlite、asyncpg(PostgreSQL)、motor(MongoDB),避免IO阻塞;
  • 批量存储:如果数据量极大,可以批量存储,减少数据库连接次数。

6. 合法合规

  • 遵守robots.txt:查看网站的robots.txt,不要爬取禁止的内容;
  • 控制爬取频率:不要给对方服务器造成压力;
  • 仅供学习研究:不要用于商业用途、恶意爬取、攻击网站;
  • 保护用户隐私:不要爬取、存储、传播用户的个人信息。

六、性能优化建议

1. 连接池复用

  • aiohttp的TCPConnector:复用TCP连接,减少连接建立开销;
  • 设置合理的连接数limit=Config.MAX_CONCURRENCYlimit_per_host=10

2. 异步IO全链路

  • 所有IO操作都用异步:HTTP请求、数据库存储、Redis操作,避免任何同步IO阻塞;
  • 不要用同步库:比如requests、sqlite3,会阻塞事件循环。

3. 分布式爬虫

  • 百万级以上用分布式:用Celery + Redis Queue(RQ)或者Scrapy-Redis,多台机器同时爬取;
  • 任务分发:Redis作为任务队列,分发URL到不同的爬虫节点。

4. 监控与告警

  • 实时监控:用Prometheus + Grafana监控爬取进度、失败率、速度;
  • 告警:失败率超过10%时发送邮件/钉钉告警。

七、测试运行

1. 启动Redis

redis-server

2. 运行爬虫

python main.py

3. 查看日志

  • 控制台实时输出日志;
  • 查看crawler.log文件;
  • 查看async_crawler.db数据库,验证数据是否保存成功。

八、总结

本文从零开始,用Python + aiohttp + asyncio + Redis + aiosqlite实现了一个生产级高并发异步爬虫,涵盖了:

  • 异步HTTP请求:aiohttp + 连接池复用;
  • 并发控制:Semaphore限制同时请求数量;
  • 代理IP池:轮询代理,避免被封;
  • 自动重试:tenacity异步重试;
  • Redis去重:百万级数据去重速度极快;
  • 异步数据存储:aiosqlite避免IO阻塞;
  • 日志监控:实时记录爬取状态。

异步爬虫的核心优势是高并发、速度快,但要注意反爬、并发控制、去重、重试等问题,合法合规使用。希望这篇文章能帮你快速上手异步爬虫,有问题可以在评论区交流!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐