如何处理异步爬虫+aiohttp实战中可能出现的错误和异常?
容错:单个错误不崩溃,可恢复错误能重试;可控:错误有记录(方便排查),资源不泄露(保证长期运行);自适应:根据错误情况动态调整策略(如并发数、代理)。新手入门时,建议先实现基础的重试机制和日志记录,再逐步完善分级处理和动态调整。遇到具体错误时,先通过日志定位错误类型(是网络、解析还是存储问题),再针对性解决——大部分错误都能通过“重试+容错+资源管理”这三板斧搞定。如果遇到特殊错误(比如aioht
在异步爬虫(尤其是aiohttp)的实战中,错误和异常处理是保证爬虫稳定性的核心——网络波动、网站反爬、代码逻辑疏漏都可能导致各种问题。如果处理不当,轻则丢失部分数据,重则整个爬虫崩溃。
本文从实战角度,拆解aiohttp异步爬虫中8类高频错误的成因和解决方案,附完整代码示例,帮你实现“遇到错误不崩溃、可恢复错误能重试、关键错误有日志”的健壮爬虫。
一、先明确:异步爬虫的错误处理和同步有何不同?
异步爬虫的错误处理比同步(requests)更复杂,核心差异在于:
- 错误传播方式:同步爬虫的错误在当前线程直接抛出,而异步任务(Task)的错误会被包装在
Future对象中,若不主动处理,可能“静默失败”(爬虫看似在运行,实际数据没爬全); - 资源依赖:异步爬虫依赖事件循环、会话(Session)、数据库连接池等共享资源,错误可能导致资源泄露(比如未关闭的连接占用内存);
- 并发影响:单个任务的错误若处理不当,可能阻塞整个事件循环(比如未捕获的异常导致所有任务挂起)。
二、8类高频错误及解决方案(附代码)
1. 网络连接错误(最常见)
错误类型:aiohttp.ClientConnectionError(包含连接超时、DNS解析失败、连接被拒绝等)。
场景:目标网站暂时不可用、网络波动、本地网络断开。
解决方案:
- 增加重试机制(对临时网络问题有效);
- 限制重试次数(避免无限重试浪费资源);
- 记录失败URL,方便后续补爬。
import aiohttp
import asyncio
from aiohttp import ClientConnectionError, ClientTimeoutError
async def fetch_with_retry(session, url, max_retry=3):
"""带重试的异步请求函数"""
for retry in range(max_retry):
try:
# 设置超时(避免请求一直挂着)
timeout = aiohttp.ClientTimeout(total=10) # 总超时10秒
async with session.get(url, timeout=timeout) as response:
print(f"请求成功:{url},状态码:{response.status}")
return await response.text()
except (ClientConnectionError, ClientTimeoutError) as e:
# 网络错误/超时,重试
if retry < max_retry - 1:
wait_time = 2** retry # 指数退避(1s, 2s, 4s...),减少服务器压力
print(f"第{retry+1}次重试{url},错误:{e},等待{wait_time}秒...")
await asyncio.sleep(wait_time) # 必须用asyncio.sleep,否则阻塞事件循环
else:
print(f"重试{max_retry}次失败:{url},错误:{e}")
# 记录失败URL到文件,方便后续补爬
with open("failed_urls.txt", "a") as f:
f.write(url + "\n")
return None
except Exception as e:
# 其他未知错误,不重试
print(f"请求{url}发生未知错误:{e}")
return None
2. HTTP状态码错误(反爬或资源不存在)
错误类型:非200状态码(如403拒绝访问、404页面不存在、503服务器过载)。
场景:IP被封(403)、目标URL无效(404)、网站服务器繁忙(503)。
解决方案:
- 按状态码分类处理(403换代理,404跳过,503重试);
- 对403等反爬状态,结合代理池动态切换IP。
async def fetch_with_status_handle(session, url, proxy=None):
"""处理HTTP状态码的请求函数"""
try:
async with session.get(url, proxy=proxy, timeout=10) as response:
if response.status == 200:
return await response.text()
elif response.status == 403:
print(f"403被拒绝:{url},可能IP被封,尝试换代理...")
# 若有代理,返回"需要换代理"的标记
return "NEED_PROXY"
elif response.status == 404:
print(f"404页面不存在:{url},跳过")
return None
elif response.status in (503, 504):
print(f"5xx服务器错误:{url},稍后重试")
await asyncio.sleep(5) # 服务器可能暂时过载,等5秒再试
return await fetch_with_status_handle(session, url, proxy) # 递归重试
else:
print(f"未知状态码{response.status}:{url}")
return None
except Exception as e:
print(f"请求错误:{url},{e}")
return None
3. 解析错误(HTML结构变化或XPath错误)
错误类型:lxml.etree.XPathError、IndexError(列表索引越界)、AttributeError(属性不存在)。
场景:网站更新页面结构(XPath失效)、部分页面数据缺失(比如有的新闻没有作者字段)。
解决方案:
- 解析前先判断元素是否存在,避免直接索引;
- 用try-except包裹解析逻辑,单个字段解析失败不影响整体;
- 记录解析失败的页面,用于后续检查XPath是否需要更新。
from lxml import etree
async def parse_news(html, url):
"""容错的新闻解析函数"""
if not html:
return None
try:
tree = etree.HTML(html)
news = {}
# 标题:先判断是否存在,不存在用"无标题"
title_list = tree.xpath("//h1[@class='title']/text()")
news["title"] = title_list[0].strip() if title_list else "无标题"
# 作者:可能不存在,用"未知作者"
author_list = tree.xpath("//div[@class='author']/text()")
news["author"] = author_list[0].strip() if author_list else "未知作者"
# 正文:可能为空,用"无内容"
content_list = tree.xpath("//div[@class='content']//text()")
news["content"] = "".join(content_list).strip() if content_list else "无内容"
news["url"] = url
return news
except Exception as e:
print(f"解析{url}失败:{e}")
# 保存失败的HTML到文件,方便调试XPath
with open(f"parse_error_{hash(url)}.html", "w", encoding="utf-8") as f:
f.write(html)
return None
4. 异步任务取消或超时(事件循环管理问题)
错误类型:asyncio.CancelledError(任务被取消)、TimeoutError(事件循环超时)。
场景:手动取消任务、爬虫被强制终止、任务运行时间超过事件循环限制。
解决方案:
- 在任务中捕获
CancelledError,做资源清理(如关闭连接); - 合理设置事件循环超时,避免任务无限运行。
async def long_running_task(session, url):
"""可能被取消的长任务,需处理CancelledError"""
try:
print(f"开始处理:{url}")
# 模拟耗时操作
await asyncio.sleep(30)
# 实际请求逻辑
html = await session.get(url)
return await html.text()
except asyncio.CancelledError:
print(f"任务{url}被取消,清理资源...")
# 做必要的清理(如关闭临时文件、记录中断状态)
return None
except Exception as e:
print(f"任务错误:{e}")
return None
# 测试任务取消
async def main():
session = aiohttp.ClientSession()
task = asyncio.create_task(long_running_task(session, "https://example.com"))
# 5秒后取消任务(模拟超时)
await asyncio.sleep(5)
task.cancel()
try:
await task # 等待任务结束,若被取消会抛出CancelledError
except asyncio.CancelledError:
print("任务已被取消")
await session.close()
asyncio.run(main())
5. 会话(Session)资源泄露(内存暴涨的元凶)
错误类型:无直接错误,但爬虫运行一段时间后内存暴涨、请求变慢。
场景:频繁创建ClientSession而不关闭(每个Session会占用TCP连接和内存);请求未正常关闭(如异常时未释放连接)。
解决方案:
- 全局复用一个
ClientSession(而非每个请求创建新Session); - 用
async with管理Session生命周期,确保自动关闭; - 限制Session的连接池大小,避免连接过多。
async def safe_crawl():
# 全局只创建一个Session,用async with确保退出时关闭
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(
limit=50, # 限制连接池大小(默认是100,根据并发调整)
limit_per_host=20 # 每个主机的最大连接数,避免对单一网站请求过密
)
) as session:
# 所有请求复用这个session
urls = [f"https://example.com/page{i}" for i in range(100)]
tasks = [fetch_with_retry(session, url) for url in urls]
await asyncio.gather(*tasks)
# 离开async with块后,session自动关闭,释放所有连接
6. 数据库异步写入错误(存储瓶颈)
错误类型:aiomysql.OperationalError(连接失败)、asyncio.TimeoutError(写入超时)。
场景:数据库连接池耗尽、MySQL服务器宕机、写入语句错误(字段不匹配)。
解决方案:
- 数据库操作加超时控制;
- 连接池参数合理配置(最小/最大连接数);
- 写入失败时重试(避免数据丢失)。
import aiomysql
async def init_db_pool():
"""初始化数据库连接池,设置合理参数"""
return await aiomysql.create_pool(
host="localhost",
user="root",
password="123456",
db="news_db",
minsize=5, # 最小连接数(启动时创建)
maxsize=20, # 最大连接数(避免连接过多拖垮数据库)
connect_timeout=10, # 连接超时10秒
charset="utf8mb4"
)
async def save_news(pool, news):
"""带重试的异步存储函数"""
if not news:
return
max_retry = 3
for retry in range(max_retry):
try:
async with pool.acquire() as conn: # 从连接池获取连接
async with conn.cursor() as cur:
sql = """
INSERT INTO news (url, title, author, content)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE title=%s, author=%s, content=%s
"""
params = (
news["url"], news["title"], news["author"], news["content"],
news["title"], news["author"], news["content"]
)
await cur.execute(sql, params)
await conn.commit()
return True
except aiomysql.OperationalError as e:
# 连接错误(如连接池耗尽、数据库宕机)
if retry < max_retry - 1:
wait_time = 2 ** retry
print(f"存储重试{retry+1}次:{news['url']},错误:{e},等待{wait_time}秒")
await asyncio.sleep(wait_time)
else:
print(f"存储失败:{news['url']},错误:{e}")
# 记录到失败文件,后续手动补存
with open("save_failed.txt", "a") as f:
f.write(f"{news['url']}\t{news['title']}\n")
return False
except Exception as e:
# SQL语法错误等,不重试(重试也会失败)
print(f"存储错误(不重试):{news['url']},{e}")
return False
7. 并发控制不当(被反爬或任务阻塞)
错误类型:无直接错误,但出现大量403、503,或任务堆积不执行。
场景:Semaphore设置过大(并发过高被封);Semaphore设置过小(效率低,任务堆积)。
解决方案:
- 动态调整并发数(根据网站响应状态);
- 结合请求成功率动态控制(成功率低则降低并发)。
async def dynamic_concurrent_crawl(urls):
# 初始并发数
initial_concurrent = 20
semaphore = asyncio.Semaphore(initial_concurrent)
success_count = 0
fail_count = 0
async with aiohttp.ClientSession() as session:
async def fetch_with_dynamic_control(url):
nonlocal success_count, fail_count
async with semaphore:
html = await fetch_with_retry(session, url)
if html:
success_count += 1
else:
fail_count += 1
# 动态调整并发:成功率低于50%,降低并发
total = success_count + fail_count
if total > 100: # 至少统计100个请求再调整
success_rate = success_count / total
if success_rate < 0.5 and semaphore._value > 5:
# 成功率低,并发减5(最低5)
semaphore._value = max(5, semaphore._value - 5)
print(f"成功率{success_rate:.2f},降低并发至{semaphore._value}")
elif success_rate > 0.8 and semaphore._value < 50:
# 成功率高,并发加5(最高50)
semaphore._value = min(50, semaphore._value + 5)
print(f"成功率{success_rate:.2f},提高并发至{semaphore._value}")
return html
tasks = [fetch_with_dynamic_control(url) for url in urls]
await asyncio.gather(*tasks)
8. 编码错误(页面乱码导致解析失败)
错误类型:UnicodeDecodeError(解码失败)。
场景:网站响应的编码不是UTF-8(如GBK、ISO-8859-1),response.text()默认用UTF-8解码导致乱码。
解决方案:
- 手动指定编码(从响应头或HTML meta标签中提取);
- 先用
response.read()获取字节,再尝试多种编码解码。
async def fetch_with_encoding(session, url):
try:
async with session.get(url, timeout=10) as response:
# 方法1:从响应头获取编码(可能不准)
content_type = response.headers.get("Content-Type", "")
encoding = response.charset if response.charset else "utf-8"
if "gbk" in content_type.lower():
encoding = "gbk"
# 方法2:读取字节,尝试解码(解决编码错误)
content = await response.read()
try:
html = content.decode(encoding)
except UnicodeDecodeError:
# 尝试其他常见编码
for enc in ["gbk", "gb2312", "utf-8", "iso-8859-1"]:
try:
html = content.decode(enc)
print(f"用{enc}解码成功:{url}")
break
except:
continue
else:
print(f"所有编码解码失败:{url}")
return None
return html
except Exception as e:
print(f"编码处理错误:{url},{e}")
return None
三、错误处理的3个最佳实践
-
日志分级,而非简单print
用logging模块替代print,区分DEBUG(调试信息)、INFO(正常运行)、WARNING(轻微错误)、ERROR(严重错误),方便后期分析问题:import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", filename="crawler.log" # 日志写入文件 ) # 使用: logging.info(f"成功爬取{url}") logging.error(f"解析失败{url}:{e}", exc_info=True) # exc_info=True记录堆栈信息 -
错误隔离,单个任务失败不影响整体
用asyncio.gather的return_exceptions=True参数,让失败的任务返回异常对象而非终止整个事件循环:async def main(): tasks = [fetch_with_retry(session, url) for url in urls] # return_exceptions=True:任务抛出的异常会被当作结果返回,不中断其他任务 results = await asyncio.gather(*tasks, return_exceptions=True) # 处理异常结果 for url, result in zip(urls, results): if isinstance(result, Exception): logging.error(f"任务{url}失败:{result}") else: # 处理正常结果 pass -
资源清理,确保程序优雅退出
用try...finally或async with确保Session、数据库连接池等资源在异常时也能关闭:async def crawl_with_cleanup(): session = None db_pool = None try: session = aiohttp.ClientSession() db_pool = await init_db_pool() # 爬取逻辑 await fetch_and_save(session, db_pool, urls) except Exception as e: logging.error(f"爬虫整体错误:{e}") finally: # 确保资源关闭 if session: await session.close() if db_pool: db_pool.close() await db_pool.wait_closed() logging.info("所有资源已清理,爬虫退出")
总结:异步爬虫错误处理的核心是“容错+可控”
处理aiohttp异步爬虫的错误,关键不是“消灭所有错误”(网络环境复杂,不可能完全避免),而是做到:
- 容错:单个错误不崩溃,可恢复错误能重试;
- 可控:错误有记录(方便排查),资源不泄露(保证长期运行);
- 自适应:根据错误情况动态调整策略(如并发数、代理)。
新手入门时,建议先实现基础的重试机制和日志记录,再逐步完善分级处理和动态调整。遇到具体错误时,先通过日志定位错误类型(是网络、解析还是存储问题),再针对性解决——大部分错误都能通过“重试+容错+资源管理”这三板斧搞定。
如果遇到特殊错误(比如aiohttp的低层SSL错误),可以结合具体错误信息搜索解决方案,或在评论区交流~
更多推荐



所有评论(0)