在大数据时代,爬虫不仅需要抓取数据,还需要处理大规模数据的性能问题。对于大数据量抓取,普通爬虫很容易出现 内存占用过高、IO 瓶颈、请求阻塞 等问题。

本文将从 内存优化、IO优化、并发抓取优化 三个维度,结合 Python 实战案例,讲解如何提升大数据量爬虫的性能与稳定性。


一、大数据量爬虫的常见问题

在抓取百万级或千万级数据时,常见问题如下:

问题 原因 影响
内存占用高 所有数据保存在内存中 容易 OOM 崩溃
IO 阻塞 文件或数据库操作同步 抓取速度受限
请求效率低 单线程/同步请求 抓取时间长
异常恢复困难 单点失败 数据丢失或重复抓取

因此,大数据量爬虫优化的核心目标是 降低内存占用、提升 IO 效率、提高并发抓取能力


二、内存优化

1. 流式处理

  • 不要一次性将所有数据存入内存
  • 采用生成器(generator)按需处理数据

示例:生成器抓取并写入 CSV

import csv
import requests
from bs4 import BeautifulSoup

def fetch_movies(pages=10):
    for page in range(0, pages*25, 25):
        url = f"https://movie.douban.com/top250?start={page}"
        headers = {"User-Agent": "Mozilla/5.0"}
        res = requests.get(url, headers=headers)
        soup = BeautifulSoup(res.text, "html.parser")
        for item in soup.find_all("div", class_="item"):
            title = item.find("span", class_="title").text
            rating = item.find("span", class_="rating_num").text
            quote_tag = item.find("span", class_="inq")
            quote = quote_tag.text if quote_tag else ""
            yield {"title": title, "rating": rating, "quote": quote}

with open("douban_top250_stream.csv", "w", newline="", encoding="utf-8-sig") as f:
    writer = csv.DictWriter(f, fieldnames=["title", "rating", "quote"])
    writer.writeheader()
    for movie in fetch_movies(10):
        writer.writerow(movie)

亮点

  • 数据即时写入文件
  • 内存占用保持低位
  • 支持大规模抓取

2. 数据分页与批量处理

  • 对于数据库入库,避免一次性插入大量数据
  • 使用 批量插入分块处理

示例:批量插入 MySQL

import pymysql

def insert_batch(cursor, data_list):
    sql = "INSERT INTO movies(title, rating, quote) VALUES(%s, %s, %s)"
    cursor.executemany(sql, data_list)

conn = pymysql.connect(host="localhost", user="root", password="123456", database="douban")
cursor = conn.cursor()
batch = []
for movie in fetch_movies(100):
    batch.append((movie['title'], movie['rating'], movie['quote']))
    if len(batch) >= 50:  # 每50条批量写入
        insert_batch(cursor, batch)
        conn.commit()
        batch = []

if batch:
    insert_batch(cursor, batch)
    conn.commit()
cursor.close()
conn.close()

3. 压缩存储

  • 大量文本数据可使用 gzip、lz4 压缩存储
  • 减少内存占用与磁盘 IO

示例:gzip 写入 CSV

import gzip
import csv

with gzip.open("douban_top250.csv.gz", "wt", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["title", "rating", "quote"])
    writer.writeheader()
    for movie in fetch_movies(100):
        writer.writerow(movie)

三、IO 优化

1. 异步写入

  • 文件、数据库写入使用异步操作
  • 避免同步 IO 阻塞爬虫请求

示例:使用 aiofiles 异步写入文件

import aiofiles
import asyncio
import aiohttp
from bs4 import BeautifulSoup

async def fetch(session, url):
    async with session.get(url) as resp:
        return await resp.text()

async def parse_and_write(url, f):
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, url)
        soup = BeautifulSoup(html, "html.parser")
        for item in soup.find_all("div", class_="item"):
            title = item.find("span", class_="title").text
            rating = item.find("span", class_="rating_num").text
            quote_tag = item.find("span", class_="inq")
            quote = quote_tag.text if quote_tag else ""
            await f.write(f"{title},{rating},{quote}\n")

async def main():
    async with aiofiles.open("douban_async.csv", "w", encoding="utf-8") as f:
        tasks = [parse_and_write(f"https://movie.douban.com/top250?start={i*25}", f) for i in range(10)]
        await asyncio.gather(*tasks)

asyncio.run(main())

2. 批量数据库操作

  • 通过批量插入减少数据库连接开销
  • 异步数据库库可使用 aiomysql、motor(MongoDB)

3. 减少磁盘 IO

  • 数据可先缓存在内存(小批量)再写入磁盘
  • 使用 CSV/JSON/Parquet 等高效存储格式

四、并发优化

1. 异步爬虫

  • 使用 aiohttp + asynciohttpx + asyncio 实现并发请求
  • 适合大规模 URL 抓取

示例:aiohttp 并发抓取

import asyncio
import aiohttp

async def fetch(url, session):
    async with session.get(url) as resp:
        return await resp.text()

async def main(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(url, session) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

urls = [f"https://movie.douban.com/top250?start={i*25}" for i in range(10)]
results = asyncio.run(main(urls))

2. 并发控制

  • 限制同时请求数量,防止被封 IP
  • 使用 Semaphore 控制并发
semaphore = asyncio.Semaphore(5)

async def fetch(url, session):
    async with semaphore:
        async with session.get(url) as resp:
            return await resp.text()

3. 多进程/多线程

  • Python GIL 限制 CPU 计算型任务
  • 对 IO 密集型任务可用 多线程,对 CPU 密集型任务可用 多进程
  • 结合异步爬虫,可进一步提升抓取速度

示例:多进程 + 异步

from multiprocessing import Pool
import asyncio

def worker(urls):
    asyncio.run(main(urls))

if __name__ == "__main__":
    url_chunks = [urls[i:i+5] for i in range(0, len(urls), 5)]
    with Pool(processes=4) as pool:
        pool.map(worker, url_chunks)

4. 分布式爬虫

  • 使用 Scrapy + Redis 实现任务队列分布式抓取
  • 支持多节点并行抓取大规模数据
  • Redis 队列可保证去重与任务调度

五、抓取大数据量的实战技巧

  1. 任务拆分

    • URL 分块处理,避免单次抓取任务过大
    • 可结合 Redis / Celery / Kafka 做任务调度
  2. 延时策略

    • 防止被封 IP
    • asyncio.sleep() 或随机延时
  3. 代理池

    • 提供 IP 轮换,提高抓取稳定性
    • 配合异常重试机制
  4. 数据去重

    • Redis Set/ Bloom Filter 避免重复抓取
  5. 异常处理与重试

    • 网络超时、服务器错误、解析失败
    • 保证任务可靠性

六、综合示例:异步 + 批量写入 + 内存优化

import asyncio, aiohttp, aiofiles
from bs4 import BeautifulSoup

async def fetch(url, session):
    async with session.get(url) as resp:
        return await resp.text()

async def parse_and_write(url, f):
    async with aiohttp.ClientSession() as session:
        html = await fetch(url, session)
        soup = BeautifulSoup(html, "html.parser")
        for item in soup.find_all("div.item"):
            title = item.find("span.title").text
            rating = item.find("span.rating_num").text
            quote_tag = item.find("span.inq")
            quote = quote_tag.text if quote_tag else ""
            await f.write(f"{title},{rating},{quote}\n")

async def main():
    urls = [f"https://movie.douban.com/top250?start={i*25}" for i in range(100)]
    async with aiofiles.open("douban_large.csv", "w", encoding="utf-8") as f:
        tasks = [parse_and_write(url, f) for url in urls]
        await asyncio.gather(*tasks)

asyncio.run(main())

优化点

  • 异步抓取减少请求阻塞
  • 流式写入降低内存占用
  • 批量处理提高写入效率

七、总结

大数据量爬虫优化的核心策略:

  1. 内存优化

    • 流式处理、生成器
    • 批量写入
    • 压缩存储
  2. IO优化

    • 异步文件/数据库操作
    • 批量写入
    • 高效存储格式
  3. 并发优化

    • 异步爬虫 (aiohttp + asyncio)
    • 多线程/多进程
    • 分布式爬虫 (Scrapy + Redis)

结合以上策略,可以实现 百万级甚至千万级数据抓取,保证爬虫的稳定性与效率。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐