在异步爬虫(尤其是aiohttp)的实战中,错误和异常处理是保证爬虫稳定性的核心——网络波动、网站反爬、代码逻辑疏漏都可能导致各种问题。如果处理不当,轻则丢失部分数据,重则整个爬虫崩溃。

本文从实战角度,拆解aiohttp异步爬虫中8类高频错误的成因和解决方案,附完整代码示例,帮你实现“遇到错误不崩溃、可恢复错误能重试、关键错误有日志”的健壮爬虫。

一、先明确:异步爬虫的错误处理和同步有何不同?

异步爬虫的错误处理比同步(requests)更复杂,核心差异在于:

  • 错误传播方式:同步爬虫的错误在当前线程直接抛出,而异步任务(Task)的错误会被包装在Future对象中,若不主动处理,可能“静默失败”(爬虫看似在运行,实际数据没爬全);
  • 资源依赖:异步爬虫依赖事件循环、会话(Session)、数据库连接池等共享资源,错误可能导致资源泄露(比如未关闭的连接占用内存);
  • 并发影响:单个任务的错误若处理不当,可能阻塞整个事件循环(比如未捕获的异常导致所有任务挂起)。

二、8类高频错误及解决方案(附代码)

1. 网络连接错误(最常见)

错误类型aiohttp.ClientConnectionError(包含连接超时、DNS解析失败、连接被拒绝等)。
场景:目标网站暂时不可用、网络波动、本地网络断开。

解决方案

  • 增加重试机制(对临时网络问题有效);
  • 限制重试次数(避免无限重试浪费资源);
  • 记录失败URL,方便后续补爬。
import aiohttp
import asyncio
from aiohttp import ClientConnectionError, ClientTimeoutError

async def fetch_with_retry(session, url, max_retry=3):
    """带重试的异步请求函数"""
    for retry in range(max_retry):
        try:
            # 设置超时(避免请求一直挂着)
            timeout = aiohttp.ClientTimeout(total=10)  # 总超时10秒
            async with session.get(url, timeout=timeout) as response:
                print(f"请求成功:{url},状态码:{response.status}")
                return await response.text()
        except (ClientConnectionError, ClientTimeoutError) as e:
            # 网络错误/超时,重试
            if retry < max_retry - 1:
                wait_time = 2** retry  # 指数退避(1s, 2s, 4s...),减少服务器压力
                print(f"第{retry+1}次重试{url},错误:{e},等待{wait_time}秒...")
                await asyncio.sleep(wait_time)  # 必须用asyncio.sleep,否则阻塞事件循环
            else:
                print(f"重试{max_retry}次失败:{url},错误:{e}")
                # 记录失败URL到文件,方便后续补爬
                with open("failed_urls.txt", "a") as f:
                    f.write(url + "\n")
                return None
        except Exception as e:
            # 其他未知错误,不重试
            print(f"请求{url}发生未知错误:{e}")
            return None

2. HTTP状态码错误(反爬或资源不存在)

错误类型:非200状态码(如403拒绝访问、404页面不存在、503服务器过载)。
场景:IP被封(403)、目标URL无效(404)、网站服务器繁忙(503)。

解决方案

  • 按状态码分类处理(403换代理,404跳过,503重试);
  • 对403等反爬状态,结合代理池动态切换IP。
async def fetch_with_status_handle(session, url, proxy=None):
    """处理HTTP状态码的请求函数"""
    try:
        async with session.get(url, proxy=proxy, timeout=10) as response:
            if response.status == 200:
                return await response.text()
            elif response.status == 403:
                print(f"403被拒绝:{url},可能IP被封,尝试换代理...")
                # 若有代理,返回"需要换代理"的标记
                return "NEED_PROXY"
            elif response.status == 404:
                print(f"404页面不存在:{url},跳过")
                return None
            elif response.status in (503, 504):
                print(f"5xx服务器错误:{url},稍后重试")
                await asyncio.sleep(5)  # 服务器可能暂时过载,等5秒再试
                return await fetch_with_status_handle(session, url, proxy)  # 递归重试
            else:
                print(f"未知状态码{response.status}{url}")
                return None
    except Exception as e:
        print(f"请求错误:{url}{e}")
        return None

3. 解析错误(HTML结构变化或XPath错误)

错误类型lxml.etree.XPathErrorIndexError(列表索引越界)、AttributeError(属性不存在)。
场景:网站更新页面结构(XPath失效)、部分页面数据缺失(比如有的新闻没有作者字段)。

解决方案

  • 解析前先判断元素是否存在,避免直接索引;
  • 用try-except包裹解析逻辑,单个字段解析失败不影响整体;
  • 记录解析失败的页面,用于后续检查XPath是否需要更新。
from lxml import etree

async def parse_news(html, url):
    """容错的新闻解析函数"""
    if not html:
        return None
    try:
        tree = etree.HTML(html)
        news = {}
        # 标题:先判断是否存在,不存在用"无标题"
        title_list = tree.xpath("//h1[@class='title']/text()")
        news["title"] = title_list[0].strip() if title_list else "无标题"
        
        # 作者:可能不存在,用"未知作者"
        author_list = tree.xpath("//div[@class='author']/text()")
        news["author"] = author_list[0].strip() if author_list else "未知作者"
        
        # 正文:可能为空,用"无内容"
        content_list = tree.xpath("//div[@class='content']//text()")
        news["content"] = "".join(content_list).strip() if content_list else "无内容"
        
        news["url"] = url
        return news
    except Exception as e:
        print(f"解析{url}失败:{e}")
        # 保存失败的HTML到文件,方便调试XPath
        with open(f"parse_error_{hash(url)}.html", "w", encoding="utf-8") as f:
            f.write(html)
        return None

4. 异步任务取消或超时(事件循环管理问题)

错误类型asyncio.CancelledError(任务被取消)、TimeoutError(事件循环超时)。
场景:手动取消任务、爬虫被强制终止、任务运行时间超过事件循环限制。

解决方案

  • 在任务中捕获CancelledError,做资源清理(如关闭连接);
  • 合理设置事件循环超时,避免任务无限运行。
async def long_running_task(session, url):
    """可能被取消的长任务,需处理CancelledError"""
    try:
        print(f"开始处理:{url}")
        # 模拟耗时操作
        await asyncio.sleep(30)
        # 实际请求逻辑
        html = await session.get(url)
        return await html.text()
    except asyncio.CancelledError:
        print(f"任务{url}被取消,清理资源...")
        # 做必要的清理(如关闭临时文件、记录中断状态)
        return None
    except Exception as e:
        print(f"任务错误:{e}")
        return None

# 测试任务取消
async def main():
    session = aiohttp.ClientSession()
    task = asyncio.create_task(long_running_task(session, "https://example.com"))
    # 5秒后取消任务(模拟超时)
    await asyncio.sleep(5)
    task.cancel()
    try:
        await task  # 等待任务结束,若被取消会抛出CancelledError
    except asyncio.CancelledError:
        print("任务已被取消")
    await session.close()

asyncio.run(main())

5. 会话(Session)资源泄露(内存暴涨的元凶)

错误类型:无直接错误,但爬虫运行一段时间后内存暴涨、请求变慢。
场景:频繁创建ClientSession而不关闭(每个Session会占用TCP连接和内存);请求未正常关闭(如异常时未释放连接)。

解决方案

  • 全局复用一个ClientSession(而非每个请求创建新Session);
  • async with管理Session生命周期,确保自动关闭;
  • 限制Session的连接池大小,避免连接过多。
async def safe_crawl():
    # 全局只创建一个Session,用async with确保退出时关闭
    async with aiohttp.ClientSession(
        connector=aiohttp.TCPConnector(
            limit=50,  # 限制连接池大小(默认是100,根据并发调整)
            limit_per_host=20  # 每个主机的最大连接数,避免对单一网站请求过密
        )
    ) as session:
        # 所有请求复用这个session
        urls = [f"https://example.com/page{i}" for i in range(100)]
        tasks = [fetch_with_retry(session, url) for url in urls]
        await asyncio.gather(*tasks)
    # 离开async with块后,session自动关闭,释放所有连接

6. 数据库异步写入错误(存储瓶颈)

错误类型aiomysql.OperationalError(连接失败)、asyncio.TimeoutError(写入超时)。
场景:数据库连接池耗尽、MySQL服务器宕机、写入语句错误(字段不匹配)。

解决方案

  • 数据库操作加超时控制;
  • 连接池参数合理配置(最小/最大连接数);
  • 写入失败时重试(避免数据丢失)。
import aiomysql

async def init_db_pool():
    """初始化数据库连接池,设置合理参数"""
    return await aiomysql.create_pool(
        host="localhost",
        user="root",
        password="123456",
        db="news_db",
        minsize=5,  # 最小连接数(启动时创建)
        maxsize=20,  # 最大连接数(避免连接过多拖垮数据库)
        connect_timeout=10,  # 连接超时10秒
        charset="utf8mb4"
    )

async def save_news(pool, news):
    """带重试的异步存储函数"""
    if not news:
        return
    max_retry = 3
    for retry in range(max_retry):
        try:
            async with pool.acquire() as conn:  # 从连接池获取连接
                async with conn.cursor() as cur:
                    sql = """
                    INSERT INTO news (url, title, author, content)
                    VALUES (%s, %s, %s, %s)
                    ON DUPLICATE KEY UPDATE title=%s, author=%s, content=%s
                    """
                    params = (
                        news["url"], news["title"], news["author"], news["content"],
                        news["title"], news["author"], news["content"]
                    )
                    await cur.execute(sql, params)
                    await conn.commit()
                    return True
        except aiomysql.OperationalError as e:
            # 连接错误(如连接池耗尽、数据库宕机)
            if retry < max_retry - 1:
                wait_time = 2 ** retry
                print(f"存储重试{retry+1}次:{news['url']},错误:{e},等待{wait_time}秒")
                await asyncio.sleep(wait_time)
            else:
                print(f"存储失败:{news['url']},错误:{e}")
                # 记录到失败文件,后续手动补存
                with open("save_failed.txt", "a") as f:
                    f.write(f"{news['url']}\t{news['title']}\n")
                return False
        except Exception as e:
            # SQL语法错误等,不重试(重试也会失败)
            print(f"存储错误(不重试):{news['url']}{e}")
            return False

7. 并发控制不当(被反爬或任务阻塞)

错误类型:无直接错误,但出现大量403、503,或任务堆积不执行。
场景Semaphore设置过大(并发过高被封);Semaphore设置过小(效率低,任务堆积)。

解决方案

  • 动态调整并发数(根据网站响应状态);
  • 结合请求成功率动态控制(成功率低则降低并发)。
async def dynamic_concurrent_crawl(urls):
    # 初始并发数
    initial_concurrent = 20
    semaphore = asyncio.Semaphore(initial_concurrent)
    success_count = 0
    fail_count = 0
    
    async with aiohttp.ClientSession() as session:
        async def fetch_with_dynamic_control(url):
            nonlocal success_count, fail_count
            async with semaphore:
                html = await fetch_with_retry(session, url)
                if html:
                    success_count += 1
                else:
                    fail_count += 1
                # 动态调整并发:成功率低于50%,降低并发
                total = success_count + fail_count
                if total > 100:  # 至少统计100个请求再调整
                    success_rate = success_count / total
                    if success_rate < 0.5 and semaphore._value > 5:
                        # 成功率低,并发减5(最低5)
                        semaphore._value = max(5, semaphore._value - 5)
                        print(f"成功率{success_rate:.2f},降低并发至{semaphore._value}")
                    elif success_rate > 0.8 and semaphore._value < 50:
                        # 成功率高,并发加5(最高50)
                        semaphore._value = min(50, semaphore._value + 5)
                        print(f"成功率{success_rate:.2f},提高并发至{semaphore._value}")
                return html
        
        tasks = [fetch_with_dynamic_control(url) for url in urls]
        await asyncio.gather(*tasks)

8. 编码错误(页面乱码导致解析失败)

错误类型UnicodeDecodeError(解码失败)。
场景:网站响应的编码不是UTF-8(如GBK、ISO-8859-1),response.text()默认用UTF-8解码导致乱码。

解决方案

  • 手动指定编码(从响应头或HTML meta标签中提取);
  • 先用response.read()获取字节,再尝试多种编码解码。
async def fetch_with_encoding(session, url):
    try:
        async with session.get(url, timeout=10) as response:
            # 方法1:从响应头获取编码(可能不准)
            content_type = response.headers.get("Content-Type", "")
            encoding = response.charset if response.charset else "utf-8"
            if "gbk" in content_type.lower():
                encoding = "gbk"
            
            # 方法2:读取字节,尝试解码(解决编码错误)
            content = await response.read()
            try:
                html = content.decode(encoding)
            except UnicodeDecodeError:
                # 尝试其他常见编码
                for enc in ["gbk", "gb2312", "utf-8", "iso-8859-1"]:
                    try:
                        html = content.decode(enc)
                        print(f"用{enc}解码成功:{url}")
                        break
                    except:
                        continue
                else:
                    print(f"所有编码解码失败:{url}")
                    return None
            return html
    except Exception as e:
        print(f"编码处理错误:{url}{e}")
        return None

三、错误处理的3个最佳实践

  1. 日志分级,而非简单print
    logging模块替代print,区分DEBUG(调试信息)、INFO(正常运行)、WARNING(轻微错误)、ERROR(严重错误),方便后期分析问题:

    import logging
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
        filename="crawler.log"  # 日志写入文件
    )
    # 使用:
    logging.info(f"成功爬取{url}")
    logging.error(f"解析失败{url}{e}", exc_info=True)  # exc_info=True记录堆栈信息
    
  2. 错误隔离,单个任务失败不影响整体
    asyncio.gatherreturn_exceptions=True参数,让失败的任务返回异常对象而非终止整个事件循环:

    async def main():
        tasks = [fetch_with_retry(session, url) for url in urls]
        # return_exceptions=True:任务抛出的异常会被当作结果返回,不中断其他任务
        results = await asyncio.gather(*tasks, return_exceptions=True)
        # 处理异常结果
        for url, result in zip(urls, results):
            if isinstance(result, Exception):
                logging.error(f"任务{url}失败:{result}")
            else:
                # 处理正常结果
                pass
    
  3. 资源清理,确保程序优雅退出
    try...finallyasync with确保Session、数据库连接池等资源在异常时也能关闭:

    async def crawl_with_cleanup():
        session = None
        db_pool = None
        try:
            session = aiohttp.ClientSession()
            db_pool = await init_db_pool()
            # 爬取逻辑
            await fetch_and_save(session, db_pool, urls)
        except Exception as e:
            logging.error(f"爬虫整体错误:{e}")
        finally:
            # 确保资源关闭
            if session:
                await session.close()
            if db_pool:
                db_pool.close()
                await db_pool.wait_closed()
            logging.info("所有资源已清理,爬虫退出")
    

总结:异步爬虫错误处理的核心是“容错+可控”

处理aiohttp异步爬虫的错误,关键不是“消灭所有错误”(网络环境复杂,不可能完全避免),而是做到:

  • 容错:单个错误不崩溃,可恢复错误能重试;
  • 可控:错误有记录(方便排查),资源不泄露(保证长期运行);
  • 自适应:根据错误情况动态调整策略(如并发数、代理)。

新手入门时,建议先实现基础的重试机制和日志记录,再逐步完善分级处理和动态调整。遇到具体错误时,先通过日志定位错误类型(是网络、解析还是存储问题),再针对性解决——大部分错误都能通过“重试+容错+资源管理”这三板斧搞定。

如果遇到特殊错误(比如aiohttp的低层SSL错误),可以结合具体错误信息搜索解决方案,或在评论区交流~

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐