Python异步爬虫实战:使用aiohttp爬取电影网站

一、项目概述

放假期间简单接触了一下爬虫,发现还挺有意思的。于是尝试使用 Python 的异步编程框架(asyncio + aiohttp)实现了一个电影网站数据抓取的小项目(具体网址就不公开了,懂的都懂 😄)。本文记录了我第一次编写异步爬虫的实践过程。

  • 异步爬虫的基本原理和实现
  • 如何处理请求异常和重试机制
  • 如何使用XPath解析HTML页面
  • 如何控制并发请求量
  • 日志记录的配置和使用

二、环境准备

2.1 安装依赖库

```bash
pip install aiohttp
pip install aiohttp
pip install fake-useragent
pip install lxml
'''

2.2 导入需要的模块

'''python

import json
import time
import aiohttp
import asyncio
import random
from fake_useragent import UserAgent
import logging
from lxml import etree

三 核心代码解释

3.1 日志配置

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(filename)s - %(levelname)s - %(message)s')
                    
logger = logging.getLogger(__name__)

知识点:

  • logging模块用于记录程序运行状态
  • basicConfig设置日志级别和格式
  • 通过getLogger获取logger实例

3.2 异步请求函数

async def get_request(url, session, retry:int, time_sleep:tuple, headers, semaphore):
   '''
   异步发送HTTP请求,支持重试机制
   :param semaphore: 信号量,控制并发数
   :param headers: 请求头
   :param time_sleep: 休眠时间范围
   :param url: 请求的url
   :param session: session链接
   :param retry: 重试次数
   :return: 请求的响应
   '''
   for attempt in range(retry):
       try:
           async with semaphore:
               logger.info(f'正在请求{url}')
               async with session.get(url=url, headers=headers) as response:
                   status = response.status
                   
                   # 2xx 成功响应
                   if 200 <= status < 300:
                       await asyncio.sleep(random.uniform(*time_sleep))
                       return await response.text()
                   
                   # 4xx 客户端错误,不重试
                   elif 400 <= status < 500:
                       logger.warning(f"客户端返回{status},错误地址{url},不进行重试")
                       return None
                   
                   # 5xx 服务器错误,重试
                   elif 500 <= status < 600:
                       logger.warning(f"服务器返回状态{status}-{url}正在重新请求,当前请求{attempt + 1}")
                       wait = random.uniform(*time_sleep) + attempt * 0.125
                       await asyncio.sleep(wait)
                       continue
                       
       except (aiohttp.ClientConnectionError, asyncio.TimeoutError) as e:
           logger.error(f"网络错误: {e} - {url},第{attempt + 1}次重试")
           if attempt < retry - 1:
               await asyncio.sleep(random.uniform(*time_sleep) + attempt * 0.125)
               continue
               
       except Exception as e:
           logger.error(f"未知错误{e}-{url}")
           return None
           
   return None

关键知识点:

  • async/await:python异步编程的而核心语法;
  • 信号量(Semaphore):控制并发请求数量
  • 状态码处理:区分客户端错误和服务器错误
  • 指数退避重试:重试时增加等待时间

3.3 批量请求 异步调度 函数为以改为面相对象做准备

async def batch_fetch(urls, session, retry, sleep_range, headers, semaphore, ua):
    tasks = []
    for url in urls:
        current_headers = headers.copy()
        current_headers['User-Agent'] = ua.random
        
        task = asyncio.create_task(
            get_request(
                url=url,
                session=session,
                retry=retry,
                time_sleep=sleep_range,
                headers=current_headers,
                semaphore=semaphore
            )
        )
        tasks.append(task)
    return await asyncio.gather(*tasks)

知识点:

  • asyncio.create_task():创建异步任务
  • asyncio.gather():并发执行多个任务
  • UserAgent.random:随机生成User-Agent,避免被识别为爬虫

3.4 XPath解析函数

def def_next_urls(response_text):
    '''提取列表页中的详情页URL'''
    if not response_text:
        logger.error("def_next_urls 获得的 response 为空")
        return []
    
    html = etree.HTML(response_text)
    if html is None:
        logger.error("HTML解析失败")
        return []
    
    next_xpath_url = html.xpath('//a[@class="name"]/@href')
    return next_xpath_url

def dict_data(response_text)->dict:
    '''提取详情页中的电影信息'''
    if not response_text:
        return {'返回结果': None, 'RESPONST': 'data函数中传入的response为空'}
    
    html = etree.HTML(response_text)
    if html is None:
        logger.error("HTML解析失败")
        return {}
    
    try:
        # 提取电影名称
        name = html.xpath('//h2[@class="m-b-sm"]/text()')
        
        # 提取电影分类
        categories = html.xpath('//*[contains(@class, "categories")]//button[contains(@class, "el-button")]//span/text()')
        
        # 提取基本信息(地区、时长)
        info_spans = html.xpath("//div[@class='m-v-sm info']/span/text()")
        
        # 提取评分
        score = html.xpath('//p[@class="score m-t-md m-b-n-sm"]/text()')
        
        # 提取剧情简介
        drama = html.xpath('//*[contains(@class, "drama")]/p[1]/text()')
        
        detail_dict = {
            'name': name[0] if name else None,
            'categories': categories if categories else None,
            'categories_str': ','.join([cat.strip() for cat in categories] if categories else []),
            'region': info_spans[0].strip() if len(info_spans) > 0 else '未知',
            'duration': info_spans[2].strip() if len(info_spans) > 2 else '未知',
            'score': score[0].strip() if score else '0.0',
            'drama': drama[0].strip() if drama else '',
        }
        logger.info(f'成功解析电影: {detail_dict["name"]}')
        
        return detail_dict
        
    except Exception as e:
        logger.warning(f'解析电影信息时出错: {e}')
        return {'解析电影时出错': e}

xpath的知识点:

  • //:从任意位置选取节点
  • @:选取属性,如@href、@class
  • [contains(@class, “xxx”)]:模糊匹配class属性
  • /text():获取节点文本内容

3.5 主函数main()

async def main():
    url = '这里的网站进行了隐藏'
    index_rul = '/page/'
    ua = UserAgent()
    timeout = aiohttp.ClientTimeout(total=3)
    retry = 3
    time_sleep = (0.5, 1.5)
    semaphore = asyncio.Semaphore(3)
    connector = aiohttp.TCPConnector(limit_per_host=5)
    
    headers = {
        'User-Agent': None,
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
        'Accept-Encoding': 'gzip, deflate',
        'Connection': 'keep-alive',
    }
    
    async with aiohttp.ClientSession(timeout=timeout, connector=connector) as session:
        ############# 1. 爬取列表页 ########################
        index_urls = [f"{url}/page/{i}" for i in range(1, 11)]
        responses = await batch_fetch(
            urls=index_urls,
            session=session,
            retry=retry,
            sleep_range=time_sleep,
            headers=headers,
            semaphore=semaphore,
            ua=ua
        )
        
        # 提取所有详情页URL
        all_next_url = []
        for response in responses:
            if response:
                next_urls = def_next_urls(response)
                if next_urls:
                    all_next_url.extend(next_urls)
            else:
                logger.error("列表页解析失败")
        
        ############# 2. 爬取详情页 ########################
        detail_url = [url + next_url for next_url in all_next_url]
        detail_responses = await batch_fetch(
            urls=detail_url,
            session=session,
            retry=retry,
            sleep_range=time_sleep,
            headers=headers,
            semaphore=semaphore,
            ua=ua
        )
        
        # 解析所有详情页
        all_data = []
        for response in detail_responses:
            if response:
                data = dict_data(response_text=response)
                if data:
                    all_data.append(data)
            else:
                logger.error("详情页请求失败")
        
        ############# 3. 保存数据 ########################
        with open('ssr1.json', 'w', encoding='utf-8') as f:
            json.dump(all_data, f, ensure_ascii=False, indent=2)
            logger.info(f'数据保存成功,共 {len(all_data)} 条')
    
    return '网站数据抓取完成,请查看保存的json文件'


if __name__ == '__main__':
    start = time.time()
    asyncio.run(main())
    end = time.time()
    logger.info(f'请求用时{end - start}')

四 关键配置说明

4.1 并发控制参数

参数 说明 建议值
semaphore 信号量,控制最大并发数 3~5
limit_per_host 同一主机的最大连接数 5
time_sleep 请求间隔时间范围 (0.5~1.5)
retry 失败重试次数 3

4.2 请求头设置

headers = {
    'User-Agent': None,  # 动态生成
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
    'Accept-Encoding': 'gzip, deflate',
    'Connection': 'keep-alive',
}

五 运行结果

运行程序后,会生成ssr1.json文件,包含爬取的电影信息:

[
  {
    "name": "电影名称",
    "categories": ["剧情", "动作"],
    "categories_str": "剧情,动作",
    "region": "美国",
    "duration": "120分钟",
    "score": "9.5",
    "drama": "电影剧情简介..."
  },
]

六 常见问题及解决

6.1连接超时

  1. 增加ClientTimeout的值
  2. 减少并发数

6.2被封IP

  1. 增加请求间隔
  2. 使用代理IP
  3. 随机User-Agent

6.3 解析失败

  1. 检查XPath表达式
  2. 查看网页结构是否变化
  3. 打印HTML内容调试

七 总结

1. 异步编程:asyncio + aiohttp的使用
2. 异常处理:完善的错误处理和重试机制
3. 数据解析:XPath的灵活运用
4. 并发控制:信号量和连接池的配置
5.日志记录:便于调试和监控
5. 为什么要用 semaphore 而不是只用 TCPConnector 的原因是链接数 不等于 并发任务数,一个TCP链接,一个是任务并发
6. 设定了 retry机制
7. gather 的风险 后期要注意,**await asyncio.gather(*tasks)**  这样的用法 一个任务异常整个程序就会雪崩,最好在参数中添加 **  return_exceptions=True **
8. 

八 扩展练习

  • 添加代理IP池
  • 实现增量爬取
  • 添加数据去重功能
  • 使用数据库存储代替JSON文件
  • 添加可视化进度条

九 设计思想总结

9.1 为什么需要三层解构

作用
请求层 保证稳定性
调度层 提高吞吐
解析曾 解析业务

9.2 为什么要做状态码分级

不是为了优雅而是:

状态 策略
2xx 成功
4xx 不重试
5xx 重试(临时错误)

这是资源友好的爬虫

9.3 为什么添加sleep

我想是为了礼貌,但是现实是防止请求节奏过快,被揍~~~~~~~~~~~!

好了,这就是我第一个异步爬虫啦,后续继续更新 ~

张志刚
Happy coding!
2026-02-14

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐