Python异步编程实战:从asyncio入门到并发爬虫
当你的Python程序需要同时处理大量I/O操作(网络请求、文件读写、数据库查询)时,传统的同步代码会让程序大部分时间都在"等待"中度过。异步编程就是解决这个问题的利器。本文将从asyncio的基础概念讲起,逐步深入到实际的并发爬虫项目,帮你真正掌握Python异步编程。适用场景:I/O密集型任务(网络请求、文件操作、数据库查询)不适用场景:CPU密集型任务(用multiprocessing)关键
·
前言
当你的Python程序需要同时处理大量I/O操作(网络请求、文件读写、数据库查询)时,传统的同步代码会让程序大部分时间都在"等待"中度过。异步编程就是解决这个问题的利器。
本文将从asyncio的基础概念讲起,逐步深入到实际的并发爬虫项目,帮你真正掌握Python异步编程。
同步 vs 异步:一个直观的对比
先看一个简单的例子,假设我们要请求3个网页,每个请求需要1秒:
用代码来验证这个差异:
import time
import asyncio
import aiohttp
# 同步版本
def sync_fetch():
import requests
urls = ['https://httpbin.org/delay/1'] * 3
start = time.time()
for url in urls:
requests.get(url)
print(f"同步耗时: {time.time() - start:.2f}秒")
# 异步版本
async def async_fetch():
urls = ['https://httpbin.org/delay/1'] * 3
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
await asyncio.gather(*tasks)
print(f"异步耗时: {time.time() - start:.2f}秒")
# 运行测试
sync_fetch() # 输出: 同步耗时: 3.xx秒
asyncio.run(async_fetch()) # 输出: 异步耗时: 1.xx秒
asyncio核心概念
协程(Coroutine)
协程是异步编程的基本单位,用async def定义:
import asyncio
# 定义一个协程
async def say_hello(name, delay):
await asyncio.sleep(delay) # 模拟I/O操作
print(f"Hello, {name}!")
return f"Done: {name}"
# 运行协程
async def main():
# 方式1:直接await
result = await say_hello("Alice", 1)
print(result)
# 方式2:创建任务并发执行
task1 = asyncio.create_task(say_hello("Bob", 2))
task2 = asyncio.create_task(say_hello("Charlie", 1))
# 等待所有任务完成
results = await asyncio.gather(task1, task2)
print(results)
asyncio.run(main())
事件循环(Event Loop)
事件循环是asyncio的核心,负责调度和执行协程:
import asyncio
async def task_a():
print("Task A: 开始")
await asyncio.sleep(2)
print("Task A: 完成")
async def task_b():
print("Task B: 开始")
await asyncio.sleep(1)
print("Task B: 完成")
async def main():
# 并发执行两个任务
await asyncio.gather(task_a(), task_b())
asyncio.run(main())
# 输出顺序:
# Task A: 开始
# Task B: 开始
# Task B: 完成(1秒后)
# Task A: 完成(2秒后)
常用API速查
| API | 用途 | 示例 |
|---|---|---|
asyncio.run() |
运行主协程 | asyncio.run(main()) |
await |
等待协程完成 | await some_coroutine() |
asyncio.create_task() |
创建并发任务 | task = asyncio.create_task(coro()) |
asyncio.gather() |
并发执行多个协程 | await asyncio.gather(coro1(), coro2()) |
asyncio.wait() |
等待任务集合 | done, pending = await asyncio.wait(tasks) |
asyncio.sleep() |
异步睡眠 | await asyncio.sleep(1) |
asyncio.wait_for() |
带超时的等待 | await asyncio.wait_for(coro(), timeout=5) |
asyncio.Semaphore |
限制并发数 | sem = asyncio.Semaphore(10) |
实战:异步并发爬虫
下面我们来实现一个完整的异步爬虫,包含并发控制、错误处理、进度显示等功能。
项目结构
基础版本
import asyncio
import aiohttp
from typing import List, Dict
import time
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
"""获取单个URL的内容"""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
content = await response.text()
return {
'url': url,
'status': response.status,
'length': len(content),
'success': True
}
except asyncio.TimeoutError:
return {'url': url, 'error': 'Timeout', 'success': False}
except Exception as e:
return {'url': url, 'error': str(e), 'success': False}
async def crawl_urls(urls: List[str]) -> List[Dict]:
"""并发爬取多个URL"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 测试
urls = [
'https://www.python.org',
'https://docs.python.org',
'https://pypi.org',
'https://github.com',
'https://stackoverflow.com'
]
start = time.time()
results = asyncio.run(crawl_urls(urls))
print(f"爬取 {len(urls)} 个URL,耗时: {time.time() - start:.2f}秒")
for r in results:
if r['success']:
print(f"✓ {r['url']} - {r['status']} - {r['length']} bytes")
else:
print(f"✗ {r['url']} - {r['error']}")
进阶版本:带并发限制
当URL数量很大时,需要限制并发数,避免被封IP或耗尽系统资源:
import asyncio
import aiohttp
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class CrawlResult:
url: str
status: Optional[int] = None
content_length: int = 0
error: Optional[str] = None
elapsed: float = 0.0
@property
def success(self) -> bool:
return self.error is None
class AsyncCrawler:
def __init__(self, max_concurrent: int = 10, timeout: int = 30):
self.max_concurrent = max_concurrent
self.timeout = timeout
self.semaphore: Optional[asyncio.Semaphore] = None
self.results: List[CrawlResult] = []
self.completed = 0
self.total = 0
async def fetch_one(self, session: aiohttp.ClientSession, url: str) -> CrawlResult:
"""带并发限制的单URL获取"""
async with self.semaphore: # 限制并发数
start = datetime.now()
try:
async with session.get(
url,
timeout=aiohttp.ClientTimeout(total=self.timeout),
headers={'User-Agent': 'Mozilla/5.0 (compatible; AsyncCrawler/1.0)'}
) as response:
content = await response.text()
elapsed = (datetime.now() - start).total_seconds()
self.completed += 1
self._print_progress()
return CrawlResult(
url=url,
status=response.status,
content_length=len(content),
elapsed=elapsed
)
except asyncio.TimeoutError:
return CrawlResult(url=url, error='Timeout')
except aiohttp.ClientError as e:
return CrawlResult(url=url, error=f'ClientError: {e}')
except Exception as e:
return CrawlResult(url=url, error=str(e))
def _print_progress(self):
"""打印进度"""
percent = (self.completed / self.total) * 100
print(f"\r进度: {self.completed}/{self.total} ({percent:.1f}%)", end='', flush=True)
async def crawl(self, urls: List[str]) -> List[CrawlResult]:
"""执行爬取"""
self.total = len(urls)
self.completed = 0
self.semaphore = asyncio.Semaphore(self.max_concurrent)
connector = aiohttp.TCPConnector(limit=self.max_concurrent, limit_per_host=5)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [self.fetch_one(session, url) for url in urls]
self.results = await asyncio.gather(*tasks)
print() # 换行
return self.results
def print_summary(self):
"""打印统计摘要"""
success = sum(1 for r in self.results if r.success)
failed = len(self.results) - success
total_bytes = sum(r.content_length for r in self.results)
avg_time = sum(r.elapsed for r in self.results if r.success) / max(success, 1)
print(f"\n{'='*50}")
print(f"爬取完成统计:")
print(f" 总数: {len(self.results)}")
print(f" 成功: {success}")
print(f" 失败: {failed}")
print(f" 总数据量: {total_bytes / 1024:.2f} KB")
print(f" 平均响应时间: {avg_time:.2f}秒")
print(f"{'='*50}")
# 使用示例
async def main():
urls = [
'https://www.python.org',
'https://docs.python.org/3/',
'https://pypi.org',
'https://github.com',
'https://stackoverflow.com',
'https://www.google.com',
'https://www.baidu.com',
'https://httpbin.org/get',
'https://httpbin.org/headers',
'https://httpbin.org/ip',
]
crawler = AsyncCrawler(max_concurrent=5, timeout=15)
start = time.time()
results = await crawler.crawl(urls)
total_time = time.time() - start
print(f"\n总耗时: {total_time:.2f}秒")
crawler.print_summary()
# 打印失败的URL
failed = [r for r in results if not r.success]
if failed:
print("\n失败列表:")
for r in failed:
print(f" {r.url}: {r.error}")
if __name__ == '__main__':
import time
asyncio.run(main())
生产级版本:带重试和代理
import asyncio
import aiohttp
import random
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class CrawlConfig:
"""爬虫配置"""
max_concurrent: int = 10 # 最大并发数
timeout: int = 30 # 超时时间(秒)
max_retries: int = 3 # 最大重试次数
retry_delay: float = 1.0 # 重试延迟(秒)
user_agents: List[str] = field(default_factory=lambda: [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
])
proxies: List[str] = field(default_factory=list) # 代理列表
class RetryableCrawler:
def __init__(self, config: CrawlConfig = None):
self.config = config or CrawlConfig()
self.semaphore: Optional[asyncio.Semaphore] = None
self.stats = {'success': 0, 'failed': 0, 'retried': 0}
def _get_random_headers(self) -> Dict:
return {
'User-Agent': random.choice(self.config.user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
}
def _get_random_proxy(self) -> Optional[str]:
if self.config.proxies:
return random.choice(self.config.proxies)
return None
async def fetch_with_retry(
self,
session: aiohttp.ClientSession,
url: str,
callback: Optional[Callable] = None
) -> Dict:
"""带重试机制的请求"""
last_error = None
for attempt in range(self.config.max_retries):
async with self.semaphore:
try:
proxy = self._get_random_proxy()
async with session.get(
url,
timeout=aiohttp.ClientTimeout(total=self.config.timeout),
headers=self._get_random_headers(),
proxy=proxy
) as response:
content = await response.text()
# 检查状态码
if response.status >= 400:
raise aiohttp.ClientResponseError(
response.request_info,
response.history,
status=response.status
)
self.stats['success'] += 1
result = {
'url': url,
'status': response.status,
'content': content,
'length': len(content),
'success': True
}
# 执行回调
if callback:
await callback(result)
return result
except (asyncio.TimeoutError, aiohttp.ClientError) as e:
last_error = str(e)
self.stats['retried'] += 1
if attempt < self.config.max_retries - 1:
delay = self.config.retry_delay * (attempt + 1)
logger.warning(f"重试 {url} (第{attempt + 1}次), 等待{delay}秒...")
await asyncio.sleep(delay)
except Exception as e:
last_error = str(e)
break
self.stats['failed'] += 1
return {'url': url, 'error': last_error, 'success': False}
async def crawl(
self,
urls: List[str],
callback: Optional[Callable] = None
) -> List[Dict]:
"""执行爬取"""
self.semaphore = asyncio.Semaphore(self.config.max_concurrent)
self.stats = {'success': 0, 'failed': 0, 'retried': 0}
connector = aiohttp.TCPConnector(
limit=self.config.max_concurrent,
limit_per_host=5,
enable_cleanup_closed=True
)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [self.fetch_with_retry(session, url, callback) for url in urls]
results = await asyncio.gather(*tasks)
logger.info(f"爬取完成: 成功={self.stats['success']}, "
f"失败={self.stats['failed']}, 重试={self.stats['retried']}")
return results
# 使用示例
async def process_result(result: Dict):
"""处理爬取结果的回调函数"""
if result['success']:
# 这里可以解析HTML、存储数据等
logger.info(f"处理: {result['url']} ({result['length']} bytes)")
async def main():
config = CrawlConfig(
max_concurrent=5,
timeout=15,
max_retries=3,
retry_delay=1.0
)
crawler = RetryableCrawler(config)
urls = [
'https://httpbin.org/get',
'https://httpbin.org/headers',
'https://httpbin.org/ip',
'https://httpbin.org/user-agent',
'https://httpbin.org/status/500', # 会失败
]
results = await crawler.crawl(urls, callback=process_result)
# 打印结果
for r in results:
status = '✓' if r['success'] else '✗'
info = f"{r.get('status', 'N/A')}" if r['success'] else r.get('error', 'Unknown')
print(f"{status} {r['url']}: {info}")
if __name__ == '__main__':
asyncio.run(main())
异步编程常见陷阱
1. 忘记await
# ❌ 错误:忘记await,协程不会执行
async def wrong():
asyncio.sleep(1) # 这行什么都不会做
print("done")
# ✓ 正确
async def correct():
await asyncio.sleep(1)
print("done")
2. 在异步函数中使用同步阻塞操作
import time
# ❌ 错误:time.sleep会阻塞整个事件循环
async def wrong():
time.sleep(1) # 阻塞!
# ✓ 正确
async def correct():
await asyncio.sleep(1) # 非阻塞
# 如果必须调用同步阻塞函数,使用run_in_executor
async def call_blocking():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, time.sleep, 1)
3. 任务异常被吞掉
# ❌ 异常可能被忽略
async def may_fail():
raise ValueError("出错了")
async def wrong():
asyncio.create_task(may_fail()) # 异常不会被捕获
await asyncio.sleep(1)
# ✓ 正确:确保处理任务异常
async def correct():
task = asyncio.create_task(may_fail())
try:
await task
except ValueError as e:
print(f"捕获异常: {e}")
4. gather vs wait的区别
async def task1():
await asyncio.sleep(1)
return "task1"
async def task2():
await asyncio.sleep(0.5)
raise ValueError("task2 failed")
async def task3():
await asyncio.sleep(0.8)
return "task3"
# gather: 默认一个失败全部失败
async def use_gather():
try:
results = await asyncio.gather(task1(), task2(), task3())
except ValueError:
print("有任务失败")
# gather + return_exceptions: 收集所有结果(包括异常)
async def use_gather_safe():
results = await asyncio.gather(task1(), task2(), task3(), return_exceptions=True)
for r in results:
if isinstance(r, Exception):
print(f"异常: {r}")
else:
print(f"结果: {r}")
# wait: 更灵活的控制
async def use_wait():
tasks = [
asyncio.create_task(task1()),
asyncio.create_task(task2()),
asyncio.create_task(task3()),
]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"完成: {len(done)}, 待处理: {len(pending)}")
性能对比测试
import asyncio
import aiohttp
import requests
import time
from concurrent.futures import ThreadPoolExecutor
URL = 'https://httpbin.org/delay/1'
COUNT = 10
def sync_requests():
"""同步请求"""
start = time.time()
for _ in range(COUNT):
requests.get(URL)
return time.time() - start
def threaded_requests():
"""多线程请求"""
start = time.time()
with ThreadPoolExecutor(max_workers=COUNT) as executor:
list(executor.map(requests.get, [URL] * COUNT))
return time.time() - start
async def async_requests():
"""异步请求"""
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [session.get(URL) for _ in range(COUNT)]
await asyncio.gather(*tasks)
return time.time() - start
# 运行对比
print(f"同步: {sync_requests():.2f}秒")
print(f"多线程: {threaded_requests():.2f}秒")
print(f"异步: {asyncio.run(async_requests()):.2f}秒")
预期输出:
同步: 10.xx秒
多线程: 1.xx秒
异步: 1.xx秒
总结
异步编程的核心要点:
- 适用场景:I/O密集型任务(网络请求、文件操作、数据库查询)
- 不适用场景:CPU密集型任务(用multiprocessing)
- 关键概念:协程、事件循环、await、Task
- 常用库:aiohttp(HTTP)、aiomysql(MySQL)、aioredis(Redis)、aiofiles(文件)
异步编程有一定学习曲线,但一旦掌握,在处理高并发I/O场景时会非常高效。建议从简单的例子开始,逐步增加复杂度。
更多推荐



所有评论(0)