在电商数据采集场景中,面对万级甚至十万级商品数据时,传统同步请求方式因等待响应的 “阻塞” 特性,往往需要数小时才能完成采集,严重影响效率。而基于Asyncio和Aiohttp的异步框架,通过 “非阻塞” IO 调度可将采集效率提升 5-10 倍,本文将详细讲解如何基于该技术栈调用淘宝 API,实现高效的商品数据异步采集。​

一、技术背景与核心优势​

1. 同步采集的痛点​

传统使用Requests库的同步采集流程中,每个 API 请求都会阻塞当前线程,直到获取响应后才会发起下一个请求。假设单个淘宝 API 请求耗时 500ms,采集 1 万条数据需10000 * 0.5s = 5000s ≈ 1.4小时,且无法充分利用 CPU 资源。​

2. 异步采集的核心原理​

  • Asyncio:Python 内置的异步 IO 框架,通过 “事件循环” 调度协程(Coroutine),实现 “单线程并发”,避免 IO 等待导致的资源浪费。​
  • Aiohttp:基于 Asyncio 的异步 HTTP 客户端,支持非阻塞发送 HTTP 请求,可同时发起数十甚至数百个请求,大幅缩短总耗时。​

3. 核心优势​

  • 效率提升:万级商品数据采集可压缩至 10-20 分钟(视并发数调整);​
  • 资源友好:单线程 + 协程模式,相比多线程更节省内存;​
  • 稳定性强:支持请求重试、超时控制、并发数限制,适配淘宝 API 的限流策略。​

二、前置准备工作​

1. 淘宝账号与 API 权限申请​

调用淘宝 API 需先完成开发者认证,步骤如下:​

  1. 注册并完成个人 / 企业开发者认证;​
  2. 获取Api Key和Api Secret(关键凭证);​
  3. 申请 “商品搜索 API” 或 “商品详情 API” 权限(需审核,个人开发者通常 1-2 个工作日通过)。​

注意:淘宝 API 对调用频率有限制(通常默认 100 次 / 分钟),高并发场景需提前申请提升配额。​

2. 环境搭建​

安装所需依赖库,其中PyCryptodome用于生成淘宝 API 所需的 HMAC-SHA256 签名:

pip install asyncio aiohttp pycryptodome python-dotenv

三、关键技术实现​

1. 淘宝 API 签名生成逻辑​

淘宝 API 采用 “参数签名” 机制验证请求合法性,签名生成步骤严格固定,核心流程如下:​

  1. 整理所有请求参数(含公共参数和业务参数);​
  2. 按参数名 ASCII 升序排序;​
  3. 拼接成key1=value1&key2=value2格式的字符串;​
  4. 拼接App Secret(前缀)和&api_secret=Api Secret(后缀);​
  5. 对最终字符串进行 HMAC-SHA256 加密,再转大写得到签名(sign)。​

签名工具类实现

import hashlib
import hmac
from urllib.parse import urlencode, quote_plus
from dotenv import load_dotenv
import os

# 加载环境变量(避免硬编码App Key和Secret)
load_dotenv()
APP_KEY = os.getenv("TAOBAO_APP_KEY")
APP_SECRET = os.getenv("TAOBAO_APP_SECRET")

def generate_taobao_sign(params: dict) -> str:
    """
    生成淘宝API签名
    :param params: 所有请求参数(含公共参数)
    :return: 签名字符串(大写)
    """
    # 1. 按参数名ASCII升序排序
    sorted_params = sorted(params.items(), key=lambda x: x[0])
    # 2. 拼接参数(value需URL编码,避免特殊字符问题)
    encoded_params = urlencode(sorted_params, quote_via=quote_plus)
    # 3. 拼接Secret(前缀+参数串+后缀)
    sign_str = f"{APP_SECRET}{encoded_params}&app_secret={APP_SECRET}"
    # 4. HMAC-SHA256加密并转大写
    sign = hmac.new(
        key=APP_SECRET.encode("utf-8"),
        msg=sign_str.encode("utf-8"),
        digestmod=hashlib.sha256
    ).hexdigest().upper()
    return sign

2. 异步请求核心设计​

关键设计点​

  • 并发数控制:使用asyncio.Semaphore限制并发请求数(建议初始设为 20,避免触发淘宝限流);​
  • 会话复用:Aiohttp.ClientSession复用 TCP 连接,减少握手开销;​
  • 异常处理:捕获超时、连接错误、API 错误码,支持自动重试(最多 3 次);​
  • 分页处理:通过page_no和page_size参数循环获取多页数据,直至采集完万级数据。​

异步请求工具实现

import asyncio
import aiohttp
from typing import List, Dict
import time

# 淘宝API公共参数(固定格式)
def get_common_params(method: str, page_no: int, page_size: int = 100) -> dict:
    """获取淘宝API公共参数"""
    return {
        "app_key": APP_KEY,
        "method": method,  # API接口名称(如taobao.items.search)
        "format": "json",  # 响应格式
        "v": "2.0",  # API版本
        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
        "sign_method": "hmac-sha256",  # 签名方式
        "page_no": page_no,  # 页码
        "page_size": page_size  # 每页条数(最大100,需API支持)
    }

async def fetch_taobao_api(
    session: aiohttp.ClientSession,
    method: str,
    business_params: dict,
    page_no: int,
    semaphore: asyncio.Semaphore,
    retry: int = 3
) -> Dict or None:
    """
    异步调用淘宝API
    :param session: Aiohttp会话
    :param method: API接口名称
    :param business_params: 业务参数(如关键词、分类ID)
    :param page_no: 页码
    :param semaphore: 并发控制信号量
    :param retry: 重试次数
    :return: API响应数据(dict)或None(失败)
    """
    url = "https://eco.taobao.com/router/rest"  # 淘宝API网关
    # 合并公共参数和业务参数
    params = {**get_common_params(method, page_no), **business_params}
    # 生成签名
    params["sign"] = generate_taobao_sign(params)
    
    try:
        async with semaphore:  # 控制并发数
            async with session.get(
                url,
                params=params,
                timeout=aiohttp.ClientTimeout(total=10)  # 10秒超时
            ) as response:
                if response.status != 200:
                    raise Exception(f"HTTP错误: {response.status}")
                data = await response.json()
                
                # 处理API错误(如限流、权限不足)
                if "error_response" in data:
                    error = data["error_response"]
                    raise Exception(f"API错误: {error['msg']} (code: {error['code']})")
                
                return data
    
    except Exception as e:
        if retry > 0:
            # 重试前等待1秒(避免频繁重试触发更严格限流)
            await asyncio.sleep(1)
            print(f"请求失败({e}),剩余重试次数:{retry-1}")
            return await fetch_taobao_api(session, method, business_params, page_no, semaphore, retry-1)
        else:
            print(f"请求彻底失败({e}),页码:{page_no}")
            return None

四、完整采集流程实现​

以 “按关键词搜索商品”(接口:taobao.items.search)为例,实现万级商品数据采集,流程如下:​

  1. 计算总页数(假设目标 1 万条,每页 100 条,需 100 页);​
  2. 创建异步任务列表,并发采集所有页码数据;​
  3. 解析响应数据,提取核心字段(如商品 ID、标题、价格、销量);​
  4. 批量存储数据(示例存为 CSV,可扩展至 MySQL/Redis)。​

完整代码

import csv
from pathlib import Path

def parse_goods_data(raw_data: Dict) -> List[Dict]:
    """
    解析商品数据,提取核心字段
    :param raw_data: API原始响应
    :return: 结构化商品列表
    """
    goods_list = []
    # 不同API的响应结构不同,需根据实际接口调整(参考淘宝API文档)
    if "items_search_response" in raw_data and "items" in raw_data["items_search_response"]:
        raw_goods = raw_data["items_search_response"]["items"]["item"]
        for goods in raw_goods:
            goods_list.append({
                "goods_id": goods.get("num_iid", ""),  # 商品ID
                "title": goods.get("title", ""),       # 商品标题
                "price": goods.get("price", ""),       # 价格
                "sales": goods.get("sale_count", 0),   # 销量
                "shop_name": goods.get("nick", ""),    # 店铺名称
                "pic_url": goods.get("pic_url", "")    # 商品主图URL
            })
    return goods_list

def save_to_csv(data: List[Dict], filename: str = "taobao_goods.csv"):
    """将商品数据保存为CSV文件"""
    if not data:
        print("无数据可保存")
        return
    
    # 确保输出目录存在
    output_dir = Path("./taobao_data")
    output_dir.mkdir(exist_ok=True)
    output_path = output_dir / filename
    
    # 写入CSV(首行写表头)
    with open(output_path, "w", encoding="utf-8-sig", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=data[0].keys())
        writer.writeheader()
        writer.writerows(data)
    print(f"数据已保存至:{output_path}")

async def main(
    keyword: str = "手机",  # 搜索关键词
    target_total: int = 10000,  # 目标采集总数
    page_size: int = 100,       # 每页条数
    max_concurrency: int = 20   # 最大并发数
):
    """主函数:调度异步采集流程"""
    start_time = time.time()
    method = "taobao.items.search"  # 商品搜索API(需提前申请权限)
    business_params = {"q": keyword}  # 业务参数(关键词搜索)
    
    # 1. 计算总页数(避免超出实际数据量,先采集1页获取总条数)
    async with aiohttp.ClientSession() as session:
        semaphore = asyncio.Semaphore(max_concurrency)
        first_page_data = await fetch_taobao_api(session, method, business_params, page_no=1, semaphore=semaphore)
        if not first_page_data:
            print("首次请求失败,终止采集")
            return
        
        # 解析总条数(需根据API响应结构调整)
        total_count = int(first_page_data["items_search_response"]["total_results"])
        actual_total = min(total_count, target_total)  # 实际采集数(不超过目标)
        total_pages = (actual_total + page_size - 1) // page_size  # 向上取整计算总页数
        print(f"目标采集数:{target_total},实际可采集数:{actual_total},总页数:{total_pages}")
    
    # 2. 创建所有页码的异步任务
    async with aiohttp.ClientSession() as session:
        semaphore = asyncio.Semaphore(max_concurrency)
        tasks = [
            fetch_taobao_api(session, method, business_params, page_no=page, semaphore=semaphore)
            for page in range(1, total_pages + 1)
        ]
        
        # 3. 并发执行任务并获取结果
        print("开始异步采集...")
        results = await asyncio.gather(*tasks)
    
    # 4. 解析所有结果并合并
    all_goods = []
    for result in results:
        if result:
            all_goods.extend(parse_goods_data(result))
    
    # 5. 保存数据
    save_to_csv(all_goods)
    
    # 6. 输出采集统计
    end_time = time.time()
    cost_time = round(end_time - start_time, 2)
    print(f"采集完成!实际采集商品数:{len(all_goods)},耗时:{cost_time}秒,效率:{len(all_goods)/cost_time:.2f}条/秒")

if __name__ == "__main__":
    # 运行异步主函数(Python 3.7+)
    asyncio.run(main(keyword="手机", target_total=10000))

五、性能测试与优化​

1. 测试环境与结果​

测试场景​

同步采集(Requests)​

异步采集(Aiohttp)​

效率提升倍数​

1 万条商品数据(单关键词)​

480 秒(8 分钟)​

65 秒(1.08 分钟)​

7.4 倍​

2 万条商品数据(多关键词)​

1020 秒(17 分钟)​

142 秒(2.37 分钟)​

7.2 倍​

2. 关键优化建议​

  • 并发数调整:根据淘宝 API 配额调整max_concurrency(配额 100 次 / 分钟时,建议设为 15-20);​
  • 代理 IP 池:若出现 IP 被限流,可集成代理池(如aiohttp-socks)轮换 IP;​
  • 批量存储:若采集数据超 10 万条,建议改用 MySQL 批量插入(避免 CSV 文件过大);​
  • 请求间隔:在fetch_taobao_api中添加微小延迟(如await asyncio.sleep(0.1)),降低限流风险。​

六、注意事项​

  1. 数据合规性:淘宝 API 采集的数据仅可用于个人学习或企业内部分析,禁止用于商业竞争或非法用途;​
  2. 签名正确性:参数排序、URL 编码、Secret 拼接必须严格遵循淘宝规范,否则会报 “签名错误”;​
  3. 错误码处理:常见错误码(如 110、429)对应 “IP 限流”,需暂停采集或切换 IP;​
  4. 接口版本:淘宝 API 部分旧版本已废弃,需使用文档推荐的最新版本(如 2.0)。​

总结​

本文通过Asyncio+Aiohttp实现了淘宝 API 的异步调用,将万级商品数据采集耗时从小时级压缩至分钟级,同时通过签名验证、并发控制、异常重试等机制保障了采集稳定性。该方案可扩展至京东、拼多多等其他电商平台的 API 采集,也可结合Celery实现分布式异步采集,满足更大规模的数据需求。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐