Serverless架构:Python+AI爬虫实现无服务器弹性采集
核心优势:Serverless架构实现爬虫的弹性扩缩容、按需付费、免运维,解决传统爬虫资源浪费、弹性不足的痛点;AI赋能:集成AI反爬识别、动态UA适配、敏感数据脱敏,提升爬虫成功率和合规性;低成本:按实际执行计费,小型爬虫月成本仅几十元,大幅降低运维和硬件成本;易扩展:通过消息队列触发可支持百万级URL并发爬取,无需手动扩容服务器;高可用:基于云厂商Serverless平台的多可用区部署,故障率
·
一、引言:传统爬虫架构的核心痛点
传统爬虫部署在物理机/虚拟机/容器中,存在三大核心问题:
- 资源浪费:爬虫负载具有明显的波峰波谷(如电商爬虫在促销期流量暴增,日常流量极低),固定部署的服务器在低负载时资源利用率不足10%;
- 弹性不足:突发高负载时(如需要爬取10万+页面),手动扩容服务器耗时且易出错,易导致爬虫任务积压;
- 运维成本高:需要维护服务器集群、监控系统、反爬策略适配,小型团队难以承担;
- 成本可控性差:服务器租赁按年/月付费,即使闲置也需支付费用。
Serverless(无服务器)架构的按需付费、自动弹性扩缩容、免运维特性,完美解决了传统爬虫的痛点。本文将分享一套Python+AI+Serverless的弹性爬虫方案,基于阿里云FC/腾讯云SCF/AWS Lambda实现“按需触发、弹性采集、AI智能反爬适配、成本可控”的无服务器爬虫体系。
二、方案设计:Serverless AI爬虫架构
2.1 核心目标
- 弹性采集:根据爬取任务量自动扩缩容,从1个并发到1000个并发无缝切换;
- 按需付费:仅为实际执行的爬虫代码计费(毫秒级粒度),闲置时零成本;
- 免运维:无需管理服务器、配置环境,聚焦爬虫业务逻辑;
- AI智能:集成AI反爬识别、动态UA/IP适配、敏感数据脱敏;
- 高可用:基于云厂商Serverless平台的多可用区部署,故障率接近零。
2.2 整体架构
2.3 技术选型
| 模块 | 技术选型 | 选型理由 |
|---|---|---|
| Serverless运行时 | 阿里云FC(Function Compute) | 国内生态完善,Python支持好,弹性扩缩容能力强 |
| 任务触发 | 阿里云EventBridge + MQTT | 支持定时触发、API触发、消息队列触发,适配不同爬虫场景 |
| AI反爬识别 | 通义千问API + OpenCV | 识别验证码、滑块验证、动态反爬规则,自动适配 |
| 弹性采集 | requests + aiohttp + 代理池 | 异步爬虫提升并发,代理池解决IP封禁问题 |
| 数据存储 | 阿里云OSS + 表格存储OTS | 低成本存储爬取结果,支持海量数据读写 |
| 监控告警 | 阿里云CloudMonitor + 日志服务SLS | 实时监控爬虫执行状态、成本、错误率,支持告警 |
| 依赖管理 | Serverless Devs + 层(Layer) | 统一管理Python依赖包,避免重复打包 |
三、环境搭建(阿里云FC为例)
3.1 前置准备
- 注册阿里云账号,开通Function Compute、OSS、日志服务SLS;
- 安装Serverless Devs工具(简化FC开发部署):
npm install -g @serverless-devs/s
s config add # 配置阿里云AccessKey
- 准备Python依赖包(如requests、aiohttp、beautifulsoup4):
mkdir layer && cd layer
pip install requests aiohttp beautifulsoup4 python-dotenv -t ./python
zip -r python.zip python/
3.2 项目结构
serverless-ai-crawler/
├── code/
│ ├── main.py # 爬虫核心代码
│ ├── ai_anti_crawl.py # AI反爬处理
│ ├── utils.py # 工具函数
│ └── .env # 配置文件
├── layer/
│ └── python.zip # 依赖包层
└── s.yaml # Serverless Devs部署配置
四、核心代码实现
4.1 配置文件(.env)
# 爬虫配置
CRAWL_TIMEOUT=15
MAX_CONCURRENT=100
PROXY_POOL_URL="https://proxy.example.com/get"
# AI反爬配置
LLM_API_KEY="你的通义千问API Key"
LLM_MODEL="qwen-plus"
CAPTCHA_API_URL="https://captcha.example.com/recognize"
# 存储配置
OSS_ENDPOINT="oss-cn-beijing.aliyuncs.com"
OSS_BUCKET="serverless-crawler-result"
OSS_ACCESS_KEY="你的OSS AccessKey"
OSS_SECRET_KEY="你的OSS SecretKey"
# 日志配置
LOG_LEVEL="INFO"
4.2 AI反爬处理模块(ai_anti_crawl.py)
核心解决验证码识别、动态反爬规则适配问题:
# -*- coding: utf-8 -*-
import os
import requests
import json
from dotenv import load_dotenv
from dashscope import Generation
load_dotenv()
class AIAntiCrawl:
"""AI反爬处理:验证码识别、反爬规则适配"""
def __init__(self):
self.llm_api_key = os.getenv("LLM_API_KEY")
self.llm_model = os.getenv("LLM_MODEL")
self.captcha_api_url = os.getenv("CAPTCHA_API_URL")
def recognize_captcha(self, captcha_image_bytes: bytes) -> str:
"""AI识别验证码"""
try:
# 调用验证码识别API(可替换为本地模型)
files = {"image": ("captcha.jpg", captcha_image_bytes)}
response = requests.post(self.captcha_api_url, files=files, timeout=10)
result = response.json()
if result["code"] == 200:
return result["data"]["text"]
else:
raise Exception(f"验证码识别失败:{result['msg']}")
except Exception as e:
print(f"验证码识别异常:{str(e)}")
return ""
def analyze_anti_crawl_rule(self, html_content: str) -> dict:
"""LLM分析反爬规则,生成适配策略"""
prompt = f"""
你是反爬规则分析专家,请分析以下网页内容中的反爬机制,并给出对应的爬虫适配策略:
1. 识别是否有验证码、滑块验证、JS加密、IP封禁、UA检测等反爬手段;
2. 针对每种反爬手段给出具体的Python爬虫适配代码片段;
3. 输出格式为JSON,包含anti_crawl_type(反爬类型)、strategy(适配策略)、code_snippet(代码片段)。
网页内容:
{html_content[:2000]}
"""
try:
response = Generation.call(
model=self.llm_model,
api_key=self.llm_api_key,
messages=[{"role": "user", "content": prompt}],
result_format="json",
temperature=0.1
)
return json.loads(response.output.choices[0].message.content)
except Exception as e:
print(f"反爬规则分析异常:{str(e)}")
return {
"anti_crawl_type": "unknown",
"strategy": "默认策略",
"code_snippet": "使用随机UA+代理IP"
}
def get_dynamic_headers(self) -> dict:
"""生成动态请求头(对抗UA检测)"""
# AI生成多样化UA(模拟不同浏览器/设备)
prompt = """生成10个不同的浏览器User-Agent,包含Chrome、Firefox、Safari,覆盖不同版本和操作系统,仅返回JSON数组"""
try:
response = Generation.call(
model=self.llm_model,
api_key=self.llm_api_key,
messages=[{"role": "user", "content": prompt}],
result_format="json",
temperature=0.5
)
uas = json.loads(response.output.choices[0].message.content)
import random
return {
"User-Agent": random.choice(uas),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Referer": "https://www.baidu.com",
"Connection": "keep-alive"
}
except Exception as e:
print(f"生成动态UA异常:{str(e)}")
# 兜底UA
return {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"
}
4.3 Serverless爬虫核心代码(main.py)
适配阿里云FC运行时,实现弹性采集:
# -*- coding: utf-8 -*-
import os
import json
import time
import asyncio
import aiohttp
from dotenv import load_dotenv
from bs4 import BeautifulSoup
import oss2
from ai_anti_crawl import AIAntiCrawl
from loguru import logger
# 加载配置
load_dotenv()
# 初始化AI反爬实例
ai_anti_crawl = AIAntiCrawl()
# 初始化OSS存储
auth = oss2.Auth(os.getenv("OSS_ACCESS_KEY"), os.getenv("OSS_SECRET_KEY"))
bucket = oss2.Bucket(auth, os.getenv("OSS_ENDPOINT"), os.getenv("OSS_BUCKET"))
# 配置日志
logger.add(
lambda msg: print(msg, end=""),
level=os.getenv("LOG_LEVEL"),
format="{time} {level} {message}"
)
async def get_proxy():
"""获取代理IP(对抗IP封禁)"""
try:
response = requests.get(os.getenv("PROXY_POOL_URL"), timeout=5)
result = response.json()
if result["code"] == 200:
return f"http://{result['data']['ip']}:{result['data']['port']}"
else:
return ""
except Exception as e:
logger.error(f"获取代理IP失败:{str(e)}")
return ""
async def crawl_single_url(session, url: str) -> dict:
"""单URL异步爬取"""
try:
# 1. 准备请求参数
headers = ai_anti_crawl.get_dynamic_headers()
proxy = await get_proxy()
proxies = {"http": proxy, "https": proxy} if proxy else None
# 2. 发送请求
async with session.get(
url,
headers=headers,
proxies=proxies,
timeout=int(os.getenv("CRAWL_TIMEOUT"))
) as response:
html = await response.text()
status_code = response.status
# 3. AI分析反爬规则(首次失败时触发)
if status_code != 200 or "captcha" in html.lower():
anti_crawl_rule = ai_anti_crawl.analyze_anti_crawl_rule(html)
logger.warning(f"检测到反爬规则:{anti_crawl_rule['anti_crawl_type']},适配策略:{anti_crawl_rule['strategy']}")
return {
"url": url,
"status": "anti_crawl",
"anti_crawl_type": anti_crawl_rule["anti_crawl_type"],
"strategy": anti_crawl_rule["strategy"],
"crawl_time": time.strftime("%Y-%m-%d %H:%M:%S")
}
# 4. 解析内容
soup = BeautifulSoup(html, "lxml")
title = soup.title.string if soup.title else ""
content = soup.get_text(strip=True)[:1000] # 截断长文本
# 5. 敏感数据脱敏(AI驱动)
from compliance_crawler import DataDesensitizer
desensitizer = DataDesensitizer()
sensitive_entities = ai_anti_crawl.recognize_sensitive_data(content)
desensitized_content = desensitizer.desensitize(content, sensitive_entities)
logger.info(f"成功爬取:{url},标题:{title[:50]}")
return {
"url": url,
"status": "success",
"status_code": status_code,
"title": title,
"content": desensitized_content,
"sensitive_count": len(sensitive_entities),
"crawl_time": time.strftime("%Y-%m-%d %H:%M:%S"),
"proxy_used": proxy is not None
}
except Exception as e:
logger.error(f"爬取失败:{url},错误:{str(e)}")
return {
"url": url,
"status": "failed",
"error": str(e),
"crawl_time": time.strftime("%Y-%m-%d %H:%M:%S")
}
async def batch_crawl(urls: list) -> list:
"""批量异步爬取"""
# 限制并发数(避免触发反爬)
semaphore = asyncio.Semaphore(int(os.getenv("MAX_CONCURRENT")))
async def bounded_crawl(url):
async with semaphore:
async with aiohttp.ClientSession() as session:
return await crawl_single_url(session, url)
tasks = [bounded_crawl(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
def save_to_oss(results: list, task_id: str):
"""保存爬取结果到OSS"""
try:
# 生成文件名
file_name = f"crawl_result/{task_id}_{time.strftime('%Y%m%d%H%M%S')}.json"
# 写入OSS
bucket.put_object(file_name, json.dumps(results, ensure_ascii=False, indent=2))
logger.info(f"爬取结果已保存到OSS:{file_name}")
return file_name
except Exception as e:
logger.error(f"保存到OSS失败:{str(e)}")
return ""
def handler(event, context):
"""
Serverless入口函数(阿里云FC)
:param event: 触发事件(包含爬取任务)
:param context: 运行时上下文
:return: 爬取结果
"""
try:
# 解析触发事件
event_data = json.loads(event.decode("utf-8")) if isinstance(event, bytes) else event
urls = event_data.get("urls", [])
task_id = event_data.get("task_id", f"task_{int(time.time())}")
if not urls:
return {
"code": 400,
"message": "爬取URL列表为空",
"task_id": task_id
}
logger.info(f"开始执行爬取任务:{task_id},URL数量:{len(urls)}")
# 执行批量爬取
loop = asyncio.get_event_loop()
crawl_results = loop.run_until_complete(batch_crawl(urls))
# 保存结果到OSS
oss_file = save_to_oss(crawl_results, task_id)
# 统计结果
success_count = len([r for r in crawl_results if r["status"] == "success"])
failed_count = len([r for r in crawl_results if r["status"] == "failed"])
anti_crawl_count = len([r for r in crawl_results if r["status"] == "anti_crawl"])
logger.info(f"爬取任务完成:{task_id},成功:{success_count},失败:{failed_count},反爬:{anti_crawl_count}")
return {
"code": 200,
"message": "爬取完成",
"task_id": task_id,
"statistics": {
"total": len(urls),
"success": success_count,
"failed": failed_count,
"anti_crawl": anti_crawl_count
},
"oss_file": oss_file,
"crawl_time": time.strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logger.error(f"爬取任务异常:{str(e)}")
return {
"code": 500,
"message": f"爬取异常:{str(e)}",
"task_id": f"task_{int(time.time())}"
}
# 本地测试入口
if __name__ == "__main__":
test_event = {
"urls": ["https://example.com", "https://example.org"],
"task_id": "test_task_001"
}
result = handler(json.dumps(test_event), None)
print(json.dumps(result, ensure_ascii=False, indent=2))
4.4 Serverless部署配置(s.yaml)
通过Serverless Devs一键部署到阿里云FC:
edition: 3.0.0
name: serverless-ai-crawler
access: aliyun-default
vars:
region: cn-beijing
functionName: ai-crawler-function
serviceName: ai-crawler-service
resources:
fc:
component: devsapp/fc
props:
region: ${vars.region}
service:
name: ${vars.serviceName}
description: Serverless AI爬虫服务
logConfig:
logstore: ai-crawler-log
project: fc-log-${vars.region}
function:
name: ${vars.functionName}
description: AI弹性爬虫函数
runtime: python3.9
codeUri: ./code
handler: main.handler
memorySize: 512
timeout: 300
environmentVariables:
PYTHONPATH: /code:/opt/python
layers:
- acs:fc:${vars.region}:xxxxxxxxxx:layers/python-deps/versions/1 # 替换为你的依赖层ARN
triggers:
- name: http-trigger
type: http
config:
authType: anonymous
methods: [GET, POST]
- name: timer-trigger
type: timer
config:
cronExpression: "0 0 1 * * *" # 每天凌晨1点执行
enable: true
payload: '{"urls": ["https://example.com/daily-data"]}'
4.5 部署命令
# 安装依赖
npm install @devsapp/fc
# 部署函数
s deploy --use-local
# 触发函数(HTTP触发)
curl -X POST https://xxxxxx.cn-beijing.fcapp.run \
-H "Content-Type: application/json" \
-d '{"urls":["https://example.com", "https://example.org"], "task_id":"demo_001"}'
# 查看日志
s logs -f
五、弹性扩缩容与成本优化
5.1 自动弹性配置
在阿里云FC控制台配置函数的弹性策略:
- 并发限制:设置单实例最大并发数(如100),实例数自动扩缩容(1-100);
- 预热配置:设置预热实例数(如5),减少冷启动耗时;
- 触发规则:基于QPS自动扩容,如QPS>100时扩容实例数;
- 降容规则:空闲时间>5分钟时自动缩容到预热实例数。
5.2 成本优化策略
- 按需计费:阿里云FC按执行时间(毫秒)+内存占用计费,100万次512MB函数执行约50元;
- 批量处理:将小任务合并为批量任务,减少函数调用次数;
- 内存优化:根据实际需求调整内存(如256MB/512MB),内存越小计费越低;
- 超时控制:合理设置超时时间(如300秒),避免函数长时间运行;
- 缓存复用:将代理IP、UA列表等缓存到函数实例,减少重复请求。
5.3 成本预估
| 场景 | 配置 | 月成本(元) |
|---|---|---|
| 小型爬虫(1000次/天) | 512MB,每次10秒 | ~10 |
| 中型爬虫(10万次/天) | 512MB,每次10秒 | ~100 |
| 大型爬虫(100万次/天) | 512MB,每次10秒 | ~1000 |
六、高可用与监控保障
6.1 监控指标配置
在阿里云CloudMonitor配置以下监控指标:
- 函数执行指标:调用次数、成功数、失败数、平均执行时间;
- 资源使用指标:内存使用率、CPU使用率、网络流量;
- 业务指标:爬取成功率、反爬触发率、敏感数据识别数;
- 成本指标:函数执行费用、OSS存储费用、API调用费用。
6.2 告警规则
- 爬取失败率>10%时触发短信/钉钉告警;
- 函数执行时间>200秒时触发告警;
- 日执行费用>预算阈值时触发告警;
- 反爬触发率>5%时触发告警。
6.3 容灾策略
- 多可用区部署:函数自动部署到多可用区,单个可用区故障不影响服务;
- 失败重试:对爬取失败的URL自动重试(最多3次);
- 降级策略:反爬触发时自动切换到备用代理池/爬取策略;
- 数据备份:OSS数据自动备份到冷存储,防止数据丢失。
七、总结
关键点回顾
- 核心优势:Serverless架构实现爬虫的弹性扩缩容、按需付费、免运维,解决传统爬虫资源浪费、弹性不足的痛点;
- AI赋能:集成AI反爬识别、动态UA适配、敏感数据脱敏,提升爬虫成功率和合规性;
- 低成本:按实际执行计费,小型爬虫月成本仅几十元,大幅降低运维和硬件成本;
- 易扩展:通过消息队列触发可支持百万级URL并发爬取,无需手动扩容服务器;
- 高可用:基于云厂商Serverless平台的多可用区部署,故障率接近零。
本文提出的方案适用于电商价格监控、舆情采集、行业数据爬取等场景,既满足了弹性采集的需求,又保证了合规性和成本可控性,是中小团队实现大规模爬虫的最优方案之一。
如果你觉得本文有帮助,欢迎点赞、收藏、关注!也欢迎在评论区交流Serverless爬虫的实践经验和优化方案~
更多推荐

所有评论(0)