一个虫子的养成日记
·
Python异步爬虫实战:使用aiohttp爬取电影网站
一、项目概述
放假期间简单接触了一下爬虫,发现还挺有意思的。于是尝试使用 Python 的异步编程框架(asyncio + aiohttp)实现了一个电影网站数据抓取的小项目(具体网址就不公开了,懂的都懂 😄)。本文记录了我第一次编写异步爬虫的实践过程。
- 异步爬虫的基本原理和实现
- 如何处理请求异常和重试机制
- 如何使用XPath解析HTML页面
- 如何控制并发请求量
- 日志记录的配置和使用
二、环境准备
2.1 安装依赖库
```bash
pip install aiohttp
pip install aiohttp
pip install fake-useragent
pip install lxml
'''
2.2 导入需要的模块
'''python
import json
import time
import aiohttp
import asyncio
import random
from fake_useragent import UserAgent
import logging
from lxml import etree
三 核心代码解释
3.1 日志配置
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(filename)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
知识点:
- logging模块用于记录程序运行状态
- basicConfig设置日志级别和格式
- 通过getLogger获取logger实例
3.2 异步请求函数
async def get_request(url, session, retry:int, time_sleep:tuple, headers, semaphore):
'''
异步发送HTTP请求,支持重试机制
:param semaphore: 信号量,控制并发数
:param headers: 请求头
:param time_sleep: 休眠时间范围
:param url: 请求的url
:param session: session链接
:param retry: 重试次数
:return: 请求的响应
'''
for attempt in range(retry):
try:
async with semaphore:
logger.info(f'正在请求{url}')
async with session.get(url=url, headers=headers) as response:
status = response.status
# 2xx 成功响应
if 200 <= status < 300:
await asyncio.sleep(random.uniform(*time_sleep))
return await response.text()
# 4xx 客户端错误,不重试
elif 400 <= status < 500:
logger.warning(f"客户端返回{status},错误地址{url},不进行重试")
return None
# 5xx 服务器错误,重试
elif 500 <= status < 600:
logger.warning(f"服务器返回状态{status}-{url}正在重新请求,当前请求{attempt + 1}")
wait = random.uniform(*time_sleep) + attempt * 0.125
await asyncio.sleep(wait)
continue
except (aiohttp.ClientConnectionError, asyncio.TimeoutError) as e:
logger.error(f"网络错误: {e} - {url},第{attempt + 1}次重试")
if attempt < retry - 1:
await asyncio.sleep(random.uniform(*time_sleep) + attempt * 0.125)
continue
except Exception as e:
logger.error(f"未知错误{e}-{url}")
return None
return None
关键知识点:
- async/await:python异步编程的而核心语法;
- 信号量(Semaphore):控制并发请求数量
- 状态码处理:区分客户端错误和服务器错误
- 指数退避重试:重试时增加等待时间
3.3 批量请求 异步调度 函数为以改为面相对象做准备
async def batch_fetch(urls, session, retry, sleep_range, headers, semaphore, ua):
tasks = []
for url in urls:
current_headers = headers.copy()
current_headers['User-Agent'] = ua.random
task = asyncio.create_task(
get_request(
url=url,
session=session,
retry=retry,
time_sleep=sleep_range,
headers=current_headers,
semaphore=semaphore
)
)
tasks.append(task)
return await asyncio.gather(*tasks)
知识点:
- asyncio.create_task():创建异步任务
- asyncio.gather():并发执行多个任务
- UserAgent.random:随机生成User-Agent,避免被识别为爬虫
3.4 XPath解析函数
def def_next_urls(response_text):
'''提取列表页中的详情页URL'''
if not response_text:
logger.error("def_next_urls 获得的 response 为空")
return []
html = etree.HTML(response_text)
if html is None:
logger.error("HTML解析失败")
return []
next_xpath_url = html.xpath('//a[@class="name"]/@href')
return next_xpath_url
def dict_data(response_text)->dict:
'''提取详情页中的电影信息'''
if not response_text:
return {'返回结果': None, 'RESPONST': 'data函数中传入的response为空'}
html = etree.HTML(response_text)
if html is None:
logger.error("HTML解析失败")
return {}
try:
# 提取电影名称
name = html.xpath('//h2[@class="m-b-sm"]/text()')
# 提取电影分类
categories = html.xpath('//*[contains(@class, "categories")]//button[contains(@class, "el-button")]//span/text()')
# 提取基本信息(地区、时长)
info_spans = html.xpath("//div[@class='m-v-sm info']/span/text()")
# 提取评分
score = html.xpath('//p[@class="score m-t-md m-b-n-sm"]/text()')
# 提取剧情简介
drama = html.xpath('//*[contains(@class, "drama")]/p[1]/text()')
detail_dict = {
'name': name[0] if name else None,
'categories': categories if categories else None,
'categories_str': ','.join([cat.strip() for cat in categories] if categories else []),
'region': info_spans[0].strip() if len(info_spans) > 0 else '未知',
'duration': info_spans[2].strip() if len(info_spans) > 2 else '未知',
'score': score[0].strip() if score else '0.0',
'drama': drama[0].strip() if drama else '',
}
logger.info(f'成功解析电影: {detail_dict["name"]}')
return detail_dict
except Exception as e:
logger.warning(f'解析电影信息时出错: {e}')
return {'解析电影时出错': e}
xpath的知识点:
- //:从任意位置选取节点
- @:选取属性,如@href、@class
- [contains(@class, “xxx”)]:模糊匹配class属性
- /text():获取节点文本内容
3.5 主函数main()
async def main():
url = '这里的网站进行了隐藏'
index_rul = '/page/'
ua = UserAgent()
timeout = aiohttp.ClientTimeout(total=3)
retry = 3
time_sleep = (0.5, 1.5)
semaphore = asyncio.Semaphore(3)
connector = aiohttp.TCPConnector(limit_per_host=5)
headers = {
'User-Agent': None,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
}
async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
############# 1. 爬取列表页 ########################
index_urls = [f"{url}/page/{i}" for i in range(1, 11)]
responses = await batch_fetch(
urls=index_urls,
session=session,
retry=retry,
sleep_range=time_sleep,
headers=headers,
semaphore=semaphore,
ua=ua
)
# 提取所有详情页URL
all_next_url = []
for response in responses:
if response:
next_urls = def_next_urls(response)
if next_urls:
all_next_url.extend(next_urls)
else:
logger.error("列表页解析失败")
############# 2. 爬取详情页 ########################
detail_url = [url + next_url for next_url in all_next_url]
detail_responses = await batch_fetch(
urls=detail_url,
session=session,
retry=retry,
sleep_range=time_sleep,
headers=headers,
semaphore=semaphore,
ua=ua
)
# 解析所有详情页
all_data = []
for response in detail_responses:
if response:
data = dict_data(response_text=response)
if data:
all_data.append(data)
else:
logger.error("详情页请求失败")
############# 3. 保存数据 ########################
with open('ssr1.json', 'w', encoding='utf-8') as f:
json.dump(all_data, f, ensure_ascii=False, indent=2)
logger.info(f'数据保存成功,共 {len(all_data)} 条')
return '网站数据抓取完成,请查看保存的json文件'
if __name__ == '__main__':
start = time.time()
asyncio.run(main())
end = time.time()
logger.info(f'请求用时{end - start}')
四 关键配置说明
4.1 并发控制参数
| 参数 | 说明 | 建议值 |
|---|---|---|
| semaphore | 信号量,控制最大并发数 | 3~5 |
| limit_per_host | 同一主机的最大连接数 | 5 |
| time_sleep | 请求间隔时间范围 | (0.5~1.5) |
| retry | 失败重试次数 | 3 |
4.2 请求头设置
headers = {
'User-Agent': None, # 动态生成
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
}
五 运行结果
运行程序后,会生成ssr1.json文件,包含爬取的电影信息:
[
{
"name": "电影名称",
"categories": ["剧情", "动作"],
"categories_str": "剧情,动作",
"region": "美国",
"duration": "120分钟",
"score": "9.5",
"drama": "电影剧情简介..."
},
]
六 常见问题及解决
6.1连接超时
- 增加ClientTimeout的值
- 减少并发数
6.2被封IP
- 增加请求间隔
- 使用代理IP
- 随机User-Agent
6.3 解析失败
- 检查XPath表达式
- 查看网页结构是否变化
- 打印HTML内容调试
七 总结
1. 异步编程:asyncio + aiohttp的使用
2. 异常处理:完善的错误处理和重试机制
3. 数据解析:XPath的灵活运用
4. 并发控制:信号量和连接池的配置
5.日志记录:便于调试和监控
5. 为什么要用 semaphore 而不是只用 TCPConnector 的原因是链接数 不等于 并发任务数,一个TCP链接,一个是任务并发
6. 设定了 retry机制
7. gather 的风险 后期要注意,**await asyncio.gather(*tasks)** 这样的用法 一个任务异常整个程序就会雪崩,最好在参数中添加 ** return_exceptions=True **
8.
八 扩展练习
- 添加代理IP池
- 实现增量爬取
- 添加数据去重功能
- 使用数据库存储代替JSON文件
- 添加可视化进度条
九 设计思想总结
9.1 为什么需要三层解构
| 层 | 作用 |
|---|---|
| 请求层 | 保证稳定性 |
| 调度层 | 提高吞吐 |
| 解析曾 | 解析业务 |
9.2 为什么要做状态码分级
不是为了优雅而是:
| 状态 | 策略 |
|---|---|
| 2xx | 成功 |
| 4xx | 不重试 |
| 5xx | 重试(临时错误) |
这是资源友好的爬虫
9.3 为什么添加sleep
我想是为了礼貌,但是现实是防止请求节奏过快,被揍~~~~~~~~~~~!
好了,这就是我第一个异步爬虫啦,后续继续更新 ~
张志刚
Happy coding!
2026-02-14
更多推荐
所有评论(0)