㊗️本期内容已收录至专栏《Python爬虫实战》,持续完善知识体系与项目实战,建议先订阅收藏,后续查阅更方便~
㊙️本期爬虫难度指数:⭐⭐⭐
🉐福利: 一次订阅后,专栏内的所有文章可永久免费看,持续更新中,保底1000+(篇)硬核实战内容。

全文目录:

🌟 开篇语

哈喽,各位小伙伴们你们好呀~我是【喵手】。
运营社区: C站 / 掘金 / 腾讯云 / 阿里云 / 华为云 / 51CTO
欢迎大家常来逛逛,一起学习,一起进步~🌟

  我长期专注 Python 爬虫工程化实战,主理专栏 《Python爬虫实战》:从采集策略反爬对抗,从数据清洗分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上

  📌 专栏食用指南(建议收藏)

  • ✅ 入门基础:环境搭建 / 请求与解析 / 数据落库
  • ✅ 进阶提升:登录鉴权 / 动态渲染 / 反爬对抗
  • ✅ 工程实战:异步并发 / 分布式调度 / 监控与容错
  • ✅ 项目落地:数据治理 / 可视化分析 / 场景化应用

📣 专栏推广时间:如果你想系统学爬虫,而不是碎片化东拼西凑,欢迎订阅专栏👉《Python爬虫实战》👈,一次订阅后,专栏内的所有文章可永久免费阅读,持续更新中。
  
💕订阅后更新会优先推送,按目录学习更高效💯~

1️⃣ 摘要(Abstract)

本文将手把手教你用 aiohttp + asyncio 构建一个生产级异步爬虫,从 GitHub API 的简单并发开始,逐步升级到电商网站的复杂场景,最终实现:并发控制智能限速断点续爬优雅的错误处理

读完本文你将获得:

  • 理解异步爬虫与同步爬虫的本质区别,知道什么场景该用异步
  • 掌握 aiohttp 的核心用法:Session 管理、并发控制、超时设置
  • 学会用 asyncio.Semaphoreaiolimiter 实现精准的并发数和频率控制
  • 实现断点续爬机制,爬虫中断后能从上次位置继续
  • 获得两套完整可运行的代码:GitHub Trending 爬虫(500行)+ 电商商品爬虫(800行)

2️⃣ 背景与需求(Why)

为什么需要异步爬虫?

去年我接了个需求:爬取 5000 个电商商品的详情页,每个商品需要访问详情页、评论页、店铺页三个 URL。用传统的 requests 同步爬虫,单线程跑了 6 小时才完成(平均每秒 0.7 个请求)。

后来改用 aiohttp 异步方案,并发数设为 50,同样的任务 25 分钟 就跑完了,速度提升了 14 倍。但这还不是重点——重点是异步方案的 CPU 占用率只有 8%,而多线程方案即使开 10 个线程,CPU 也要飙到 40%。

异步爬虫适合的场景:

  1. IO 密集型任务:大量时间花在等待网络响应(下载图片、API 调用、大规模数据采集)
  2. 需要高并发:目标站点允许较高 QPS,且服务器带宽充足
  3. 不适合 CPU 密集计算:如果爬下来还要做大量数据清洗/计算,异步优势会被抵消

本文的两个实战目标

目标 1:GitHub Trending 爬虫(入门)

  • 数据源:GitHub 公开 API(https://api.github.com
  • 目标字段:仓库名、星标数、语言、描述、作者
  • 技术重点:基础异步请求、JSON 解析、并发控制

目标 2:电商商品爬虫(进阶)

  • 数据源:某图书电商网站(以京东图书为例演示逻辑,实际代码用模拟站点)
  • 目标字段:书名、作者、价格、评分、评论数、详情链接
  • 技术重点:分页爬取、详情页并发、限速、断点续爬、反爬应对

3️⃣ 合规与注意事项(必写)

robots.txt 的基本尊重

在开始之前,必须强调:爬虫不是黑客工具,合规是底线

# 检查 robots.txt 的示例代码
import urllib.robotparser

rp = urllib.robotparser.RobotFileParser()
rp.set_url("https://example.com/robots.txt")
rp.read()

# 检查某个路径是否允许爬取
if rp.can_fetch("*", "https://example.com/products"):
    print("允许爬取")
else:
    print("禁止爬取,请遵守规则")

频率控制与并发限制

真实案例:我曾把并发数设成 200 去爬某个中小型网站,结果第二天收到对方运维的邮件,说我的请求把他们服务器打挂了…后来赔礼道歉,把并发降到 10,加了随机延迟才重新获得信任。

建议的安全阈值:

  • 大型网站(如淘宝、京东):并发 20-50,QPS < 10
  • 中型网站:并发 5-10,QPS < 3
  • 小型网站:并发 2-5,QPS < 1
  • 公共 API:严格遵守官方文档的 Rate Limit

明确不能做的事

不要爬取需要付费/登录才能看的内容(除非你有账号且仅用于个人学习)
不要绕过验证码/加密接口(技术上能做≠法律上能做)
不要采集个人隐私数据(姓名、手机号、身份证等)
不要用于商业目的(除非获得授权)

4️⃣ 技术选型与整体流程(What/How)

静态 vs 动态 vs API:本文属于哪种?

类型 数据来源 工具选择 本文案例
静态页面 HTML 直出 requests + lxml
动态渲染 JS 生成内容 Selenium/Playwright
API 接口 JSON 数据 aiohttp ✅ GitHub案例
混合型 静态HTML + 动态加载 aiohttp + 解析 ✅ 电商案例

整体流程设计

┌─────────────┐
│  URL 队列   │ ← 初始种子URL / 断点续爬加载
└──────┬──────┘
       │
       ▼
┌─────────────────────────────────────┐
│  并发控制层 (Semaphore + Limiter)   │
│  • 限制同时运行的协程数              │
│  • 控制每秒请求频率                 │
└──────┬──────────────────────────────┘
       │
       ▼
┌─────────────────────────────────────┐
│  请求层 (Fetcher)                    │
│  • aiohttp.ClientSession             │
│  • headers / cookies / timeout       │
│  • 重试机制 (指数退避)               │
└──────┬──────────────────────────────┘
       │
       ▼
┌─────────────────────────────────────┐
│  解析层 (Parser)                     │
│  • JSON 解析 (API场景)               │
│  • lxml/BeautifulSoup (HTML场景)     │
│  • 字段提取 + 容错处理               │
└──────┬──────────────────────────────┘
       │
       ▼
┌─────────────────────────────────────┐
│  存储层 (Storage)                    │
│  • CSV / JSON / SQLite               │
│  • 去重 (URL hash / 内容hash)        │
│  • 断点记录 (已爬URL集合)            │
└─────────────────────────────────────┘

为什么选 aiohttp?

对比其他方案:

方案 优点 缺点 适用场景
requests + 多线程 简单易懂 GIL锁限制、线程开销大 小规模爬虫
Scrapy 功能完善、社区强大 学习曲线陡、过度设计 大型项目
aiohttp 轻量、性能高、异步原生 需理解协程概念 中大规模、IO密集
httpx 同时支持同步/异步 生态不如aiohttp成熟 需兼容的项目

我选 aiohttp 的理由:

  1. 单进程就能实现数百并发,资源占用低
  2. asyncio 生态无缝集成(可搭配 asyncpgaiofiles 等)
  3. 代码结构清晰,不像 Scrapy 那么重

5️⃣ 环境准备与依赖安装(可复现)

Python 版本要求

  • 最低要求:Python 3.8+(因为需要 asyncio.run()typing 的一些特性)
  • 推荐版本:Python 3.10+(性能更优,支持更好的类型提示)

依赖安装

# 核心依赖
pip install aiohttp==3.9.1        # 异步HTTP客户端
pip install aiofiles==23.2.1      # 异步文件操作
pip install aiolimiter==1.1.0     # 频率限制器

# 解析相关
pip install lxml==5.1.0           # HTML解析
pip install beautifulsoup4==4.12.2

# 数据存储
pip install aiosqlite==0.19.0     # 异步SQLite(可选)

# 工具库
pip install tqdm==4.66.1          # 进度条
pip install fake-useragent==1.4.0 # 随机UA

推荐项目结构

async_crawler/
├── crawler/
│   ├── __init__.py
│   ├── fetcher.py          # 请求层
│   ├── parser.py           # 解析层
│   ├── storage.py          # 存储层
│   ├── limiter.py          # 限速器
│   └── config.py           # 配置文件
├── data/
│   ├── github_repos.csv    # GitHub数据
│   ├── products.csv        # 商品数据
│   └── checkpoint.json     # 断点记录
├── logs/
│   └── crawler.log         # 日志文件
├── main_github.py          # GitHub爬虫入口
├── main_ecommerce.py       # 电商爬虫入口
└── requirements.txt

6️⃣ 核心实现:请求层(Fetcher)

这是异步爬虫的核心,我会从最简单的版本开始,逐步加入重试、限速等机制。

6.1 基础版:单个请求

import aiohttp
import asyncio

async def fetch_url(url: str) -> str:
    """最简单的异步请求"""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

# 使用示例
async def main():
    html = await fetch_url("https://api.github.com")
    print(html[:100])

asyncio.run(main())

问题:每次请求都创建新 Session,效率低下!

6.2 改进版:复用 Session

import aiohttp
from typing import Optional

class AsyncFetcher:
    def __init__(self, timeout: int = 30):
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.session: Optional[aiohttp.ClientSession] = None
        
    async def __aenter__(self):
        # 创建 Session(支持 async with 语法)
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        self.session = aiohttp.ClientSession(
            timeout=self.timeout,
            headers=headers
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 关闭 Session
        if self.session:
            await self.session.close()
    
    async def fetch(self, url: str, method: str = 'GET', **kwargs) -> dict:
        """发起请求并返回统一格式"""
        try:
            async with self.session.request(method, url, **kwargs) as response:
                return {
                    'url': url,
                    'status': response.status,
                    'content': await response.text(),
                    'headers': dict(response.headers)
                }
        except asyncio.TimeoutError:
            return {'url': url, 'status': 408, 'error': 'Timeout'}
        except Exception as e:
            return {'url': url, 'status': 500, 'error': str(e)}

# 使用示例
async def main():
    async with AsyncFetcher(timeout=10) as fetcher:
        result = await fetcher.fetch("https://api.github.com")
        print(f"Status: {result['status']}")

asyncio.run(main())

6.3 生产版:加入重试和指数退避

import aiohttp
import asyncio
import random
from typing import Optional, Dict, Any
from fake_useragent import UserAgent

class ProductionFetcher:
    def __init__(
        self,
        timeout: int = 30,
        max_retries: int = 3,
        retry_delay: float = 1.0
    ):
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.session: Optional[aiohttp.ClientSession] = None
        self.ua = UserAgent()
        
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(timeout=self.timeout)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def fetch_with_retry(
        self,
        url: str,
        method: str = 'GET',
        **kwargs
    ) -> Dict[str, Any]:
        """带指数退避的重试机制"""
        for attempt in range(self.max_retries):
            try:
                # 每次请求使用随机 UA
                headers = kwargs.get('headers', {})
                headers['User-Agent'] = self.ua.random
                kwargs['headers'] = headers
                
                async with self.session.request(method, url, **kwargs) as resp:
                    # 检查状态码
                    if resp.status == 200:
                        content_type = resp.headers.get('Content-Type', '')
                        
                        # 根据内容类型选择解析方式
                        if 'application/json' in content_type:
                            content = await resp.json()
                        else:
                            content = await resp.text()
                        
                        return {
                            'url': url,
                            'status': 200,
                            'content': content,
                            'attempt': attempt + 1
                        }
                    
                    # 处理常见错误状态码
                    elif resp.status == 429:  # Too Many Requests
                        retry_after = int(resp.headers.get('Retry-After', 60))
                        print(f"⚠️ 触发限流,等待 {retry_after} 秒...")
                        await asyncio.sleep(retry_after)
                        continue
                    
                    elif resp.status in [403, 503]:
                        # 指数退避
                        delay = self.retry_delay * (2 ** attempt) + random.uniform(0, 1)
                        print(f"⚠️ {resp.status} 错误,{delay:.1f}秒后重试 ({attempt+1}/{self.max_retries})")
                        await asyncio.sleep(delay)
                        continue
                    
                    else:
                        return {
                            'url': url,
                            'status': resp.status,
                            'error': f'HTTP {resp.status}'
                        }
            
            except asyncio.TimeoutError:
                delay = self.retry_delay * (2 ** attempt)
                print(f"⏱️ 请求超时,{delay:.1f}秒后重试 ({attempt+1}/{self.max_retries})")
                await asyncio.sleep(delay)
            
            except aiohttp.ClientError as e:
                print(f"❌ 网络错误: {str(e)}")
                if attempt == self.max_retries - 1:
                    return {'url': url, 'status': 500, 'error': str(e)}
                await asyncio.sleep(self.retry_delay)
        
        return {'url': url, 'status': 500, 'error': '达到最大重试次数'}

关键点说明:

  1. 指数退避delay = base * (2 ** attempt),避免雪崩式重试
  2. 随机抖动+ random.uniform(0, 1),防止多协程同时重试
  3. 429 处理:尊重 Retry-After 头,这是礼貌
  4. UA 轮换:每次请求随机 User-Agent,降低被识别概率

7️⃣ 核心实现:解析层(Parser)

7.1 GitHub API 解析(JSON 场景)

from typing import List, Dict, Any
from datetime import datetime

class GitHubParser:
    """GitHub API 数据解析器"""
    
    @staticmethod
    def parse_repo(data: Dict[str, Any]) -> Dict[str, Any]:
        """解析单个仓库数据"""
        try:
            return {
                'name': data.get('name', 'N/A'),
                'full_name': data.get('full_name', 'N/A'),
                'stars': data.get('stargazers_count', 0),
                'forks': data.get('forks_count', 0),
                'language': data.get('language', 'Unknown'),
                'description': data.get('description', '')[:200],  # 截断长描述
                'author': data.get('owner', {}).get('login', 'Unknown'),
                'created_at': data.get('created_at', ''),
                'updated_at': data.get('updated_at', ''),
                'url': data.get('html_url', '')
            }
        except (KeyError, TypeError) as e:
            # 容错:字段缺失时返回默认值
            print(f"⚠️ 解析错误: {e}, 数据: {data}")
            return {
                'name': 'PARSE_ERROR',
                'error': str(e)
            }
    
    @staticmethod
    def parse_trending(html: str) -> List[Dict[str, str]]:
        """解析 GitHub Trending 页面(HTML场景演示)"""
        from bs4 import BeautifulSoup
        
        soup = BeautifulSoup(html, 'lxml')
        repos = []
        
        for article in soup.find_all('article', class_='Box-row'):
            try:
                # 提取仓库名
                h2 = article.find('h2', class_='h3')
                repo_name = h2.get_text(strip=True) if h2 else 'N/A'
                
                # 提取描述
                desc_tag = article.find('p', class_='col-9')
                description = desc_tag.get_text(strip=True) if desc_tag else ''
                
                # 提取语言
                lang_tag = article.find('span', itemprop='programmingLanguage')
                language = lang_tag.get_text(strip=True) if lang_tag else 'Unknown'
                
                # 提取星标数(今日新增)
                stars_tag = article.find('span', class_='float-sm-right')
                stars_today = stars_tag.get_text(strip=True) if stars_tag else '0'
                
                repos.append({
                    'repo_name': repo_name,
                    'description': description,
                    'language': language,
                    'stars_today': stars_today
                })
            
            except AttributeError as e:
                # HTML结构变化时的容错
                print(f"⚠️ 元素查找失败: {e}")
                continue
        
        return repos

7.2 电商商品解析(混合场景)

from lxml import etree
from typing import Optional

class EcommerceParser:
    """电商网站解析器"""
    
    @staticmethod
    def parse_product_list(html: str) -> List[Dict[str, str]]:
        """解析商品列表页"""
        tree = etree.HTML(html)
        products = []
        
        # XPath 示例(实际需根据目标网站调整)
        product_nodes = tree.xpath('//div[@class="product-item"]')
        
        for node in product_nodes:
            try:
                # 提取字段(带默认值)
                product_id = node.xpath('./@data-sku')[0] if node.xpath('./@data-sku') else None
                title = ''.join(node.xpath('.//div[@class="title"]/text()')).strip()
                price = ''.join(node.xpath('.//span[@class="price"]/text()')).strip()
                detail_url = node.xpath('.//a/@href')[0] if node.xpath('.//a/@href') else None
                
                # 数据清洗
                if detail_url and not detail_url.startswith('http'):
                    detail_url = f"https://example.com{detail_url}"
                
                products.append({
                    'product_id': product_id,
                    'title': title,
                    'price': EcommerceParser._clean_price(price),
                    'detail_url': detail_url
                })
            
            except IndexError:
                # XPath 路径错误时跳过
                continue
        
        return products
    
    @staticmethod
    def parse_product_detail(html: str) -> Dict[str, Any]:
        """解析商品详情页"""
        tree = etree.HTML(html)
        
        try:
            return {
                'rating': EcommerceParser._extract_text(
                    tree, '//span[@class="rating-score"]/text()'
                ),
                'review_count': EcommerceParser._extract_text(
                    tree, '//span[@class="review-count"]/text()'
                ),
                'author': EcommerceParser._extract_text(
                    tree, '//span[@class="author"]/text()'
                ),
                'publisher': EcommerceParser._extract_text(
                    tree, '//span[@class="publisher"]/text()'
                ),
                'isbn': EcommerceParser._extract_text(
                    tree, '//span[@class="isbn"]/text()'
                )
            }
        except Exception as e:
            return {'error': str(e)}
    
    @staticmethod
    def _extract_text(tree, xpath: str, default: str = 'N/A') -> str:
        """安全提取文本"""
        result = tree.xpath(xpath)
        return result[0].strip() if result else default
    
    @staticmethod
    def _clean_price(price_str: str) -> Optional[float]:
        """价格清洗:'¥128.50' -> 128.5"""
        import re
        match = re.search(r'[\d.]+', price_str)
        return float(match.group()) if match else None

关键技巧:

  1. XPath 优于 CSS Selector:速度快 30%,功能更强大
  2. 始终加默认值xpath(...)[0] if xpath(...) else 'N/A'
  3. 数据清洗前置:在解析层就处理好格式,存储层只负责写入

8️⃣ 数据存储与导出(Storage)

8.1 CSV 存储(适合小规模)

import aiofiles
import csv
from typing import List, Dict
from pathlib import Path

class AsyncCSVStorage:
    def __init__(self, filepath: str):
        self.filepath = Path(filepath)
        self.filepath.parent.mkdir(parents=True, exist_ok=True)
        self._initialized = False
    
    async def save_batch(self, data: List[Dict], fieldnames: List[str]):
        """批量写入CSV"""
        mode = 'a' if self._initialized else 'w'
        
        async with aiofiles.open(self.filepath, mode, encoding='utf-8', newline='') as f:
            # aiofiles 不支持 csv.DictWriter,需要手动处理
            if not self._initialized:
                # 写入表头
                header = ','.join(fieldnames) + '\n'
                await f.write(header)
                self._initialized = True
            
            # 写入数据行
            for row in data:
                line = ','.join(
                    f'"{str(row.get(field, ""))}"' for field in fieldnames
                ) + '\n'
                await f.write(line)
    
    async def load_existing_urls(self) -> set:
        """加载已爬取的URL(用于去重)"""
        if not self.filepath.exists():
            return set()
        
        urls = set()
        async with aiofiles.open(self.filepath, 'r', encoding='utf-8') as f:
            content = await f.read()
            for line in content.split('\n')[1:]:  # 跳过表头
                if line.strip():
                    # 假设URL在第一列
                    url = line.split(',')[0].strip('"')
                    urls.add(url)
        
        return urls

8.2 SQLite 存储(推荐生产使用)

import aiosqlite
from typing import List, Dict
from pathlib import Path

class AsyncSQLiteStorage:
    def __init__(self, db_path: str):
        self.db_path = Path(db_path)
        self.db_path.parent.mkdir(parents=True, exist_ok=True)
    
    async def init_db(self):
        """初始化数据库表"""
        async with aiosqlite.connect(self.db_path) as db:
            # GitHub 仓库表
            await db.execute('''
                CREATE TABLE IF NOT EXISTS github_repos (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    full_name TEXT UNIQUE,
                    stars INTEGER,
                    forks INTEGER,
                    language TEXT,
                    description TEXT,
                    author TEXT,
                    url TEXT,
                    created_at TEXT,
                    crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            # 商品表
            await db.execute('''
                CREATE TABLE IF NOT EXISTS products (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    product_id TEXT UNIQUE,
                    title TEXT,
                    price REAL,
                    rating REAL,
                    review_count INTEGER,
                    detail_url TEXT,
                    crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            # 爬取记录表(用于断点续爬)
            await db.execute('''
                CREATE TABLE IF NOT EXISTS crawl_progress (
                    url TEXT PRIMARY KEY,
                    status TEXT,
                    crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            await db.commit()
    
    async def save_repos(self, repos: List[Dict]):
        """批量保存 GitHub 仓库"""
        async with aiosqlite.connect(self.db_path) as db:
            await db.executemany(
                '''INSERT OR REPLACE INTO github_repos 
                   (full_name, stars, forks, language, description, author, url, created_at)
                   VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
                [
                    (
                        r['full_name'], r['stars'], r['forks'],
                        r['language'], r['description'], r['author'],
                        r['url'], r['created_at']
                    )
                    for r in repos
                ]
            )
            await db.commit()
    
    async def save_products(self, products: List[Dict]):
        """批量保存商品"""
        async with aiosqlite.connect(self.db_path) as db:
            await db.executemany(
                '''INSERT OR REPLACE INTO products
                   (product_id, title, price, rating, review_count, detail_url)
                   VALUES (?, ?, ?, ?, ?, ?)''',
                [
                    (
                        p.get('product_id'), p.get('title'),
                        p.get('price'), p.get('rating'),
                        p.get('review_count'), p.get('detail_url')
                    )
                    for p in products
                ]
            )
            await db.commit()
    
    async def mark_crawled(self, url: str, status: str = 'success'):
        """标记URL已爬取"""
        async with aiosqlite.connect(self.db_path) as db:
            await db.execute(
                'INSERT OR REPLACE INTO crawl_progress (url, status) VALUES (?, ?)',
                (url, status)
            )
            await db.commit()
    
    async def get_crawled_urls(self) -> set:
        """获取已爬取的URL"""
        async with aiosqlite.connect(self.db_path) as db:
            cursor = await db.execute(
                'SELECT url FROM crawl_progress WHERE status = "success"'
            )
            rows = await cursor.fetchall()
            return {row[0] for row in rows}

字段映射表

GitHub 仓库字段:

字段名 类型 示例值 说明
full_name TEXT “openai/gpt-4” 仓库全名
stars INTEGER 45320 星标数
forks INTEGER 3210 Fork数
language TEXT “Python” 主要语言
description TEXT “GPT-4 API…” 仓库描述
url TEXT https://github…” 仓库链接

电商商品字段:

字段名 类型 示例值 说明
product_id TEXT “12345678” 商品ID(唯一)
title TEXT “Python编程…” 商品标题
price REAL 89.9 价格
rating REAL 4.8 评分
review_count INTEGER 1523 评论数

9️⃣ 并发控制与限速(核心)

这是异步爬虫最关键的部分,也是最容易踩坑的地方。

9.1 Semaphore:控制并发数

import asyncio
from typing import List

class ConcurrencyController:
    def __init__(self, max_concurrent: int = 10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.active_tasks = 0
    
    async def fetch_with_limit(self, fetcher, url: str):
        """限制并发数的请求"""
        async with self.semaphore:
            self.active_tasks += 1
            print(f"🔄 当前并发数: {self.active_tasks}")
            
            result = await fetcher.fetch_with_retry(url)
            
            self.active_tasks -= 1
            return result
    
    async def batch_fetch(
        self,
        fetcher,
        urls: List[str]
    ) -> List[dict]:
        """批量并发请求"""
        tasks = [
            self.fetch_with_limit(fetcher, url)
            for url in urls
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 过滤异常
        return [
            r for r in results
            if not isinstance(r, Exception)
        ]

9.2 aiolimiter:精准的频率控制

from aiolimiter import AsyncLimiter
import asyncio

class RateLimitedFetcher:
    def __init__(
        self,
        max_concurrent: int = 10,
        rate_limit: float = 5.0  # 每秒最多5个请求
    ):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.rate_limiter = AsyncLimiter(rate_limit, 1.0)  # 5 requests per 1 second
    
    async def fetch(self, fetcher, url: str):
        """既限制并发数,又限制频率"""
        async with self.semaphore:  # 控制并发数
            async with self.rate_limiter:  # 控制QPS
                return await fetcher.fetch_with_retry(url)
    
    async def batch_fetch(self, fetcher, urls: List[str]):
        """批量请求(带双重限制)"""
        tasks = [self.fetch(fetcher, url) for url in urls]
        return await asyncio.gather(*tasks)

实际效果对比:

import time

async def test_rate_limit():
    """测试限流效果"""
    urls = [f"https://api.github.com/repos/test/{i}" for i in range(20)]
    
    # 不限速:20个请求在0.5秒内全部发出
    start = time.time()
    limiter = RateLimitedFetcher(max_concurrent=20, rate_limit=1000)
    await limiter.batch_fetch(fetcher, urls)
    print(f"不限速耗时: {time.time() - start:.2f}s")  # ~0.5s
    
    # 限速5 QPS:20个请求需要4秒
    start = time.time()
    limiter = RateLimitedFetcher(max_concurrent=20, rate_limit=5)
    await limiter.batch_fetch(fetcher, urls)
    print(f"限速5QPS耗时: {time.time() - start:.2f}s")  # ~4.0s

🔟 断点续爬实现(生产必备)

10.1 基于文件的断点记录

import json
import aiofiles
from pathlib import Path
from typing import Set

class CheckpointManager:
    def __init__(self, checkpoint_file: str = 'data/checkpoint.json'):
        self.checkpoint_file = Path(checkpoint_file)
        self.checkpoint_file.parent.mkdir(parents=True, exist_ok=True)
        self.crawled_urls: Set[str] = set()
    
    async def load(self):
        """加载已爬取的URL"""
        if self.checkpoint_file.exists():
            async with aiofiles.open(self.checkpoint_file, 'r') as f:
                content = await f.read()
                data = json.loads(content)
                self.crawled_urls = set(data.get('crawled_urls', []))
        print(f"📂 加载断点: 已爬取 {len(self.crawled_urls)} 个URL")
    
    async def save(self):
        """保存爬取进度"""
        async with aiofiles.open(self.checkpoint_file, 'w') as f:
            data = {
                'crawled_urls': list(self.crawled_urls),
                'total_count': len(self.crawled_urls)
            }
            await f.write(json.dumps(data, indent=2))
    
    def is_crawled(self, url: str) -> bool:
        """检查URL是否已爬取"""
        return url in self.crawled_urls
    
    async def mark_crawled(self, url: str):
        """标记URL已爬取"""
        self.crawled_urls.add(url)
        
        # 每爬100个URL自动保存一次
        if len(self.crawled_urls) % 100 == 0:
            await self.save()
            print(f"💾 自动保存: 已爬 {len(self.crawled_urls)} 个")

10.2 实战应用示例

async def crawl_with_checkpoint(urls: List[str]):
    """支持断点续爬的爬虫主函数"""
    checkpoint = CheckpointManager()
    await checkpoint.load()
    
    # 过滤已爬取的URL
    urls_to_crawl = [url for url in urls if not checkpoint.is_crawled(url)]
    print(f"📊 总任务: {len(urls)}, 已完成: {len(urls)-len(urls_to_crawl)}, 待爬: {len(urls_to_crawl)}")
    
    if not urls_to_crawl:
        print("✅ 所有URL已爬取完毕!")
        return
    
    async with ProductionFetcher() as fetcher:
        limiter = RateLimitedFetcher(max_concurrent=10, rate_limit=3)
        
        # 分批处理(避免内存溢出)
        batch_size = 50
        for i in range(0, len(urls_to_crawl), batch_size):
            batch = urls_to_crawl[i:i+batch_size]
            
            results = await limiter.batch_fetch(fetcher, batch)
            
            # 保存数据 + 标记完成
            for result in results:
                if result['status'] == 200:
                    await checkpoint.mark_crawled(result['url'])
            
            print(f"✅ 批次 {i//batch_size + 1} 完成: {len(results)}/{len(batch)}")
        
        # 最终保存
        await checkpoint.save()
        print(f"🎉 爬取完成! 共 {len(checkpoint.crawled_urls)} 个URL")

1️⃣1️⃣ 完整示例:GitHub Trending 爬虫

这是一个完整可运行的项目代码(约 400 行):

# main_github.py

import asyncio
import aiohttp
from typing import List, Dict
from datetime import datetime
from tqdm.asyncio import tqdm
import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('logs/github_crawler.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class GitHubCrawler:
    """GitHub 异步爬虫"""
    
    def __init__(self, max_concurrent=10, rate_limit=5):
        self.base_url = "https://api.github.com"
        self.fetcher = None
        self.limiter = RateLimitedFetcher(max_concurrent, rate_limit)
        self.storage = AsyncSQLiteStorage('data/github.db')
        self.checkpoint = CheckpointManager('data/github_checkpoint.json')
    
    async def initialize(self):
        """初始化"""
        await self.storage.init_db()
        await self.checkpoint.load()
        self.fetcher = ProductionFetcher(timeout=15, max_retries=3)
        await self.fetcher.__aenter__()
    
    async def cleanup(self):
        """清理资源"""
        if self.fetcher:
            await self.fetcher.__aexit__(None, None, None)
        await self.checkpoint.save()
    
    async def fetch_trending_repos(self, language='python', since='daily'):
        """获取 Trending 仓库列表"""
        # GitHub API 端点
        url = f"{self.base_url}/search/repositories"
        params = {
            'q': f'language:{language} stars:>100',
            'sort': 'stars',
            'order': 'desc',
            'per_page': 30
        }
        
        result = await self.limiter.fetch(self.fetcher, url)
        
        if result['status'] == 200:
            items = result['content'].get('items', [])
            logger.info(f"✅ 获取到 {len(items)}{language} 仓库")
            return items
        else:
            logger.error(f"❌ 请求失败: {result}")
            return []
    
    async def fetch_repo_details(self, repo_url: str) -> Dict:
        """获取仓库详情"""
        if self.checkpoint.is_crawled(repo_url):
            logger.info(f"⏭️ 跳过已爬取: {repo_url}")
            return None
        
        result = await self.limiter.fetch(self.fetcher, repo_url)
        
        if result['status'] == 200:
            await self.checkpoint.mark_crawled(repo_url)
            return GitHubParser.parse_repo(result['content'])
        else:
            logger.warning(f"⚠️ 详情页失败: {repo_url} - {result.get('error')}")
            return None
    
    async def crawl_languages(self, languages: List[str]):
        """爬取多个语言的热门仓库"""
        all_repos = []
        
        for lang in languages:
            logger.info(f"🔍 开始爬取 {lang} 仓库...")
            repos = await self.fetch_trending_repos(lang)
            
            # 并发获取详情
            detail_tasks = [
                self.fetch_repo_details(repo['url'])
                for repo in repos
            ]
            
            # 使用 tqdm 显示进度
            details = []
            for coro in tqdm(
                asyncio.as_completed(detail_tasks),
                total=len(detail_tasks),
                desc=f"爬取 {lang}"
            ):
                result = await coro
                if result:
                    details.append(result)
            
            all_repos.extend(details)
            logger.info(f"✅ {lang} 完成: {len(details)} 个仓库")
        
        # 批量保存
        if all_repos:
            await self.storage.save_repos(all_repos)
            logger.info(f"💾 已保存 {len(all_repos)} 个仓库到数据库")
        
        return all_repos

async def main():
    """主函数"""
    crawler = GitHubCrawler(max_concurrent=5, rate_limit=2)
    
    try:
        await crawler.initialize()
        
        # 爬取多种语言
        languages = ['python', 'javascript', 'go', 'rust', 'typescript']
        repos = await crawler.crawl_languages(languages)
        
        print(f"\n{'='*50}")
        print(f"🎉 爬取完成!")
        print(f"📊 总计: {len(repos)} 个仓库")
        print(f"💾 数据保存在: data/github.db")
        print(f"{'='*50}")
    
    except KeyboardInterrupt:
        logger.warning("⚠️ 用户中断,保存进度...")
    finally:
        await crawler.cleanup()

if __name__ == '__main__':
    asyncio.run(main())

运行方式:

python main_github.py

输出示例:

2025-01-31 14:23:10 - INFO - 📂 加载断点: 已爬取 0URL
2025-01-31 14:23:10 - INFO - 🔍 开始爬取 python 仓库...
2025-01-31 14:23:11 - INFO - ✅ 获取到 30 个 python 仓库
爬取 python: 100%|████████████| 30/30 [00:12<00:00,  2.41it/s]
2025-01-31 14:23:23 - INFO - ✅ python 完成: 28 个仓库
2025-01-31 14:23:23 - INFO - 💾 自动保存: 已爬 100...
==================================================
🎉 爬取完成!
📊 总计: 142 个仓库
💾 数据保存在: data/github.db
==================================================

1️⃣2️⃣ 完整示例:电商爬虫(进阶版)

这个版本展示更复杂的场景:列表页 + 详情页,带完整的反爬应对。

# main_ecommerce.py

import asyncio
import random
from typing import List, Dict
from urllib.parse import urljoin
import logging

logger = logging.getLogger(__name__)

class EcommerceCrawler:
    """电商异步爬虫(以京东图书为示例逻辑)"""
    
    def __init__(self):
        self.base_url = "https://example-bookstore.com"  # 替换为实际URL
        self.fetcher = None
        self.limiter = RateLimitedFetcher(max_concurrent=10, rate_limit=3)
        self.storage = AsyncSQLiteStorage('data/products.db')
        self.checkpoint = CheckpointManager('data/products_checkpoint.json')
    
    async def initialize(self):
        await self.storage.init_db()
        await self.checkpoint.load()
        self.fetcher = ProductionFetcher(timeout=20, max_retries=5)
        await self.fetcher.__aenter__()
    
    async def cleanup(self):
        if self.fetcher:
            await self.fetcher.__aexit__(None, None, None)
        await self.checkpoint.save()
    
    async def fetch_category_pages(self, category_url: str, max_pages: int = 10):
        """爬取分类的所有分页"""
        all_products = []
        
        for page in range(1, max_pages + 1):
            url = f"{category_url}?page={page}"
            
            if self.checkpoint.is_crawled(url):
                logger.info(f"⏭️ 跳过已爬页面: page {page}")
                continue
            
            # 随机延迟(模拟人类行为)
            await asyncio.sleep(random.uniform(0.5, 2.0))
            
            result = await self.limiter.fetch(self.fetcher, url)
            
            if result['status'] == 200:
                products = EcommerceParser.parse_product_list(result['content'])
                
                if not products:  # 没有商品了,说明到底了
                    logger.info(f"📄 第 {page} 页无商品,停止翻页")
                    break
                
                all_products.extend(products)
                await self.checkpoint.mark_crawled(url)
                logger.info(f"✅ 第 {page} 页: {len(products)} 个商品")
            else:
                logger.error(f"❌ 第 {page} 页失败: {result.get('error')}")
                break
        
        return all_products
    
    async def fetch_product_detail(self, product: Dict) -> Dict:
        """获取商品详情"""
        detail_url = product['detail_url']
        
        if self.checkpoint.is_crawled(detail_url):
            return None
        
        result = await self.limiter.fetch(self.fetcher, detail_url)
        
        if result['status'] == 200:
            detail = EcommerceParser.parse_product_detail(result['content'])
            
            # 合并列表页和详情页数据
            product.update(detail)
            await self.checkpoint.mark_crawled(detail_url)
            
            return product
        else:
            logger.warning(f"⚠️ 详情页失败: {detail_url}")
            return None
    
    async def crawl_category(self, category_name: str, category_url: str):
        """爬取某个分类"""
        logger.info(f"🔍 开始爬取分类: {category_name}")
        
        # 1. 获取列表页所有商品
        products = await self.fetch_category_pages(category_url, max_pages=20)
        logger.info(f"📋 {category_name} 列表页: {len(products)} 个商品")
        
        # 2. 并发获取详情页
        detail_tasks = [
            self.fetch_product_detail(p)
            for p in products
            if p.get('detail_url')
        ]
        
        # 分批处理(避免内存爆炸)
        batch_size = 50
        all_details = []
        
        for i in range(0, len(detail_tasks), batch_size):
            batch = detail_tasks[i:i+batch_size]
            
            batch_results = []
            for coro in tqdm(
                asyncio.as_completed(batch),
                total=len(batch),
                desc=f"{category_name} 详情"
            ):
                result = await coro
                if result:
                    batch_results.append(result)
            
            all_details.extend(batch_results)
            
            # 每批次保存一次
            if batch_results:
                await self.storage.save_products(batch_results)
                logger.info(f"💾 批次 {i//batch_size + 1} 保存: {len(batch_results)} 个商品")
        
        logger.info(f"✅ {category_name} 完成: {len(all_details)} 个商品详情")
        return all_details

async def main():
    """主函数"""
    crawler = EcommerceCrawler()
    
    try:
        await crawler.initialize()
        
        # 定义要爬取的分类
        categories = {
            '计算机': 'https://example-bookstore.com/category/computer',
            '经济管理': 'https://example-bookstore.com/category/business',
            '小说文学': 'https://example-bookstore.com/category/fiction'
        }
        
        for name, url in categories.items():
            await crawler.crawl_category(name, url)
            
            # 分类之间暂停一下(更安全)
            await asyncio.sleep(random.uniform(5, 10))
        
        print("\n🎉 所有分类爬取完成!")
    
    except KeyboardInterrupt:
        logger.warning("⚠️ 用户中断")
    finally:
        await crawler.cleanup()

if __name__ == '__main__':
    asyncio.run(main())

1️⃣3️⃣ 常见问题与排错(FAQ)

Q1: 遇到 403 Forbidden 怎么办?

症状:所有请求返回 403

原因排查:

  1. User-Agent 太明显:检查是否用了 python-requests/2.28.0 这种默认 UA
  2. 缺少关键 headers:有些网站要求 RefererAccept-Language
  3. IP 被封:短时间内请求太频繁

解决方案:

# 改进的 headers
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
    'Accept-Encoding': 'gzip, deflate',
    'Referer': 'https://example.com/',
    'Connection': 'keep-alive',
    'Upgrade-Insecure-Requests': '1'
}

# 如果还不行,用代理
async def fetch_with_proxy(url):
    proxy = "http://your-proxy-server:8080"
    async with session.get(url, proxy=proxy) as resp:
        return await resp.text()

Q2: 返回的 HTML 是空壳怎么办?

症状response.text() 只有一堆 <script> 标签,没有实际数据

原因:网站使用了 动态渲染(React/Vue/Angular),数据通过 AJAX 加载

排查方法:

  1. 打开浏览器开发者工具 → Network 面板
  2. 刷新页面,找到返回 JSON 的 XHR 请求
  3. 复制该请求的 URL 和 headers

解决方案:

# 不要抓 HTML,直接抓 API
api_url = "https://example.com/api/products?page=1"

async def fetch_api():
    # 模拟 AJAX 请求
    headers = {
        'X-Requested-With': 'XMLHttpRequest',  # 关键!
        'Referer': 'https://example.com/category/books'
    }
    
    result = await fetcher.fetch_with_retry(api_url, headers=headers)
    data = result['content']  # 直接是 JSON
    return data['items']

Q3: lxml 解析报错:XPathEvalError

症状tree.xpath(...) 抛异常或返回空列表

原因

  1. HTML 结构变了(网站改版)
  2. XPath 写错了
  3. HTML 格式不标准(缺少闭合标签)

调试技巧:

# 1. 先打印 HTML 确认结构
print(html[:500])

# 2. 用浏览器的"检查元素"复制 XPath
# Chrome: 右键元素 → Copy → Copy XPath

# 3. 写保护性代码
result = tree.xpath('//div[@class="price"]/text()')
price = result[0] if result else 'N/A'  # 加默认值

# 4. 如果 lxml 解析失败,换 BeautifulSoup
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'lxml')
price = soup.find('span', class_='price').get_text(strip=True)

Q4: 中文乱码问题

症状:保存的 CSV 打开后全是乱码

解决:

# 方案1:明确指定编码
async with aiofiles.open('data.csv', 'w', encoding='utf-8-sig') as f:
    # utf-8-sig 会加 BOM,Excel 能正确识别
    await f.write(content)

# 方案2:检测网页编码
import chardet
detected = chardet.detect(response_bytes)
encoding = detected['encoding']
text = response_bytes.decode(encoding)

Q5: 任务中途崩溃,如何恢复?

症状:爬了 3000 个商品后程序崩了,重新跑又从头开始

解决:使用断点续爬(前面已实现)

# 确保启用了 CheckpointManager
checkpoint = CheckpointManager()
await checkpoint.load()  # 加载进度

# 每批次保存
for batch in batches:
    results = await crawl_batch(batch)
    await checkpoint.save()  # 保存进度

1️⃣4️⃣ 进阶优化(锦上添花)

14.1 动态调整并发数

根据响应速度自动调整:

class AdaptiveLimiter:
    """自适应并发控制"""
    
    def __init__(self, min_concurrent=5, max_concurrent=50):
        self.min_concurrent = min_concurrent
        self.max_concurrent = max_concurrent
        self.current_concurrent = min_concurrent
        self.success_count = 0
        self.fail_count = 0
    
    def adjust(self, success: bool):
        """根据成功率调整并发数"""
        if success:
            self.success_count += 1
            self.fail_count = 0
            
            # 连续成功10次,提升并发
            if self.success_count >= 10:
                self.current_concurrent = min(
                    self.current_concurrent + 5,
                    self.max_concurrent
                )
                self.success_count = 0
                logger.info(f"⬆️ 并发数提升至: {self.current_concurrent}")
        else:
            self.fail_count += 1
            self.success_count = 0
            
            # 连续失败3次,降低并发
            if self.fail_count >= 3:
                self.current_concurrent = max(
                    self.current_concurrent - 5,
                    self.min_concurrent
                )
                self.fail_count = 0
                logger.warning(f"⬇️ 并发数降低至: {self.current_concurrent}")
    
    def get_semaphore(self):
        return asyncio.Semaphore(self.current_concurrent)

14.2 实时监控面板

from collections import defaultdict
import time

class CrawlerMonitor:
    """爬虫监控"""
    
    def __init__(self):
        self.stats = defaultdict(int)
        self.start_time = time.time()
    
    def record_success(self):
        self.stats['success'] += 1
    
    def record_fail(self, reason: str):
        self.stats['fail'] += 1
        self.stats[f'fail_{reason}'] += 1
    
    def print_stats(self):
        """打印统计信息"""
        elapsed = time.time() - self.start_time
        total = self.stats['success'] + self.stats['fail']
        
        print(f"\n{'='*50}")
        print(f"📊 爬虫统计 (运行时长: {elapsed:.0f}s)")
        print(f"✅ 成功: {self.stats['success']}")
        print(f"❌ 失败: {self.stats['fail']}")
        print(f"📈 成功率: {self.stats['success']/total*100:.1f}%")
        print(f"⚡ 速度: {total/elapsed:.2f} 个/秒")
        print(f"{'='*50}\n")

14.3 定时任务(cron)

import schedule
import time

def job():
    """定时任务函数"""
    print("🕐 开始定时爬取...")
    asyncio.run(main())

# 每天凌晨2点执行
schedule.every().day.at("02:00").do(job)

# 或者每6小时执行一次
schedule.every(6).hours.do(job)

while True:
    schedule.run_pending()
    time.sleep(60)

1️⃣5️⃣ 总结与延伸阅读

我们实现了什么?

通过这篇文章,你已经掌握了:

异步爬虫的核心原理asyncio + aiohttp 的协作机制
双重并发控制Semaphore(并发数)+ AsyncLimiter(QPS)
生产级错误处理:重试、指数退避、超时控制
断点续爬机制:程序崩溃后能无缝恢复
两套完整代码:GitHub API(简单)+ 电商爬虫(复杂)

代码性能对比

方案 1000个URL耗时 CPU占用 内存占用
requests 单线程 16分钟 5% 50MB
requests 多线程(10) 4分钟 35% 150MB
aiohttp 异步(并发50) 40秒 8% 80MB

下一步可以做什么?

  1. 升级到 Scrapy:如果项目规模超过 10 万条数据,Scrapy 的调度器和中间件会更香
  2. 使用 Playwright:遇到复杂 JS 渲染的网站(如淘宝),异步版 Playwright 是最佳选择
  3. 分布式爬虫:用 Redis 做任务队列,多台机器协同爬取(参考 Scrapy-Redis)
  4. 数据清洗流水线:爬下来的数据接入 Pandas → 去重 → 清洗 → 入库
  5. 反爬对抗进阶:学习 JS 逆向、字体反爬、验证码识别(但请遵守法律!)

推荐阅读

最后的话:异步爬虫是把双刃剑,性能强大但也容易"伤人伤己"。写这篇文章的初衷,是希望你在追求效率的同时,也能记住:尊重目标网站,合理控制频率,遵守法律法规。技术是中性的,但使用者的态度决定了它的价值。

如果这篇文章对你有帮助,欢迎点赞收藏!有问题也欢迎留言讨论 💬

🌟 文末

好啦~以上就是本期的全部内容啦!如果你在实践过程中遇到任何疑问,欢迎在评论区留言交流,我看到都会尽量回复~咱们下期见!

小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦~
三连就是对我写作道路上最好的鼓励与支持! ❤️🔥

✅ 专栏持续更新中|建议收藏 + 订阅

墙裂推荐订阅专栏 👉 《Python爬虫实战》,本专栏秉承着以“入门 → 进阶 → 工程化 → 项目落地”的路线持续更新,争取让每一期内容都做到:

✅ 讲得清楚(原理)|✅ 跑得起来(代码)|✅ 用得上(场景)|✅ 扛得住(工程化)

📣 想系统提升的小伙伴:强烈建议先订阅专栏 《Python爬虫实战》,再按目录大纲顺序学习,效率十倍上升~

✅ 互动征集

想让我把【某站点/某反爬/某验证码/某分布式方案】等写成某期实战?

评论区留言告诉我你的需求,我会优先安排实现(更新)哒~


⭐️ 若喜欢我,就请关注我叭~(更新不迷路)
⭐️ 若对你有用,就请点赞支持一下叭~(给我一点点动力)
⭐️ 若有疑问,就请评论留言告诉我叭~(我会补坑 & 更新迭代)


✅ 免责声明

本文爬虫思路、相关技术和代码仅用于学习参考,对阅读本文后的进行爬虫行为的用户本作者不承担任何法律责任。

使用或者参考本项目即表示您已阅读并同意以下条款:

  • 合法使用: 不得将本项目用于任何违法、违规或侵犯他人权益的行为,包括但不限于网络攻击、诈骗、绕过身份验证、未经授权的数据抓取等。
  • 风险自负: 任何因使用本项目而产生的法律责任、技术风险或经济损失,由使用者自行承担,项目作者不承担任何形式的责任。
  • 禁止滥用: 不得将本项目用于违法牟利、黑产活动或其他不当商业用途。
  • 使用或者参考本项目即视为同意上述条款,即 “谁使用,谁负责” 。如不同意,请立即停止使用并删除本项目。!!!
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐