在网络爬虫的世界里,IP封禁就像是一道"封印术",让我们的爬虫程序寸步难行。而动态IP轮换技术,就像是破解这道封印的"金钥匙"。今天我们就来深入探讨如何通过智能的IP轮换策略,让爬虫在反爬的重重包围中优雅地穿行!

🎯 反爬机制深度解析

常见反爬检测手段

想象一下,网站的反爬系统就像是一个经验丰富的"门卫大叔",他会通过各种蛛丝马迹来识别"可疑人员":

用户请求
反爬检测系统
IP频率检测
User-Agent检测
行为模式分析
JavaScript验证
Cookie/Session检测
超过阈值?
异常特征?
机器行为?
执行失败?
缺失/异常?
IP封禁
请求拒绝
验证码挑战
JavaScript检测失败
会话验证失败
正常访问

反爬检测的核心指标

  1. 请求频率:单IP在时间窗口内的请求数量
  2. 访问模式:请求间隔、访问路径的规律性
  3. 浏览器指纹:User-Agent、HTTP头信息
  4. 会话状态:Cookie、Session的持续性
  5. 交互行为:是否执行JavaScript、处理验证码

🔄 IP轮换策略设计

1. 基础轮换策略

让我们从最简单的IP轮换开始,就像轮流值班一样:

import random
import time
import threading
from collections import deque, defaultdict
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum

class RotationStrategy(Enum):
    RANDOM = "random"
    ROUND_ROBIN = "round_robin" 
    WEIGHTED = "weighted"
    LEAST_USED = "least_used"
    RESPONSE_TIME = "response_time"

@dataclass
class ProxyInfo:
    ip: str
    port: int
    username: str = None
    password: str = None
    success_count: int = 0
    failure_count: int = 0
    last_used: float = 0
    response_time_avg: float = 0
    quality_score: float = 0.5
    cooling_until: float = 0
    
    @property
    def proxy_url(self) -> str:
        if self.username and self.password:
            return f"http://{self.username}:{self.password}@{self.ip}:{self.port}"
        return f"http://{self.ip}:{self.port}"
    
    @property
    def success_rate(self) -> float:
        total = self.success_count + self.failure_count
        return self.success_count / max(1, total)
    
    @property
    def is_cooling(self) -> bool:
        return time.time() < self.cooling_until

class IPRotationManager:
    """IP轮换管理器"""
    
    def __init__(self, strategy: RotationStrategy = RotationStrategy.WEIGHTED):
        self.strategy = strategy
        self.proxies: List[ProxyInfo] = []
        self.current_index = 0
        self.usage_stats = defaultdict(int)
        self.failed_proxies = set()
        self.lock = threading.RLock()
        
        # 轮换配置
        self.config = {
            'cooling_time': 300,      # IP冷却时间(秒)
            'max_failures': 3,        # 最大失败次数
            'failure_window': 600,    # 失败计数窗口(秒)
            'min_interval': 1,        # 最小请求间隔(秒)
            'quality_threshold': 0.3   # 质量阈值
        }
    
    def add_proxy(self, proxy: ProxyInfo):
        """添加代理"""
        with self.lock:
            # 检查是否已存在
            existing = next((p for p in self.proxies if p.ip == proxy.ip and p.port == proxy.port), None)
            if not existing:
                self.proxies.append(proxy)
                print(f"✅ 添加代理: {proxy.ip}:{proxy.port}")
    
    def remove_proxy(self, proxy: ProxyInfo):
        """移除代理"""
        with self.lock:
            if proxy in self.proxies:
                self.proxies.remove(proxy)
                print(f"❌ 移除代理: {proxy.ip}:{proxy.port}")
    
    def get_proxy(self, exclude_ips: set = None) -> Optional[ProxyInfo]:
        """根据策略获取代理"""
        with self.lock:
            available_proxies = self._get_available_proxies(exclude_ips)
            
            if not available_proxies:
                print("⚠️  没有可用的代理")
                return None
            
            # 根据策略选择代理
            if self.strategy == RotationStrategy.RANDOM:
                selected = self._random_select(available_proxies)
            elif self.strategy == RotationStrategy.ROUND_ROBIN:
                selected = self._round_robin_select(available_proxies)
            elif self.strategy == RotationStrategy.WEIGHTED:
                selected = self._weighted_select(available_proxies)
            elif self.strategy == RotationStrategy.LEAST_USED:
                selected = self._least_used_select(available_proxies)
            else:
                selected = self._response_time_select(available_proxies)
            
            if selected:
                selected.last_used = time.time()
                self.usage_stats[f"{selected.ip}:{selected.port}"] += 1
                print(f"🔄 选择代理: {selected.ip}:{selected.port} (策略: {self.strategy.value})")
            
            return selected
    
    def _get_available_proxies(self, exclude_ips: set = None) -> List[ProxyInfo]:
        """获取可用代理列表"""
        exclude_ips = exclude_ips or set()
        available = []
        
        for proxy in self.proxies:
            # 过滤条件
            if (proxy.ip not in exclude_ips and 
                not proxy.is_cooling and
                proxy.quality_score >= self.config['quality_threshold'] and
                f"{proxy.ip}:{proxy.port}" not in self.failed_proxies):
                available.append(proxy)
        
        return available
    
    def _random_select(self, proxies: List[ProxyInfo]) -> ProxyInfo:
        """随机选择策略"""
        return random.choice(proxies)
    
    def _round_robin_select(self, proxies: List[ProxyInfo]) -> ProxyInfo:
        """轮询选择策略"""
        selected = proxies[self.current_index % len(proxies)]
        self.current_index += 1
        return selected
    
    def _weighted_select(self, proxies: List[ProxyInfo]) -> ProxyInfo:
        """加权选择策略"""
        # 计算权重:基于成功率和响应时间
        weights = []
        for proxy in proxies:
            # 成功率权重
            success_weight = proxy.success_rate
            
            # 响应时间权重(响应时间越短权重越高)
            time_weight = max(0.1, 1 - (proxy.response_time_avg - 1000) / 4000)
            
            # 使用频率权重(使用越少权重越高)
            usage_count = self.usage_stats.get(f"{proxy.ip}:{proxy.port}", 0)
            usage_weight = max(0.1, 1 / (1 + usage_count * 0.1))
            
            # 综合权重
            total_weight = success_weight * 0.4 + time_weight * 0.3 + usage_weight * 0.3
            weights.append(total_weight)
        
        return random.choices(proxies, weights=weights)[0]
    
    def _least_used_select(self, proxies: List[ProxyInfo]) -> ProxyInfo:
        """最少使用选择策略"""
        return min(proxies, key=lambda p: self.usage_stats.get(f"{p.ip}:{p.port}", 0))
    
    def _response_time_select(self, proxies: List[ProxyInfo]) -> ProxyInfo:
        """响应时间优先选择策略"""
        # 按响应时间排序,从前20%中随机选择
        sorted_proxies = sorted(proxies, key=lambda p: p.response_time_avg)
        top_count = max(1, len(sorted_proxies) // 5)
        return random.choice(sorted_proxies[:top_count])
    
    def report_success(self, proxy: ProxyInfo, response_time: float = 0):
        """报告成功使用"""
        with self.lock:
            proxy.success_count += 1
            
            if response_time > 0:
                # 更新平均响应时间
                total_requests = proxy.success_count + proxy.failure_count
                proxy.response_time_avg = (proxy.response_time_avg * (total_requests - 1) + response_time) / total_requests
            
            # 更新质量评分
            self._update_quality_score(proxy)
            
            # 从失败列表中移除
            failed_key = f"{proxy.ip}:{proxy.port}"
            if failed_key in self.failed_proxies:
                self.failed_proxies.discard(failed_key)
    
    def report_failure(self, proxy: ProxyInfo, error_type: str = "unknown"):
        """报告失败使用"""
        with self.lock:
            proxy.failure_count += 1
            
            # 更新质量评分
            self._update_quality_score(proxy)
            
            # 设置冷却时间
            proxy.cooling_until = time.time() + self.config['cooling_time']
            
            # 检查是否需要暂时禁用
            if proxy.failure_count >= self.config['max_failures']:
                failed_key = f"{proxy.ip}:{proxy.port}"
                self.failed_proxies.add(failed_key)
                print(f"⚠️  代理失败次数过多,暂时禁用: {failed_key}")
    
    def _update_quality_score(self, proxy: ProxyInfo):
        """更新代理质量评分"""
        success_rate = proxy.success_rate
        
        # 响应时间评分(1000ms以下为满分)
        time_score = max(0, 1 - (proxy.response_time_avg - 1000) / 4000)
        
        # 综合评分
        proxy.quality_score = success_rate * 0.7 + time_score * 0.3
    
    def get_stats(self) -> Dict:
        """获取统计信息"""
        with self.lock:
            total_proxies = len(self.proxies)
            available_proxies = len(self._get_available_proxies())
            cooling_proxies = sum(1 for p in self.proxies if p.is_cooling)
            failed_proxies = len(self.failed_proxies)
            
            return {
                'total_proxies': total_proxies,
                'available_proxies': available_proxies,
                'cooling_proxies': cooling_proxies,
                'failed_proxies': failed_proxies,
                'strategy': self.strategy.value
            }

# 使用示例
def demo_ip_rotation():
    """IP轮换演示"""
    # 创建轮换管理器
    rotation_manager = IPRotationManager(strategy=RotationStrategy.WEIGHTED)
    
    # 添加代理
    proxies = [
        ProxyInfo("192.168.1.100", 8080, quality_score=0.8, response_time_avg=1200),
        ProxyInfo("192.168.1.101", 8080, quality_score=0.6, response_time_avg=2000),
        ProxyInfo("192.168.1.102", 8080, quality_score=0.9, response_time_avg=800),
        ProxyInfo("192.168.1.103", 8080, quality_score=0.7, response_time_avg=1500),
    ]
    
    for proxy in proxies:
        rotation_manager.add_proxy(proxy)
    
    # 模拟使用
    print("🔄 开始IP轮换测试...")
    for i in range(10):
        proxy = rotation_manager.get_proxy()
        if proxy:
            # 模拟请求结果
            if random.random() > 0.2:  # 80%成功率
                rotation_manager.report_success(proxy, random.randint(800, 2000))
            else:
                rotation_manager.report_failure(proxy)
        
        time.sleep(0.1)
    
    # 输出统计
    stats = rotation_manager.get_stats()
    print(f"\n📊 轮换统计: {stats}")

if __name__ == "__main__":
    demo_ip_rotation()

2. 智能频率控制系统

频率控制就像是给爬虫安装了一个"智能刹车系统":

import asyncio
import time
from collections import defaultdict, deque
from typing import Dict, List, Optional
from dataclasses import dataclass
import logging

@dataclass
class FrequencyRule:
    """频率控制规则"""
    requests_per_minute: int = 60
    requests_per_hour: int = 1000
    burst_limit: int = 5
    cool_down_seconds: int = 30

class AdaptiveFrequencyController:
    """自适应频率控制器"""
    
    def __init__(self):
        self.ip_stats = defaultdict(lambda: {
            'requests': deque(),
            'successes': deque(), 
            'failures': deque(),
            'last_request': 0,
            'cooling_until': 0,
            'current_limit': 60  # 当前每分钟限制
        })
        
        self.global_rules = FrequencyRule()
        self.adaptive_config = {
            'success_rate_threshold': 0.8,
            'increase_factor': 1.2,
            'decrease_factor': 0.7,
            'min_requests_per_minute': 10,
            'max_requests_per_minute': 120,
            'adaptation_window': 300  # 5分钟适应窗口
        }
    
    async def can_request(self, ip: str) -> tuple[bool, float]:
        """检查是否可以发送请求"""
        current_time = time.time()
        stats = self.ip_stats[ip]
        
        # 检查冷却期
        if current_time < stats['cooling_until']:
            remaining = stats['cooling_until'] - current_time
            return False, remaining
        
        # 清理过期记录
        self._clean_expired_records(stats, current_time)
        
        # 检查各项限制
        minute_count = len([r for r in stats['requests'] if current_time - r <= 60])
        hour_count = len([r for r in stats['requests'] if current_time - r <= 3600])
        
        # 检查分钟限制
        current_limit = stats['current_limit']
        if minute_count >= current_limit:
            return False, 60 - (current_time - max(stats['requests'], default=0))
        
        # 检查小时限制
        if hour_count >= self.global_rules.requests_per_hour:
            return False, 3600 - (current_time - max(stats['requests'], default=0))
        
        # 检查突发限制
        recent_requests = [r for r in stats['requests'] if current_time - r <= 10]
        if len(recent_requests) >= self.global_rules.burst_limit:
            return False, 10 - (current_time - max(recent_requests, default=0))
        
        # 检查最小间隔
        if stats['last_request'] > 0:
            min_interval = self._calculate_adaptive_interval(ip)
            elapsed = current_time - stats['last_request']
            if elapsed < min_interval:
                return False, min_interval - elapsed
        
        return True, 0
    
    def record_request(self, ip: str, success: bool, response_time: float = 0):
        """记录请求结果"""
        current_time = time.time()
        stats = self.ip_stats[ip]
        
        # 记录请求
        stats['requests'].append(current_time)
        stats['last_request'] = current_time
        
        if success:
            stats['successes'].append(current_time)
        else:
            stats['failures'].append(current_time)
            
            # 失败时设置冷却时间
            stats['cooling_until'] = current_time + self.global_rules.cool_down_seconds
        
        # 自适应调整频率限制
        asyncio.create_task(self._adaptive_adjust(ip))
    
    def _clean_expired_records(self, stats: Dict, current_time: float):
        """清理过期记录"""
        # 保留最近1小时的记录
        cutoff_time = current_time - 3600
        
        while stats['requests'] and stats['requests'][0] < cutoff_time:
            stats['requests'].popleft()
        
        while stats['successes'] and stats['successes'][0] < cutoff_time:
            stats['successes'].popleft()
            
        while stats['failures'] and stats['failures'][0] < cutoff_time:
            stats['failures'].popleft()
    
    def _calculate_adaptive_interval(self, ip: str) -> float:
        """计算自适应请求间隔"""
        stats = self.ip_stats[ip]
        current_time = time.time()
        
        # 计算最近成功率
        recent_successes = len([s for s in stats['successes'] if current_time - s <= 300])
        recent_failures = len([f for f in stats['failures'] if current_time - f <= 300])
        total_recent = recent_successes + recent_failures
        
        if total_recent == 0:
            return 1.0  # 默认1秒间隔
        
        success_rate = recent_successes / total_recent
        
        # 根据成功率调整间隔
        if success_rate >= 0.9:
            return 0.5  # 高成功率,短间隔
        elif success_rate >= 0.7:
            return 1.0  # 正常间隔
        elif success_rate >= 0.5:
            return 2.0  # 降低频率
        else:
            return 5.0  # 大幅降低频率
    
    async def _adaptive_adjust(self, ip: str):
        """自适应调整频率限制"""
        stats = self.ip_stats[ip]
        current_time = time.time()
        
        # 计算适应窗口内的成功率
        window_start = current_time - self.adaptive_config['adaptation_window']
        
        recent_successes = len([s for s in stats['successes'] if s >= window_start])
        recent_failures = len([f for f in stats['failures'] if f >= window_start])
        total_recent = recent_successes + recent_failures
        
        if total_recent < 10:  # 样本太少,不调整
            return
        
        success_rate = recent_successes / total_recent
        current_limit = stats['current_limit']
        
        # 根据成功率调整限制
        if success_rate >= self.adaptive_config['success_rate_threshold']:
            # 成功率高,可以增加频率
            new_limit = min(
                int(current_limit * self.adaptive_config['increase_factor']),
                self.adaptive_config['max_requests_per_minute']
            )
        else:
            # 成功率低,降低频率
            new_limit = max(
                int(current_limit * self.adaptive_config['decrease_factor']),
                self.adaptive_config['min_requests_per_minute']
            )
        
        if new_limit != current_limit:
            stats['current_limit'] = new_limit
            logging.info(f"📊 IP {ip} 频率限制调整: {current_limit} -> {new_limit} (成功率: {success_rate:.2%})")
    
    def get_ip_stats(self, ip: str) -> Dict:
        """获取IP统计信息"""
        stats = self.ip_stats[ip]
        current_time = time.time()
        
        minute_count = len([r for r in stats['requests'] if current_time - r <= 60])
        hour_count = len([r for r in stats['requests'] if current_time - r <= 3600])
        
        recent_successes = len([s for s in stats['successes'] if current_time - s <= 300])
        recent_failures = len([f for f in stats['failures'] if current_time - f <= 300])
        total_recent = recent_successes + recent_failures
        
        success_rate = recent_successes / total_recent if total_recent > 0 else 0
        
        return {
            'ip': ip,
            'requests_last_minute': minute_count,
            'requests_last_hour': hour_count,
            'current_limit_per_minute': stats['current_limit'],
            'success_rate_5min': success_rate,
            'is_cooling': current_time < stats['cooling_until'],
            'cooling_remaining': max(0, stats['cooling_until'] - current_time)
        }

3. 分布式IP轮换系统

在多服务器环境中,我们需要一个"中央调度员"来协调所有的IP使用:

import redis
import json
import asyncio
from typing import Dict, List, Optional
from dataclasses import asdict

class DistributedIPManager:
    """分布式IP管理器"""
    
    def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, db=redis_db, decode_responses=True)
        self.instance_id = self._generate_instance_id()
        
        # Redis key前缀
        self.key_prefix = "distributed_ip_pool"
        self.usage_key = f"{self.key_prefix}:usage"
        self.stats_key = f"{self.key_prefix}:stats"
        self.lock_key = f"{self.key_prefix}:locks"
        
    def _generate_instance_id(self) -> str:
        """生成实例ID"""
        import socket
        import uuid
        hostname = socket.gethostname()
        return f"{hostname}-{str(uuid.uuid4())[:8]}"
    
    async def acquire_ip(self, strategy: str = "least_used") -> Optional[Dict]:
        """分布式获取IP"""
        lock_key = f"{self.lock_key}:acquire"
        
        try:
            # 分布式锁
            if self.redis_client.set(lock_key, self.instance_id, nx=True, ex=5):
                try:
                    # 获取可用IP列表
                    available_ips = await self._get_available_ips()
                    
                    if not available_ips:
                        return None
                    
                    # 根据策略选择IP
                    selected_ip = await self._select_ip_by_strategy(available_ips, strategy)
                    
                    if selected_ip:
                        # 记录使用
                        await self._record_ip_acquisition(selected_ip)
                        
                        return selected_ip
                    
                finally:
                    # 释放锁
                    self.redis_client.delete(lock_key)
            else:
                # 锁获取失败,等待后重试
                await asyncio.sleep(0.1)
                return await self.acquire_ip(strategy)
                
        except Exception as e:
            print(f"❌ 获取IP失败: {e}")
            return None
    
    async def release_ip(self, ip_info: Dict, success: bool = True, response_time: float = 0):
        """释放IP并更新统计"""
        ip_key = f"{ip_info['ip']}:{ip_info['port']}"
        
        try:
            # 更新使用统计
            stats = {
                'last_used': time.time(),
                'success': success,
                'response_time': response_time,
                'instance_id': self.instance_id
            }
            
            # 原子性更新统计信息
            pipe = self.redis_client.pipeline()
            
            if success:
                pipe.hincrby(f"{self.stats_key}:{ip_key}", "success_count", 1)
                pipe.hset(f"{self.stats_key}:{ip_key}", "last_success", time.time())
                
                if response_time > 0:
                    # 更新平均响应时间
                    pipe.hset(f"{self.stats_key}:{ip_key}", "last_response_time", response_time)
            else:
                pipe.hincrby(f"{self.stats_key}:{ip_key}", "failure_count", 1)
                pipe.hset(f"{self.stats_key}:{ip_key}", "last_failure", time.time())
                
                # 设置冷却期
                cooling_until = time.time() + 300  # 5分钟冷却
                pipe.hset(f"{self.stats_key}:{ip_key}", "cooling_until", cooling_until)
            
            # 减少使用计数
            pipe.hdel(f"{self.usage_key}", ip_key)
            
            await pipe.execute()
            
        except Exception as e:
            print(f"❌ 释放IP失败: {e}")
    
    async def _get_available_ips(self) -> List[Dict]:
        """获取可用IP列表"""
        try:
            # 从Redis获取所有IP信息
            ip_list_key = f"{self.key_prefix}:ip_list"
            ip_data = self.redis_client.hgetall(ip_list_key)
            
            available_ips = []
            current_time = time.time()
            
            for ip_key, ip_json in ip_data.items():
                try:
                    ip_info = json.loads(ip_json)
                    
                    # 检查是否可用
                    if await self._is_ip_available(ip_key, current_time):
                        available_ips.append(ip_info)
                        
                except json.JSONDecodeError:
                    continue
            
            return available_ips
            
        except Exception as e:
            print(f"❌ 获取可用IP列表失败: {e}")
            return []
    
    async def _is_ip_available(self, ip_key: str, current_time: float) -> bool:
        """检查IP是否可用"""
        try:
            # 检查是否在使用中
            if self.redis_client.hexists(self.usage_key, ip_key):
                return False
            
            # 检查冷却期
            stats = self.redis_client.hgetall(f"{self.stats_key}:{ip_key}")
            if stats.get("cooling_until"):
                cooling_until = float(stats["cooling_until"])
                if current_time < cooling_until:
                    return False
            
            # 检查质量评分
            success_count = int(stats.get("success_count", 0))
            failure_count = int(stats.get("failure_count", 0))
            total_requests = success_count + failure_count
            
            if total_requests > 0:
                success_rate = success_count / total_requests
                if success_rate < 0.3:  # 成功率太低
                    return False
            
            return True
            
        except Exception:
            return False
    
    async def _select_ip_by_strategy(self, available_ips: List[Dict], strategy: str) -> Optional[Dict]:
        """根据策略选择IP"""
        if not available_ips:
            return None
        
        try:
            if strategy == "random":
                return random.choice(available_ips)
            
            elif strategy == "least_used":
                # 选择使用次数最少的IP
                ip_usage = {}
                for ip_info in available_ips:
                    ip_key = f"{ip_info['ip']}:{ip_info['port']}"
                    stats = self.redis_client.hgetall(f"{self.stats_key}:{ip_key}")
                    total_used = int(stats.get("success_count", 0)) + int(stats.get("failure_count", 0))
                    ip_usage[ip_info['ip']] = total_used
                
                return min(available_ips, key=lambda x: ip_usage.get(x['ip'], 0))
            
            elif strategy == "best_performance":
                # 选择性能最好的IP
                best_ip = None
                best_score = 0
                
                for ip_info in available_ips:
                    ip_key = f"{ip_info['ip']}:{ip_info['port']}"
                    stats = self.redis_client.hgetall(f"{self.stats_key}:{ip_key}")
                    
                    success_count = int(stats.get("success_count", 0))
                    failure_count = int(stats.get("failure_count", 0))
                    total_requests = success_count + failure_count
                    
                    if total_requests > 0:
                        success_rate = success_count / total_requests
                        avg_response_time = float(stats.get("last_response_time", 1000))
                        
                        # 综合评分:成功率权重0.7,响应时间权重0.3
                        time_score = max(0.1, 1 - (avg_response_time - 1000) / 4000)
                        score = success_rate * 0.7 + time_score * 0.3
                        
                        if score > best_score:
                            best_score = score
                            best_ip = ip_info
                
                return best_ip or available_ips[0]
            
            else:
                # 默认随机选择
                return random.choice(available_ips)
                
        except Exception as e:
            print(f"❌ IP选择策略执行失败: {e}")
            return random.choice(available_ips) if available_ips else None
    
    async def _record_ip_acquisition(self, ip_info: Dict):
        """记录IP获取"""
        ip_key = f"{ip_info['ip']}:{ip_info['port']}"
        
        try:
            # 在usage表中记录使用
            usage_info = {
                'acquired_at': time.time(),
                'instance_id': self.instance_id
            }
            
            self.redis_client.hset(self.usage_key, ip_key, json.dumps(usage_info))
            
        except Exception as e:
            print(f"❌ 记录IP获取失败: {e}")
    
    async def add_ip_to_pool(self, ip_info: Dict):
        """添加IP到分布式池"""
        try:
            ip_key = f"{ip_info['ip']}:{ip_info['port']}"
            ip_list_key = f"{self.key_prefix}:ip_list"
            
            self.redis_client.hset(ip_list_key, ip_key, json.dumps(ip_info))
            print(f"✅ IP已添加到分布式池: {ip_key}")
            
        except Exception as e:
            print(f"❌ 添加IP到分布式池失败: {e}")
    
    async def get_pool_stats(self) -> Dict:
        """获取池统计信息"""
        try:
            ip_list_key = f"{self.key_prefix}:ip_list"
            
            total_ips = self.redis_client.hlen(ip_list_key)
            using_ips = self.redis_client.hlen(self.usage_key)
            available_ips = len(await self._get_available_ips())
            
            return {
                'total_ips': total_ips,
                'using_ips': using_ips,
                'available_ips': available_ips,
                'instance_id': self.instance_id
            }
            
        except Exception as e:
            print(f"❌ 获取池统计失败: {e}")
            return {}

# 使用示例
async def demo_distributed_ip_manager():
    """分布式IP管理演示"""
    manager = DistributedIPManager()
    
    # 添加一些IP到池中
    test_ips = [
        {'ip': '192.168.1.100', 'port': 8080, 'type': 'HTTP'},
        {'ip': '192.168.1.101', 'port': 8080, 'type': 'HTTP'},
        {'ip': '192.168.1.102', 'port': 8080, 'type': 'HTTP'},
    ]
    
    for ip_info in test_ips:
        await manager.add_ip_to_pool(ip_info)
    
    # 模拟IP使用
    for i in range(5):
        print(f"\n🔄 第 {i+1} 次获取IP")
        
        # 获取IP
        ip_info = await manager.acquire_ip(strategy="least_used")
        if ip_info:
            print(f"📍 获取到IP: {ip_info['ip']}:{ip_info['port']}")
            
            # 模拟使用
            await asyncio.sleep(0.1)
            
            # 模拟请求结果
            success = random.random() > 0.2
            response_time = random.randint(500, 2000)
            
            # 释放IP
            await manager.release_ip(ip_info, success, response_time)
            print(f"✅ IP已释放,成功: {success}, 响应时间: {response_time}ms")
        else:
            print("❌ 没有可用的IP")
    
    # 输出统计
    stats = await manager.get_pool_stats()
    print(f"\n📊 池统计: {stats}")

if __name__ == "__main__":
    asyncio.run(demo_distributed_ip_manager())

🛡️ 反反爬高级策略

行为模拟和伪装技术

import random
import time
from typing import Dict, List
from dataclasses import dataclass

@dataclass
class BehaviorPattern:
    """行为模式配置"""
    min_think_time: float = 1.0      # 最小思考时间
    max_think_time: float = 5.0      # 最大思考时间
    scroll_probability: float = 0.3   # 滚动概率
    click_probability: float = 0.2    # 点击概率
    back_probability: float = 0.1     # 返回概率

class HumanBehaviorSimulator:
    """人类行为模拟器"""
    
    def __init__(self):
        self.user_agents = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
            "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0"
        ]
        
        self.session_state = {
            'pages_visited': 0,
            'session_start': time.time(),
            'last_action': time.time()
        }
    
    def get_random_headers(self) -> Dict[str, str]:
        """生成随机请求头"""
        return {
            'User-Agent': random.choice(self.user_agents),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': random.choice(['en-US,en;q=0.5', 'zh-CN,zh;q=0.9', 'en-GB,en;q=0.5']),
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        }
    
    def calculate_think_time(self, pattern: BehaviorPattern) -> float:
        """计算思考时间"""
        base_time = random.uniform(pattern.min_think_time, pattern.max_think_time)
        
        # 根据会话状态调整
        pages_factor = min(1.5, 1 + self.session_state['pages_visited'] * 0.1)
        session_duration = time.time() - self.session_state['session_start']
        fatigue_factor = 1 + min(0.5, session_duration / 3600)  # 疲劳因子
        
        return base_time * pages_factor * fatigue_factor
    
    async def simulate_page_interaction(self, pattern: BehaviorPattern):
        """模拟页面交互"""
        interactions = []
        
        # 滚动行为
        if random.random() < pattern.scroll_probability:
            scroll_times = random.randint(1, 3)
            for _ in range(scroll_times):
                await asyncio.sleep(random.uniform(0.5, 2.0))
                interactions.append("scroll")
        
        # 点击行为
        if random.random() < pattern.click_probability:
            await asyncio.sleep(random.uniform(0.3, 1.0))
            interactions.append("click")
        
        # 更新会话状态
        self.session_state['pages_visited'] += 1
        self.session_state['last_action'] = time.time()
        
        return interactions

# 反反爬策略整合器
class AntiAntiCrawlManager:
    """反反爬策略管理器"""
    
    def __init__(self, ip_manager: IPRotationManager, freq_controller: AdaptiveFrequencyController):
        self.ip_manager = ip_manager
        self.freq_controller = freq_controller
        self.behavior_simulator = HumanBehaviorSimulator()
        self.behavior_pattern = BehaviorPattern()
    
    async def make_safe_request(self, url: str, **kwargs) -> Optional[Dict]:
        """安全请求方法"""
        max_retries = 3
        
        for attempt in range(max_retries):
            try:
                # 获取代理
                proxy = self.ip_manager.get_proxy()
                if not proxy:
                    print("❌ 没有可用的代理")
                    return None
                
                # 检查频率限制
                can_request, wait_time = await self.freq_controller.can_request(proxy.ip)
                if not can_request:
                    print(f"⏳ 需要等待 {wait_time:.1f} 秒")
                    await asyncio.sleep(wait_time)
                    continue
                
                # 模拟思考时间
                think_time = self.behavior_simulator.calculate_think_time(self.behavior_pattern)
                await asyncio.sleep(think_time)
                
                # 生成随机请求头
                headers = self.behavior_simulator.get_random_headers()
                
                # 发送请求
                start_time = time.time()
                async with aiohttp.ClientSession() as session:
                    async with session.get(
                        url,
                        proxy=proxy.proxy_url,
                        headers=headers,
                        timeout=aiohttp.ClientTimeout(total=30),
                        **kwargs
                    ) as response:
                        response_time = (time.time() - start_time) * 1000
                        
                        if response.status == 200:
                            # 成功
                            self.ip_manager.report_success(proxy, response_time)
                            self.freq_controller.record_request(proxy.ip, True, response_time)
                            
                            # 模拟页面交互
                            await self.behavior_simulator.simulate_page_interaction(self.behavior_pattern)
                            
                            content = await response.text()
                            return {
                                'status': response.status,
                                'content': content,
                                'response_time': response_time,
                                'proxy_used': f"{proxy.ip}:{proxy.port}"
                            }
                        else:
                            # 请求失败
                            self.ip_manager.report_failure(proxy, f"HTTP_{response.status}")
                            self.freq_controller.record_request(proxy.ip, False)
                
            except Exception as e:
                if proxy:
                    self.ip_manager.report_failure(proxy, str(e))
                    self.freq_controller.record_request(proxy.ip, False)
                
                print(f"❌ 请求失败 (尝试 {attempt + 1}): {e}")
                
                if attempt < max_retries - 1:
                    await asyncio.sleep(2 ** attempt)  # 指数退避
        
        return None

🚀 性能优化和最佳实践

关键优化策略

  1. 预加载和缓存:提前加载高质量代理
  2. 连接池复用:避免频繁建立连接
  3. 异步并发:提高处理效率
  4. 智能重试:指数退避策略
  5. 监控告警:及时发现问题
# 性能监控示例
class PerformanceMonitor:
    """性能监控器"""
    
    def __init__(self):
        self.metrics = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'avg_response_time': 0,
            'ip_rotation_count': 0,
            'start_time': time.time()
        }
    
    def record_request(self, success: bool, response_time: float = 0, ip_rotated: bool = False):
        """记录请求指标"""
        self.metrics['total_requests'] += 1
        
        if success:
            self.metrics['successful_requests'] += 1
            
            # 更新平均响应时间
            if response_time > 0:
                total_time = self.metrics['avg_response_time'] * (self.metrics['successful_requests'] - 1)
                self.metrics['avg_response_time'] = (total_time + response_time) / self.metrics['successful_requests']
        else:
            self.metrics['failed_requests'] += 1
        
        if ip_rotated:
            self.metrics['ip_rotation_count'] += 1
    
    def get_performance_report(self) -> Dict:
        """获取性能报告"""
        runtime = time.time() - self.metrics['start_time']
        success_rate = self.metrics['successful_requests'] / max(1, self.metrics['total_requests'])
        requests_per_minute = (self.metrics['total_requests'] / runtime) * 60
        
        return {
            'runtime_seconds': runtime,
            'total_requests': self.metrics['total_requests'],
            'success_rate': success_rate,
            'avg_response_time': self.metrics['avg_response_time'],
            'requests_per_minute': requests_per_minute,
            'ip_rotations': self.metrics['ip_rotation_count'],
            'efficiency_score': success_rate * min(1.0, requests_per_minute / 60)  # 效率评分
        }

📋 总结

通过本文的学习,我们掌握了动态IP轮换的核心技术:

核心技术要点

  1. 多策略IP轮换:随机、轮询、加权、最少使用等
  2. 自适应频率控制:根据成功率动态调整请求频率
  3. 分布式IP管理:多服务器协调IP使用
  4. 行为模拟技术:模拟真实用户行为模式

实战应用建议

  • 🎯 策略组合:根据场景选择合适的轮换策略
  • ⚖️ 平衡取舍:在效率和隐蔽性之间找到平衡点
  • 📊 监控调优:持续监控性能指标,及时调整策略
  • 🔒 合规使用:遵守robots.txt和相关法律法规

避免常见陷阱

  • 不要过度频繁切换IP
  • 避免使用低质量代理
  • 注意模拟真实的用户行为
  • 建立完善的监控和告警机制

动态IP轮换技术是现代网络爬虫的核心技能,但请记住:技术是中性的,关键在于如何合理合法地使用它们。希望这篇文章能帮助你构建更加智能和稳定的爬虫系统!

🎉 温馨提示:在使用这些技术时,请务必遵守网站的robots.txt协议和相关法律法规,做一个有道德的程序员!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐