AI驱动的智能爬虫架构与应用
AI智能爬虫技术探索 本文探讨了AI驱动的智能爬虫架构设计与实现。传统爬虫面临反爬机制复杂、网页结构多变等挑战,而AI技术能显著提升爬虫的智能性和适应性。 文章主要包含两部分内容:1) AI智能爬虫的分层架构设计,包括NLP引擎、视觉引擎等核心组件;2) 关键实现技术,如自然语言处理(实体识别、情感分析)、计算机视觉(页面截图分析)等。作者通过Python代码展示了智能爬虫的核心架构和功能模块,包
AI驱动的智能爬虫架构与应用
作为一名热爱编程的技术爱好者,我一直在探索如何将人工智能技术融入到传统的网络爬虫中,让爬虫变得更加智能、高效和人性化。今天,我想和大家分享我在AI驱动智能爬虫领域的探索心得和实践经验。
引言:传统爬虫的局限性与AI的机遇
在互联网信息爆炸的时代,传统爬虫面临着诸多挑战:反爬虫机制日益复杂、网页结构变化频繁、数据质量参差不齐等。作为一名程序员,我深刻体会到传统爬虫的局限性,同时也看到了AI技术带来的巨大机遇。
AI驱动的智能爬虫不仅能够自动适应网页结构变化,还能智能识别反爬虫策略,甚至能够理解网页内容的语义,实现真正的"智能爬取"。但是面临网站不断更新迭代的防御机制也让我认识到AI并不是万能的,本文将深入探讨AI智能爬虫的架构设计、核心算法和实际应用。
第一部分:AI智能爬虫的架构设计
1.1 整体架构概览
AI智能爬虫采用分层架构设计,每一层都有明确的职责和边界。让我用代码来展示这个架构:
# AI智能爬虫核心架构
class AISmartCrawler:
def __init__(self):
# 核心组件初始化
self.nlp_engine = NLPEngine() # 自然语言处理引擎
self.vision_engine = VisionEngine() # 计算机视觉引擎
self.behavior_engine = BehaviorEngine() # 智能行为引擎
self.adaptation_engine = AdaptationEngine() # 自适应引擎
def crawl(self, target_url, strategy='intelligent'):
"""智能爬取主流程"""
try:
# 1. 智能页面分析
page_info = self.analyze_page(target_url)
# 2. 动态策略选择
if strategy == 'intelligent':
strategy = self.select_strategy(page_info)
# 3. 执行爬取策略
result = self.execute_strategy(target_url, strategy, page_info)
# 4. 智能数据提取
extracted_data = self.extract_data_intelligently(result)
# 5. 质量评估与优化
quality_score = self.evaluate_quality(extracted_data)
return {
'data': extracted_data,
'quality_score': quality_score,
'strategy_used': strategy,
'metadata': page_info
}
except Exception as e:
self.handle_error(e)
return None
1.2 核心组件详解
1.2.1 自然语言处理引擎
NLP引擎是AI爬虫的"大脑",负责理解网页内容的语义和结构:
import spacy
from transformers import pipeline
import re
class NLPEngine:
def __init__(self):
# 加载预训练模型
self.nlp = spacy.load("zh_core_web_sm")
self.sentiment_analyzer = pipeline("sentiment-analysis", model="uer/roberta-base-finetuned-jd-binary-chinese")
self.text_classifier = pipeline("text-classification", model="uer/roberta-base-chinese-cluener")
def analyze_content(self, html_content):
"""智能分析网页内容"""
# 提取纯文本
text_content = self.extract_text(html_content)
# 实体识别
entities = self.extract_entities(text_content)
# 情感分析
sentiment = self.analyze_sentiment(text_content)
# 主题分类
topic = self.classify_topic(text_content)
return {
'text': text_content,
'entities': entities,
'sentiment': sentiment,
'topic': topic,
'structure': self.analyze_structure(html_content)
}
def extract_entities(self, text):
"""提取命名实体"""
doc = self.nlp(text)
entities = {}
for ent in doc.ents:
if ent.label_ not in entities:
entities[ent.label_] = []
entities[ent.label_].append(ent.text)
return entities
def analyze_structure(self, html_content):
"""分析HTML结构特征"""
# 使用正则表达式分析HTML结构
structure_info = {
'title_tags': len(re.findall(r'<title[^>]*>(.*?)</title>', html_content, re.I)),
'heading_tags': len(re.findall(r'<h[1-6][^>]*>(.*?)</h[1-6]>', html_content, re.I)),
'link_density': len(re.findall(r'<a[^>]*>', html_content)) / max(len(html_content), 1),
'form_count': len(re.findall(r'<form[^>]*>', html_content)),
'script_count': len(re.findall(r'<script[^>]*>', html_content))
}
return structure_info
1.2.2 计算机视觉引擎
视觉引擎让爬虫能够"看到"网页,理解视觉元素和布局:
import cv2
import numpy as np
from PIL import Image
import pytesseract
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
class VisionEngine:
def __init__(self):
self.driver = self.setup_driver()
def setup_driver(self):
"""设置无头浏览器"""
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
return webdriver.Chrome(options=chrome_options)
def capture_page_screenshot(self, url):
"""捕获页面截图"""
try:
self.driver.get(url)
# 等待页面加载
self.driver.implicitly_wait(5)
# 获取页面尺寸
page_width = self.driver.execute_script("return document.body.scrollWidth")
page_height = self.driver.execute_script("return document.body.scrollHeight")
# 设置窗口大小
self.driver.set_window_size(page_width, page_height)
# 截图
screenshot = self.driver.get_screenshot_as_png()
return Image.open(io.BytesIO(screenshot))
except Exception as e:
print(f"截图失败: {e}")
return None
def analyze_layout(self, screenshot):
"""分析页面布局"""
# 转换为OpenCV格式
img = cv2.cvtColor(np.array(screenshot), cv2.COLOR_RGB2BGR)
# 边缘检测
edges = cv2.Canny(img, 50, 150)
# 轮廓检测
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 分析布局结构
layout_info = {
'content_blocks': len(contours),
'text_regions': self.detect_text_regions(img),
'image_regions': self.detect_image_regions(img),
'navigation_elements': self.detect_navigation(img)
}
return layout_info
def detect_text_regions(self, img):
"""检测文本区域"""
# 使用Tesseract进行OCR
text_regions = []
# 转换为灰度图
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 文本检测
try:
text = pytesseract.image_to_data(gray, output_type=pytesseract.Output.DICT)
for i, conf in enumerate(text['conf']):
if conf > 60: # 置信度阈值
x, y, w, h = text['left'][i], text['top'][i], text['width'][i], text['height'][i]
text_regions.append({
'bbox': (x, y, w, h),
'text': text['text'][i],
'confidence': conf
})
except:
pass
return text_regions
1.2.3 智能行为引擎
行为引擎模拟人类浏览行为,避免被反爬虫系统检测:
import random
import time
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class BehaviorEngine:
def __init__(self):
self.behavior_patterns = self.load_behavior_patterns()
def load_behavior_patterns(self):
"""加载行为模式库"""
return {
'human_like': {
'scroll_patterns': ['smooth', 'random', 'bounce'],
'click_patterns': ['direct', 'hover_first', 'double_click'],
'typing_patterns': ['natural', 'fast', 'slow'],
'wait_patterns': ['random', 'fixed', 'adaptive']
},
'bot_like': {
'scroll_patterns': ['instant'],
'click_patterns': ['instant'],
'typing_patterns': ['instant'],
'wait_patterns': ['minimal']
}
}
def simulate_human_behavior(self, driver, behavior_type='human_like'):
"""模拟人类行为"""
pattern = self.behavior_patterns[behavior_type]
# 随机滚动
self.simulate_scrolling(driver, pattern['scroll_patterns'])
# 随机等待
self.simulate_waiting(pattern['wait_patterns'])
# 鼠标移动
self.simulate_mouse_movement(driver)
def simulate_scrolling(self, driver, scroll_patterns):
"""模拟滚动行为"""
pattern = random.choice(scroll_patterns)
if pattern == 'smooth':
# 平滑滚动
total_height = driver.execute_script("return document.body.scrollHeight")
current_position = 0
step = 100
while current_position < total_height:
driver.execute_script(f"window.scrollTo(0, {current_position})")
current_position += step
time.sleep(random.uniform(0.1, 0.3))
elif pattern == 'random':
# 随机滚动
for _ in range(random.randint(3, 8)):
scroll_y = random.randint(100, 800)
driver.execute_script(f"window.scrollBy(0, {scroll_y})")
time.sleep(random.uniform(0.5, 2.0))
elif pattern == 'bounce':
# 弹跳式滚动
positions = [100, 300, 200, 500, 400, 700, 600]
for pos in positions:
driver.execute_script(f"window.scrollTo(0, {pos})")
time.sleep(random.uniform(0.3, 1.0))
def simulate_mouse_movement(self, driver):
"""模拟鼠标移动"""
try:
# 随机选择页面元素
elements = driver.find_elements(By.TAG_NAME, "a")[:10]
if elements:
element = random.choice(elements)
# 创建动作链
actions = ActionChains(driver)
# 移动到元素
actions.move_to_element(element)
actions.perform()
# 随机等待
time.sleep(random.uniform(0.5, 2.0))
except Exception as e:
print(f"鼠标移动模拟失败: {e}")
def simulate_waiting(self, wait_patterns):
"""模拟等待行为"""
pattern = random.choice(wait_patterns)
if pattern == 'random':
wait_time = random.uniform(1.0, 5.0)
elif pattern == 'fixed':
wait_time = 2.0
elif pattern == 'adaptive':
wait_time = random.uniform(0.5, 3.0)
else:
wait_time = 1.0
time.sleep(wait_time)
第二部分:核心算法与智能策略
2.1 自适应爬取策略
AI爬虫能够根据目标网站的特征自动选择最适合的爬取策略:
class AdaptiveStrategySelector:
def __init__(self):
self.strategy_weights = {
'selenium': 0.3,
'requests': 0.4,
'playwright': 0.2,
'scrapy': 0.1
}
def select_strategy(self, page_info, historical_data=None):
"""智能选择爬取策略"""
# 计算策略得分
strategy_scores = {}
# Selenium策略得分
strategy_scores['selenium'] = self.calculate_selenium_score(page_info)
# Requests策略得分
strategy_scores['requests'] = self.calculate_requests_score(page_info)
# Playwright策略得分
strategy_scores['playwright'] = self.calculate_playwright_score(page_info)
# Scrapy策略得分
strategy_scores['scrapy'] = self.calculate_scrapy_score(page_info)
# 结合历史数据调整权重
if historical_data:
strategy_scores = self.adjust_with_history(strategy_scores, historical_data)
# 选择得分最高的策略
best_strategy = max(strategy_scores, key=strategy_scores.get)
return best_strategy, strategy_scores
def calculate_selenium_score(self, page_info):
"""计算Selenium策略得分"""
score = 0
# JavaScript依赖
if page_info.get('javascript_heavy', False):
score += 30
# 动态内容
if page_info.get('dynamic_content', False):
score += 25
# 反爬虫检测
if page_info.get('anti_crawler', False):
score += 20
# 页面复杂度
complexity = page_info.get('complexity', 0)
score += min(complexity * 2, 25)
return score
def calculate_requests_score(self, page_info):
"""计算Requests策略得分"""
score = 0
# 静态内容
if page_info.get('static_content', True):
score += 30
# API接口
if page_info.get('has_api', False):
score += 25
# 简单结构
if page_info.get('complexity', 0) < 5:
score += 25
# 无反爬虫
if not page_info.get('anti_crawler', False):
score += 20
return score
2.2 智能反爬虫对抗
AI爬虫能够学习和适应各种反爬虫策略:
class AntiCrawlerDetector:
def __init__(self):
self.detection_patterns = self.load_detection_patterns()
self.evasion_strategies = self.load_evasion_strategies()
def load_detection_patterns(self):
"""加载检测模式"""
return {
'behavior_patterns': [
'too_fast',
'too_regular',
'no_mouse_movement',
'no_scroll_behavior'
],
'technical_patterns': [
'user_agent_suspicious',
'ip_frequency',
'request_headers',
'javascript_execution'
],
'content_patterns': [
'honeypot_links',
'hidden_fields',
'javascript_challenges',
'captcha_verification'
]
}
def detect_anti_crawler(self, response, page_content):
"""检测反爬虫机制"""
detection_results = {
'behavior_detected': False,
'technical_detected': False,
'content_detected': False,
'risk_level': 'low',
'recommendations': []
}
# 检测行为模式
behavior_detected = self.detect_behavior_patterns(response)
if behavior_detected:
detection_results['behavior_detected'] = True
detection_results['recommendations'].append('调整请求频率和行为模式')
# 检测技术模式
technical_detected = self.detect_technical_patterns(response)
if technical_detected:
detection_results['technical_detected'] = True
detection_results['recommendations'].append('优化请求头和用户代理')
# 检测内容模式
content_detected = self.detect_content_patterns(page_content)
if content_detected:
detection_results['content_detected'] = True
detection_results['recommendations'].append('处理验证码和JavaScript挑战')
# 计算风险等级
detection_results['risk_level'] = self.calculate_risk_level(detection_results)
return detection_results
def detect_behavior_patterns(self, response):
"""检测行为模式"""
suspicious_patterns = []
# 检查响应时间异常
if response.elapsed.total_seconds() < 0.1:
suspicious_patterns.append('response_too_fast')
# 检查状态码异常
if response.status_code in [403, 429, 503]:
suspicious_patterns.append('status_code_suspicious')
# 检查响应头异常
headers = response.headers
if 'cf-ray' in headers or 'x-powered-by' in headers:
suspicious_patterns.append('cloudflare_detected')
return len(suspicious_patterns) > 0
def detect_content_patterns(self, page_content):
"""检测内容模式"""
suspicious_patterns = []
# 检测蜜罐链接
honeypot_patterns = [
'style="display:none"',
'class="hidden"',
'visibility:hidden'
]
for pattern in honeypot_patterns:
if pattern in page_content:
suspicious_patterns.append('honeypot_detected')
break
# 检测验证码
captcha_patterns = [
'captcha',
'recaptcha',
'验证码',
'请输入验证码'
]
for pattern in captcha_patterns:
if pattern.lower() in page_content.lower():
suspicious_patterns.append('captcha_detected')
break
# 检测JavaScript挑战
js_challenge_patterns = [
'javascript:void(0)',
'onclick="return false"',
'function challenge()'
]
for pattern in js_challenge_patterns:
if pattern in page_content:
suspicious_patterns.append('js_challenge_detected')
break
return len(suspicious_patterns) > 0
def calculate_risk_level(self, detection_results):
"""计算风险等级"""
risk_score = 0
if detection_results['behavior_detected']:
risk_score += 30
if detection_results['technical_detected']:
risk_score += 40
if detection_results['content_detected']:
risk_score += 30
if risk_score >= 80:
return 'high'
elif risk_score >= 50:
return 'medium'
else:
return 'low'
2.3 智能数据提取
AI爬虫能够理解网页结构,智能提取所需数据:
class IntelligentDataExtractor:
def __init__(self):
self.extraction_models = self.load_extraction_models()
self.schema_validator = SchemaValidator()
def load_extraction_models(self):
"""加载数据提取模型"""
return {
'product_info': ProductExtractionModel(),
'article_content': ArticleExtractionModel(),
'ecommerce_data': EcommerceExtractionModel(),
'social_media': SocialMediaExtractionModel()
}
def extract_data_intelligently(self, page_content, target_schema):
"""智能数据提取"""
# 识别页面类型
page_type = self.classify_page_type(page_content)
# 选择对应的提取模型
if page_type in self.extraction_models:
extractor = self.extraction_models[page_type]
extracted_data = extractor.extract(page_content, target_schema)
else:
# 使用通用提取器
extracted_data = self.generic_extractor.extract(page_content, target_schema)
# 数据清洗和验证
cleaned_data = self.clean_and_validate_data(extracted_data, target_schema)
return cleaned_data
def classify_page_type(self, page_content):
"""分类页面类型"""
# 使用机器学习模型分类
features = self.extract_page_features(page_content)
# 简单的规则分类(实际项目中应使用训练好的模型)
if 'product' in page_content.lower() or 'price' in page_content.lower():
return 'product_info'
elif 'article' in page_content.lower() or 'content' in page_content.lower():
return 'article_content'
elif 'shop' in page_content.lower() or 'cart' in page_content.lower():
return 'ecommerce_data'
elif 'social' in page_content.lower() or 'share' in page_content.lower():
return 'social_media'
else:
return 'generic'
def extract_page_features(self, page_content):
"""提取页面特征"""
features = {
'text_length': len(page_content),
'link_count': page_content.count('<a'),
'image_count': page_content.count('<img'),
'form_count': page_content.count('<form'),
'script_count': page_content.count('<script'),
'style_count': page_content.count('<style'),
'div_count': page_content.count('<div'),
'span_count': page_content.count('<span')
}
return features
class ProductExtractionModel:
"""产品信息提取模型"""
def extract(self, page_content, target_schema):
"""提取产品信息"""
extracted_data = {}
# 提取产品名称
product_name = self.extract_product_name(page_content)
if product_name:
extracted_data['product_name'] = product_name
# 提取价格信息
price_info = self.extract_price_info(page_content)
if price_info:
extracted_data['price'] = price_info
# 提取产品描述
description = self.extract_description(page_content)
if description:
extracted_data['description'] = description
# 提取产品图片
images = self.extract_product_images(page_content)
if images:
extracted_data['images'] = images
# 提取规格参数
specifications = self.extract_specifications(page_content)
if specifications:
extracted_data['specifications'] = specifications
return extracted_data
def extract_product_name(self, page_content):
"""提取产品名称"""
# 使用多种策略提取产品名称
strategies = [
self.extract_by_h1_tag,
self.extract_by_title_tag,
self.extract_by_class_name,
self.extract_by_id_name
]
for strategy in strategies:
result = strategy(page_content)
if result:
return result
return None
def extract_by_h1_tag(self, page_content):
"""通过H1标签提取"""
import re
h1_pattern = r'<h1[^>]*>(.*?)</h1>'
matches = re.findall(h1_pattern, page_content, re.I)
if matches:
# 清理HTML标签
clean_text = re.sub(r'<[^>]+>', '', matches[0])
return clean_text.strip()
return None
def extract_price_info(self, page_content):
"""提取价格信息"""
import re
# 价格模式
price_patterns = [
r'¥\s*(\d+(?:\.\d{2})?)', # 人民币
r'$\s*(\d+(?:\.\d{2})?)', # 美元
r'(\d+(?:\.\d{2})?)\s*元', # 中文元
r'价格[::]\s*(\d+(?:\.\d{2})?)', # 中文价格
r'Price[::]\s*(\d+(?:\.\d{2})?)' # 英文价格
]
for pattern in price_patterns:
matches = re.findall(pattern, page_content)
if matches:
try:
price = float(matches[0])
return {
'amount': price,
'currency': self.detect_currency(pattern),
'original_text': matches[0]
}
except ValueError:
continue
return None
def detect_currency(self, pattern):
"""检测货币类型"""
if '¥' in pattern or '元' in pattern:
return 'CNY'
elif '$' in pattern:
return 'USD'
else:
return 'Unknown'
第三部分:实际应用与性能优化
3.1 分布式爬虫架构
为了处理大规模爬取任务,我们需要设计分布式架构:
import redis
import json
import hashlib
from celery import Celery
from datetime import datetime, timedelta
class DistributedCrawlerManager:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
self.celery_app = Celery('crawler_tasks', broker='redis://localhost:6379/0')
self.task_queue = 'crawler_queue'
def distribute_task(self, task_config):
"""分发爬取任务"""
# 生成任务ID
task_id = self.generate_task_id(task_config)
# 检查任务是否已存在
if self.redis_client.exists(f"task:{task_id}"):
return {"status": "exists", "task_id": task_id}
# 任务去重
if self.is_duplicate_task(task_config):
return {"status": "duplicate", "message": "任务已存在"}
# 任务优先级计算
priority = self.calculate_priority(task_config)
# 分发到合适的worker
worker_id = self.select_worker(task_config)
# 创建任务
task_info = {
'id': task_id,
'config': task_config,
'priority': priority,
'worker_id': worker_id,
'status': 'pending',
'created_at': datetime.now().isoformat(),
'estimated_duration': self.estimate_duration(task_config)
}
# 存储任务信息
self.redis_client.hset(f"task:{task_id}", mapping=task_info)
# 发送到Celery队列
task = self.celery_app.send_task(
'crawler_tasks.execute_crawl',
args=[task_config],
kwargs={'task_id': task_id},
queue=self.task_queue,
priority=priority
)
return {
"status": "distributed",
"task_id": task_id,
"celery_task_id": task.id
}
def generate_task_id(self, task_config):
"""生成任务ID"""
# 基于配置内容生成唯一ID
config_str = json.dumps(task_config, sort_keys=True)
return hashlib.md5(config_str.encode()).hexdigest()
def is_duplicate_task(self, task_config):
"""检查任务是否重复"""
# 检查URL是否在最近的任务中
url = task_config.get('url')
if not url:
return False
# 检查最近24小时内的任务
yesterday = datetime.now() - timedelta(hours=24)
# 获取所有任务
task_keys = self.redis_client.keys("task:*")
for key in task_keys:
task_data = self.redis_client.hgetall(key)
if task_data.get(b'config'):
try:
config = json.loads(task_data[b'config'].decode())
if config.get('url') == url:
created_at = datetime.fromisoformat(task_data[b'created_at'].decode())
if created_at > yesterday:
return True
except:
continue
return False
def calculate_priority(self, task_config):
"""计算任务优先级"""
priority = 5 # 默认优先级
# URL重要性
url_importance = task_config.get('importance', 'normal')
if url_importance == 'high':
priority += 3
elif url_importance == 'low':
priority -= 2
# 任务类型
task_type = task_config.get('type', 'general')
if task_type == 'urgent':
priority += 4
elif task_type == 'scheduled':
priority -= 1
# 资源需求
resource_demand = task_config.get('resource_demand', 'normal')
if resource_demand == 'high':
priority -= 1 # 高资源需求降低优先级
# 时间约束
deadline = task_config.get('deadline')
if deadline:
time_left = datetime.fromisoformat(deadline) - datetime.now()
if time_left.total_seconds() < 3600: # 1小时内
priority += 5
elif time_left.total_seconds() < 86400: # 24小时内
priority += 2
return max(1, min(10, priority)) # 限制在1-10范围内
def select_worker(self, task_config):
"""选择合适的工作节点"""
# 获取可用worker列表
available_workers = self.get_available_workers()
if not available_workers:
return None
# 根据任务需求选择worker
best_worker = None
best_score = -1
for worker in available_workers:
score = self.calculate_worker_score(worker, task_config)
if score > best_score:
best_score = score
best_worker = worker
return best_worker
def get_available_workers(self):
"""获取可用worker列表"""
worker_keys = self.redis_client.keys("worker:*")
available_workers = []
for key in worker_keys:
worker_data = self.redis_client.hgetall(key)
if worker_data.get(b'status') == b'available':
available_workers.append({
'id': worker_data[b'id'].decode(),
'capabilities': json.loads(worker_data[b'capabilities'].decode()),
'current_load': int(worker_data[b'current_load']),
'max_load': int(worker_data[b'max_load'])
})
return available_workers
def calculate_worker_score(self, worker, task_config):
"""计算worker适合度得分"""
score = 0
# 负载能力
load_ratio = worker['current_load'] / worker['max_load']
if load_ratio < 0.5:
score += 30
elif load_ratio < 0.8:
score += 20
else:
score += 10
# 能力匹配
required_capabilities = task_config.get('required_capabilities', [])
worker_capabilities = worker['capabilities']
for capability in required_capabilities:
if capability in worker_capabilities:
score += 20
# 地理位置(如果相关)
if 'location' in task_config and 'location' in worker:
if task_config['location'] == worker['location']:
score += 15
return score
3.2 性能监控与优化
实时监控爬虫性能,自动优化参数:
import psutil
import time
from collections import deque
import threading
class PerformanceMonitor:
def __init__(self):
self.metrics = {
'requests_per_second': deque(maxlen=100),
'success_rate': deque(maxlen=100),
'response_time': deque(maxlen=100),
'memory_usage': deque(maxlen=100),
'cpu_usage': deque(maxlen=100),
'error_rate': deque(maxlen=100)
}
self.optimization_rules = self.load_optimization_rules()
self.monitoring_thread = None
self.is_monitoring = False
def start_monitoring(self):
"""开始性能监控"""
if not self.is_monitoring:
self.is_monitoring = True
self.monitoring_thread = threading.Thread(target=self._monitor_loop)
self.monitoring_thread.daemon = True
self.monitoring_thread.start()
def stop_monitoring(self):
"""停止性能监控"""
self.is_monitoring = False
if self.monitoring_thread:
self.monitoring_thread.join()
def _monitor_loop(self):
"""监控循环"""
while self.is_monitoring:
try:
# 收集系统指标
self.collect_system_metrics()
# 分析性能
self.analyze_performance()
# 执行优化
self.execute_optimizations()
# 等待下次监控
time.sleep(5)
except Exception as e:
print(f"监控错误: {e}")
time.sleep(10)
def collect_system_metrics(self):
"""收集系统指标"""
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
self.metrics['cpu_usage'].append({
'timestamp': time.time(),
'value': cpu_percent
})
# 内存使用率
memory = psutil.virtual_memory()
self.metrics['memory_usage'].append({
'timestamp': time.time(),
'value': memory.percent
})
# 网络统计
network = psutil.net_io_counters()
# 磁盘I/O
disk = psutil.disk_io_counters()
def analyze_performance(self):
"""分析性能指标"""
current_time = time.time()
# 计算平均响应时间
if self.metrics['response_time']:
avg_response_time = sum(item['value'] for item in self.metrics['response_time']) / len(self.metrics['response_time'])
# 响应时间趋势分析
recent_times = [item['value'] for item in list(self.metrics['response_time'])[-10:]]
if len(recent_times) >= 2:
trend = (recent_times[-1] - recent_times[0]) / len(recent_times)
if trend > 0.1: # 响应时间增加
self.trigger_optimization('response_time_increasing', {
'current_avg': avg_response_time,
'trend': trend
})
# 成功率分析
if self.metrics['success_rate']:
recent_success_rate = [item['value'] for item in list(self.metrics['success_rate'])[-20:]]
avg_success_rate = sum(recent_success_rate) / len(recent_success_rate)
if avg_success_rate < 0.8: # 成功率低于80%
self.trigger_optimization('low_success_rate', {
'current_rate': avg_success_rate,
'threshold': 0.8
})
# 错误率分析
if self.metrics['error_rate']:
recent_error_rate = [item['value'] for item in list(self.metrics['error_rate'])[-20:]]
avg_error_rate = sum(recent_error_rate) / len(recent_error_rate)
if avg_error_rate > 0.2: # 错误率高于20%
self.trigger_optimization('high_error_rate', {
'current_rate': avg_error_rate,
'threshold': 0.2
})
def trigger_optimization(self, issue_type, context):
"""触发优化"""
print(f"检测到性能问题: {issue_type}, 上下文: {context}")
# 查找对应的优化规则
if issue_type in self.optimization_rules:
rule = self.optimization_rules[issue_type]
# 检查是否满足执行条件
if self.should_execute_optimization(rule, context):
self.execute_optimization(rule, context)
def should_execute_optimization(self, rule, context):
"""检查是否应该执行优化"""
# 检查冷却时间
last_execution = rule.get('last_execution', 0)
current_time = time.time()
if current_time - last_execution < rule.get('cooldown', 300): # 默认5分钟冷却
return False
# 检查触发条件
conditions = rule.get('conditions', [])
for condition in conditions:
if not self.evaluate_condition(condition, context):
return False
return True
def execute_optimization(self, rule, context):
"""执行优化"""
try:
print(f"执行优化: {rule['name']}")
# 执行优化动作
actions = rule.get('actions', [])
for action in actions:
self.execute_action(action, context)
# 更新执行时间
rule['last_execution'] = time.time()
# 记录优化日志
self.log_optimization(rule, context)
except Exception as e:
print(f"优化执行失败: {e}")
def execute_action(self, action, context):
"""执行具体优化动作"""
action_type = action['type']
if action_type == 'adjust_request_rate':
self.adjust_request_rate(action['params'])
elif action_type == 'change_user_agent':
self.change_user_agent(action['params'])
elif action_type == 'enable_proxy':
self.enable_proxy(action['params'])
elif action_type == 'adjust_timeout':
self.adjust_timeout(action['params'])
else:
print(f"未知的优化动作类型: {action_type}")
def adjust_request_rate(self, params):
"""调整请求频率"""
new_rate = params.get('new_rate')
if new_rate:
# 这里应该通知爬虫引擎调整请求频率
print(f"调整请求频率到: {new_rate} 请求/秒")
def change_user_agent(self, params):
"""更换用户代理"""
new_user_agent = params.get('user_agent')
if new_user_agent:
# 这里应该通知爬虫引擎更换用户代理
print(f"更换用户代理到: {new_user_agent}")
def enable_proxy(self, params):
"""启用代理"""
proxy_config = params.get('proxy_config')
if proxy_config:
# 这里应该通知爬虫引擎启用代理
print(f"启用代理: {proxy_config}")
更多推荐



所有评论(0)