【问题解决】已验证:Python 爬虫被反爬、403、验证码、IP 封禁完整解决方案

问题现象

在使用Python进行网络爬虫时,经常遇到以下问题:

# HTTP 403 Forbidden
requests.exceptions.HTTPError: 403 Client Error: Forbidden

# HTTP 429 Too Many Requests
requests.exceptions.HTTPError: 429 Client Error: Too Many Requests

# 验证码
# 网站返回验证码页面,无法继续爬取

# IP被封禁
# 连接超时或拒绝连接

# User-Agent检测
# 返回空页面或错误页面

这些问题导致爬虫无法正常工作。本文将提供完整的反反爬解决方案。

问题原因分析

1. HTTP 403原因

  • User-Agent被识别为爬虫
  • Referer缺失或错误
  • Cookie缺失或过期
  • 请求头不完整

2. HTTP 429原因

  • 请求频率过高
  • 同一IP请求过多
  • 未遵守robots.txt规则

3. 验证码原因

  • 检测到异常访问模式
  • IP行为可疑
  • 触发了反爬机制

4. IP封禁原因

  • 短时间内大量请求
  • 使用了代理IP但质量差
  • 违反了网站使用条款

解决方案

方案一:完善请求头

import requests
from fake_useragent import UserAgent

# 安装fake_useragent
# pip install fake-useragent

def get_headers():
    """获取随机请求头"""
    ua = UserAgent()
    headers = {
        'User-Agent': ua.random,
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
        'Cache-Control': 'max-age=0',
    }
    return headers

def fetch_with_headers(url):
    """使用完整请求头获取页面"""
    headers = get_headers()

    session = requests.Session()
    session.headers.update(headers)

    try:
        response = session.get(url, timeout=10)
        response.raise_for_status()
        return response.text
    except requests.exceptions.HTTPError as e:
        print(f"HTTP错误: {e}")
        return None
    except Exception as e:
        print(f"请求失败: {e}")
        return None

# 使用示例
html = fetch_with_headers('https://www.example.com')

方案二:Cookie管理

import requests
import pickle
from pathlib import Path

class CookieManager:
    """Cookie管理器"""
    def __init__(self, cookie_file='cookies.pkl'):
        self.cookie_file = Path(cookie_file)
        self.session = requests.Session()
        self.load_cookies()

    def save_cookies(self):
        """保存Cookie"""
        with open(self.cookie_file, 'wb') as f:
            pickle.dump(self.session.cookies, f)

    def load_cookies(self):
        """加载Cookie"""
        if self.cookie_file.exists():
            with open(self.cookie_file, 'rb') as f:
                self.session.cookies.update(pickle.load(f))

    def get(self, url, **kwargs):
        """发送GET请求"""
        response = self.session.get(url, **kwargs)
        self.save_cookies()
        return response

    def post(self, url, **kwargs):
        """发送POST请求"""
        response = self.session.post(url, **kwargs)
        self.save_cookies()
        return response

# 使用示例
cookie_manager = CookieManager()

# 第一次访问,可能需要登录
response = cookie_manager.get('https://www.example.com/login', data={
    'username': 'your_username',
    'password': 'your_password'
})

# 后续访问会自动携带Cookie
response = cookie_manager.get('https://www.example.com/protected')

方案三:请求频率控制

import time
import random
from functools import wraps

def rate_limit(min_delay=1, max_delay=3):
    """请求频率限制装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            delay = random.uniform(min_delay, max_delay)
            time.sleep(delay)
            return func(*args, **kwargs)
        return wrapper
    return decorator

class RateLimitedCrawler:
    """频率限制的爬虫"""
    def __init__(self, min_delay=1, max_delay=3):
        self.min_delay = min_delay
        self.max_delay = max_delay

    @rate_limit(min_delay=1, max_delay=3)
    def fetch(self, url):
        """获取页面"""
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            return response.text
        except Exception as e:
            print(f"请求失败: {e}")
            return None

    def fetch_multiple(self, urls):
        """批量获取页面"""
        results = []
        for url in urls:
            html = self.fetch(url)
            if html:
                results.append(html)
        return results

# 使用示例
crawler = RateLimitedCrawler(min_delay=2, max_delay=5)
urls = [
    'https://www.example.com/page1',
    'https://www.example.com/page2',
    'https://www.example.com/page3',
]
results = crawler.fetch_multiple(urls)

方案四:代理IP池

import requests
import random
from typing import List, Optional

class ProxyPool:
    """代理IP池"""
    def __init__(self, proxies: List[str]):
        self.proxies = proxies
        self.failed_proxies = set()
        self.session = requests.Session()

    def get_proxy(self) -> Optional[dict]:
        """获取随机代理"""
        available_proxies = [p for p in self.proxies if p not in self.failed_proxies]
        if not available_proxies:
            return None

        proxy = random.choice(available_proxies)
        return {
            'http': proxy,
            'https': proxy
        }

    def mark_failed(self, proxy: str):
        """标记失败的代理"""
        self.failed_proxies.add(proxy)

    def fetch(self, url: str, max_retries: int = 3) -> Optional[str]:
        """使用代理获取页面"""
        for attempt in range(max_retries):
            proxy_dict = self.get_proxy()
            if not proxy_dict:
                print("没有可用的代理")
                return None

            try:
                response = self.session.get(
                    url,
                    proxies=proxy_dict,
                    timeout=10,
                    headers={'User-Agent': 'Mozilla/5.0'}
                )
                response.raise_for_status()
                return response.text

            except Exception as e:
                proxy = proxy_dict['http']
                print(f"代理 {proxy} 失败: {e}")
                self.mark_failed(proxy)

        return None

# 使用示例
proxies = [
    'http://proxy1.example.com:8080',
    'http://proxy2.example.com:8080',
    'http://proxy3.example.com:8080',
]

proxy_pool = ProxyPool(proxies)
html = proxy_pool.fetch('https://www.example.com')

方案五:验证码处理

import requests
import base64
from io import BytesIO
from PIL import Image
import pytesseract

# 安装依赖
# pip install pytesseract pillow
# 需要安装Tesseract OCR引擎

class CaptchaSolver:
    """验证码解决器"""
    def __init__(self):
        self.session = requests.Session()

    def get_captcha_image(self, url: str) -> Optional[Image.Image]:
        """获取验证码图片"""
        try:
            response = self.session.get(url)
            response.raise_for_status()
            image = Image.open(BytesIO(response.content))
            return image
        except Exception as e:
            print(f"获取验证码失败: {e}")
            return None

    def solve_captcha(self, image: Image.Image) -> str:
        """解决验证码"""
        try:
            # 预处理图片
            image = image.convert('L')  # 转为灰度
            image = image.resize((200, 80))  # 调整大小

            # 使用OCR识别
            captcha_text = pytesseract.image_to_string(image)
            return captcha_text.strip()
        except Exception as e:
            print(f"识别验证码失败: {e}")
            return ""

    def solve_with_api(self, image: Image.Image, api_url: str, api_key: str) -> str:
        """使用API解决验证码"""
        try:
            buffered = BytesIO()
            image.save(buffered, format="PNG")
            img_str = base64.b64encode(buffered.getvalue()).decode()

            response = requests.post(
                api_url,
                json={'image': img_str},
                headers={'Authorization': f'Bearer {api_key}'}
            )
            response.raise_for_status()
            return response.json()['result']
        except Exception as e:
            print(f"API识别失败: {e}")
            return ""

# 使用示例
solver = CaptchaSolver()

# 方法1:使用OCR
captcha_image = solver.get_captcha_image('https://www.example.com/captcha')
if captcha_image:
    captcha_text = solver.solve_captcha(captcha_image)
    print(f"验证码: {captcha_text}")

# 方法2:使用第三方API
# captcha_text = solver.solve_with_api(captcha_image, 'https://api.captcha.com/solve', 'your_api_key')

常见场景解决

场景1:处理403 Forbidden

import requests
from fake_useragent import UserAgent

def handle_403(url):
    """处理403错误"""
    ua = UserAgent()

    headers = {
        'User-Agent': ua.random,
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
        'Referer': url,
        'Upgrade-Insecure-Requests': '1',
    }

    session = requests.Session()
    session.headers.update(headers)

    try:
        response = session.get(url, timeout=10)

        if response.status_code == 403:
            print("遇到403错误,尝试解决...")

            # 尝试添加更多请求头
            headers.update({
                'DNT': '1',
                'Cache-Control': 'max-age=0',
            })

            response = session.get(url, timeout=10)

        response.raise_for_status()
        return response.text

    except requests.exceptions.HTTPError as e:
        print(f"HTTP错误: {e}")
        return None
    except Exception as e:
        print(f"请求失败: {e}")
        return None

# 使用示例
html = handle_403('https://www.example.com')

场景2:处理429 Too Many Requests

import requests
import time
import random

class RateLimitedRequest:
    """频率限制请求"""
    def __init__(self, max_requests_per_minute=30):
        self.max_requests = max_requests_per_minute
        self.request_times = []

    def wait_if_needed(self):
        """如果需要则等待"""
        current_time = time.time()

        # 移除超过1分钟的请求记录
        self.request_times = [
            t for t in self.request_times
            if current_time - t < 60
        ]

        # 如果请求次数超过限制,等待
        if len(self.request_times) >= self.max_requests:
            oldest_time = min(self.request_times)
            wait_time = 60 - (current_time - oldest_time) + random.uniform(1, 3)
            print(f"请求过于频繁,等待 {wait_time:.2f} 秒")
            time.sleep(wait_time)

    def get(self, url, **kwargs):
        """发送GET请求"""
        self.wait_if_needed()

        try:
            response = requests.get(url, timeout=10, **kwargs)
            response.raise_for_status()

            self.request_times.append(time.time())
            return response.text

        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                print("遇到429错误,等待后重试...")
                retry_after = int(e.response.headers.get('Retry-After', 60))
                time.sleep(retry_after + random.uniform(1, 3))
                return self.get(url, **kwargs)
            else:
                raise
        except Exception as e:
            print(f"请求失败: {e}")
            return None

# 使用示例
requester = RateLimitedRequest(max_requests_per_minute=20)
html = requester.get('https://www.example.com')

场景3:处理IP封禁

import requests
import time
from typing import List, Optional

class IPBanHandler:
    """IP封禁处理器"""
    def __init__(self, proxies: List[str]):
        self.proxies = proxies
        self.current_proxy_index = 0
        self.session = requests.Session()

    def get_next_proxy(self) -> Optional[dict]:
        """获取下一个代理"""
        if self.current_proxy_index >= len(self.proxies):
            print("所有代理都已用完")
            return None

        proxy = self.proxies[self.current_proxy_index]
        self.current_proxy_index += 1

        return {
            'http': proxy,
            'https': proxy
        }

    def fetch_with_proxy_rotation(self, url: str, max_retries: int = 3) -> Optional[str]:
        """使用代理轮换获取页面"""
        for attempt in range(max_retries):
            proxy_dict = self.get_next_proxy()
            if not proxy_dict:
                return None

            try:
                response = self.session.get(
                    url,
                    proxies=proxy_dict,
                    timeout=10,
                    headers={'User-Agent': 'Mozilla/5.0'}
                )

                if response.status_code == 403:
                    print(f"代理 {proxy_dict['http']} 被封禁,尝试下一个...")
                    continue

                response.raise_for_status()
                return response.text

            except requests.exceptions.RequestException as e:
                print(f"请求失败: {e}")
                time.sleep(2)

        return None

# 使用示例
proxies = [
    'http://proxy1.example.com:8080',
    'http://proxy2.example.com:8080',
    'http://proxy3.example.com:8080',
]

handler = IPBanHandler(proxies)
html = handler.fetch_with_proxy_rotation('https://www.example.com')

场景4:处理JavaScript渲染

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

class JavaScriptCrawler:
    """JavaScript渲染爬虫"""
    def __init__(self, headless=True):
        options = Options()
        if headless:
            options.add_argument('--headless')

        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')
        options.add_argument('--disable-gpu')
        options.add_argument('--window-size=1920,1080')

        # 添加User-Agent
        options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')

        self.driver = webdriver.Chrome(options=options)

    def fetch(self, url: str, wait_time: int = 10) -> str:
        """获取页面"""
        try:
            self.driver.get(url)

            # 等待页面加载完成
            WebDriverWait(self.driver, wait_time).until(
                EC.presence_of_element_located((By.TAG_NAME, 'body'))
            )

            # 额外等待JavaScript执行
            time.sleep(2)

            return self.driver.page_source

        except Exception as e:
            print(f"获取页面失败: {e}")
            return ""

    def close(self):
        """关闭浏览器"""
        self.driver.quit()

# 使用示例
crawler = JavaScriptCrawler(headless=True)
html = crawler.fetch('https://www.example.com')
crawler.close()

场景5:处理登录验证

import requests
from bs4 import BeautifulSoup

class LoginCrawler:
    """登录爬虫"""
    def __init__(self):
        self.session = requests.Session()

    def get_csrf_token(self, login_url: str) -> Optional[str]:
        """获取CSRF令牌"""
        try:
            response = self.session.get(login_url)
            response.raise_for_status()

            soup = BeautifulSoup(response.text, 'html.parser')
            csrf_token = soup.find('input', {'name': 'csrf_token'})

            if csrf_token:
                return csrf_token.get('value')
            return None

        except Exception as e:
            print(f"获取CSRF令牌失败: {e}")
            return None

    def login(self, login_url: str, username: str, password: str) -> bool:
        """登录"""
        csrf_token = self.get_csrf_token(login_url)
        if not csrf_token:
            print("无法获取CSRF令牌")
            return False

        login_data = {
            'username': username,
            'password': password,
            'csrf_token': csrf_token,
        }

        try:
            response = self.session.post(login_url, data=login_data)
            response.raise_for_status()

            # 检查是否登录成功
            if 'logout' in response.text.lower():
                print("登录成功")
                return True
            else:
                print("登录失败")
                return False

        except Exception as e:
            print(f"登录失败: {e}")
            return False

    def fetch_protected_page(self, url: str) -> Optional[str]:
        """获取需要登录的页面"""
        try:
            response = self.session.get(url)
            response.raise_for_status()
            return response.text
        except Exception as e:
            print(f"获取页面失败: {e}")
            return None

# 使用示例
crawler = LoginCrawler()
if crawler.login('https://www.example.com/login', 'username', 'password'):
    html = crawler.fetch_protected_page('https://www.example.com/protected')

高级技巧

1. 分布式爬虫

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import time

class DistributedCrawler:
    """分布式爬虫"""
    def __init__(self, proxies: List[str], max_workers=5):
        self.proxies = proxies
        self.max_workers = max_workers

    def fetch_single(self, url: str) -> Optional[str]:
        """获取单个页面"""
        proxy = random.choice(self.proxies)
        headers = {'User-Agent': 'Mozilla/5.0'}

        try:
            response = requests.get(
                url,
                proxies={'http': proxy, 'https': proxy},
                headers=headers,
                timeout=10
            )
            response.raise_for_status()
            return response.text
        except Exception as e:
            print(f"获取 {url} 失败: {e}")
            return None

    def fetch_multiple(self, urls: List[str]) -> List[Optional[str]]:
        """批量获取页面"""
        results = []

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_url = {
                executor.submit(self.fetch_single, url): url
                for url in urls
            }

            for future in as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    print(f"处理 {url} 时出错: {e}")
                    results.append(None)

        return results

# 使用示例
proxies = ['http://proxy1.example.com:8080', 'http://proxy2.example.com:8080']
crawler = DistributedCrawler(proxies, max_workers=3)
urls = ['https://www.example.com/page1', 'https://www.example.com/page2']
results = crawler.fetch_multiple(urls)

2. 智能重试机制

import requests
import time
from typing import Callable, Optional

def smart_retry(
    func: Callable,
    max_retries: int = 3,
    backoff_factor: float = 2,
    initial_delay: float = 1
):
    """智能重试装饰器"""
    def wrapper(*args, **kwargs):
        delay = initial_delay

        for attempt in range(max_retries):
            try:
                return func(*args, **kwargs)
            except requests.exceptions.HTTPError as e:
                if e.response.status_code in [429, 503]:
                    print(f"遇到 {e.response.status_code} 错误,等待 {delay:.2f} 秒后重试...")
                    time.sleep(delay)
                    delay *= backoff_factor
                else:
                    raise
            except requests.exceptions.RequestException as e:
                if attempt < max_retries - 1:
                    print(f"请求失败,等待 {delay:.2f} 秒后重试...")
                    time.sleep(delay)
                    delay *= backoff_factor
                else:
                    raise

    return wrapper

# 使用示例
@smart_retry(max_retries=3, backoff_factor=2, initial_delay=1)
def fetch_with_retry(url: str) -> str:
    """带重试的获取"""
    response = requests.get(url, timeout=10)
    response.raise_for_status()
    return response.text

html = fetch_with_retry('https://www.example.com')

3. 反爬虫检测绕过

import requests
import random
import time
from selenium import webdriver
from selenium.webdriver.chrome.options import Options

class AntiDetectionCrawler:
    """反检测爬虫"""
    def __init__(self):
        self.session = requests.Session()
        self.setup_session()

    def setup_session(self):
        """设置会话"""
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        })

    def random_delay(self, min_delay=1, max_delay=3):
        """随机延迟"""
        delay = random.uniform(min_delay, max_delay)
        time.sleep(delay)

    def simulate_human_behavior(self):
        """模拟人类行为"""
        # 随机延迟
        self.random_delay()

        # 随机User-Agent
        from fake_useragent import UserAgent
        ua = UserAgent()
        self.session.headers['User-Agent'] = ua.random

    def fetch(self, url: str) -> Optional[str]:
        """获取页面"""
        self.simulate_human_behavior()

        try:
            response = self.session.get(url, timeout=10)
            response.raise_for_status()
            return response.text
        except Exception as e:
            print(f"获取失败: {e}")
            return None

# 使用示例
crawler = AntiDetectionCrawler()
html = crawler.fetch('https://www.example.com')

最佳实践

1. 遵守robots.txt

import urllib.robotparser
from urllib.parse import urlparse

class RobotsTxtChecker:
    """robots.txt检查器"""
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.rp = urllib.robotparser.RobotFileParser()
        self.rp.set_url(f"{base_url}/robots.txt")
        self.rp.read()

    def can_fetch(self, url: str, user_agent='*') -> bool:
        """检查是否可以抓取"""
        return self.rp.can_fetch(user_agent, url)

    def crawl_delay(self, user_agent='*') -> float:
        """获取抓取延迟"""
        return self.rp.crawl_delay(user_agent)

# 使用示例
checker = RobotsTxtChecker('https://www.example.com')
if checker.can_fetch('https://www.example.com/page'):
    print("可以抓取")
else:
    print("不允许抓取")

2. 错误处理和日志

import logging
from datetime import datetime

class CrawlerLogger:
    """爬虫日志记录器"""
    def __init__(self, log_file='crawler.log'):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger('Crawler')

    def log_request(self, url: str, status_code: int):
        """记录请求"""
        self.logger.info(f"请求: {url}, 状态码: {status_code}")

    def log_error(self, url: str, error: Exception):
        """记录错误"""
        self.logger.error(f"错误: {url}, 异常: {error}")

    def log_success(self, url: str, data_size: int):
        """记录成功"""
        self.logger.info(f"成功: {url}, 数据大小: {data_size}")

# 使用示例
logger = CrawlerLogger()
try:
    response = requests.get('https://www.example.com')
    logger.log_request('https://www.example.com', response.status_code)
    logger.log_success('https://www.example.com', len(response.content))
except Exception as e:
    logger.log_error('https://www.example.com', e)

3. 数据存储

import json
import csv
from pathlib import Path

class DataStorage:
    """数据存储"""
    def __init__(self, output_dir='output'):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)

    def save_json(self, data, filename: str):
        """保存为JSON"""
        filepath = self.output_dir / f"{filename}.json"
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)

    def save_csv(self, data, filename: str, fieldnames: list):
        """保存为CSV"""
        filepath = self.output_dir / f"{filename}.csv"
        with open(filepath, 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(data)

# 使用示例
storage = DataStorage()
data = [{'name': 'Alice', 'age': 25}, {'name': 'Bob', 'age': 30}]
storage.save_json(data, 'users')
storage.save_csv(data, 'users', ['name', 'age'])

总结

完整的反反爬解决方案:

解决403:

  • 完善请求头
  • 使用随机User-Agent
  • 添加Referer
  • 管理Cookie

解决429:

  • 控制请求频率
  • 使用智能重试
  • 遵守robots.txt
  • 添加随机延迟

解决验证码:

  • 使用OCR识别
  • 调用第三方API
  • 手动输入
  • 避免触发验证码

解决IP封禁:

  • 使用代理IP池
  • 代理轮换
  • 分布式爬虫
  • 降低请求频率

最佳实践:

  • 遵守网站规则
  • 合理设置延迟
  • 完善错误处理
  • 记录详细日志
  • 尊重网站资源

通过本文的解决方案,你可以有效应对各种反爬机制,让爬虫更加稳定可靠。记住,合理使用爬虫技术,遵守法律法规和网站规则。
在这里插入图片描述

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐