前言

当你的Python程序需要同时处理大量I/O操作(网络请求、文件读写、数据库查询)时,传统的同步代码会让程序大部分时间都在"等待"中度过。异步编程就是解决这个问题的利器。

本文将从asyncio的基础概念讲起,逐步深入到实际的并发爬虫项目,帮你真正掌握Python异步编程。

同步 vs 异步:一个直观的对比

先看一个简单的例子,假设我们要请求3个网页,每个请求需要1秒:

网站C 网站B 网站A 程序 网站C 网站B 网站A 程序 同步方式(总耗时3秒) 请求 响应(1秒) 请求 响应(1秒) 请求 响应(1秒)
网站C 网站B 网站A 程序 网站C 网站B 网站A 程序 异步方式(总耗时约1秒) 请求 请求 请求 响应 响应 响应

用代码来验证这个差异:

import time
import asyncio
import aiohttp

# 同步版本
def sync_fetch():
    import requests
    urls = ['https://httpbin.org/delay/1'] * 3
    start = time.time()
    for url in urls:
        requests.get(url)
    print(f"同步耗时: {time.time() - start:.2f}秒")

# 异步版本
async def async_fetch():
    urls = ['https://httpbin.org/delay/1'] * 3
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        await asyncio.gather(*tasks)
    print(f"异步耗时: {time.time() - start:.2f}秒")

# 运行测试
sync_fetch()        # 输出: 同步耗时: 3.xx秒
asyncio.run(async_fetch())  # 输出: 异步耗时: 1.xx秒

asyncio核心概念

协程(Coroutine)

协程是异步编程的基本单位,用async def定义:

import asyncio

# 定义一个协程
async def say_hello(name, delay):
    await asyncio.sleep(delay)  # 模拟I/O操作
    print(f"Hello, {name}!")
    return f"Done: {name}"

# 运行协程
async def main():
    # 方式1:直接await
    result = await say_hello("Alice", 1)
    print(result)
    
    # 方式2:创建任务并发执行
    task1 = asyncio.create_task(say_hello("Bob", 2))
    task2 = asyncio.create_task(say_hello("Charlie", 1))
    
    # 等待所有任务完成
    results = await asyncio.gather(task1, task2)
    print(results)

asyncio.run(main())

事件循环(Event Loop)

事件循环是asyncio的核心,负责调度和执行协程:

事件循环启动

有待执行的任务?

取出一个就绪任务

执行任务直到遇到await

任务挂起,切换到其他任务

有等待中的I/O?

等待I/O事件

I/O完成,任务变为就绪

事件循环结束

import asyncio

async def task_a():
    print("Task A: 开始")
    await asyncio.sleep(2)
    print("Task A: 完成")

async def task_b():
    print("Task B: 开始")
    await asyncio.sleep(1)
    print("Task B: 完成")

async def main():
    # 并发执行两个任务
    await asyncio.gather(task_a(), task_b())

asyncio.run(main())

# 输出顺序:
# Task A: 开始
# Task B: 开始
# Task B: 完成(1秒后)
# Task A: 完成(2秒后)

常用API速查

API 用途 示例
asyncio.run() 运行主协程 asyncio.run(main())
await 等待协程完成 await some_coroutine()
asyncio.create_task() 创建并发任务 task = asyncio.create_task(coro())
asyncio.gather() 并发执行多个协程 await asyncio.gather(coro1(), coro2())
asyncio.wait() 等待任务集合 done, pending = await asyncio.wait(tasks)
asyncio.sleep() 异步睡眠 await asyncio.sleep(1)
asyncio.wait_for() 带超时的等待 await asyncio.wait_for(coro(), timeout=5)
asyncio.Semaphore 限制并发数 sem = asyncio.Semaphore(10)

实战:异步并发爬虫

下面我们来实现一个完整的异步爬虫,包含并发控制、错误处理、进度显示等功能。

项目结构

URL队列

并发控制器

Worker 1

Worker 2

Worker N

结果收集器

数据存储

基础版本

import asyncio
import aiohttp
from typing import List, Dict
import time

async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
    """获取单个URL的内容"""
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
            content = await response.text()
            return {
                'url': url,
                'status': response.status,
                'length': len(content),
                'success': True
            }
    except asyncio.TimeoutError:
        return {'url': url, 'error': 'Timeout', 'success': False}
    except Exception as e:
        return {'url': url, 'error': str(e), 'success': False}

async def crawl_urls(urls: List[str]) -> List[Dict]:
    """并发爬取多个URL"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

# 测试
urls = [
    'https://www.python.org',
    'https://docs.python.org',
    'https://pypi.org',
    'https://github.com',
    'https://stackoverflow.com'
]

start = time.time()
results = asyncio.run(crawl_urls(urls))
print(f"爬取 {len(urls)} 个URL,耗时: {time.time() - start:.2f}秒")

for r in results:
    if r['success']:
        print(f"✓ {r['url']} - {r['status']} - {r['length']} bytes")
    else:
        print(f"✗ {r['url']} - {r['error']}")

进阶版本:带并发限制

当URL数量很大时,需要限制并发数,避免被封IP或耗尽系统资源:

import asyncio
import aiohttp
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime

@dataclass
class CrawlResult:
    url: str
    status: Optional[int] = None
    content_length: int = 0
    error: Optional[str] = None
    elapsed: float = 0.0
    
    @property
    def success(self) -> bool:
        return self.error is None

class AsyncCrawler:
    def __init__(self, max_concurrent: int = 10, timeout: int = 30):
        self.max_concurrent = max_concurrent
        self.timeout = timeout
        self.semaphore: Optional[asyncio.Semaphore] = None
        self.results: List[CrawlResult] = []
        self.completed = 0
        self.total = 0
    
    async def fetch_one(self, session: aiohttp.ClientSession, url: str) -> CrawlResult:
        """带并发限制的单URL获取"""
        async with self.semaphore:  # 限制并发数
            start = datetime.now()
            try:
                async with session.get(
                    url, 
                    timeout=aiohttp.ClientTimeout(total=self.timeout),
                    headers={'User-Agent': 'Mozilla/5.0 (compatible; AsyncCrawler/1.0)'}
                ) as response:
                    content = await response.text()
                    elapsed = (datetime.now() - start).total_seconds()
                    
                    self.completed += 1
                    self._print_progress()
                    
                    return CrawlResult(
                        url=url,
                        status=response.status,
                        content_length=len(content),
                        elapsed=elapsed
                    )
            except asyncio.TimeoutError:
                return CrawlResult(url=url, error='Timeout')
            except aiohttp.ClientError as e:
                return CrawlResult(url=url, error=f'ClientError: {e}')
            except Exception as e:
                return CrawlResult(url=url, error=str(e))
    
    def _print_progress(self):
        """打印进度"""
        percent = (self.completed / self.total) * 100
        print(f"\r进度: {self.completed}/{self.total} ({percent:.1f}%)", end='', flush=True)
    
    async def crawl(self, urls: List[str]) -> List[CrawlResult]:
        """执行爬取"""
        self.total = len(urls)
        self.completed = 0
        self.semaphore = asyncio.Semaphore(self.max_concurrent)
        
        connector = aiohttp.TCPConnector(limit=self.max_concurrent, limit_per_host=5)
        async with aiohttp.ClientSession(connector=connector) as session:
            tasks = [self.fetch_one(session, url) for url in urls]
            self.results = await asyncio.gather(*tasks)
        
        print()  # 换行
        return self.results
    
    def print_summary(self):
        """打印统计摘要"""
        success = sum(1 for r in self.results if r.success)
        failed = len(self.results) - success
        total_bytes = sum(r.content_length for r in self.results)
        avg_time = sum(r.elapsed for r in self.results if r.success) / max(success, 1)
        
        print(f"\n{'='*50}")
        print(f"爬取完成统计:")
        print(f"  总数: {len(self.results)}")
        print(f"  成功: {success}")
        print(f"  失败: {failed}")
        print(f"  总数据量: {total_bytes / 1024:.2f} KB")
        print(f"  平均响应时间: {avg_time:.2f}秒")
        print(f"{'='*50}")

# 使用示例
async def main():
    urls = [
        'https://www.python.org',
        'https://docs.python.org/3/',
        'https://pypi.org',
        'https://github.com',
        'https://stackoverflow.com',
        'https://www.google.com',
        'https://www.baidu.com',
        'https://httpbin.org/get',
        'https://httpbin.org/headers',
        'https://httpbin.org/ip',
    ]
    
    crawler = AsyncCrawler(max_concurrent=5, timeout=15)
    
    start = time.time()
    results = await crawler.crawl(urls)
    total_time = time.time() - start
    
    print(f"\n总耗时: {total_time:.2f}秒")
    crawler.print_summary()
    
    # 打印失败的URL
    failed = [r for r in results if not r.success]
    if failed:
        print("\n失败列表:")
        for r in failed:
            print(f"  {r.url}: {r.error}")

if __name__ == '__main__':
    import time
    asyncio.run(main())

生产级版本:带重试和代理

import asyncio
import aiohttp
import random
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class CrawlConfig:
    """爬虫配置"""
    max_concurrent: int = 10          # 最大并发数
    timeout: int = 30                  # 超时时间(秒)
    max_retries: int = 3               # 最大重试次数
    retry_delay: float = 1.0           # 重试延迟(秒)
    user_agents: List[str] = field(default_factory=lambda: [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
        'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
    ])
    proxies: List[str] = field(default_factory=list)  # 代理列表

class RetryableCrawler:
    def __init__(self, config: CrawlConfig = None):
        self.config = config or CrawlConfig()
        self.semaphore: Optional[asyncio.Semaphore] = None
        self.stats = {'success': 0, 'failed': 0, 'retried': 0}
    
    def _get_random_headers(self) -> Dict:
        return {
            'User-Agent': random.choice(self.config.user_agents),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
        }
    
    def _get_random_proxy(self) -> Optional[str]:
        if self.config.proxies:
            return random.choice(self.config.proxies)
        return None
    
    async def fetch_with_retry(
        self, 
        session: aiohttp.ClientSession, 
        url: str,
        callback: Optional[Callable] = None
    ) -> Dict:
        """带重试机制的请求"""
        last_error = None
        
        for attempt in range(self.config.max_retries):
            async with self.semaphore:
                try:
                    proxy = self._get_random_proxy()
                    async with session.get(
                        url,
                        timeout=aiohttp.ClientTimeout(total=self.config.timeout),
                        headers=self._get_random_headers(),
                        proxy=proxy
                    ) as response:
                        content = await response.text()
                        
                        # 检查状态码
                        if response.status >= 400:
                            raise aiohttp.ClientResponseError(
                                response.request_info,
                                response.history,
                                status=response.status
                            )
                        
                        self.stats['success'] += 1
                        
                        result = {
                            'url': url,
                            'status': response.status,
                            'content': content,
                            'length': len(content),
                            'success': True
                        }
                        
                        # 执行回调
                        if callback:
                            await callback(result)
                        
                        return result
                        
                except (asyncio.TimeoutError, aiohttp.ClientError) as e:
                    last_error = str(e)
                    self.stats['retried'] += 1
                    
                    if attempt < self.config.max_retries - 1:
                        delay = self.config.retry_delay * (attempt + 1)
                        logger.warning(f"重试 {url} (第{attempt + 1}次), 等待{delay}秒...")
                        await asyncio.sleep(delay)
                    
                except Exception as e:
                    last_error = str(e)
                    break
        
        self.stats['failed'] += 1
        return {'url': url, 'error': last_error, 'success': False}
    
    async def crawl(
        self, 
        urls: List[str],
        callback: Optional[Callable] = None
    ) -> List[Dict]:
        """执行爬取"""
        self.semaphore = asyncio.Semaphore(self.config.max_concurrent)
        self.stats = {'success': 0, 'failed': 0, 'retried': 0}
        
        connector = aiohttp.TCPConnector(
            limit=self.config.max_concurrent,
            limit_per_host=5,
            enable_cleanup_closed=True
        )
        
        async with aiohttp.ClientSession(connector=connector) as session:
            tasks = [self.fetch_with_retry(session, url, callback) for url in urls]
            results = await asyncio.gather(*tasks)
        
        logger.info(f"爬取完成: 成功={self.stats['success']}, "
                   f"失败={self.stats['failed']}, 重试={self.stats['retried']}")
        
        return results

# 使用示例
async def process_result(result: Dict):
    """处理爬取结果的回调函数"""
    if result['success']:
        # 这里可以解析HTML、存储数据等
        logger.info(f"处理: {result['url']} ({result['length']} bytes)")

async def main():
    config = CrawlConfig(
        max_concurrent=5,
        timeout=15,
        max_retries=3,
        retry_delay=1.0
    )
    
    crawler = RetryableCrawler(config)
    
    urls = [
        'https://httpbin.org/get',
        'https://httpbin.org/headers',
        'https://httpbin.org/ip',
        'https://httpbin.org/user-agent',
        'https://httpbin.org/status/500',  # 会失败
    ]
    
    results = await crawler.crawl(urls, callback=process_result)
    
    # 打印结果
    for r in results:
        status = '✓' if r['success'] else '✗'
        info = f"{r.get('status', 'N/A')}" if r['success'] else r.get('error', 'Unknown')
        print(f"{status} {r['url']}: {info}")

if __name__ == '__main__':
    asyncio.run(main())

异步编程常见陷阱

1. 忘记await

# ❌ 错误:忘记await,协程不会执行
async def wrong():
    asyncio.sleep(1)  # 这行什么都不会做
    print("done")

# ✓ 正确
async def correct():
    await asyncio.sleep(1)
    print("done")

2. 在异步函数中使用同步阻塞操作

import time

# ❌ 错误:time.sleep会阻塞整个事件循环
async def wrong():
    time.sleep(1)  # 阻塞!
    
# ✓ 正确
async def correct():
    await asyncio.sleep(1)  # 非阻塞

# 如果必须调用同步阻塞函数,使用run_in_executor
async def call_blocking():
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, time.sleep, 1)

3. 任务异常被吞掉

# ❌ 异常可能被忽略
async def may_fail():
    raise ValueError("出错了")

async def wrong():
    asyncio.create_task(may_fail())  # 异常不会被捕获
    await asyncio.sleep(1)

# ✓ 正确:确保处理任务异常
async def correct():
    task = asyncio.create_task(may_fail())
    try:
        await task
    except ValueError as e:
        print(f"捕获异常: {e}")

4. gather vs wait的区别

async def task1():
    await asyncio.sleep(1)
    return "task1"

async def task2():
    await asyncio.sleep(0.5)
    raise ValueError("task2 failed")

async def task3():
    await asyncio.sleep(0.8)
    return "task3"

# gather: 默认一个失败全部失败
async def use_gather():
    try:
        results = await asyncio.gather(task1(), task2(), task3())
    except ValueError:
        print("有任务失败")

# gather + return_exceptions: 收集所有结果(包括异常)
async def use_gather_safe():
    results = await asyncio.gather(task1(), task2(), task3(), return_exceptions=True)
    for r in results:
        if isinstance(r, Exception):
            print(f"异常: {r}")
        else:
            print(f"结果: {r}")

# wait: 更灵活的控制
async def use_wait():
    tasks = [
        asyncio.create_task(task1()),
        asyncio.create_task(task2()),
        asyncio.create_task(task3()),
    ]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print(f"完成: {len(done)}, 待处理: {len(pending)}")

性能对比测试

import asyncio
import aiohttp
import requests
import time
from concurrent.futures import ThreadPoolExecutor

URL = 'https://httpbin.org/delay/1'
COUNT = 10

def sync_requests():
    """同步请求"""
    start = time.time()
    for _ in range(COUNT):
        requests.get(URL)
    return time.time() - start

def threaded_requests():
    """多线程请求"""
    start = time.time()
    with ThreadPoolExecutor(max_workers=COUNT) as executor:
        list(executor.map(requests.get, [URL] * COUNT))
    return time.time() - start

async def async_requests():
    """异步请求"""
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(URL) for _ in range(COUNT)]
        await asyncio.gather(*tasks)
    return time.time() - start

# 运行对比
print(f"同步: {sync_requests():.2f}秒")
print(f"多线程: {threaded_requests():.2f}秒")
print(f"异步: {asyncio.run(async_requests()):.2f}秒")

预期输出:

同步: 10.xx秒
多线程: 1.xx秒
异步: 1.xx秒
10个请求耗时对比(秒) 同步 多线程 异步 12 11 10 9 8 7 6 5 4 3 2 1 0 耗时(秒)

总结

异步编程的核心要点:

  1. 适用场景:I/O密集型任务(网络请求、文件操作、数据库查询)
  2. 不适用场景:CPU密集型任务(用multiprocessing)
  3. 关键概念:协程、事件循环、await、Task
  4. 常用库:aiohttp(HTTP)、aiomysql(MySQL)、aioredis(Redis)、aiofiles(文件)

任务类型判断

I/O密集型?

需要并发?

CPU密集型

使用multiprocessing

使用asyncio

普通同步代码即可

并发量大?

配合Semaphore限流

直接gather

异步编程有一定学习曲线,但一旦掌握,在处理高并发I/O场景时会非常高效。建议从简单的例子开始,逐步增加复杂度。


Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐