【问题解决】已验证:Python 爬虫被反爬、403、验证码、IP 封禁完整解决方案
Python爬虫常见反爬问题及解决方案 摘要:本文针对Python爬虫开发中常见的403禁止访问、429请求过多、验证码和IP封禁等问题,提供了完整的解决方案。通过分析问题原因,提出四种有效应对措施:1)使用fake-useragent生成随机请求头完善HTTP头信息;2)采用Cookie管理器持久化会话状态;3)实现请求频率控制装饰器模拟人类操作;4)构建代理IP池实现IP轮换。文中提供了可直接
·
【问题解决】已验证:Python 爬虫被反爬、403、验证码、IP 封禁完整解决方案
问题现象
在使用Python进行网络爬虫时,经常遇到以下问题:
# HTTP 403 Forbidden
requests.exceptions.HTTPError: 403 Client Error: Forbidden
# HTTP 429 Too Many Requests
requests.exceptions.HTTPError: 429 Client Error: Too Many Requests
# 验证码
# 网站返回验证码页面,无法继续爬取
# IP被封禁
# 连接超时或拒绝连接
# User-Agent检测
# 返回空页面或错误页面
这些问题导致爬虫无法正常工作。本文将提供完整的反反爬解决方案。
问题原因分析
1. HTTP 403原因
- User-Agent被识别为爬虫
- Referer缺失或错误
- Cookie缺失或过期
- 请求头不完整
2. HTTP 429原因
- 请求频率过高
- 同一IP请求过多
- 未遵守robots.txt规则
3. 验证码原因
- 检测到异常访问模式
- IP行为可疑
- 触发了反爬机制
4. IP封禁原因
- 短时间内大量请求
- 使用了代理IP但质量差
- 违反了网站使用条款
解决方案
方案一:完善请求头
import requests
from fake_useragent import UserAgent
# 安装fake_useragent
# pip install fake-useragent
def get_headers():
"""获取随机请求头"""
ua = UserAgent()
headers = {
'User-Agent': ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Cache-Control': 'max-age=0',
}
return headers
def fetch_with_headers(url):
"""使用完整请求头获取页面"""
headers = get_headers()
session = requests.Session()
session.headers.update(headers)
try:
response = session.get(url, timeout=10)
response.raise_for_status()
return response.text
except requests.exceptions.HTTPError as e:
print(f"HTTP错误: {e}")
return None
except Exception as e:
print(f"请求失败: {e}")
return None
# 使用示例
html = fetch_with_headers('https://www.example.com')
方案二:Cookie管理
import requests
import pickle
from pathlib import Path
class CookieManager:
"""Cookie管理器"""
def __init__(self, cookie_file='cookies.pkl'):
self.cookie_file = Path(cookie_file)
self.session = requests.Session()
self.load_cookies()
def save_cookies(self):
"""保存Cookie"""
with open(self.cookie_file, 'wb') as f:
pickle.dump(self.session.cookies, f)
def load_cookies(self):
"""加载Cookie"""
if self.cookie_file.exists():
with open(self.cookie_file, 'rb') as f:
self.session.cookies.update(pickle.load(f))
def get(self, url, **kwargs):
"""发送GET请求"""
response = self.session.get(url, **kwargs)
self.save_cookies()
return response
def post(self, url, **kwargs):
"""发送POST请求"""
response = self.session.post(url, **kwargs)
self.save_cookies()
return response
# 使用示例
cookie_manager = CookieManager()
# 第一次访问,可能需要登录
response = cookie_manager.get('https://www.example.com/login', data={
'username': 'your_username',
'password': 'your_password'
})
# 后续访问会自动携带Cookie
response = cookie_manager.get('https://www.example.com/protected')
方案三:请求频率控制
import time
import random
from functools import wraps
def rate_limit(min_delay=1, max_delay=3):
"""请求频率限制装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
delay = random.uniform(min_delay, max_delay)
time.sleep(delay)
return func(*args, **kwargs)
return wrapper
return decorator
class RateLimitedCrawler:
"""频率限制的爬虫"""
def __init__(self, min_delay=1, max_delay=3):
self.min_delay = min_delay
self.max_delay = max_delay
@rate_limit(min_delay=1, max_delay=3)
def fetch(self, url):
"""获取页面"""
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.text
except Exception as e:
print(f"请求失败: {e}")
return None
def fetch_multiple(self, urls):
"""批量获取页面"""
results = []
for url in urls:
html = self.fetch(url)
if html:
results.append(html)
return results
# 使用示例
crawler = RateLimitedCrawler(min_delay=2, max_delay=5)
urls = [
'https://www.example.com/page1',
'https://www.example.com/page2',
'https://www.example.com/page3',
]
results = crawler.fetch_multiple(urls)
方案四:代理IP池
import requests
import random
from typing import List, Optional
class ProxyPool:
"""代理IP池"""
def __init__(self, proxies: List[str]):
self.proxies = proxies
self.failed_proxies = set()
self.session = requests.Session()
def get_proxy(self) -> Optional[dict]:
"""获取随机代理"""
available_proxies = [p for p in self.proxies if p not in self.failed_proxies]
if not available_proxies:
return None
proxy = random.choice(available_proxies)
return {
'http': proxy,
'https': proxy
}
def mark_failed(self, proxy: str):
"""标记失败的代理"""
self.failed_proxies.add(proxy)
def fetch(self, url: str, max_retries: int = 3) -> Optional[str]:
"""使用代理获取页面"""
for attempt in range(max_retries):
proxy_dict = self.get_proxy()
if not proxy_dict:
print("没有可用的代理")
return None
try:
response = self.session.get(
url,
proxies=proxy_dict,
timeout=10,
headers={'User-Agent': 'Mozilla/5.0'}
)
response.raise_for_status()
return response.text
except Exception as e:
proxy = proxy_dict['http']
print(f"代理 {proxy} 失败: {e}")
self.mark_failed(proxy)
return None
# 使用示例
proxies = [
'http://proxy1.example.com:8080',
'http://proxy2.example.com:8080',
'http://proxy3.example.com:8080',
]
proxy_pool = ProxyPool(proxies)
html = proxy_pool.fetch('https://www.example.com')
方案五:验证码处理
import requests
import base64
from io import BytesIO
from PIL import Image
import pytesseract
# 安装依赖
# pip install pytesseract pillow
# 需要安装Tesseract OCR引擎
class CaptchaSolver:
"""验证码解决器"""
def __init__(self):
self.session = requests.Session()
def get_captcha_image(self, url: str) -> Optional[Image.Image]:
"""获取验证码图片"""
try:
response = self.session.get(url)
response.raise_for_status()
image = Image.open(BytesIO(response.content))
return image
except Exception as e:
print(f"获取验证码失败: {e}")
return None
def solve_captcha(self, image: Image.Image) -> str:
"""解决验证码"""
try:
# 预处理图片
image = image.convert('L') # 转为灰度
image = image.resize((200, 80)) # 调整大小
# 使用OCR识别
captcha_text = pytesseract.image_to_string(image)
return captcha_text.strip()
except Exception as e:
print(f"识别验证码失败: {e}")
return ""
def solve_with_api(self, image: Image.Image, api_url: str, api_key: str) -> str:
"""使用API解决验证码"""
try:
buffered = BytesIO()
image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode()
response = requests.post(
api_url,
json={'image': img_str},
headers={'Authorization': f'Bearer {api_key}'}
)
response.raise_for_status()
return response.json()['result']
except Exception as e:
print(f"API识别失败: {e}")
return ""
# 使用示例
solver = CaptchaSolver()
# 方法1:使用OCR
captcha_image = solver.get_captcha_image('https://www.example.com/captcha')
if captcha_image:
captcha_text = solver.solve_captcha(captcha_image)
print(f"验证码: {captcha_text}")
# 方法2:使用第三方API
# captcha_text = solver.solve_with_api(captcha_image, 'https://api.captcha.com/solve', 'your_api_key')
常见场景解决
场景1:处理403 Forbidden
import requests
from fake_useragent import UserAgent
def handle_403(url):
"""处理403错误"""
ua = UserAgent()
headers = {
'User-Agent': ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Referer': url,
'Upgrade-Insecure-Requests': '1',
}
session = requests.Session()
session.headers.update(headers)
try:
response = session.get(url, timeout=10)
if response.status_code == 403:
print("遇到403错误,尝试解决...")
# 尝试添加更多请求头
headers.update({
'DNT': '1',
'Cache-Control': 'max-age=0',
})
response = session.get(url, timeout=10)
response.raise_for_status()
return response.text
except requests.exceptions.HTTPError as e:
print(f"HTTP错误: {e}")
return None
except Exception as e:
print(f"请求失败: {e}")
return None
# 使用示例
html = handle_403('https://www.example.com')
场景2:处理429 Too Many Requests
import requests
import time
import random
class RateLimitedRequest:
"""频率限制请求"""
def __init__(self, max_requests_per_minute=30):
self.max_requests = max_requests_per_minute
self.request_times = []
def wait_if_needed(self):
"""如果需要则等待"""
current_time = time.time()
# 移除超过1分钟的请求记录
self.request_times = [
t for t in self.request_times
if current_time - t < 60
]
# 如果请求次数超过限制,等待
if len(self.request_times) >= self.max_requests:
oldest_time = min(self.request_times)
wait_time = 60 - (current_time - oldest_time) + random.uniform(1, 3)
print(f"请求过于频繁,等待 {wait_time:.2f} 秒")
time.sleep(wait_time)
def get(self, url, **kwargs):
"""发送GET请求"""
self.wait_if_needed()
try:
response = requests.get(url, timeout=10, **kwargs)
response.raise_for_status()
self.request_times.append(time.time())
return response.text
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
print("遇到429错误,等待后重试...")
retry_after = int(e.response.headers.get('Retry-After', 60))
time.sleep(retry_after + random.uniform(1, 3))
return self.get(url, **kwargs)
else:
raise
except Exception as e:
print(f"请求失败: {e}")
return None
# 使用示例
requester = RateLimitedRequest(max_requests_per_minute=20)
html = requester.get('https://www.example.com')
场景3:处理IP封禁
import requests
import time
from typing import List, Optional
class IPBanHandler:
"""IP封禁处理器"""
def __init__(self, proxies: List[str]):
self.proxies = proxies
self.current_proxy_index = 0
self.session = requests.Session()
def get_next_proxy(self) -> Optional[dict]:
"""获取下一个代理"""
if self.current_proxy_index >= len(self.proxies):
print("所有代理都已用完")
return None
proxy = self.proxies[self.current_proxy_index]
self.current_proxy_index += 1
return {
'http': proxy,
'https': proxy
}
def fetch_with_proxy_rotation(self, url: str, max_retries: int = 3) -> Optional[str]:
"""使用代理轮换获取页面"""
for attempt in range(max_retries):
proxy_dict = self.get_next_proxy()
if not proxy_dict:
return None
try:
response = self.session.get(
url,
proxies=proxy_dict,
timeout=10,
headers={'User-Agent': 'Mozilla/5.0'}
)
if response.status_code == 403:
print(f"代理 {proxy_dict['http']} 被封禁,尝试下一个...")
continue
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
time.sleep(2)
return None
# 使用示例
proxies = [
'http://proxy1.example.com:8080',
'http://proxy2.example.com:8080',
'http://proxy3.example.com:8080',
]
handler = IPBanHandler(proxies)
html = handler.fetch_with_proxy_rotation('https://www.example.com')
场景4:处理JavaScript渲染
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
class JavaScriptCrawler:
"""JavaScript渲染爬虫"""
def __init__(self, headless=True):
options = Options()
if headless:
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
options.add_argument('--window-size=1920,1080')
# 添加User-Agent
options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
self.driver = webdriver.Chrome(options=options)
def fetch(self, url: str, wait_time: int = 10) -> str:
"""获取页面"""
try:
self.driver.get(url)
# 等待页面加载完成
WebDriverWait(self.driver, wait_time).until(
EC.presence_of_element_located((By.TAG_NAME, 'body'))
)
# 额外等待JavaScript执行
time.sleep(2)
return self.driver.page_source
except Exception as e:
print(f"获取页面失败: {e}")
return ""
def close(self):
"""关闭浏览器"""
self.driver.quit()
# 使用示例
crawler = JavaScriptCrawler(headless=True)
html = crawler.fetch('https://www.example.com')
crawler.close()
场景5:处理登录验证
import requests
from bs4 import BeautifulSoup
class LoginCrawler:
"""登录爬虫"""
def __init__(self):
self.session = requests.Session()
def get_csrf_token(self, login_url: str) -> Optional[str]:
"""获取CSRF令牌"""
try:
response = self.session.get(login_url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
csrf_token = soup.find('input', {'name': 'csrf_token'})
if csrf_token:
return csrf_token.get('value')
return None
except Exception as e:
print(f"获取CSRF令牌失败: {e}")
return None
def login(self, login_url: str, username: str, password: str) -> bool:
"""登录"""
csrf_token = self.get_csrf_token(login_url)
if not csrf_token:
print("无法获取CSRF令牌")
return False
login_data = {
'username': username,
'password': password,
'csrf_token': csrf_token,
}
try:
response = self.session.post(login_url, data=login_data)
response.raise_for_status()
# 检查是否登录成功
if 'logout' in response.text.lower():
print("登录成功")
return True
else:
print("登录失败")
return False
except Exception as e:
print(f"登录失败: {e}")
return False
def fetch_protected_page(self, url: str) -> Optional[str]:
"""获取需要登录的页面"""
try:
response = self.session.get(url)
response.raise_for_status()
return response.text
except Exception as e:
print(f"获取页面失败: {e}")
return None
# 使用示例
crawler = LoginCrawler()
if crawler.login('https://www.example.com/login', 'username', 'password'):
html = crawler.fetch_protected_page('https://www.example.com/protected')
高级技巧
1. 分布式爬虫
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import time
class DistributedCrawler:
"""分布式爬虫"""
def __init__(self, proxies: List[str], max_workers=5):
self.proxies = proxies
self.max_workers = max_workers
def fetch_single(self, url: str) -> Optional[str]:
"""获取单个页面"""
proxy = random.choice(self.proxies)
headers = {'User-Agent': 'Mozilla/5.0'}
try:
response = requests.get(
url,
proxies={'http': proxy, 'https': proxy},
headers=headers,
timeout=10
)
response.raise_for_status()
return response.text
except Exception as e:
print(f"获取 {url} 失败: {e}")
return None
def fetch_multiple(self, urls: List[str]) -> List[Optional[str]]:
"""批量获取页面"""
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_url = {
executor.submit(self.fetch_single, url): url
for url in urls
}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"处理 {url} 时出错: {e}")
results.append(None)
return results
# 使用示例
proxies = ['http://proxy1.example.com:8080', 'http://proxy2.example.com:8080']
crawler = DistributedCrawler(proxies, max_workers=3)
urls = ['https://www.example.com/page1', 'https://www.example.com/page2']
results = crawler.fetch_multiple(urls)
2. 智能重试机制
import requests
import time
from typing import Callable, Optional
def smart_retry(
func: Callable,
max_retries: int = 3,
backoff_factor: float = 2,
initial_delay: float = 1
):
"""智能重试装饰器"""
def wrapper(*args, **kwargs):
delay = initial_delay
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except requests.exceptions.HTTPError as e:
if e.response.status_code in [429, 503]:
print(f"遇到 {e.response.status_code} 错误,等待 {delay:.2f} 秒后重试...")
time.sleep(delay)
delay *= backoff_factor
else:
raise
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
print(f"请求失败,等待 {delay:.2f} 秒后重试...")
time.sleep(delay)
delay *= backoff_factor
else:
raise
return wrapper
# 使用示例
@smart_retry(max_retries=3, backoff_factor=2, initial_delay=1)
def fetch_with_retry(url: str) -> str:
"""带重试的获取"""
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.text
html = fetch_with_retry('https://www.example.com')
3. 反爬虫检测绕过
import requests
import random
import time
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
class AntiDetectionCrawler:
"""反检测爬虫"""
def __init__(self):
self.session = requests.Session()
self.setup_session()
def setup_session(self):
"""设置会话"""
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
def random_delay(self, min_delay=1, max_delay=3):
"""随机延迟"""
delay = random.uniform(min_delay, max_delay)
time.sleep(delay)
def simulate_human_behavior(self):
"""模拟人类行为"""
# 随机延迟
self.random_delay()
# 随机User-Agent
from fake_useragent import UserAgent
ua = UserAgent()
self.session.headers['User-Agent'] = ua.random
def fetch(self, url: str) -> Optional[str]:
"""获取页面"""
self.simulate_human_behavior()
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
return response.text
except Exception as e:
print(f"获取失败: {e}")
return None
# 使用示例
crawler = AntiDetectionCrawler()
html = crawler.fetch('https://www.example.com')
最佳实践
1. 遵守robots.txt
import urllib.robotparser
from urllib.parse import urlparse
class RobotsTxtChecker:
"""robots.txt检查器"""
def __init__(self, base_url: str):
self.base_url = base_url
self.rp = urllib.robotparser.RobotFileParser()
self.rp.set_url(f"{base_url}/robots.txt")
self.rp.read()
def can_fetch(self, url: str, user_agent='*') -> bool:
"""检查是否可以抓取"""
return self.rp.can_fetch(user_agent, url)
def crawl_delay(self, user_agent='*') -> float:
"""获取抓取延迟"""
return self.rp.crawl_delay(user_agent)
# 使用示例
checker = RobotsTxtChecker('https://www.example.com')
if checker.can_fetch('https://www.example.com/page'):
print("可以抓取")
else:
print("不允许抓取")
2. 错误处理和日志
import logging
from datetime import datetime
class CrawlerLogger:
"""爬虫日志记录器"""
def __init__(self, log_file='crawler.log'):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger('Crawler')
def log_request(self, url: str, status_code: int):
"""记录请求"""
self.logger.info(f"请求: {url}, 状态码: {status_code}")
def log_error(self, url: str, error: Exception):
"""记录错误"""
self.logger.error(f"错误: {url}, 异常: {error}")
def log_success(self, url: str, data_size: int):
"""记录成功"""
self.logger.info(f"成功: {url}, 数据大小: {data_size}")
# 使用示例
logger = CrawlerLogger()
try:
response = requests.get('https://www.example.com')
logger.log_request('https://www.example.com', response.status_code)
logger.log_success('https://www.example.com', len(response.content))
except Exception as e:
logger.log_error('https://www.example.com', e)
3. 数据存储
import json
import csv
from pathlib import Path
class DataStorage:
"""数据存储"""
def __init__(self, output_dir='output'):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
def save_json(self, data, filename: str):
"""保存为JSON"""
filepath = self.output_dir / f"{filename}.json"
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def save_csv(self, data, filename: str, fieldnames: list):
"""保存为CSV"""
filepath = self.output_dir / f"{filename}.csv"
with open(filepath, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
# 使用示例
storage = DataStorage()
data = [{'name': 'Alice', 'age': 25}, {'name': 'Bob', 'age': 30}]
storage.save_json(data, 'users')
storage.save_csv(data, 'users', ['name', 'age'])
总结
完整的反反爬解决方案:
解决403:
- 完善请求头
- 使用随机User-Agent
- 添加Referer
- 管理Cookie
解决429:
- 控制请求频率
- 使用智能重试
- 遵守robots.txt
- 添加随机延迟
解决验证码:
- 使用OCR识别
- 调用第三方API
- 手动输入
- 避免触发验证码
解决IP封禁:
- 使用代理IP池
- 代理轮换
- 分布式爬虫
- 降低请求频率
最佳实践:
- 遵守网站规则
- 合理设置延迟
- 完善错误处理
- 记录详细日志
- 尊重网站资源
通过本文的解决方案,你可以有效应对各种反爬机制,让爬虫更加稳定可靠。记住,合理使用爬虫技术,遵守法律法规和网站规则。
更多推荐



所有评论(0)