Python 爬虫大数据量抓取优化实战:内存、IO、并发优化
大数据量爬虫性能优化指南 本文针对百万级以上数据抓取场景,从内存、IO和并发三个维度提出优化方案: 内存优化 采用生成器流式处理数据,避免内存堆积 数据分页批量处理,减少单次内存占用 使用gzip压缩存储降低磁盘空间 IO优化 异步写入文件/数据库避免阻塞 批量数据库操作减少连接开销 选择高效存储格式(CSV/Parquet) 并发优化 使用aiohttp实现异步请求 通过Semaphore控制并
·
在大数据时代,爬虫不仅需要抓取数据,还需要处理大规模数据的性能问题。对于大数据量抓取,普通爬虫很容易出现 内存占用过高、IO 瓶颈、请求阻塞 等问题。
本文将从 内存优化、IO优化、并发抓取优化 三个维度,结合 Python 实战案例,讲解如何提升大数据量爬虫的性能与稳定性。
一、大数据量爬虫的常见问题
在抓取百万级或千万级数据时,常见问题如下:
问题 | 原因 | 影响 |
---|---|---|
内存占用高 | 所有数据保存在内存中 | 容易 OOM 崩溃 |
IO 阻塞 | 文件或数据库操作同步 | 抓取速度受限 |
请求效率低 | 单线程/同步请求 | 抓取时间长 |
异常恢复困难 | 单点失败 | 数据丢失或重复抓取 |
因此,大数据量爬虫优化的核心目标是 降低内存占用、提升 IO 效率、提高并发抓取能力。
二、内存优化
1. 流式处理
- 不要一次性将所有数据存入内存
- 采用生成器(generator)按需处理数据
示例:生成器抓取并写入 CSV
import csv
import requests
from bs4 import BeautifulSoup
def fetch_movies(pages=10):
for page in range(0, pages*25, 25):
url = f"https://movie.douban.com/top250?start={page}"
headers = {"User-Agent": "Mozilla/5.0"}
res = requests.get(url, headers=headers)
soup = BeautifulSoup(res.text, "html.parser")
for item in soup.find_all("div", class_="item"):
title = item.find("span", class_="title").text
rating = item.find("span", class_="rating_num").text
quote_tag = item.find("span", class_="inq")
quote = quote_tag.text if quote_tag else ""
yield {"title": title, "rating": rating, "quote": quote}
with open("douban_top250_stream.csv", "w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=["title", "rating", "quote"])
writer.writeheader()
for movie in fetch_movies(10):
writer.writerow(movie)
亮点:
- 数据即时写入文件
- 内存占用保持低位
- 支持大规模抓取
2. 数据分页与批量处理
- 对于数据库入库,避免一次性插入大量数据
- 使用 批量插入 或 分块处理
示例:批量插入 MySQL
import pymysql
def insert_batch(cursor, data_list):
sql = "INSERT INTO movies(title, rating, quote) VALUES(%s, %s, %s)"
cursor.executemany(sql, data_list)
conn = pymysql.connect(host="localhost", user="root", password="123456", database="douban")
cursor = conn.cursor()
batch = []
for movie in fetch_movies(100):
batch.append((movie['title'], movie['rating'], movie['quote']))
if len(batch) >= 50: # 每50条批量写入
insert_batch(cursor, batch)
conn.commit()
batch = []
if batch:
insert_batch(cursor, batch)
conn.commit()
cursor.close()
conn.close()
3. 压缩存储
- 大量文本数据可使用 gzip、lz4 压缩存储
- 减少内存占用与磁盘 IO
示例:gzip 写入 CSV
import gzip
import csv
with gzip.open("douban_top250.csv.gz", "wt", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["title", "rating", "quote"])
writer.writeheader()
for movie in fetch_movies(100):
writer.writerow(movie)
三、IO 优化
1. 异步写入
- 文件、数据库写入使用异步操作
- 避免同步 IO 阻塞爬虫请求
示例:使用 aiofiles 异步写入文件
import aiofiles
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.text()
async def parse_and_write(url, f):
async with aiohttp.ClientSession() as session:
html = await fetch(session, url)
soup = BeautifulSoup(html, "html.parser")
for item in soup.find_all("div", class_="item"):
title = item.find("span", class_="title").text
rating = item.find("span", class_="rating_num").text
quote_tag = item.find("span", class_="inq")
quote = quote_tag.text if quote_tag else ""
await f.write(f"{title},{rating},{quote}\n")
async def main():
async with aiofiles.open("douban_async.csv", "w", encoding="utf-8") as f:
tasks = [parse_and_write(f"https://movie.douban.com/top250?start={i*25}", f) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
2. 批量数据库操作
- 通过批量插入减少数据库连接开销
- 异步数据库库可使用 aiomysql、motor(MongoDB)
3. 减少磁盘 IO
- 数据可先缓存在内存(小批量)再写入磁盘
- 使用 CSV/JSON/Parquet 等高效存储格式
四、并发优化
1. 异步爬虫
- 使用 aiohttp + asyncio 或 httpx + asyncio 实现并发请求
- 适合大规模 URL 抓取
示例:aiohttp 并发抓取
import asyncio
import aiohttp
async def fetch(url, session):
async with session.get(url) as resp:
return await resp.text()
async def main(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(url, session) for url in urls]
results = await asyncio.gather(*tasks)
return results
urls = [f"https://movie.douban.com/top250?start={i*25}" for i in range(10)]
results = asyncio.run(main(urls))
2. 并发控制
- 限制同时请求数量,防止被封 IP
- 使用 Semaphore 控制并发
semaphore = asyncio.Semaphore(5)
async def fetch(url, session):
async with semaphore:
async with session.get(url) as resp:
return await resp.text()
3. 多进程/多线程
- Python GIL 限制 CPU 计算型任务
- 对 IO 密集型任务可用 多线程,对 CPU 密集型任务可用 多进程
- 结合异步爬虫,可进一步提升抓取速度
示例:多进程 + 异步
from multiprocessing import Pool
import asyncio
def worker(urls):
asyncio.run(main(urls))
if __name__ == "__main__":
url_chunks = [urls[i:i+5] for i in range(0, len(urls), 5)]
with Pool(processes=4) as pool:
pool.map(worker, url_chunks)
4. 分布式爬虫
- 使用 Scrapy + Redis 实现任务队列分布式抓取
- 支持多节点并行抓取大规模数据
- Redis 队列可保证去重与任务调度
五、抓取大数据量的实战技巧
-
任务拆分
- URL 分块处理,避免单次抓取任务过大
- 可结合 Redis / Celery / Kafka 做任务调度
-
延时策略
- 防止被封 IP
asyncio.sleep()
或随机延时
-
代理池
- 提供 IP 轮换,提高抓取稳定性
- 配合异常重试机制
-
数据去重
- Redis Set/ Bloom Filter 避免重复抓取
-
异常处理与重试
- 网络超时、服务器错误、解析失败
- 保证任务可靠性
六、综合示例:异步 + 批量写入 + 内存优化
import asyncio, aiohttp, aiofiles
from bs4 import BeautifulSoup
async def fetch(url, session):
async with session.get(url) as resp:
return await resp.text()
async def parse_and_write(url, f):
async with aiohttp.ClientSession() as session:
html = await fetch(url, session)
soup = BeautifulSoup(html, "html.parser")
for item in soup.find_all("div.item"):
title = item.find("span.title").text
rating = item.find("span.rating_num").text
quote_tag = item.find("span.inq")
quote = quote_tag.text if quote_tag else ""
await f.write(f"{title},{rating},{quote}\n")
async def main():
urls = [f"https://movie.douban.com/top250?start={i*25}" for i in range(100)]
async with aiofiles.open("douban_large.csv", "w", encoding="utf-8") as f:
tasks = [parse_and_write(url, f) for url in urls]
await asyncio.gather(*tasks)
asyncio.run(main())
优化点:
- 异步抓取减少请求阻塞
- 流式写入降低内存占用
- 批量处理提高写入效率
七、总结
大数据量爬虫优化的核心策略:
-
内存优化
- 流式处理、生成器
- 批量写入
- 压缩存储
-
IO优化
- 异步文件/数据库操作
- 批量写入
- 高效存储格式
-
并发优化
- 异步爬虫 (aiohttp + asyncio)
- 多线程/多进程
- 分布式爬虫 (Scrapy + Redis)
结合以上策略,可以实现 百万级甚至千万级数据抓取,保证爬虫的稳定性与效率。
更多推荐
所有评论(0)