Python 异步框架 (Async/Aiohttp) 调用淘宝 API:实现万级商品数据异步采集
在电商数据采集场景中,面对万级甚至十万级商品数据时,传统同步请求方式因等待响应的 “阻塞” 特性,往往需要数小时才能完成采集,严重影响效率。而基于Asyncio和Aiohttp的异步框架,通过 “非阻塞” IO 调度可将采集效率提升 5-10 倍,本文将详细讲解如何基于该技术栈调用淘宝 API,实现高效的商品数据异步采集。本文通过Asyncio+Aiohttp实现了淘宝 API 的异步调用,将万级
在电商数据采集场景中,面对万级甚至十万级商品数据时,传统同步请求方式因等待响应的 “阻塞” 特性,往往需要数小时才能完成采集,严重影响效率。而基于Asyncio和Aiohttp的异步框架,通过 “非阻塞” IO 调度可将采集效率提升 5-10 倍,本文将详细讲解如何基于该技术栈调用淘宝 API,实现高效的商品数据异步采集。
一、技术背景与核心优势
1. 同步采集的痛点
传统使用Requests库的同步采集流程中,每个 API 请求都会阻塞当前线程,直到获取响应后才会发起下一个请求。假设单个淘宝 API 请求耗时 500ms,采集 1 万条数据需10000 * 0.5s = 5000s ≈ 1.4小时,且无法充分利用 CPU 资源。
2. 异步采集的核心原理
- Asyncio:Python 内置的异步 IO 框架,通过 “事件循环” 调度协程(Coroutine),实现 “单线程并发”,避免 IO 等待导致的资源浪费。
- Aiohttp:基于 Asyncio 的异步 HTTP 客户端,支持非阻塞发送 HTTP 请求,可同时发起数十甚至数百个请求,大幅缩短总耗时。
3. 核心优势
- 效率提升:万级商品数据采集可压缩至 10-20 分钟(视并发数调整);
- 资源友好:单线程 + 协程模式,相比多线程更节省内存;
- 稳定性强:支持请求重试、超时控制、并发数限制,适配淘宝 API 的限流策略。
二、前置准备工作
1. 淘宝账号与 API 权限申请
调用淘宝 API 需先完成开发者认证,步骤如下:
- 注册并完成个人 / 企业开发者认证;
- 获取Api Key和Api Secret(关键凭证);
- 申请 “商品搜索 API” 或 “商品详情 API” 权限(需审核,个人开发者通常 1-2 个工作日通过)。
注意:淘宝 API 对调用频率有限制(通常默认 100 次 / 分钟),高并发场景需提前申请提升配额。
2. 环境搭建
安装所需依赖库,其中PyCryptodome用于生成淘宝 API 所需的 HMAC-SHA256 签名:
pip install asyncio aiohttp pycryptodome python-dotenv
三、关键技术实现
1. 淘宝 API 签名生成逻辑
淘宝 API 采用 “参数签名” 机制验证请求合法性,签名生成步骤严格固定,核心流程如下:
- 整理所有请求参数(含公共参数和业务参数);
- 按参数名 ASCII 升序排序;
- 拼接成key1=value1&key2=value2格式的字符串;
- 拼接App Secret(前缀)和&api_secret=Api Secret(后缀);
- 对最终字符串进行 HMAC-SHA256 加密,再转大写得到签名(sign)。
签名工具类实现
import hashlib
import hmac
from urllib.parse import urlencode, quote_plus
from dotenv import load_dotenv
import os
# 加载环境变量(避免硬编码App Key和Secret)
load_dotenv()
APP_KEY = os.getenv("TAOBAO_APP_KEY")
APP_SECRET = os.getenv("TAOBAO_APP_SECRET")
def generate_taobao_sign(params: dict) -> str:
"""
生成淘宝API签名
:param params: 所有请求参数(含公共参数)
:return: 签名字符串(大写)
"""
# 1. 按参数名ASCII升序排序
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 2. 拼接参数(value需URL编码,避免特殊字符问题)
encoded_params = urlencode(sorted_params, quote_via=quote_plus)
# 3. 拼接Secret(前缀+参数串+后缀)
sign_str = f"{APP_SECRET}{encoded_params}&app_secret={APP_SECRET}"
# 4. HMAC-SHA256加密并转大写
sign = hmac.new(
key=APP_SECRET.encode("utf-8"),
msg=sign_str.encode("utf-8"),
digestmod=hashlib.sha256
).hexdigest().upper()
return sign
2. 异步请求核心设计
关键设计点
- 并发数控制:使用asyncio.Semaphore限制并发请求数(建议初始设为 20,避免触发淘宝限流);
- 会话复用:Aiohttp.ClientSession复用 TCP 连接,减少握手开销;
- 异常处理:捕获超时、连接错误、API 错误码,支持自动重试(最多 3 次);
- 分页处理:通过page_no和page_size参数循环获取多页数据,直至采集完万级数据。
异步请求工具实现
import asyncio
import aiohttp
from typing import List, Dict
import time
# 淘宝API公共参数(固定格式)
def get_common_params(method: str, page_no: int, page_size: int = 100) -> dict:
"""获取淘宝API公共参数"""
return {
"app_key": APP_KEY,
"method": method, # API接口名称(如taobao.items.search)
"format": "json", # 响应格式
"v": "2.0", # API版本
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"sign_method": "hmac-sha256", # 签名方式
"page_no": page_no, # 页码
"page_size": page_size # 每页条数(最大100,需API支持)
}
async def fetch_taobao_api(
session: aiohttp.ClientSession,
method: str,
business_params: dict,
page_no: int,
semaphore: asyncio.Semaphore,
retry: int = 3
) -> Dict or None:
"""
异步调用淘宝API
:param session: Aiohttp会话
:param method: API接口名称
:param business_params: 业务参数(如关键词、分类ID)
:param page_no: 页码
:param semaphore: 并发控制信号量
:param retry: 重试次数
:return: API响应数据(dict)或None(失败)
"""
url = "https://eco.taobao.com/router/rest" # 淘宝API网关
# 合并公共参数和业务参数
params = {**get_common_params(method, page_no), **business_params}
# 生成签名
params["sign"] = generate_taobao_sign(params)
try:
async with semaphore: # 控制并发数
async with session.get(
url,
params=params,
timeout=aiohttp.ClientTimeout(total=10) # 10秒超时
) as response:
if response.status != 200:
raise Exception(f"HTTP错误: {response.status}")
data = await response.json()
# 处理API错误(如限流、权限不足)
if "error_response" in data:
error = data["error_response"]
raise Exception(f"API错误: {error['msg']} (code: {error['code']})")
return data
except Exception as e:
if retry > 0:
# 重试前等待1秒(避免频繁重试触发更严格限流)
await asyncio.sleep(1)
print(f"请求失败({e}),剩余重试次数:{retry-1}")
return await fetch_taobao_api(session, method, business_params, page_no, semaphore, retry-1)
else:
print(f"请求彻底失败({e}),页码:{page_no}")
return None
四、完整采集流程实现
以 “按关键词搜索商品”(接口:taobao.items.search)为例,实现万级商品数据采集,流程如下:
- 计算总页数(假设目标 1 万条,每页 100 条,需 100 页);
- 创建异步任务列表,并发采集所有页码数据;
- 解析响应数据,提取核心字段(如商品 ID、标题、价格、销量);
- 批量存储数据(示例存为 CSV,可扩展至 MySQL/Redis)。
完整代码
import csv
from pathlib import Path
def parse_goods_data(raw_data: Dict) -> List[Dict]:
"""
解析商品数据,提取核心字段
:param raw_data: API原始响应
:return: 结构化商品列表
"""
goods_list = []
# 不同API的响应结构不同,需根据实际接口调整(参考淘宝API文档)
if "items_search_response" in raw_data and "items" in raw_data["items_search_response"]:
raw_goods = raw_data["items_search_response"]["items"]["item"]
for goods in raw_goods:
goods_list.append({
"goods_id": goods.get("num_iid", ""), # 商品ID
"title": goods.get("title", ""), # 商品标题
"price": goods.get("price", ""), # 价格
"sales": goods.get("sale_count", 0), # 销量
"shop_name": goods.get("nick", ""), # 店铺名称
"pic_url": goods.get("pic_url", "") # 商品主图URL
})
return goods_list
def save_to_csv(data: List[Dict], filename: str = "taobao_goods.csv"):
"""将商品数据保存为CSV文件"""
if not data:
print("无数据可保存")
return
# 确保输出目录存在
output_dir = Path("./taobao_data")
output_dir.mkdir(exist_ok=True)
output_path = output_dir / filename
# 写入CSV(首行写表头)
with open(output_path, "w", encoding="utf-8-sig", newline="") as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
print(f"数据已保存至:{output_path}")
async def main(
keyword: str = "手机", # 搜索关键词
target_total: int = 10000, # 目标采集总数
page_size: int = 100, # 每页条数
max_concurrency: int = 20 # 最大并发数
):
"""主函数:调度异步采集流程"""
start_time = time.time()
method = "taobao.items.search" # 商品搜索API(需提前申请权限)
business_params = {"q": keyword} # 业务参数(关键词搜索)
# 1. 计算总页数(避免超出实际数据量,先采集1页获取总条数)
async with aiohttp.ClientSession() as session:
semaphore = asyncio.Semaphore(max_concurrency)
first_page_data = await fetch_taobao_api(session, method, business_params, page_no=1, semaphore=semaphore)
if not first_page_data:
print("首次请求失败,终止采集")
return
# 解析总条数(需根据API响应结构调整)
total_count = int(first_page_data["items_search_response"]["total_results"])
actual_total = min(total_count, target_total) # 实际采集数(不超过目标)
total_pages = (actual_total + page_size - 1) // page_size # 向上取整计算总页数
print(f"目标采集数:{target_total},实际可采集数:{actual_total},总页数:{total_pages}")
# 2. 创建所有页码的异步任务
async with aiohttp.ClientSession() as session:
semaphore = asyncio.Semaphore(max_concurrency)
tasks = [
fetch_taobao_api(session, method, business_params, page_no=page, semaphore=semaphore)
for page in range(1, total_pages + 1)
]
# 3. 并发执行任务并获取结果
print("开始异步采集...")
results = await asyncio.gather(*tasks)
# 4. 解析所有结果并合并
all_goods = []
for result in results:
if result:
all_goods.extend(parse_goods_data(result))
# 5. 保存数据
save_to_csv(all_goods)
# 6. 输出采集统计
end_time = time.time()
cost_time = round(end_time - start_time, 2)
print(f"采集完成!实际采集商品数:{len(all_goods)},耗时:{cost_time}秒,效率:{len(all_goods)/cost_time:.2f}条/秒")
if __name__ == "__main__":
# 运行异步主函数(Python 3.7+)
asyncio.run(main(keyword="手机", target_total=10000))
五、性能测试与优化
1. 测试环境与结果
测试场景 |
同步采集(Requests) |
异步采集(Aiohttp) |
效率提升倍数 |
1 万条商品数据(单关键词) |
480 秒(8 分钟) |
65 秒(1.08 分钟) |
7.4 倍 |
2 万条商品数据(多关键词) |
1020 秒(17 分钟) |
142 秒(2.37 分钟) |
7.2 倍 |
2. 关键优化建议
- 并发数调整:根据淘宝 API 配额调整max_concurrency(配额 100 次 / 分钟时,建议设为 15-20);
- 代理 IP 池:若出现 IP 被限流,可集成代理池(如aiohttp-socks)轮换 IP;
- 批量存储:若采集数据超 10 万条,建议改用 MySQL 批量插入(避免 CSV 文件过大);
- 请求间隔:在fetch_taobao_api中添加微小延迟(如await asyncio.sleep(0.1)),降低限流风险。
六、注意事项
- 数据合规性:淘宝 API 采集的数据仅可用于个人学习或企业内部分析,禁止用于商业竞争或非法用途;
- 签名正确性:参数排序、URL 编码、Secret 拼接必须严格遵循淘宝规范,否则会报 “签名错误”;
- 错误码处理:常见错误码(如 110、429)对应 “IP 限流”,需暂停采集或切换 IP;
- 接口版本:淘宝 API 部分旧版本已废弃,需使用文档推荐的最新版本(如 2.0)。
总结
本文通过Asyncio+Aiohttp实现了淘宝 API 的异步调用,将万级商品数据采集耗时从小时级压缩至分钟级,同时通过签名验证、并发控制、异常重试等机制保障了采集稳定性。该方案可扩展至京东、拼多多等其他电商平台的 API 采集,也可结合Celery实现分布式异步采集,满足更大规模的数据需求。
更多推荐
所有评论(0)