AI驱动的智能爬虫架构与应用

作为一名热爱编程的技术爱好者,我一直在探索如何将人工智能技术融入到传统的网络爬虫中,让爬虫变得更加智能、高效和人性化。今天,我想和大家分享我在AI驱动智能爬虫领域的探索心得和实践经验。

引言:传统爬虫的局限性与AI的机遇

在互联网信息爆炸的时代,传统爬虫面临着诸多挑战:反爬虫机制日益复杂、网页结构变化频繁、数据质量参差不齐等。作为一名程序员,我深刻体会到传统爬虫的局限性,同时也看到了AI技术带来的巨大机遇。

AI驱动的智能爬虫不仅能够自动适应网页结构变化,还能智能识别反爬虫策略,甚至能够理解网页内容的语义,实现真正的"智能爬取"。但是面临网站不断更新迭代的防御机制也让我认识到AI并不是万能的,本文将深入探讨AI智能爬虫的架构设计、核心算法和实际应用。

第一部分:AI智能爬虫的架构设计

1.1 整体架构概览

AI智能爬虫采用分层架构设计,每一层都有明确的职责和边界。让我用代码来展示这个架构:

# AI智能爬虫核心架构
class AISmartCrawler:
    def __init__(self):
        # 核心组件初始化
        self.nlp_engine = NLPEngine()           # 自然语言处理引擎
        self.vision_engine = VisionEngine()     # 计算机视觉引擎
        self.behavior_engine = BehaviorEngine() # 智能行为引擎
        self.adaptation_engine = AdaptationEngine() # 自适应引擎
        
    def crawl(self, target_url, strategy='intelligent'):
        """智能爬取主流程"""
        try:
            # 1. 智能页面分析
            page_info = self.analyze_page(target_url)
            
            # 2. 动态策略选择
            if strategy == 'intelligent':
                strategy = self.select_strategy(page_info)
            
            # 3. 执行爬取策略
            result = self.execute_strategy(target_url, strategy, page_info)
            
            # 4. 智能数据提取
            extracted_data = self.extract_data_intelligently(result)
            
            # 5. 质量评估与优化
            quality_score = self.evaluate_quality(extracted_data)
            
            return {
                'data': extracted_data,
                'quality_score': quality_score,
                'strategy_used': strategy,
                'metadata': page_info
            }
            
        except Exception as e:
            self.handle_error(e)
            return None

1.2 核心组件详解

1.2.1 自然语言处理引擎

NLP引擎是AI爬虫的"大脑",负责理解网页内容的语义和结构:

import spacy
from transformers import pipeline
import re

class NLPEngine:
    def __init__(self):
        # 加载预训练模型
        self.nlp = spacy.load("zh_core_web_sm")
        self.sentiment_analyzer = pipeline("sentiment-analysis", model="uer/roberta-base-finetuned-jd-binary-chinese")
        self.text_classifier = pipeline("text-classification", model="uer/roberta-base-chinese-cluener")
        
    def analyze_content(self, html_content):
        """智能分析网页内容"""
        # 提取纯文本
        text_content = self.extract_text(html_content)
        
        # 实体识别
        entities = self.extract_entities(text_content)
        
        # 情感分析
        sentiment = self.analyze_sentiment(text_content)
        
        # 主题分类
        topic = self.classify_topic(text_content)
        
        return {
            'text': text_content,
            'entities': entities,
            'sentiment': sentiment,
            'topic': topic,
            'structure': self.analyze_structure(html_content)
        }
    
    def extract_entities(self, text):
        """提取命名实体"""
        doc = self.nlp(text)
        entities = {}
        
        for ent in doc.ents:
            if ent.label_ not in entities:
                entities[ent.label_] = []
            entities[ent.label_].append(ent.text)
            
        return entities
    
    def analyze_structure(self, html_content):
        """分析HTML结构特征"""
        # 使用正则表达式分析HTML结构
        structure_info = {
            'title_tags': len(re.findall(r'<title[^>]*>(.*?)</title>', html_content, re.I)),
            'heading_tags': len(re.findall(r'<h[1-6][^>]*>(.*?)</h[1-6]>', html_content, re.I)),
            'link_density': len(re.findall(r'<a[^>]*>', html_content)) / max(len(html_content), 1),
            'form_count': len(re.findall(r'<form[^>]*>', html_content)),
            'script_count': len(re.findall(r'<script[^>]*>', html_content))
        }
        
        return structure_info
1.2.2 计算机视觉引擎

视觉引擎让爬虫能够"看到"网页,理解视觉元素和布局:

import cv2
import numpy as np
from PIL import Image
import pytesseract
from selenium import webdriver
from selenium.webdriver.chrome.options import Options

class VisionEngine:
    def __init__(self):
        self.driver = self.setup_driver()
        
    def setup_driver(self):
        """设置无头浏览器"""
        chrome_options = Options()
        chrome_options.add_argument("--headless")
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-dev-shm-usage")
        return webdriver.Chrome(options=chrome_options)
    
    def capture_page_screenshot(self, url):
        """捕获页面截图"""
        try:
            self.driver.get(url)
            # 等待页面加载
            self.driver.implicitly_wait(5)
            
            # 获取页面尺寸
            page_width = self.driver.execute_script("return document.body.scrollWidth")
            page_height = self.driver.execute_script("return document.body.scrollHeight")
            
            # 设置窗口大小
            self.driver.set_window_size(page_width, page_height)
            
            # 截图
            screenshot = self.driver.get_screenshot_as_png()
            return Image.open(io.BytesIO(screenshot))
            
        except Exception as e:
            print(f"截图失败: {e}")
            return None
    
    def analyze_layout(self, screenshot):
        """分析页面布局"""
        # 转换为OpenCV格式
        img = cv2.cvtColor(np.array(screenshot), cv2.COLOR_RGB2BGR)
        
        # 边缘检测
        edges = cv2.Canny(img, 50, 150)
        
        # 轮廓检测
        contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        # 分析布局结构
        layout_info = {
            'content_blocks': len(contours),
            'text_regions': self.detect_text_regions(img),
            'image_regions': self.detect_image_regions(img),
            'navigation_elements': self.detect_navigation(img)
        }
        
        return layout_info
    
    def detect_text_regions(self, img):
        """检测文本区域"""
        # 使用Tesseract进行OCR
        text_regions = []
        
        # 转换为灰度图
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        
        # 文本检测
        try:
            text = pytesseract.image_to_data(gray, output_type=pytesseract.Output.DICT)
            
            for i, conf in enumerate(text['conf']):
                if conf > 60:  # 置信度阈值
                    x, y, w, h = text['left'][i], text['top'][i], text['width'][i], text['height'][i]
                    text_regions.append({
                        'bbox': (x, y, w, h),
                        'text': text['text'][i],
                        'confidence': conf
                    })
        except:
            pass
            
        return text_regions
1.2.3 智能行为引擎

行为引擎模拟人类浏览行为,避免被反爬虫系统检测:

import random
import time
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

class BehaviorEngine:
    def __init__(self):
        self.behavior_patterns = self.load_behavior_patterns()
        
    def load_behavior_patterns(self):
        """加载行为模式库"""
        return {
            'human_like': {
                'scroll_patterns': ['smooth', 'random', 'bounce'],
                'click_patterns': ['direct', 'hover_first', 'double_click'],
                'typing_patterns': ['natural', 'fast', 'slow'],
                'wait_patterns': ['random', 'fixed', 'adaptive']
            },
            'bot_like': {
                'scroll_patterns': ['instant'],
                'click_patterns': ['instant'],
                'typing_patterns': ['instant'],
                'wait_patterns': ['minimal']
            }
        }
    
    def simulate_human_behavior(self, driver, behavior_type='human_like'):
        """模拟人类行为"""
        pattern = self.behavior_patterns[behavior_type]
        
        # 随机滚动
        self.simulate_scrolling(driver, pattern['scroll_patterns'])
        
        # 随机等待
        self.simulate_waiting(pattern['wait_patterns'])
        
        # 鼠标移动
        self.simulate_mouse_movement(driver)
        
    def simulate_scrolling(self, driver, scroll_patterns):
        """模拟滚动行为"""
        pattern = random.choice(scroll_patterns)
        
        if pattern == 'smooth':
            # 平滑滚动
            total_height = driver.execute_script("return document.body.scrollHeight")
            current_position = 0
            step = 100
            
            while current_position < total_height:
                driver.execute_script(f"window.scrollTo(0, {current_position})")
                current_position += step
                time.sleep(random.uniform(0.1, 0.3))
                
        elif pattern == 'random':
            # 随机滚动
            for _ in range(random.randint(3, 8)):
                scroll_y = random.randint(100, 800)
                driver.execute_script(f"window.scrollBy(0, {scroll_y})")
                time.sleep(random.uniform(0.5, 2.0))
                
        elif pattern == 'bounce':
            # 弹跳式滚动
            positions = [100, 300, 200, 500, 400, 700, 600]
            for pos in positions:
                driver.execute_script(f"window.scrollTo(0, {pos})")
                time.sleep(random.uniform(0.3, 1.0))
    
    def simulate_mouse_movement(self, driver):
        """模拟鼠标移动"""
        try:
            # 随机选择页面元素
            elements = driver.find_elements(By.TAG_NAME, "a")[:10]
            if elements:
                element = random.choice(elements)
                
                # 创建动作链
                actions = ActionChains(driver)
                
                # 移动到元素
                actions.move_to_element(element)
                actions.perform()
                
                # 随机等待
                time.sleep(random.uniform(0.5, 2.0))
                
        except Exception as e:
            print(f"鼠标移动模拟失败: {e}")
    
    def simulate_waiting(self, wait_patterns):
        """模拟等待行为"""
        pattern = random.choice(wait_patterns)
        
        if pattern == 'random':
            wait_time = random.uniform(1.0, 5.0)
        elif pattern == 'fixed':
            wait_time = 2.0
        elif pattern == 'adaptive':
            wait_time = random.uniform(0.5, 3.0)
        else:
            wait_time = 1.0
            
        time.sleep(wait_time)

第二部分:核心算法与智能策略

2.1 自适应爬取策略

AI爬虫能够根据目标网站的特征自动选择最适合的爬取策略:

class AdaptiveStrategySelector:
    def __init__(self):
        self.strategy_weights = {
            'selenium': 0.3,
            'requests': 0.4,
            'playwright': 0.2,
            'scrapy': 0.1
        }
        
    def select_strategy(self, page_info, historical_data=None):
        """智能选择爬取策略"""
        # 计算策略得分
        strategy_scores = {}
        
        # Selenium策略得分
        strategy_scores['selenium'] = self.calculate_selenium_score(page_info)
        
        # Requests策略得分
        strategy_scores['requests'] = self.calculate_requests_score(page_info)
        
        # Playwright策略得分
        strategy_scores['playwright'] = self.calculate_playwright_score(page_info)
        
        # Scrapy策略得分
        strategy_scores['scrapy'] = self.calculate_scrapy_score(page_info)
        
        # 结合历史数据调整权重
        if historical_data:
            strategy_scores = self.adjust_with_history(strategy_scores, historical_data)
        
        # 选择得分最高的策略
        best_strategy = max(strategy_scores, key=strategy_scores.get)
        
        return best_strategy, strategy_scores
    
    def calculate_selenium_score(self, page_info):
        """计算Selenium策略得分"""
        score = 0
        
        # JavaScript依赖
        if page_info.get('javascript_heavy', False):
            score += 30
            
        # 动态内容
        if page_info.get('dynamic_content', False):
            score += 25
            
        # 反爬虫检测
        if page_info.get('anti_crawler', False):
            score += 20
            
        # 页面复杂度
        complexity = page_info.get('complexity', 0)
        score += min(complexity * 2, 25)
        
        return score
    
    def calculate_requests_score(self, page_info):
        """计算Requests策略得分"""
        score = 0
        
        # 静态内容
        if page_info.get('static_content', True):
            score += 30
            
        # API接口
        if page_info.get('has_api', False):
            score += 25
            
        # 简单结构
        if page_info.get('complexity', 0) < 5:
            score += 25
            
        # 无反爬虫
        if not page_info.get('anti_crawler', False):
            score += 20
            
        return score

2.2 智能反爬虫对抗

AI爬虫能够学习和适应各种反爬虫策略:

class AntiCrawlerDetector:
    def __init__(self):
        self.detection_patterns = self.load_detection_patterns()
        self.evasion_strategies = self.load_evasion_strategies()
        
    def load_detection_patterns(self):
        """加载检测模式"""
        return {
            'behavior_patterns': [
                'too_fast',
                'too_regular',
                'no_mouse_movement',
                'no_scroll_behavior'
            ],
            'technical_patterns': [
                'user_agent_suspicious',
                'ip_frequency',
                'request_headers',
                'javascript_execution'
            ],
            'content_patterns': [
                'honeypot_links',
                'hidden_fields',
                'javascript_challenges',
                'captcha_verification'
            ]
        }
    
    def detect_anti_crawler(self, response, page_content):
        """检测反爬虫机制"""
        detection_results = {
            'behavior_detected': False,
            'technical_detected': False,
            'content_detected': False,
            'risk_level': 'low',
            'recommendations': []
        }
        
        # 检测行为模式
        behavior_detected = self.detect_behavior_patterns(response)
        if behavior_detected:
            detection_results['behavior_detected'] = True
            detection_results['recommendations'].append('调整请求频率和行为模式')
        
        # 检测技术模式
        technical_detected = self.detect_technical_patterns(response)
        if technical_detected:
            detection_results['technical_detected'] = True
            detection_results['recommendations'].append('优化请求头和用户代理')
        
        # 检测内容模式
        content_detected = self.detect_content_patterns(page_content)
        if content_detected:
            detection_results['content_detected'] = True
            detection_results['recommendations'].append('处理验证码和JavaScript挑战')
        
        # 计算风险等级
        detection_results['risk_level'] = self.calculate_risk_level(detection_results)
        
        return detection_results
    
    def detect_behavior_patterns(self, response):
        """检测行为模式"""
        suspicious_patterns = []
        
        # 检查响应时间异常
        if response.elapsed.total_seconds() < 0.1:
            suspicious_patterns.append('response_too_fast')
        
        # 检查状态码异常
        if response.status_code in [403, 429, 503]:
            suspicious_patterns.append('status_code_suspicious')
        
        # 检查响应头异常
        headers = response.headers
        if 'cf-ray' in headers or 'x-powered-by' in headers:
            suspicious_patterns.append('cloudflare_detected')
        
        return len(suspicious_patterns) > 0
    
    def detect_content_patterns(self, page_content):
        """检测内容模式"""
        suspicious_patterns = []
        
        # 检测蜜罐链接
        honeypot_patterns = [
            'style="display:none"',
            'class="hidden"',
            'visibility:hidden'
        ]
        
        for pattern in honeypot_patterns:
            if pattern in page_content:
                suspicious_patterns.append('honeypot_detected')
                break
        
        # 检测验证码
        captcha_patterns = [
            'captcha',
            'recaptcha',
            '验证码',
            '请输入验证码'
        ]
        
        for pattern in captcha_patterns:
            if pattern.lower() in page_content.lower():
                suspicious_patterns.append('captcha_detected')
                break
        
        # 检测JavaScript挑战
        js_challenge_patterns = [
            'javascript:void(0)',
            'onclick="return false"',
            'function challenge()'
        ]
        
        for pattern in js_challenge_patterns:
            if pattern in page_content:
                suspicious_patterns.append('js_challenge_detected')
                break
        
        return len(suspicious_patterns) > 0
    
    def calculate_risk_level(self, detection_results):
        """计算风险等级"""
        risk_score = 0
        
        if detection_results['behavior_detected']:
            risk_score += 30
        if detection_results['technical_detected']:
            risk_score += 40
        if detection_results['content_detected']:
            risk_score += 30
        
        if risk_score >= 80:
            return 'high'
        elif risk_score >= 50:
            return 'medium'
        else:
            return 'low'

2.3 智能数据提取

AI爬虫能够理解网页结构,智能提取所需数据:

class IntelligentDataExtractor:
    def __init__(self):
        self.extraction_models = self.load_extraction_models()
        self.schema_validator = SchemaValidator()
        
    def load_extraction_models(self):
        """加载数据提取模型"""
        return {
            'product_info': ProductExtractionModel(),
            'article_content': ArticleExtractionModel(),
            'ecommerce_data': EcommerceExtractionModel(),
            'social_media': SocialMediaExtractionModel()
        }
    
    def extract_data_intelligently(self, page_content, target_schema):
        """智能数据提取"""
        # 识别页面类型
        page_type = self.classify_page_type(page_content)
        
        # 选择对应的提取模型
        if page_type in self.extraction_models:
            extractor = self.extraction_models[page_type]
            extracted_data = extractor.extract(page_content, target_schema)
        else:
            # 使用通用提取器
            extracted_data = self.generic_extractor.extract(page_content, target_schema)
        
        # 数据清洗和验证
        cleaned_data = self.clean_and_validate_data(extracted_data, target_schema)
        
        return cleaned_data
    
    def classify_page_type(self, page_content):
        """分类页面类型"""
        # 使用机器学习模型分类
        features = self.extract_page_features(page_content)
        
        # 简单的规则分类(实际项目中应使用训练好的模型)
        if 'product' in page_content.lower() or 'price' in page_content.lower():
            return 'product_info'
        elif 'article' in page_content.lower() or 'content' in page_content.lower():
            return 'article_content'
        elif 'shop' in page_content.lower() or 'cart' in page_content.lower():
            return 'ecommerce_data'
        elif 'social' in page_content.lower() or 'share' in page_content.lower():
            return 'social_media'
        else:
            return 'generic'
    
    def extract_page_features(self, page_content):
        """提取页面特征"""
        features = {
            'text_length': len(page_content),
            'link_count': page_content.count('<a'),
            'image_count': page_content.count('<img'),
            'form_count': page_content.count('<form'),
            'script_count': page_content.count('<script'),
            'style_count': page_content.count('<style'),
            'div_count': page_content.count('<div'),
            'span_count': page_content.count('<span')
        }
        
        return features

class ProductExtractionModel:
    """产品信息提取模型"""
    
    def extract(self, page_content, target_schema):
        """提取产品信息"""
        extracted_data = {}
        
        # 提取产品名称
        product_name = self.extract_product_name(page_content)
        if product_name:
            extracted_data['product_name'] = product_name
        
        # 提取价格信息
        price_info = self.extract_price_info(page_content)
        if price_info:
            extracted_data['price'] = price_info
        
        # 提取产品描述
        description = self.extract_description(page_content)
        if description:
            extracted_data['description'] = description
        
        # 提取产品图片
        images = self.extract_product_images(page_content)
        if images:
            extracted_data['images'] = images
        
        # 提取规格参数
        specifications = self.extract_specifications(page_content)
        if specifications:
            extracted_data['specifications'] = specifications
        
        return extracted_data
    
    def extract_product_name(self, page_content):
        """提取产品名称"""
        # 使用多种策略提取产品名称
        strategies = [
            self.extract_by_h1_tag,
            self.extract_by_title_tag,
            self.extract_by_class_name,
            self.extract_by_id_name
        ]
        
        for strategy in strategies:
            result = strategy(page_content)
            if result:
                return result
        
        return None
    
    def extract_by_h1_tag(self, page_content):
        """通过H1标签提取"""
        import re
        h1_pattern = r'<h1[^>]*>(.*?)</h1>'
        matches = re.findall(h1_pattern, page_content, re.I)
        
        if matches:
            # 清理HTML标签
            clean_text = re.sub(r'<[^>]+>', '', matches[0])
            return clean_text.strip()
        
        return None
    
    def extract_price_info(self, page_content):
        """提取价格信息"""
        import re
        
        # 价格模式
        price_patterns = [
            r'¥\s*(\d+(?:\.\d{2})?)',  # 人民币
            r'$\s*(\d+(?:\.\d{2})?)',   # 美元
            r'(\d+(?:\.\d{2})?)\s*元',  # 中文元
            r'价格[::]\s*(\d+(?:\.\d{2})?)',  # 中文价格
            r'Price[::]\s*(\d+(?:\.\d{2})?)'  # 英文价格
        ]
        
        for pattern in price_patterns:
            matches = re.findall(pattern, page_content)
            if matches:
                try:
                    price = float(matches[0])
                    return {
                        'amount': price,
                        'currency': self.detect_currency(pattern),
                        'original_text': matches[0]
                    }
                except ValueError:
                    continue
        
        return None
    
    def detect_currency(self, pattern):
        """检测货币类型"""
        if '¥' in pattern or '元' in pattern:
            return 'CNY'
        elif '$' in pattern:
            return 'USD'
        else:
            return 'Unknown'

第三部分:实际应用与性能优化

3.1 分布式爬虫架构

为了处理大规模爬取任务,我们需要设计分布式架构:

import redis
import json
import hashlib
from celery import Celery
from datetime import datetime, timedelta

class DistributedCrawlerManager:
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        self.celery_app = Celery('crawler_tasks', broker='redis://localhost:6379/0')
        self.task_queue = 'crawler_queue'
        
    def distribute_task(self, task_config):
        """分发爬取任务"""
        # 生成任务ID
        task_id = self.generate_task_id(task_config)
        
        # 检查任务是否已存在
        if self.redis_client.exists(f"task:{task_id}"):
            return {"status": "exists", "task_id": task_id}
        
        # 任务去重
        if self.is_duplicate_task(task_config):
            return {"status": "duplicate", "message": "任务已存在"}
        
        # 任务优先级计算
        priority = self.calculate_priority(task_config)
        
        # 分发到合适的worker
        worker_id = self.select_worker(task_config)
        
        # 创建任务
        task_info = {
            'id': task_id,
            'config': task_config,
            'priority': priority,
            'worker_id': worker_id,
            'status': 'pending',
            'created_at': datetime.now().isoformat(),
            'estimated_duration': self.estimate_duration(task_config)
        }
        
        # 存储任务信息
        self.redis_client.hset(f"task:{task_id}", mapping=task_info)
        
        # 发送到Celery队列
        task = self.celery_app.send_task(
            'crawler_tasks.execute_crawl',
            args=[task_config],
            kwargs={'task_id': task_id},
            queue=self.task_queue,
            priority=priority
        )
        
        return {
            "status": "distributed",
            "task_id": task_id,
            "celery_task_id": task.id
        }
    
    def generate_task_id(self, task_config):
        """生成任务ID"""
        # 基于配置内容生成唯一ID
        config_str = json.dumps(task_config, sort_keys=True)
        return hashlib.md5(config_str.encode()).hexdigest()
    
    def is_duplicate_task(self, task_config):
        """检查任务是否重复"""
        # 检查URL是否在最近的任务中
        url = task_config.get('url')
        if not url:
            return False
        
        # 检查最近24小时内的任务
        yesterday = datetime.now() - timedelta(hours=24)
        
        # 获取所有任务
        task_keys = self.redis_client.keys("task:*")
        
        for key in task_keys:
            task_data = self.redis_client.hgetall(key)
            if task_data.get(b'config'):
                try:
                    config = json.loads(task_data[b'config'].decode())
                    if config.get('url') == url:
                        created_at = datetime.fromisoformat(task_data[b'created_at'].decode())
                        if created_at > yesterday:
                            return True
                except:
                    continue
        
        return False
    
    def calculate_priority(self, task_config):
        """计算任务优先级"""
        priority = 5  # 默认优先级
        
        # URL重要性
        url_importance = task_config.get('importance', 'normal')
        if url_importance == 'high':
            priority += 3
        elif url_importance == 'low':
            priority -= 2
        
        # 任务类型
        task_type = task_config.get('type', 'general')
        if task_type == 'urgent':
            priority += 4
        elif task_type == 'scheduled':
            priority -= 1
        
        # 资源需求
        resource_demand = task_config.get('resource_demand', 'normal')
        if resource_demand == 'high':
            priority -= 1  # 高资源需求降低优先级
        
        # 时间约束
        deadline = task_config.get('deadline')
        if deadline:
            time_left = datetime.fromisoformat(deadline) - datetime.now()
            if time_left.total_seconds() < 3600:  # 1小时内
                priority += 5
            elif time_left.total_seconds() < 86400:  # 24小时内
                priority += 2
        
        return max(1, min(10, priority))  # 限制在1-10范围内
    
    def select_worker(self, task_config):
        """选择合适的工作节点"""
        # 获取可用worker列表
        available_workers = self.get_available_workers()
        
        if not available_workers:
            return None
        
        # 根据任务需求选择worker
        best_worker = None
        best_score = -1
        
        for worker in available_workers:
            score = self.calculate_worker_score(worker, task_config)
            if score > best_score:
                best_score = score
                best_worker = worker
        
        return best_worker
    
    def get_available_workers(self):
        """获取可用worker列表"""
        worker_keys = self.redis_client.keys("worker:*")
        available_workers = []
        
        for key in worker_keys:
            worker_data = self.redis_client.hgetall(key)
            if worker_data.get(b'status') == b'available':
                available_workers.append({
                    'id': worker_data[b'id'].decode(),
                    'capabilities': json.loads(worker_data[b'capabilities'].decode()),
                    'current_load': int(worker_data[b'current_load']),
                    'max_load': int(worker_data[b'max_load'])
                })
        
        return available_workers
    
    def calculate_worker_score(self, worker, task_config):
        """计算worker适合度得分"""
        score = 0
        
        # 负载能力
        load_ratio = worker['current_load'] / worker['max_load']
        if load_ratio < 0.5:
            score += 30
        elif load_ratio < 0.8:
            score += 20
        else:
            score += 10
        
        # 能力匹配
        required_capabilities = task_config.get('required_capabilities', [])
        worker_capabilities = worker['capabilities']
        
        for capability in required_capabilities:
            if capability in worker_capabilities:
                score += 20
        
        # 地理位置(如果相关)
        if 'location' in task_config and 'location' in worker:
            if task_config['location'] == worker['location']:
                score += 15
        
        return score

3.2 性能监控与优化

实时监控爬虫性能,自动优化参数:

import psutil
import time
from collections import deque
import threading

class PerformanceMonitor:
    def __init__(self):
        self.metrics = {
            'requests_per_second': deque(maxlen=100),
            'success_rate': deque(maxlen=100),
            'response_time': deque(maxlen=100),
            'memory_usage': deque(maxlen=100),
            'cpu_usage': deque(maxlen=100),
            'error_rate': deque(maxlen=100)
        }
        
        self.optimization_rules = self.load_optimization_rules()
        self.monitoring_thread = None
        self.is_monitoring = False
        
    def start_monitoring(self):
        """开始性能监控"""
        if not self.is_monitoring:
            self.is_monitoring = True
            self.monitoring_thread = threading.Thread(target=self._monitor_loop)
            self.monitoring_thread.daemon = True
            self.monitoring_thread.start()
    
    def stop_monitoring(self):
        """停止性能监控"""
        self.is_monitoring = False
        if self.monitoring_thread:
            self.monitoring_thread.join()
    
    def _monitor_loop(self):
        """监控循环"""
        while self.is_monitoring:
            try:
                # 收集系统指标
                self.collect_system_metrics()
                
                # 分析性能
                self.analyze_performance()
                
                # 执行优化
                self.execute_optimizations()
                
                # 等待下次监控
                time.sleep(5)
                
            except Exception as e:
                print(f"监控错误: {e}")
                time.sleep(10)
    
    def collect_system_metrics(self):
        """收集系统指标"""
        # CPU使用率
        cpu_percent = psutil.cpu_percent(interval=1)
        self.metrics['cpu_usage'].append({
            'timestamp': time.time(),
            'value': cpu_percent
        })
        
        # 内存使用率
        memory = psutil.virtual_memory()
        self.metrics['memory_usage'].append({
            'timestamp': time.time(),
            'value': memory.percent
        })
        
        # 网络统计
        network = psutil.net_io_counters()
        
        # 磁盘I/O
        disk = psutil.disk_io_counters()
    
    def analyze_performance(self):
        """分析性能指标"""
        current_time = time.time()
        
        # 计算平均响应时间
        if self.metrics['response_time']:
            avg_response_time = sum(item['value'] for item in self.metrics['response_time']) / len(self.metrics['response_time'])
            
            # 响应时间趋势分析
            recent_times = [item['value'] for item in list(self.metrics['response_time'])[-10:]]
            if len(recent_times) >= 2:
                trend = (recent_times[-1] - recent_times[0]) / len(recent_times)
                
                if trend > 0.1:  # 响应时间增加
                    self.trigger_optimization('response_time_increasing', {
                        'current_avg': avg_response_time,
                        'trend': trend
                    })
        
        # 成功率分析
        if self.metrics['success_rate']:
            recent_success_rate = [item['value'] for item in list(self.metrics['success_rate'])[-20:]]
            avg_success_rate = sum(recent_success_rate) / len(recent_success_rate)
            
            if avg_success_rate < 0.8:  # 成功率低于80%
                self.trigger_optimization('low_success_rate', {
                    'current_rate': avg_success_rate,
                    'threshold': 0.8
                })
        
        # 错误率分析
        if self.metrics['error_rate']:
            recent_error_rate = [item['value'] for item in list(self.metrics['error_rate'])[-20:]]
            avg_error_rate = sum(recent_error_rate) / len(recent_error_rate)
            
            if avg_error_rate > 0.2:  # 错误率高于20%
                self.trigger_optimization('high_error_rate', {
                    'current_rate': avg_error_rate,
                    'threshold': 0.2
                })
    
    def trigger_optimization(self, issue_type, context):
        """触发优化"""
        print(f"检测到性能问题: {issue_type}, 上下文: {context}")
        
        # 查找对应的优化规则
        if issue_type in self.optimization_rules:
            rule = self.optimization_rules[issue_type]
            
            # 检查是否满足执行条件
            if self.should_execute_optimization(rule, context):
                self.execute_optimization(rule, context)
    
    def should_execute_optimization(self, rule, context):
        """检查是否应该执行优化"""
        # 检查冷却时间
        last_execution = rule.get('last_execution', 0)
        current_time = time.time()
        
        if current_time - last_execution < rule.get('cooldown', 300):  # 默认5分钟冷却
            return False
        
        # 检查触发条件
        conditions = rule.get('conditions', [])
        for condition in conditions:
            if not self.evaluate_condition(condition, context):
                return False
        
        return True
    
    def execute_optimization(self, rule, context):
        """执行优化"""
        try:
            print(f"执行优化: {rule['name']}")
            
            # 执行优化动作
            actions = rule.get('actions', [])
            for action in actions:
                self.execute_action(action, context)
            
            # 更新执行时间
            rule['last_execution'] = time.time()
            
            # 记录优化日志
            self.log_optimization(rule, context)
            
        except Exception as e:
            print(f"优化执行失败: {e}")
    
    def execute_action(self, action, context):
        """执行具体优化动作"""
        action_type = action['type']
        
        if action_type == 'adjust_request_rate':
            self.adjust_request_rate(action['params'])
        elif action_type == 'change_user_agent':
            self.change_user_agent(action['params'])
        elif action_type == 'enable_proxy':
            self.enable_proxy(action['params'])
        elif action_type == 'adjust_timeout':
            self.adjust_timeout(action['params'])
        else:
            print(f"未知的优化动作类型: {action_type}")
    
    def adjust_request_rate(self, params):
        """调整请求频率"""
        new_rate = params.get('new_rate')
        if new_rate:
            # 这里应该通知爬虫引擎调整请求频率
            print(f"调整请求频率到: {new_rate} 请求/秒")
    
    def change_user_agent(self, params):
        """更换用户代理"""
        new_user_agent = params.get('user_agent')
        if new_user_agent:
            # 这里应该通知爬虫引擎更换用户代理
            print(f"更换用户代理到: {new_user_agent}")
    
    def enable_proxy(self, params):
        """启用代理"""
        proxy_config = params.get('proxy_config')
        if proxy_config:
            # 这里应该通知爬虫引擎启用代理
            print(f"启用代理: {proxy_config}")
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐