Python 异步框架 (Async/Aiohttp) 调用淘宝 API:实现万级商品数据异步采集
代码中仅提取了商品的核心字段(如商品 ID、标题、售价等),若需采集更多字段(如商品属性、评价数、库存等),可参考淘宝 “taobao.item.search” 接口文档,在item_list的字典中添加对应字段(如"评价数": item.get("comment_count", ""))。运行代码后,控制台会输出每一页的采集状态(如 “页码 1 采集成功,获取 100 条商品数据”),采集完成后
在电商数据分析、竞品监控等业务场景中,需要高效采集大量商品数据。传统的同步采集方式受限于网络 IO 等待,采集效率极低,难以满足万级甚至十万级商品数据的采集需求。而 Python 的异步编程(基于 Asyncio)结合 Aiohttp 框架,能够通过并发处理网络请求大幅提升采集效率,完美解决大规模数据采集的性能瓶颈。本文将详细讲解如何使用 Async/Aiohttp 调用淘宝平台 API,实现万级商品数据的异步采集,并提供完整可运行的代码示例。
一、核心技术原理与优势
在开始实战前,我们需要先理解异步采集的核心优势,以及淘宝 API 调用的关键注意事项,为后续开发奠定基础。
1. 同步采集 vs 异步采集的效率差异
- 同步采集:每次请求必须等待前一个请求完成(包括网络连接、数据返回的 IO 等待)才能发起下一个请求。假设单个请求平均耗时 1 秒,采集 10000 条数据需要约 2.7 小时(10000 秒)。
- 异步采集:通过事件循环(Event Loop)管理请求,当一个请求处于 IO 等待时,立即切换到其他请求继续执行,无需等待。在合理的并发数下(如 100-200),采集 10000 条数据仅需 1-2 分钟,效率提升超 100 倍。
2. 淘宝 API 调用的关键前提
要调用淘宝 API,需先完成以下准备工作,否则会导致接口调用失败:
- 注册淘宝账号:访问注册开发者账号。
- 创建应用并获取密钥:获取Api Key和Api Secret(接口调用的身份凭证)。
- 申请 API 权限:本文使用 “taobao.item.search” 接口(商品搜索接口),需在应用中申请该接口的调用权限(个人开发者可申请 “测试权限”,支持少量数据采集;企业开发者可申请正式权限)。
- 获取 Access Token:部分接口需要 Access Token(用户授权凭证),若仅采集公开商品数据,可使用 “应用级 Access Token”(通过 App Key 和 App Secret 获取)。
二、环境搭建
1. 安装依赖库
本文需要用到以下库:
- asyncio:Python 内置的异步编程框架,用于管理事件循环和协程。
- aiohttp:异步 HTTP 客户端库,用于发起异步网络请求(替代同步的requests库)。
- pycryptodome:用于淘宝 API 签名(淘宝 API 要求请求参数需进行 HMAC-SHA256 签名)。
- pandas:用于数据处理和存储(将采集到的商品数据保存为 Excel 或 CSV 文件)。
通过 pip 安装依赖:
pip install aiohttp pycryptodome pandas
2. 淘宝 API 签名工具函数
淘宝 API 要求所有请求参数(包括公共参数和业务参数)需按 “参数名 ASCII 排序” 后,用 App Secret 进行 HMAC-SHA256 签名,生成sign参数(签名步骤参考平台文档)。
我们先实现签名工具函数,方便后续调用:
import hmac
import hashlib
import urllib.parse
def generate_taobao_sign(params: dict, app_secret: str) -> str:
"""
生成淘宝API请求的签名(HMAC-SHA256)
:param params: 请求参数(包含公共参数和业务参数)
:param app_secret: 应用的App Secret
:return: 签名字符串(小写)
"""
# 1. 按参数名ASCII升序排序
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 2. 拼接为“key=value”格式的字符串(无分隔符)
canonical_str = ''.join([f"{k}{v}" for k, v in sorted_params])
# 3. 在字符串前后添加App Secret
sign_str = app_secret + canonical_str + app_secret
# 4. HMAC-SHA256签名并转为小写
sign = hmac.new(
app_secret.encode('utf-8'),
sign_str.encode('utf-8'),
hashlib.sha256
).hexdigest().lower()
return sign
三、异步采集核心代码实现
1. 核心思路
- 构造请求参数:包含淘宝 API 的公共参数(如app_key、method、timestamp等)和业务参数(如keyword、page_no、page_size等)。
- 异步请求协程:定义fetch_taobao_items协程函数,负责单页商品数据的异步请求、响应解析。
- 并发控制:使用asyncio.Semaphore控制并发数(避免并发过高导致 IP 被封禁或接口限流)。
- 数据收集与存储:通过列表收集所有页的商品数据,最终用pandas保存为 Excel 文件。
2. 完整代码实现
import asyncio
import aiohttp
import time
import random
from datetime import datetime
import pandas as pd
from typing import List, Dict
# -------------------------- 配置参数(需根据自身情况修改) --------------------------
APP_KEY = "你的App Key" # 替换为你的App Key
APP_SECRET = "你的App Secret" # 替换为你的App Secret
ACCESS_TOKEN = "你的Access Token"# 替换为你的Access Token(若接口需要)
KEYWORD = "手机" # 搜索关键词(可替换为其他商品类别,如“连衣裙”“笔记本电脑”)
PAGE_SIZE = 100 # 每页商品数量(淘宝API最大支持100条/页)
TOTAL_PAGES = 100 # 总采集页数(100页 × 100条/页 = 10000条数据)
MAX_CONCURRENCY = 50 # 最大并发数(建议50-200,过高易被限流)
API_URL = "https://eco.taobao.com/router/rest" # 淘宝API网关地址
# -----------------------------------------------------------------------------------
async def fetch_taobao_items(
session: aiohttp.ClientSession,
semaphore: asyncio.Semaphore,
page_no: int
) -> List[Dict]:
"""
异步采集单页商品数据
:param session: aiohttp客户端会话(复用连接,提升效率)
:param semaphore: 信号量(控制并发数)
:param page_no: 当前页码
:return: 单页商品数据列表(若失败返回空列表)
"""
async with semaphore: # 控制并发
try:
# 1. 构造公共参数(淘宝API必填)
public_params = {
"app_key": APP_KEY,
"method": "taobao.item.search", # 商品搜索接口
"format": "json", # 响应格式
"v": "2.0", # API版本
"sign_method": "hmac-sha256", # 签名方式
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 时间戳
"access_token": ACCESS_TOKEN # 若接口需要,否则可删除
}
# 2. 构造业务参数(商品搜索条件)
business_params = {
"q": KEYWORD, # 搜索关键词
"page_no": page_no, # 当前页码
"page_size": PAGE_SIZE # 每页数量
# 可选参数:添加筛选条件(如价格区间、销量排序等)
# "start_price": 1000, # 最低价格
# "end_price": 5000, # 最高价格
# "sort": "sales_desc" # 按销量降序(默认按综合排序)
}
# 3. 合并参数并生成签名
all_params = {**public_params, **business_params}
all_params["sign"] = generate_taobao_sign(all_params, APP_SECRET)
# 4. 发起异步GET请求(添加随机延迟,避免被限流)
await asyncio.sleep(random.uniform(0.1, 0.5)) # 0.1-0.5秒随机延迟
async with session.get(API_URL, params=all_params) as response:
if response.status != 200:
print(f"页码{page_no}请求失败,HTTP状态码:{response.status}")
return []
# 5. 解析响应数据
result = await response.json()
# 检查API返回是否成功(淘宝API错误码参考:https://open.taobao.com/doc.htm?docId=101370&docType=1)
if "error_response" in result:
error_msg = result["error_response"]["msg"]
print(f"页码{page_no}API调用失败:{error_msg}")
return []
# 6. 提取商品核心字段(根据需求调整字段)
items = result["item_search_response"]["items"]["item"]
item_list = []
for item in items:
item_list.append({
"商品ID": item.get("num_iid", ""),
"商品标题": item.get("title", "").replace("<span class='H'>", "").replace("</span>", ""), # 去除标题中的高亮标签
"售价": item.get("price", ""),
"销量": item.get("sales", ""),
"卖家昵称": item.get("nick", ""),
"店铺名称": item.get("shop_name", ""),
"商品链接": item.get("detail_url", ""),
"图片链接": item.get("pic_url", "")
})
print(f"页码{page_no}采集成功,获取{len(item_list)}条商品数据")
return item_list
except Exception as e:
print(f"页码{page_no}采集异常:{str(e)}")
return []
async def main():
"""
主函数:创建异步会话、调度协程、收集数据并存储
"""
start_time = time.time()
all_items = [] # 存储所有商品数据
# 1. 创建aiohttp客户端会话(复用TCP连接,提升效率)
timeout = aiohttp.ClientTimeout(total=30) # 超时时间30秒
async with aiohttp.ClientSession(timeout=timeout) as session:
# 2. 创建信号量(控制并发数)
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
# 3. 生成所有页码的协程任务
tasks = [
fetch_taobao_items(session, semaphore, page_no)
for page_no in range(1, TOTAL_PAGES + 1)
]
# 4. 并发执行所有任务并收集结果
results = await asyncio.gather(*tasks)
# 5. 合并所有页的商品数据
for page_items in results:
if page_items:
all_items.extend(page_items)
# 6. 数据存储(保存为Excel文件)
if all_items:
df = pd.DataFrame(all_items)
output_file = f"淘宝商品数据_{KEYWORD}_{datetime.now().strftime('%Y%m%d%H%M%S')}.xlsx"
df.to_excel(output_file, index=False, engine="openpyxl")
print(f"\n采集完成!共获取{len(all_items)}条商品数据,已保存至:{output_file}")
else:
print("\n采集失败,未获取到任何商品数据")
# 7. 输出采集耗时
end_time = time.time()
print(f"总耗时:{round(end_time - start_time, 2)}秒")
if __name__ == "__main__":
# 运行异步主函数(Python 3.7+支持asyncio.run())
asyncio.run(main())
四、代码说明与注意事项
1. 关键参数配置
- APP_KEY/APP_SECRET/ACCESS_TOKEN:必须替换为你在淘宝平台申请的真实凭证,否则接口调用会提示 “身份验证失败”。
- KEYWORD:搜索关键词,可根据需求修改(如 “运动鞋”“家电” 等)。
- PAGE_SIZE:淘宝 API 限制最大为 100 条 / 页,无需修改( smaller 页码会增加请求次数,降低效率)。
- TOTAL_PAGES:总采集页数,100 页对应 10000 条数据(若需采集更多数据,需申请更大的接口调用配额)。
- MAX_CONCURRENCY:并发数建议设置为 50-200(过低效率低,过高易触发淘宝 API 的限流机制,导致 IP 被临时封禁)。
2. 淘宝 API 限流与反爬注意事项
淘宝平台对 API 调用有严格的限流策略(如个人开发者测试权限可能限制 “100 次 / 小时”),若需大规模采集,需注意:
- 控制并发数:通过MAX_CONCURRENCY参数限制并发,避免短时间内发起大量请求。
- 添加随机延迟:代码中已通过random.uniform(0.1, 0.5)添加随机延迟,模拟人工操作,降低被限流概率。
- 使用代理 IP:若采集量极大(如超 10 万条),建议使用代理 IP 池(如aiohttp-socks库),避免单个 IP 被封禁。
- 查看接口配额:在淘宝平台 “应用管理” 中查看接口调用配额,避免超出配额导致调用失败。
3. 数据字段扩展
代码中仅提取了商品的核心字段(如商品 ID、标题、售价等),若需采集更多字段(如商品属性、评价数、库存等),可参考淘宝 “taobao.item.search” 接口文档,在item_list的字典中添加对应字段(如"评价数": item.get("comment_count", ""))。
五、效果验证与性能优化
1. 采集效果验证
运行代码后,控制台会输出每一页的采集状态(如 “页码 1 采集成功,获取 100 条商品数据”),采集完成后会生成 Excel 文件,打开文件可查看结构化的商品数据(包含商品 ID、标题、售价等字段)。
以 “手机” 为关键词,采集 100 页(10000 条数据)的耗时约为 30-60 秒(取决于网络速度和并发数),远快于同步采集的 2.7 小时。
2. 性能优化方向
- 连接复用:代码中使用aiohttp.ClientSession复用 TCP 连接,避免每次请求都建立新连接,提升效率。
- 批量存储:若采集数据量超 10 万条,建议使用数据库(如 MySQL、MongoDB)替代 Excel,通过批量插入(如pandas.to_sql)提升存储效率。
- 任务拆分:若需采集多个关键词(如 “手机”“电脑”“服装”),可将不同关键词的采集任务拆分为独立协程,进一步提升并发效率。
- 异常重试:在fetch_taobao_items函数中添加异常重试机制(如使用tenacity库),对临时失败的请求进行重试,提高采集成功率。
六、总结
本文通过 Asyncio+Aiohttp 实现了淘宝 API 的异步调用,成功解决了万级商品数据采集的效率问题。核心亮点包括:
- 异步并发:通过事件循环和信号量,实现高效的并发请求,大幅降低采集耗时。
- 签名合规:严格按照淘宝 API 要求实现 HMAC-SHA256 签名,确保接口调用合法。
- 可扩展性:代码结构清晰,支持字段扩展、代理 IP 集成、数据库存储等二次开发需求。
若你需要进一步扩展功能(如添加商品详情页采集、实时数据监控等),可基于本文代码进行迭代。同时,需注意遵守淘宝平台的使用规范,避免过度采集导致账号或 IP 被封禁。
更多推荐
所有评论(0)