一、引言:传统爬虫架构的核心痛点

传统爬虫部署在物理机/虚拟机/容器中,存在三大核心问题:

  1. 资源浪费:爬虫负载具有明显的波峰波谷(如电商爬虫在促销期流量暴增,日常流量极低),固定部署的服务器在低负载时资源利用率不足10%;
  2. 弹性不足:突发高负载时(如需要爬取10万+页面),手动扩容服务器耗时且易出错,易导致爬虫任务积压;
  3. 运维成本高:需要维护服务器集群、监控系统、反爬策略适配,小型团队难以承担;
  4. 成本可控性差:服务器租赁按年/月付费,即使闲置也需支付费用。

Serverless(无服务器)架构的按需付费、自动弹性扩缩容、免运维特性,完美解决了传统爬虫的痛点。本文将分享一套Python+AI+Serverless的弹性爬虫方案,基于阿里云FC/腾讯云SCF/AWS Lambda实现“按需触发、弹性采集、AI智能反爬适配、成本可控”的无服务器爬虫体系。

二、方案设计:Serverless AI爬虫架构

2.1 核心目标

  1. 弹性采集:根据爬取任务量自动扩缩容,从1个并发到1000个并发无缝切换;
  2. 按需付费:仅为实际执行的爬虫代码计费(毫秒级粒度),闲置时零成本;
  3. 免运维:无需管理服务器、配置环境,聚焦爬虫业务逻辑;
  4. AI智能:集成AI反爬识别、动态UA/IP适配、敏感数据脱敏;
  5. 高可用:基于云厂商Serverless平台的多可用区部署,故障率接近零。

2.2 整体架构

任务触发层
API/定时/消息队列

Serverless调度层
FC/SCF/Lambda

AI能力层
反爬识别/UA适配/脱敏

弹性采集层
多并发爬虫执行

数据存储层
OSS/S3/云数据库

结果回调层
WebHook/消息通知

监控告警层
执行日志/成本监控

AI策略层
动态反爬策略更新

2.3 技术选型

模块 技术选型 选型理由
Serverless运行时 阿里云FC(Function Compute) 国内生态完善,Python支持好,弹性扩缩容能力强
任务触发 阿里云EventBridge + MQTT 支持定时触发、API触发、消息队列触发,适配不同爬虫场景
AI反爬识别 通义千问API + OpenCV 识别验证码、滑块验证、动态反爬规则,自动适配
弹性采集 requests + aiohttp + 代理池 异步爬虫提升并发,代理池解决IP封禁问题
数据存储 阿里云OSS + 表格存储OTS 低成本存储爬取结果,支持海量数据读写
监控告警 阿里云CloudMonitor + 日志服务SLS 实时监控爬虫执行状态、成本、错误率,支持告警
依赖管理 Serverless Devs + 层(Layer) 统一管理Python依赖包,避免重复打包

三、环境搭建(阿里云FC为例)

3.1 前置准备

  1. 注册阿里云账号,开通Function Compute、OSS、日志服务SLS;
  2. 安装Serverless Devs工具(简化FC开发部署):
npm install -g @serverless-devs/s
s config add  # 配置阿里云AccessKey
  1. 准备Python依赖包(如requests、aiohttp、beautifulsoup4):
mkdir layer && cd layer
pip install requests aiohttp beautifulsoup4 python-dotenv -t ./python
zip -r python.zip python/

3.2 项目结构

serverless-ai-crawler/
├── code/
│   ├── main.py          # 爬虫核心代码
│   ├── ai_anti_crawl.py # AI反爬处理
│   ├── utils.py         # 工具函数
│   └── .env             # 配置文件
├── layer/
│   └── python.zip       # 依赖包层
└── s.yaml               # Serverless Devs部署配置

四、核心代码实现

4.1 配置文件(.env)

# 爬虫配置
CRAWL_TIMEOUT=15
MAX_CONCURRENT=100
PROXY_POOL_URL="https://proxy.example.com/get"

# AI反爬配置
LLM_API_KEY="你的通义千问API Key"
LLM_MODEL="qwen-plus"
CAPTCHA_API_URL="https://captcha.example.com/recognize"

# 存储配置
OSS_ENDPOINT="oss-cn-beijing.aliyuncs.com"
OSS_BUCKET="serverless-crawler-result"
OSS_ACCESS_KEY="你的OSS AccessKey"
OSS_SECRET_KEY="你的OSS SecretKey"

# 日志配置
LOG_LEVEL="INFO"

4.2 AI反爬处理模块(ai_anti_crawl.py)

核心解决验证码识别、动态反爬规则适配问题:

# -*- coding: utf-8 -*-
import os
import requests
import json
from dotenv import load_dotenv
from dashscope import Generation

load_dotenv()

class AIAntiCrawl:
    """AI反爬处理:验证码识别、反爬规则适配"""
    
    def __init__(self):
        self.llm_api_key = os.getenv("LLM_API_KEY")
        self.llm_model = os.getenv("LLM_MODEL")
        self.captcha_api_url = os.getenv("CAPTCHA_API_URL")
    
    def recognize_captcha(self, captcha_image_bytes: bytes) -> str:
        """AI识别验证码"""
        try:
            # 调用验证码识别API(可替换为本地模型)
            files = {"image": ("captcha.jpg", captcha_image_bytes)}
            response = requests.post(self.captcha_api_url, files=files, timeout=10)
            result = response.json()
            if result["code"] == 200:
                return result["data"]["text"]
            else:
                raise Exception(f"验证码识别失败:{result['msg']}")
        except Exception as e:
            print(f"验证码识别异常:{str(e)}")
            return ""
    
    def analyze_anti_crawl_rule(self, html_content: str) -> dict:
        """LLM分析反爬规则,生成适配策略"""
        prompt = f"""
        你是反爬规则分析专家,请分析以下网页内容中的反爬机制,并给出对应的爬虫适配策略:
        1. 识别是否有验证码、滑块验证、JS加密、IP封禁、UA检测等反爬手段;
        2. 针对每种反爬手段给出具体的Python爬虫适配代码片段;
        3. 输出格式为JSON,包含anti_crawl_type(反爬类型)、strategy(适配策略)、code_snippet(代码片段)。
        
        网页内容:
        {html_content[:2000]}
        """
        
        try:
            response = Generation.call(
                model=self.llm_model,
                api_key=self.llm_api_key,
                messages=[{"role": "user", "content": prompt}],
                result_format="json",
                temperature=0.1
            )
            return json.loads(response.output.choices[0].message.content)
        except Exception as e:
            print(f"反爬规则分析异常:{str(e)}")
            return {
                "anti_crawl_type": "unknown",
                "strategy": "默认策略",
                "code_snippet": "使用随机UA+代理IP"
            }
    
    def get_dynamic_headers(self) -> dict:
        """生成动态请求头(对抗UA检测)"""
        # AI生成多样化UA(模拟不同浏览器/设备)
        prompt = """生成10个不同的浏览器User-Agent,包含Chrome、Firefox、Safari,覆盖不同版本和操作系统,仅返回JSON数组"""
        
        try:
            response = Generation.call(
                model=self.llm_model,
                api_key=self.llm_api_key,
                messages=[{"role": "user", "content": prompt}],
                result_format="json",
                temperature=0.5
            )
            uas = json.loads(response.output.choices[0].message.content)
            import random
            return {
                "User-Agent": random.choice(uas),
                "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
                "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
                "Referer": "https://www.baidu.com",
                "Connection": "keep-alive"
            }
        except Exception as e:
            print(f"生成动态UA异常:{str(e)}")
            # 兜底UA
            return {
                "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
                "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
                "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"
            }

4.3 Serverless爬虫核心代码(main.py)

适配阿里云FC运行时,实现弹性采集:

# -*- coding: utf-8 -*-
import os
import json
import time
import asyncio
import aiohttp
from dotenv import load_dotenv
from bs4 import BeautifulSoup
import oss2
from ai_anti_crawl import AIAntiCrawl
from loguru import logger

# 加载配置
load_dotenv()

# 初始化AI反爬实例
ai_anti_crawl = AIAntiCrawl()

# 初始化OSS存储
auth = oss2.Auth(os.getenv("OSS_ACCESS_KEY"), os.getenv("OSS_SECRET_KEY"))
bucket = oss2.Bucket(auth, os.getenv("OSS_ENDPOINT"), os.getenv("OSS_BUCKET"))

# 配置日志
logger.add(
    lambda msg: print(msg, end=""),
    level=os.getenv("LOG_LEVEL"),
    format="{time} {level} {message}"
)

async def get_proxy():
    """获取代理IP(对抗IP封禁)"""
    try:
        response = requests.get(os.getenv("PROXY_POOL_URL"), timeout=5)
        result = response.json()
        if result["code"] == 200:
            return f"http://{result['data']['ip']}:{result['data']['port']}"
        else:
            return ""
    except Exception as e:
        logger.error(f"获取代理IP失败:{str(e)}")
        return ""

async def crawl_single_url(session, url: str) -> dict:
    """单URL异步爬取"""
    try:
        # 1. 准备请求参数
        headers = ai_anti_crawl.get_dynamic_headers()
        proxy = await get_proxy()
        proxies = {"http": proxy, "https": proxy} if proxy else None
        
        # 2. 发送请求
        async with session.get(
            url,
            headers=headers,
            proxies=proxies,
            timeout=int(os.getenv("CRAWL_TIMEOUT"))
        ) as response:
            html = await response.text()
            status_code = response.status
            
            # 3. AI分析反爬规则(首次失败时触发)
            if status_code != 200 or "captcha" in html.lower():
                anti_crawl_rule = ai_anti_crawl.analyze_anti_crawl_rule(html)
                logger.warning(f"检测到反爬规则:{anti_crawl_rule['anti_crawl_type']},适配策略:{anti_crawl_rule['strategy']}")
                return {
                    "url": url,
                    "status": "anti_crawl",
                    "anti_crawl_type": anti_crawl_rule["anti_crawl_type"],
                    "strategy": anti_crawl_rule["strategy"],
                    "crawl_time": time.strftime("%Y-%m-%d %H:%M:%S")
                }
            
            # 4. 解析内容
            soup = BeautifulSoup(html, "lxml")
            title = soup.title.string if soup.title else ""
            content = soup.get_text(strip=True)[:1000]  # 截断长文本
            
            # 5. 敏感数据脱敏(AI驱动)
            from compliance_crawler import DataDesensitizer
            desensitizer = DataDesensitizer()
            sensitive_entities = ai_anti_crawl.recognize_sensitive_data(content)
            desensitized_content = desensitizer.desensitize(content, sensitive_entities)
            
            logger.info(f"成功爬取:{url},标题:{title[:50]}")
            return {
                "url": url,
                "status": "success",
                "status_code": status_code,
                "title": title,
                "content": desensitized_content,
                "sensitive_count": len(sensitive_entities),
                "crawl_time": time.strftime("%Y-%m-%d %H:%M:%S"),
                "proxy_used": proxy is not None
            }
    except Exception as e:
        logger.error(f"爬取失败:{url},错误:{str(e)}")
        return {
            "url": url,
            "status": "failed",
            "error": str(e),
            "crawl_time": time.strftime("%Y-%m-%d %H:%M:%S")
        }

async def batch_crawl(urls: list) -> list:
    """批量异步爬取"""
    # 限制并发数(避免触发反爬)
    semaphore = asyncio.Semaphore(int(os.getenv("MAX_CONCURRENT")))
    
    async def bounded_crawl(url):
        async with semaphore:
            async with aiohttp.ClientSession() as session:
                return await crawl_single_url(session, url)
    
    tasks = [bounded_crawl(url) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

def save_to_oss(results: list, task_id: str):
    """保存爬取结果到OSS"""
    try:
        # 生成文件名
        file_name = f"crawl_result/{task_id}_{time.strftime('%Y%m%d%H%M%S')}.json"
        # 写入OSS
        bucket.put_object(file_name, json.dumps(results, ensure_ascii=False, indent=2))
        logger.info(f"爬取结果已保存到OSS:{file_name}")
        return file_name
    except Exception as e:
        logger.error(f"保存到OSS失败:{str(e)}")
        return ""

def handler(event, context):
    """
    Serverless入口函数(阿里云FC)
    :param event: 触发事件(包含爬取任务)
    :param context: 运行时上下文
    :return: 爬取结果
    """
    try:
        # 解析触发事件
        event_data = json.loads(event.decode("utf-8")) if isinstance(event, bytes) else event
        urls = event_data.get("urls", [])
        task_id = event_data.get("task_id", f"task_{int(time.time())}")
        
        if not urls:
            return {
                "code": 400,
                "message": "爬取URL列表为空",
                "task_id": task_id
            }
        
        logger.info(f"开始执行爬取任务:{task_id},URL数量:{len(urls)}")
        
        # 执行批量爬取
        loop = asyncio.get_event_loop()
        crawl_results = loop.run_until_complete(batch_crawl(urls))
        
        # 保存结果到OSS
        oss_file = save_to_oss(crawl_results, task_id)
        
        # 统计结果
        success_count = len([r for r in crawl_results if r["status"] == "success"])
        failed_count = len([r for r in crawl_results if r["status"] == "failed"])
        anti_crawl_count = len([r for r in crawl_results if r["status"] == "anti_crawl"])
        
        logger.info(f"爬取任务完成:{task_id},成功:{success_count},失败:{failed_count},反爬:{anti_crawl_count}")
        
        return {
            "code": 200,
            "message": "爬取完成",
            "task_id": task_id,
            "statistics": {
                "total": len(urls),
                "success": success_count,
                "failed": failed_count,
                "anti_crawl": anti_crawl_count
            },
            "oss_file": oss_file,
            "crawl_time": time.strftime("%Y-%m-%d %H:%M:%S")
        }
    except Exception as e:
        logger.error(f"爬取任务异常:{str(e)}")
        return {
            "code": 500,
            "message": f"爬取异常:{str(e)}",
            "task_id": f"task_{int(time.time())}"
        }

# 本地测试入口
if __name__ == "__main__":
    test_event = {
        "urls": ["https://example.com", "https://example.org"],
        "task_id": "test_task_001"
    }
    result = handler(json.dumps(test_event), None)
    print(json.dumps(result, ensure_ascii=False, indent=2))

4.4 Serverless部署配置(s.yaml)

通过Serverless Devs一键部署到阿里云FC:

edition: 3.0.0
name: serverless-ai-crawler
access: aliyun-default

vars:
  region: cn-beijing
  functionName: ai-crawler-function
  serviceName: ai-crawler-service

resources:
  fc:
    component: devsapp/fc
    props:
      region: ${vars.region}
      service:
        name: ${vars.serviceName}
        description: Serverless AI爬虫服务
        logConfig:
          logstore: ai-crawler-log
          project: fc-log-${vars.region}
      function:
        name: ${vars.functionName}
        description: AI弹性爬虫函数
        runtime: python3.9
        codeUri: ./code
        handler: main.handler
        memorySize: 512
        timeout: 300
        environmentVariables:
          PYTHONPATH: /code:/opt/python
        layers:
          - acs:fc:${vars.region}:xxxxxxxxxx:layers/python-deps/versions/1 # 替换为你的依赖层ARN
        triggers:
          - name: http-trigger
            type: http
            config:
              authType: anonymous
              methods: [GET, POST]
          - name: timer-trigger
            type: timer
            config:
              cronExpression: "0 0 1 * * *" # 每天凌晨1点执行
              enable: true
              payload: '{"urls": ["https://example.com/daily-data"]}'

4.5 部署命令

# 安装依赖
npm install @devsapp/fc

# 部署函数
s deploy --use-local

# 触发函数(HTTP触发)
curl -X POST https://xxxxxx.cn-beijing.fcapp.run \
  -H "Content-Type: application/json" \
  -d '{"urls":["https://example.com", "https://example.org"], "task_id":"demo_001"}'

# 查看日志
s logs -f

五、弹性扩缩容与成本优化

5.1 自动弹性配置

在阿里云FC控制台配置函数的弹性策略:

  1. 并发限制:设置单实例最大并发数(如100),实例数自动扩缩容(1-100);
  2. 预热配置:设置预热实例数(如5),减少冷启动耗时;
  3. 触发规则:基于QPS自动扩容,如QPS>100时扩容实例数;
  4. 降容规则:空闲时间>5分钟时自动缩容到预热实例数。

5.2 成本优化策略

  1. 按需计费:阿里云FC按执行时间(毫秒)+内存占用计费,100万次512MB函数执行约50元;
  2. 批量处理:将小任务合并为批量任务,减少函数调用次数;
  3. 内存优化:根据实际需求调整内存(如256MB/512MB),内存越小计费越低;
  4. 超时控制:合理设置超时时间(如300秒),避免函数长时间运行;
  5. 缓存复用:将代理IP、UA列表等缓存到函数实例,减少重复请求。

5.3 成本预估

场景 配置 月成本(元)
小型爬虫(1000次/天) 512MB,每次10秒 ~10
中型爬虫(10万次/天) 512MB,每次10秒 ~100
大型爬虫(100万次/天) 512MB,每次10秒 ~1000

六、高可用与监控保障

6.1 监控指标配置

在阿里云CloudMonitor配置以下监控指标:

  1. 函数执行指标:调用次数、成功数、失败数、平均执行时间;
  2. 资源使用指标:内存使用率、CPU使用率、网络流量;
  3. 业务指标:爬取成功率、反爬触发率、敏感数据识别数;
  4. 成本指标:函数执行费用、OSS存储费用、API调用费用。

6.2 告警规则

  1. 爬取失败率>10%时触发短信/钉钉告警;
  2. 函数执行时间>200秒时触发告警;
  3. 日执行费用>预算阈值时触发告警;
  4. 反爬触发率>5%时触发告警。

6.3 容灾策略

  1. 多可用区部署:函数自动部署到多可用区,单个可用区故障不影响服务;
  2. 失败重试:对爬取失败的URL自动重试(最多3次);
  3. 降级策略:反爬触发时自动切换到备用代理池/爬取策略;
  4. 数据备份:OSS数据自动备份到冷存储,防止数据丢失。

七、总结

关键点回顾

  1. 核心优势:Serverless架构实现爬虫的弹性扩缩容、按需付费、免运维,解决传统爬虫资源浪费、弹性不足的痛点;
  2. AI赋能:集成AI反爬识别、动态UA适配、敏感数据脱敏,提升爬虫成功率和合规性;
  3. 低成本:按实际执行计费,小型爬虫月成本仅几十元,大幅降低运维和硬件成本;
  4. 易扩展:通过消息队列触发可支持百万级URL并发爬取,无需手动扩容服务器;
  5. 高可用:基于云厂商Serverless平台的多可用区部署,故障率接近零。

本文提出的方案适用于电商价格监控、舆情采集、行业数据爬取等场景,既满足了弹性采集的需求,又保证了合规性和成本可控性,是中小团队实现大规模爬虫的最优方案之一。

如果你觉得本文有帮助,欢迎点赞、收藏、关注!也欢迎在评论区交流Serverless爬虫的实践经验和优化方案~

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐