做爬虫的朋友都懂这种痛:刚爬没几条就被封IP,JS加密绕半天没头绪,滑块验证卡到怀疑人生,好不容易搞定单线程,百万数据爬到天荒地老…… 我前阵子爬某电商平台300万商品数据时,硬生生踩遍了所有反爬坑,从IP被封到JS逆向卡壳,最后靠“反爬破解+分布式架构”组合拳,实现了IP零封禁、日均爬取50万条数据。

这篇文章不是纸上谈兵的反爬理论,而是我实战总结的“封神秘籍”:把10大常见反爬机制拆解得明明白白,每个机制都带“原理+破解思路+可直接运行的Python代码”,最后再教你搭建分布式爬虫架构,让百万数据爬取效率翻倍,全程不封IP。

一、先搞懂:反爬的本质是“识别非人类行为”

在拆解反爬机制前,先明确一个核心逻辑:所有反爬都是网站在区分“真人”和“爬虫”。真人的行为是“随机、有间隔、符合浏览器规范”的,而爬虫的行为是“高频、无规律、不符合HTTP协议”的。

所以我们的破解思路只有一个:让爬虫“伪装”成真人,同时用技术手段规避网站的限制。下面10大反爬机制,全是围绕这个核心展开。

二、硬刚10大反爬机制:从入门到精通(附实战代码)

反爬1:IP封禁(最常见,也是最容易解决的)

原理:网站检测到同一IP短时间内请求频率过高,直接拉黑该IP。
我踩过的坑:刚开始用单机固定IP爬,10分钟就被封,换了普通代理又被识别为“代理IP”,照样封。
破解方案:高匿代理池+动态切换+IP存活检测

  • 代理池选“高匿代理”(普通透明代理会暴露真实IP);
  • 用Redis存储代理,定时检测代理存活状态,剔除无效代理;
  • 每爬1-3条数据切换一次IP,模拟真人不同网络环境。

实战代码(代理池核心逻辑)

import requests
import redis
import threading
import time

# 连接Redis(存储代理)
r = redis.Redis(host='localhost', port=6379, db=0)

# 代理检测:判断代理是否有效
def check_proxy(proxy):
    try:
        proxies = {
            'http': f'http://{proxy}',
            'https': f'https://{proxy}'
        }
        # 访问百度检测(选稳定的检测地址)
        response = requests.get('https://www.baidu.com', proxies=proxies, timeout=5)
        return response.status_code == 200
    except:
        return False

# 定时抓取代理(这里用免费代理演示,生产环境用付费高匿代理)
def fetch_proxies():
    while True:
        try:
            # 免费代理API(实际用付费代理平台接口,比如阿布云、快代理)
            response = requests.get('https://api.xiaoxiangdaili.com/ip/get?appKey=xxx&count=20')
            proxies = response.json()['data']
            for proxy in proxies:
                if check_proxy(proxy):
                    r.sadd('valid_proxies', proxy)
                    print(f'新增有效代理:{proxy}')
        except Exception as e:
            print(f'抓取代理失败:{e}')
        time.sleep(300)  # 5分钟更新一次代理

# 随机获取有效代理
def get_random_proxy():
    proxies = r.smembers('valid_proxies')
    if not proxies:
        raise Exception('无有效代理可用!')
    return str(proxies.pop())

# 启动代理抓取线程
proxy_thread = threading.Thread(target=fetch_proxies)
proxy_thread.daemon = True
proxy_thread.start()

# 爬虫请求示例
def crawl_with_proxy(url):
    proxy = get_random_proxy()
    proxies = {
        'http': f'http://{proxy}',
        'https': f'https://{proxy}'
    }
    try:
        response = requests.get(url, proxies=proxies, timeout=10)
        print(f'请求成功:{url},代理:{proxy}')
        return response
    except Exception as e:
        print(f'请求失败,更换代理:{e}')
        r.srem('valid_proxies', proxy)  # 移除无效代理
        return crawl_with_proxy(url)  # 递归更换代理

关键注意:免费代理稳定性差,生产环境一定要用付费高匿代理(比如阿布云、隧道代理),隧道代理会自动切换IP,不用自己维护代理池,更省心。

反爬2:User-Agent检测(入门级反爬)

原理:网站通过请求头的User-Agent判断是否为浏览器请求,爬虫默认的User-Agent(比如python-requests/2.25.1)会被直接识别。
我踩过的坑:刚开始没改User-Agent,爬某博客网站直接返回403,还以为是IP被封,查了半天发现是User-Agent的问题。
破解方案:构建User-Agent池,每次请求随机切换。

实战代码

import requests
import random

# User-Agent池(包含Chrome、Firefox、Safari,覆盖PC和移动端)
USER_AGENTS = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Firefox/119.0',
    'Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1',
    'Mozilla/5.0 (Linux; Android 13; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Mobile Safari/537.36'
]

def crawl_with_random_ua(url):
    headers = {
        'User-Agent': random.choice(USER_AGENTS),
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Referer': 'https://www.baidu.com/'  # 模拟从百度跳转,更真实
    }
    response = requests.get(url, headers=headers, timeout=10)
    return response

进阶技巧:除了User-Agent,还可以加上Accept、Referer、Cache-Control等请求头,让请求更像真人浏览器。

反爬3:Cookie验证(中等难度,常见于需要登录的网站)

原理:网站通过Cookie判断用户是否登录、是否为真实用户,爬虫如果没有Cookie或Cookie失效,会被限制访问。
我踩过的坑:爬某电商平台时,用账号密码登录后获取Cookie,但爬了1小时后Cookie失效,需要重新登录,很麻烦。
破解方案:Cookie池+自动登录+定时续期

  • 用多个账号登录,获取多个Cookie存入Redis;
  • 定时检测Cookie有效性,失效后自动重新登录获取新Cookie;
  • 每次请求随机选择一个Cookie。

实战代码(Cookie池核心逻辑)

import requests
import redis
import time
from selenium import webdriver
from selenium.webdriver.common.by import By

# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=1)

# 自动登录获取Cookie(以某网站为例,需根据实际网站修改)
def get_cookie_by_login(username, password):
    driver = webdriver.Chrome()
    driver.get('https://www.target.com/login')
    # 输入账号密码(需根据网站HTML结构修改选择器)
    driver.find_element(By.ID, 'username').send_keys(username)
    driver.find_element(By.ID, 'password').send_keys(password)
    driver.find_element(By.ID, 'login-btn').click()
    time.sleep(3)  # 等待登录完成
    # 获取Cookie并转换为字符串
    cookies = driver.get_cookies()
    cookie_str = '; '.join([f'{c["name"]}={c["value"]}' for c in cookies])
    driver.quit()
    return cookie_str

# 检测Cookie有效性
def check_cookie(cookie_str):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        'Cookie': cookie_str
    }
    try:
        response = requests.get('https://www.target.com/user/info', headers=headers, timeout=10)
        return response.json()['code'] == 200  # 假设返回200表示Cookie有效
    except:
        return False

# 初始化Cookie池(多个账号)
def init_cookie_pool():
    accounts = [
        ('user1', 'pass1'),
        ('user2', 'pass2'),
        ('user3', 'pass3')
    ]
    for username, password in accounts:
        cookie_str = get_cookie_by_login(username, password)
        if check_cookie(cookie_str):
            r.sadd('valid_cookies', cookie_str)
            print(f'新增有效Cookie:{cookie_str[:30]}...')

# 定时更新Cookie池
def update_cookie_pool():
    while True:
        cookies = r.smembers('valid_cookies')
        for cookie in cookies:
            if not check_cookie(str(cookie)):
                r.srem('valid_cookies', cookie)
                print(f'Cookie失效:{str(cookie)[:30]}...')
        # 补充新Cookie
        init_cookie_pool()
        time.sleep(3600)  # 1小时更新一次

# 启动Cookie池更新线程
cookie_thread = threading.Thread(target=update_cookie_pool)
cookie_thread.daemon = True
cookie_thread.start()

# 随机获取Cookie
def get_random_cookie():
    cookies = r.smembers('valid_cookies')
    if not cookies:
        raise Exception('无有效Cookie可用!')
    return str(cookies.pop())

反爬4:JS加密(进阶难度,常见于API接口)

原理:网站将关键参数(如token、sign)通过JS加密后再发送请求,爬虫如果不能破解加密逻辑,就无法构造有效请求。
我踩过的坑:爬某短视频平台API时,发现每次请求都需要一个动态变化的sign参数,直接复制浏览器的sign只能用一次,后来花了3小时逆向JS代码才破解。
破解方案:JS逆向分析+execjs执行加密逻辑

  • 用Chrome开发者工具(Sources面板)找到加密JS文件;
  • 分析加密函数(通常是MD5、SHA1、AES或自定义加密);
  • 用Python的execjs库执行JS加密函数,生成有效参数。

实战代码(JS加密破解示例)
假设某网站的sign参数是通过“时间戳+密钥+MD5加密”生成的,JS代码如下:

// 网站加密JS代码
function generateSign(timestamp) {
    var secret = "abc123xyz"; // 密钥,需从JS中提取
    return md5(timestamp + secret);
}

Python破解代码:

import requests
import time
import execjs
import hashlib

# 方法1:直接用Python实现加密逻辑(推荐,效率高)
def generate_sign_python():
    timestamp = str(int(time.time() * 1000))
    secret = "abc123xyz"  # 从JS中提取的密钥
    sign = hashlib.md5((timestamp + secret).encode()).hexdigest()
    return timestamp, sign

# 方法2:用execjs执行JS代码(适用于复杂加密)
def generate_sign_js():
    # 读取加密JS代码
    with open('encrypt.js', 'r', encoding='utf-8') as f:
        js_code = f.read()
    # 创建JS执行环境
    ctx = execjs.compile(js_code)
    timestamp = str(int(time.time() * 1000))
    sign = ctx.call('generateSign', timestamp)
    return timestamp, sign

# 构造请求
def crawl_with_js_encrypt():
    url = 'https://www.target.com/api/data'
    timestamp, sign = generate_sign_python()
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
    }
    params = {
        'timestamp': timestamp,
        'sign': sign,
        'page': 1,
        'size': 20
    }
    response = requests.get(url, headers=headers, params=params, timeout=10)
    print(f'响应结果:{response.json()}')
    return response

逆向技巧:如果JS代码被混淆(比如变量名变成a、b、c),可以用Chrome的“Pretty Print”功能格式化代码,再通过断点调试跟踪参数传递过程。

反爬5:滑块验证(中等难度,常见于登录、注册、高频请求)

原理:网站通过滑块验证区分真人(能准确滑动滑块到缺口位置)和爬虫(无法模拟真实滑动轨迹)。
我踩过的坑:刚开始用Selenium直接拖动滑块,每次都验证失败,后来发现网站会检测滑动轨迹(速度、加速度),匀速滑动会被识别为爬虫。
破解方案:ddddocr识别缺口位置+模拟真实滑动轨迹

实战代码

from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
import ddddocr
import time

# 初始化OCR识别器(用于识别滑块缺口位置)
ocr = ddddocr.DdddOcr(det=False, ocr=False)

def slide_verification(driver, slider, background):
    # 保存滑块和背景图片
    slider.screenshot('slider.png')
    background.screenshot('background.png')
    
    # 识别缺口位置(返回缺口的x坐标)
    with open('slider.png', 'rb') as f1, open('background.png', 'rb') as f2:
        res = ocr.slide_match(f1.read(), f2.read(), simple_target=True)
    gap_x = res['target'][0] - 20  # 调整偏移量(根据实际情况修改)
    
    # 模拟真实滑动轨迹:先加速后减速
    action = ActionChains(driver)
    action.click_and_hold(slider).perform()
    time.sleep(0.2)
    
    # 分三段滑动
    action.move_by_offset(gap_x * 0.3, 0).perform()
    time.sleep(random.uniform(0.1, 0.2))
    action.move_by_offset(gap_x * 0.5, 0).perform()
    time.sleep(random.uniform(0.1, 0.2))
    action.move_by_offset(gap_x * 0.2, 0).perform()
    time.sleep(random.uniform(0.1, 0.2))
    
    action.release().perform()
    time.sleep(2)

# 示例:爬取需要滑块验证的网站
def crawl_with_slide_verification():
    driver = webdriver.Chrome()
    driver.get('https://www.target.com/login')
    time.sleep(3)
    
    # 找到滑块和背景图片元素(需根据实际网站修改选择器)
    slider = driver.find_element(By.CLASS_NAME, 'slider-btn')
    background = driver.find_element(By.CLASS_NAME, 'slider-background')
    
    # 执行滑块验证
    slide_verification(driver, slider, background)
    
    # 后续操作(登录、爬取数据)
    # ...

if __name__ == '__main__':
    crawl_with_slide_verification()

关键注意:滑动轨迹一定要模拟真人,避免匀速滑动,同时缺口位置识别可能有误差,需要根据实际情况调整偏移量。

反爬6:指纹识别(高阶难度,反爬天花板)

原理:网站通过浏览器指纹(Canvas指纹、WebGL指纹、User-Agent+系统信息组合)识别同一设备,即使更换IP和Cookie,也能识别出是爬虫。
我踩过的坑:爬某金融平台时,换了代理和Cookie还是被封,后来发现是Canvas指纹被识别,同一个设备的指纹是固定的。
破解方案:修改浏览器指纹+使用指纹池

  • 用Selenium的ChromeOptions修改Canvas、WebGL指纹;
  • 使用多个浏览器实例,每个实例对应不同的指纹。

实战代码(修改Chrome指纹)

from selenium import webdriver
from selenium.webdriver.chrome.options import Options

def create_fingerprint_browser():
    chrome_options = Options()
    # 禁用自动化检测
    chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
    chrome_options.add_experimental_option('useAutomationExtension', False)
    
    # 修改Canvas指纹(通过注入JS)
    chrome_options.add_argument('--disable-blink-features=AutomationControlled')
    
    # 修改WebGL指纹
    chrome_options.add_argument('--disable-webgl')
    
    # 修改时区(模拟不同地区)
    chrome_options.add_argument('--lang=zh-CN')
    chrome_options.add_experimental_option('prefs', {
        'intl.accept_languages': 'zh-CN,zh'
    })
    
    # 创建浏览器实例
    driver = webdriver.Chrome(options=chrome_options)
    # 注入JS修改Canvas指纹
    driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
        'source': '''
            // 重写Canvas的toDataURL方法,修改指纹
            Object.defineProperty(HTMLCanvasElement.prototype, 'toDataURL', {
                value: function() {
                    return '';
                }
            });
        '''
    })
    return driver

# 使用修改指纹后的浏览器爬取
def crawl_with_fingerprint():
    driver = create_fingerprint_browser()
    driver.get('https://www.target.com')
    # 后续操作
    # ...

进阶方案:使用FingerprintSwitcher等工具,自动生成不同的浏览器指纹,避免被识别。

反爬7:API签名验证(中等难度,常见于APP接口)

原理:APP接口的请求参数会通过“参数排序+密钥+MD5/SHA1加密”生成签名,服务器收到请求后会验证签名,签名不正确则拒绝响应。
破解方案:抓包分析签名规则+Python实现签名逻辑

  • 用Fiddler或Charles抓包APP请求,获取请求参数和签名;
  • 分析签名生成规则(通常是将参数按字母排序后拼接,再加上密钥加密);
  • 用Python实现相同的签名逻辑。

实战代码

import requests
import hashlib
import urllib.parse

# 生成API签名
def generate_api_sign(params, secret):
    # 1. 参数按字母升序排序
    sorted_params = sorted(params.items(), key=lambda x: x[0])
    # 2. 拼接参数(key=value&key=value)
    param_str = urllib.parse.urlencode(sorted_params)
    # 3. 拼接密钥并加密
    sign_str = param_str + secret
    sign = hashlib.sha1(sign_str.encode()).hexdigest()
    return sign

# 爬取APP接口
def crawl_app_api():
    url = 'https://api.target.com/v1/data'
    secret = 'app_secret_123'  # 从APP逆向或抓包获取
    params = {
        'page': 1,
        'size': 20,
        'timestamp': str(int(time.time())),
        'device_id': '123456789'
    }
    # 生成签名
    params['sign'] = generate_api_sign(params, secret)
    headers = {
        'User-Agent': 'TargetApp/1.0.0 (Android; 13)',
        'Content-Type': 'application/json'
    }
    response = requests.get(url, headers=headers, params=params, timeout=10)
    print(f'APP接口响应:{response.json()}')
    return response

反爬8:动态渲染(中等难度,常见于Vue/React网站)

原理:网站通过JS动态加载数据(比如异步请求API、渲染DOM),直接爬取HTML源码获取不到数据。
我踩过的坑:刚开始用requests爬某Vue网站,返回的HTML只有空壳,没有实际数据,后来发现数据是通过AJAX请求获取的。
破解方案:两种选择,按需使用

  • 直接爬取AJAX接口(效率高,推荐):用Chrome开发者工具(Network面板)找到数据接口,直接请求接口;
  • 使用Playwright/Selenium模拟浏览器渲染(效率低,但适用所有动态页面)。

实战代码(Playwright模拟渲染)

from playwright.sync_api import sync_playwright
import time

def crawl_dynamic_page():
    with sync_playwright() as p:
        # 启动浏览器(无头模式,不显示界面)
        browser = p.chrome.launch(headless=True)
        page = browser.new_page()
        # 访问动态页面
        page.goto('https://www.target.com/dynamic-page')
        # 等待数据加载完成(根据实际情况调整等待时间或条件)
        page.wait_for_selector('.data-item', timeout=10000)
        time.sleep(2)
        # 获取渲染后的HTML
        html = page.content()
        # 提取数据(用BeautifulSoup)
        from bs4 import BeautifulSoup
        soup = BeautifulSoup(html, 'html.parser')
        items = soup.find_all('.data-item')
        for item in items:
            print(f'数据:{item.get_text()}')
        browser.close()

效率优化:如果能找到AJAX接口,优先直接请求接口;如果接口有反爬(如JS加密),再用Playwright模拟浏览器。

反爬9:请求频率限制(入门级反爬)

原理:网站限制同一IP/用户单位时间内的请求次数,超过次数则暂时封禁或返回429 Too Many Requests。
破解方案:控制请求频率+随机延时

  • 每次请求后添加随机延时(1-3秒),模拟真人浏览节奏;
  • 结合IP池,分散请求压力。

实战代码

import requests
import time
import random

def crawl_with_delay(urls):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
    }
    for url in urls:
        try:
            response = requests.get(url, headers=headers, timeout=10)
            print(f'爬取成功:{url}')
            # 随机延时1-3秒
            time.sleep(random.uniform(1, 3))
        except Exception as e:
            print(f'爬取失败:{url},错误:{e}')
            time.sleep(5)  # 失败后延时更久

反爬10:Referer验证(入门级反爬)

原理:网站通过Referer请求头判断请求来源,只允许从自身网站或信任的网站跳转过来的请求。
破解方案:设置正确的Referer头,模拟真实跳转来源。

实战代码

import requests

def crawl_with_referer(url):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        'Referer': 'https://www.target.com/list'  # 模拟从列表页跳转
    }
    response = requests.get(url, headers=headers, timeout=10)
    return response

三、分布式爬虫搭建:百万数据爬取效率翻倍

解决了反爬问题,接下来要解决“海量数据爬取慢”的问题——单机爬虫爬百万数据可能需要几天,分布式爬虫能将任务拆分到多个节点,效率翻倍。

分布式爬虫核心架构

分布式爬虫架构
├─ 任务调度中心(Redis):存储待爬取的URL队列、已爬取的URL(去重)
├─ 爬虫节点(多台服务器/多进程):从Redis获取任务,爬取数据后存储到数据库
├─ 数据存储(MongoDB+Redis):MongoDB存结构化数据,Redis做缓存和去重
└─ 代理池(Redis):所有爬虫节点共享代理池,避免重复维护

实战:用Scrapy-Redis搭建分布式爬虫

Scrapy-Redis是基于Scrapy和Redis的分布式爬虫框架,能快速实现任务分发和数据共享。

1. 环境搭建
pip install scrapy scrapy-redis redis pymongo
2. 爬虫项目配置(settings.py)
# 启用Scrapy-Redis调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 启用Scrapy-Redis去重
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# Redis连接配置
REDIS_URL = "redis://localhost:6379/2"
# 任务队列持久化(重启后不丢失任务)
SCHEDULER_PERSIST = True
# 数据存储配置(MongoDB)
MONGO_URI = "mongodb://localhost:27017"
MONGO_DB = "spider_data"
# 下载延迟(控制请求频率)
DOWNLOAD_DELAY = 1
# 并发数(根据服务器性能调整)
CONCURRENT_REQUESTS = 32
# 代理池配置(自定义下载中间件)
DOWNLOADER_MIDDLEWARES = {
    'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
    'spider_project.middlewares.RandomUserAgentMiddleware': 543,
    'spider_project.middlewares.RandomProxyMiddleware': 544,
}
3. 自定义下载中间件(middlewares.py)
import random
import redis
from scrapy import signals

# User-Agent中间件
class RandomUserAgentMiddleware:
    USER_AGENTS = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
    ]

    def process_request(self, request, spider):
        request.headers['User-Agent'] = random.choice(self.USER_AGENTS)

# 代理中间件
class RandomProxyMiddleware:
    def __init__(self, redis_url):
        self.r = redis.Redis.from_url(redis_url)

    @classmethod
    def from_crawler(cls, crawler):
        return cls(redis_url=crawler.settings.get('REDIS_URL'))

    def process_request(self, request, spider):
        proxies = self.r.smembers('valid_proxies')
        if proxies:
            proxy = random.choice(list(proxies))
            request.meta['proxy'] = f'http://{proxy}'

    def process_response(self, request, response, spider):
        # 代理失效时更换代理
        if response.status in [403, 404, 500]:
            proxy = request.meta.get('proxy').replace('http://', '')
            self.r.srem('valid_proxies', proxy)
            # 重新请求
            return request.replace(dont_filter=True)
        return response
4. 爬虫代码(spiders/target_spider.py)
import scrapy
from scrapy_redis.spiders import RedisSpider
from pymongo import MongoClient
from spider_project.settings import MONGO_URI, MONGO_DB

class TargetSpider(RedisSpider):
    name = 'target_spider'
    redis_key = 'target:start_urls'  # Redis中的任务队列键名

    def __init__(self):
        self.client = MongoClient(MONGO_URI)
        self.db = self.client[MONGO_DB]
        self.collection = self.db['products']

    def parse(self, response):
        # 提取商品数据(根据实际网站HTML结构修改)
        products = response.xpath('//div[@class="product-item"]')
        for product in products:
            item = {
                'title': product.xpath('.//h3/text()').get().strip(),
                'price': product.xpath('.//span[@class="price"]/text()').get().strip(),
                'url': response.urljoin(product.xpath('.//a/@href').get()),
                'create_time': scrapy.Field(serializer=str)
            }
            # 存入MongoDB
            self.collection.insert_one(item)
            yield item

        # 提取下一页URL,加入任务队列
        next_page = response.xpath('//a[@class="next-page"]/@href').get()
        if next_page:
            yield scrapy.Request(url=response.urljoin(next_page), callback=self.parse)
5. 启动分布式爬虫
  1. 启动Redis服务器;
  2. 在多台服务器上启动爬虫节点:
scrapy runspider spiders/target_spider.py
  1. 向Redis中添加起始URL:
redis-cli lpush target:start_urls https://www.target.com/products

这样,所有爬虫节点会从Redis中获取任务,爬取后的数据存入MongoDB,实现分布式爬取,百万数据爬取效率能提升5-10倍。

四、百万数据爬取避坑指南(实战经验总结)

  1. IP池一定要用高匿代理:普通代理容易被识别,隧道代理是最优选择,不用自己维护代理池;
  2. 请求频率不能太高:即使有代理池,也要控制请求频率,避免触发网站的反爬阈值;
  3. 数据去重很重要:用Redis的布隆过滤器或集合去重,避免重复爬取同一URL;
  4. 异常重试要合理:爬取失败后不要立即重试,间隔5-10秒再重试,避免加重服务器负担;
  5. 日志监控不能少:用logging模块记录爬取状态、错误信息,方便排查问题;
  6. 遵守robots.txt协议:虽然很多网站的robots.txt没强制限制,但遵守协议能减少被封的风险;
  7. 注意法律风险:只爬取公开数据,不爬取隐私数据、付费数据,避免触犯法律。

五、总结:爬虫封神的核心是“模拟真人+高效架构”

爬取百万数据不封IP,本质上是两件事:一是让爬虫“伪装”成真人,通过代理池、Cookie池、随机UA、真实滑动轨迹等手段,规避网站的反爬检测;二是用分布式架构拆分任务,提升爬取效率。

这篇文章的10大反爬破解方法和分布式架构搭建,都是我实战中验证过的有效方案,代码可以直接复用。如果你们在爬取过程中遇到“JS加密复杂”“滑块验证过不了”“分布式任务重复”等问题,评论区留问题,我会结合具体场景帮你分析。

觉得有用的话,点赞收藏,下次爬取海量数据时直接翻出来用,少走90%的弯路!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐